package org.df4j.core.boundconnector.messagestream;

import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;
import org.df4j.core.boundconnector.messagescalar.SimpleSubscription;
import org.df4j.core.tasknode.AsyncProc;

/* loaded from: input_file:org/df4j/core/boundconnector/messagestream/StreamOutput.class */
public class StreamOutput<M> extends AsyncProc.Lock implements StreamPublisher<M>, StreamCollector<M> {
    protected AsyncProc actor;
    protected Set<StreamOutput<M>.SimpleSubscriptionImpl> subscriptions;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/df4j/core/boundconnector/messagestream/StreamOutput$SimpleSubscriptionImpl.class */
    public class SimpleSubscriptionImpl implements SimpleSubscription {
        protected StreamSubscriber<? super M> subscriber;
        private volatile boolean closed = false;

        public SimpleSubscriptionImpl(StreamSubscriber<? super M> streamSubscriber) {
            this.subscriber = streamSubscriber;
        }

        public void post(M m) {
            this.subscriber.post(m);
        }

        public void postFailure(Throwable th) {
            this.subscriber.completeExceptionally(th);
            cancel();
        }

        public void complete() {
            if (this.subscriber == null) {
                return;
            }
            this.subscriber.complete();
            this.subscriber = null;
        }

        @Override // org.df4j.core.boundconnector.messagescalar.SimpleSubscription
        public boolean cancel() {
            synchronized (StreamOutput.this) {
                if (this.closed) {
                    return false;
                }
                this.closed = true;
                StreamOutput.this.subscriptions.remove(this);
                return false;
            }
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public StreamOutput(AsyncProc asyncProc) {
        super(false);
        asyncProc.getClass();
        this.subscriptions = new HashSet();
        this.actor = asyncProc;
    }

    @Override // org.df4j.core.boundconnector.messagestream.StreamPublisher
    public <S extends StreamSubscriber<? super M>> S subscribe(S s) {
        StreamOutput<M>.SimpleSubscriptionImpl simpleSubscriptionImpl = new SimpleSubscriptionImpl(s);
        this.subscriptions.add(simpleSubscriptionImpl);
        s.onSubscribe(simpleSubscriptionImpl);
        return s;
    }

    public synchronized void close() {
        this.subscriptions = null;
        super.turnOff();
    }

    public synchronized boolean closed() {
        return super.isBlocked();
    }

    public void forEachSubscription(Consumer<? super StreamOutput<M>.SimpleSubscriptionImpl> consumer) {
        if (closed()) {
            return;
        }
        this.subscriptions.forEach(consumer);
    }

    @Override // org.df4j.core.boundconnector.messagestream.StreamCollector
    public void post(M m) {
        if (m == null) {
            throw new NullPointerException();
        }
        forEachSubscription(simpleSubscriptionImpl -> {
            simpleSubscriptionImpl.post(m);
        });
    }

    @Override // org.df4j.core.boundconnector.messagescalar.ScalarCollector
    public boolean completeExceptionally(Throwable th) {
        forEachSubscription(simpleSubscriptionImpl -> {
            simpleSubscriptionImpl.postFailure(th);
        });
        return false;
    }

    @Override // org.df4j.core.boundconnector.messagestream.StreamCollector
    public synchronized void complete() {
        forEachSubscription((v0) -> {
            v0.complete();
        });
    }
}
