package org.df4j.core.boundconnector.reactivestream;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import org.df4j.core.boundconnector.messagestream.StreamInput;
import org.df4j.core.tasknode.AsyncProc;

/* loaded from: input_file:org/df4j/core/boundconnector/reactivestream/ReactiveInput.class */
public class ReactiveInput<T> extends StreamInput<T> implements ReactiveSubscriber<T>, Iterator<T> {
    protected Deque<T> queue;
    protected boolean closeRequested;
    protected int capacity;
    protected ReactiveSubscription subscription;

    public ReactiveInput(AsyncProc asyncProc, int i) {
        super(asyncProc);
        this.closeRequested = false;
        this.queue = new ArrayDeque(i);
        this.capacity = i;
    }

    public ReactiveInput(AsyncProc asyncProc) {
        this(asyncProc, 8);
    }

    @Override // org.df4j.core.boundconnector.messagestream.StreamInput
    protected int size() {
        return this.queue.size();
    }

    @Override // org.df4j.core.boundconnector.reactivestream.ReactiveSubscriber
    public void onSubscribe(ReactiveSubscription reactiveSubscription) {
        this.subscription = reactiveSubscription;
        reactiveSubscription.request(this.capacity);
    }

    @Override // org.df4j.core.boundconnector.messagestream.StreamInput, org.df4j.core.boundconnector.messagestream.StreamCollector
    public synchronized void post(T t) {
        if (this.subscription == null) {
            throw new IllegalStateException("not yet subscribed");
        }
        if (this.queue.size() >= this.capacity) {
            throw new IllegalStateException("no space for next token");
        }
        super.complete(t);
    }

    @Override // org.df4j.core.boundconnector.messagestream.StreamInput, org.df4j.core.boundconnector.messagescalar.ScalarInput, org.df4j.core.boundconnector.messagescalar.ConstInput, org.df4j.core.tasknode.AsyncProc.AsyncParam
    public synchronized T next() {
        this.subscription.request(1L);
        return (T) super.next();
    }

    @Override // org.df4j.core.boundconnector.messagestream.StreamInput
    public synchronized boolean isClosed() {
        return this.closeRequested && this.value == null;
    }
}
