package org.df4j.core.boundconnector.reactivestream;

import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;
import org.df4j.core.boundconnector.messagestream.StreamCollector;
import org.df4j.core.boundconnector.permitstream.Semafor;
import org.df4j.core.tasknode.AsyncProc;

/* loaded from: input_file:org/df4j/core/boundconnector/reactivestream/ReactiveOutput.class */
public class ReactiveOutput<M> extends AsyncProc.Lock implements ReactivePublisher<M>, StreamCollector<M> {
    protected AsyncProc actor;
    protected Set<ReactiveOutput<M>.SimpleReactiveSubscriptionImpl> subscriptions;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/df4j/core/boundconnector/reactivestream/ReactiveOutput$SimpleReactiveSubscriptionImpl.class */
    public class SimpleReactiveSubscriptionImpl extends Semafor implements ReactiveSubscription {
        protected ReactiveSubscriber<? super M> subscriber;
        private volatile boolean closed;

        public SimpleReactiveSubscriptionImpl(ReactiveSubscriber<? super M> reactiveSubscriber) {
            super(ReactiveOutput.this.actor);
            this.closed = false;
            if (reactiveSubscriber == null) {
                throw new NullPointerException();
            }
            this.subscriber = reactiveSubscriber;
        }

        public void post(M m) {
            if (isCompleted()) {
                throw new IllegalStateException("post to completed connector");
            }
            this.subscriber.post(m);
        }

        public void postFailure(Throwable th) {
            if (isCompleted()) {
                throw new IllegalStateException("completeExceptionally to completed connector");
            }
            this.subscriber.completeExceptionally(th);
            cancel();
        }

        @Override // org.df4j.core.boundconnector.permitstream.Semafor, org.df4j.core.tasknode.AsyncProc.Lock
        public void purge() {
        }

        public void complete() {
            if (isCompleted()) {
                return;
            }
            this.subscriber.complete();
            this.subscriber = null;
        }

        private boolean isCompleted() {
            return this.subscriber == null;
        }

        @Override // org.df4j.core.boundconnector.messagescalar.SimpleSubscription
        public synchronized boolean cancel() {
            if (this.closed) {
                return false;
            }
            this.closed = true;
            ReactiveOutput.this.subscriptions.remove(this);
            super.unRegister();
            return false;
        }

        @Override // org.df4j.core.boundconnector.reactivestream.ReactiveSubscription
        public void request(long j) {
            super.release(j);
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public ReactiveOutput(AsyncProc asyncProc) {
        super(false);
        asyncProc.getClass();
        this.subscriptions = new HashSet();
        this.actor = asyncProc;
    }

    @Override // org.df4j.core.boundconnector.reactivestream.ReactivePublisher
    public <S extends ReactiveSubscriber<? super M>> S subscribe(S s) {
        ReactiveOutput<M>.SimpleReactiveSubscriptionImpl simpleReactiveSubscriptionImpl = new SimpleReactiveSubscriptionImpl(s);
        this.subscriptions.add(simpleReactiveSubscriptionImpl);
        s.onSubscribe(simpleReactiveSubscriptionImpl);
        return s;
    }

    public synchronized void close() {
        this.subscriptions = null;
        super.turnOff();
    }

    public synchronized boolean closed() {
        return super.isBlocked();
    }

    public void forEachSubscription(Consumer<? super ReactiveOutput<M>.SimpleReactiveSubscriptionImpl> consumer) {
        if (closed()) {
            return;
        }
        this.subscriptions.forEach(consumer);
    }

    @Override // org.df4j.core.boundconnector.messagestream.StreamCollector
    public void post(M m) {
        forEachSubscription(simpleReactiveSubscriptionImpl -> {
            simpleReactiveSubscriptionImpl.post(m);
        });
    }

    @Override // org.df4j.core.boundconnector.messagestream.StreamCollector
    public synchronized void complete() {
        forEachSubscription((v0) -> {
            v0.complete();
        });
    }

    @Override // org.df4j.core.boundconnector.messagescalar.ScalarCollector
    public boolean completeExceptionally(Throwable th) {
        forEachSubscription(simpleReactiveSubscriptionImpl -> {
            simpleReactiveSubscriptionImpl.postFailure(th);
        });
        return false;
    }
}
