package reactor.core.publisher;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Stream;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Exceptions;
import reactor.core.Receiver;
import reactor.core.Scannable;
import reactor.core.publisher.EventLoopProcessor;
import reactor.core.publisher.RingBuffer;
import reactor.util.concurrent.QueueSupplier;

/* loaded from: input_file:reactor/core/publisher/EmitterProcessor.class */
public final class EmitterProcessor<T> extends FluxProcessor<T, T> implements Receiver {
    final int maxConcurrency;
    final int bufferSize;
    final int limit;
    final boolean autoCancel;
    Subscription upstreamSubscription;
    private volatile RingBuffer<EventLoopProcessor.Slot<T>> emitBuffer;
    private volatile boolean done;
    private volatile Throwable error;
    volatile EmitterInner<?>[] subscribers;
    private volatile int running;
    private volatile int outstanding;
    boolean firstDrain = true;
    static final EmitterInner<?>[] EMPTY = new EmitterInner[0];
    static final EmitterInner<?>[] CANCELLED = new EmitterInner[0];
    static final AtomicReferenceFieldUpdater<EmitterProcessor, Throwable> ERROR = AtomicReferenceFieldUpdater.newUpdater(EmitterProcessor.class, Throwable.class, "error");
    static final AtomicReferenceFieldUpdater<EmitterProcessor, EmitterInner[]> SUBSCRIBERS = AtomicReferenceFieldUpdater.newUpdater(EmitterProcessor.class, EmitterInner[].class, "subscribers");
    static final AtomicIntegerFieldUpdater<EmitterProcessor> RUNNING = AtomicIntegerFieldUpdater.newUpdater(EmitterProcessor.class, "running");
    static final AtomicIntegerFieldUpdater<EmitterProcessor> OUTSTANDING = AtomicIntegerFieldUpdater.newUpdater(EmitterProcessor.class, "outstanding");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/core/publisher/EmitterProcessor$EmitterInner.class */
    public static final class EmitterInner<T> implements InnerProducer<T> {
        static final long MASK_NOT_SUBSCRIBED = Long.MIN_VALUE;
        final EmitterProcessor<T> parent;
        final Subscriber<? super T> actual;
        volatile boolean done;
        boolean unbounded = false;
        private volatile long requested = MASK_NOT_SUBSCRIBED;
        volatile RingBuffer.Sequence pollCursor;
        static final AtomicLongFieldUpdater<EmitterInner> REQUESTED = AtomicLongFieldUpdater.newUpdater(EmitterInner.class, "requested");
        static final AtomicReferenceFieldUpdater<EmitterInner, RingBuffer.Sequence> CURSOR = AtomicReferenceFieldUpdater.newUpdater(EmitterInner.class, RingBuffer.Sequence.class, "pollCursor");

        EmitterInner(EmitterProcessor<T> emitterProcessor, Subscriber<? super T> subscriber) {
            this.actual = subscriber;
            this.parent = emitterProcessor;
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            if (Operators.checkRequest(j, this.actual)) {
                Operators.getAndAddCap(REQUESTED, this, j);
                if (EmitterProcessor.RUNNING.getAndIncrement(this.parent) == 0) {
                    this.parent.drainLoop();
                }
            }
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            this.done = true;
            this.parent.drain();
        }

        @Override // reactor.core.publisher.InnerProducer, reactor.core.Scannable
        public final Object scan(Scannable.Attr attr) {
            switch (attr) {
                case PARENT:
                    return this.parent;
                case BUFFERED:
                    return Long.valueOf((this.pollCursor == null || this.done) ? -1L : ((EmitterProcessor) this.parent).emitBuffer.getCursor() - this.pollCursor.getAsLong());
                case CANCELLED:
                case TERMINATED:
                    return Boolean.valueOf(this.done);
                default:
                    return super.scan(attr);
            }
        }

        void startTracking(long j) {
            RingBuffer.Sequence newSequence = RingBuffer.newSequence(j - 1);
            if (CURSOR.compareAndSet(this, null, newSequence)) {
                ((EmitterProcessor) this.parent).emitBuffer.addGatingSequence(newSequence);
            }
        }

        void start() {
            if (REQUESTED.compareAndSet(this, MASK_NOT_SUBSCRIBED, 0L)) {
                RingBuffer ringBuffer = ((EmitterProcessor) this.parent).emitBuffer;
                if (ringBuffer != null) {
                    startTracking(Math.max(0L, ringBuffer.getMinimumGatingSequence() + 1));
                }
                this.actual.onSubscribe(this);
            }
        }

        @Override // reactor.core.publisher.InnerProducer
        public Subscriber<? super T> actual() {
            return this.actual;
        }
    }

    public static <E> EmitterProcessor<E> create() {
        return create(true);
    }

    public static <E> EmitterProcessor<E> create(boolean z) {
        return create(QueueSupplier.SMALL_BUFFER_SIZE, z);
    }

    public static <E> EmitterProcessor<E> create(int i) {
        return create(i, Integer.MAX_VALUE);
    }

    public static <E> EmitterProcessor<E> create(int i, int i2) {
        return create(i, i2, true);
    }

    public static <E> EmitterProcessor<E> create(int i, boolean z) {
        return create(i, Integer.MAX_VALUE, z);
    }

    public static <E> EmitterProcessor<E> create(int i, int i2, boolean z) {
        return new EmitterProcessor<>(z, i2, i);
    }

    @Override // reactor.core.Receiver
    public Subscription upstream() {
        return this.upstreamSubscription;
    }

    EmitterProcessor(boolean z, int i, int i2) {
        if (i2 < 1) {
            throw new IllegalArgumentException("bufferSize must be strictly positive, was: " + i2);
        }
        this.autoCancel = z;
        this.maxConcurrency = i;
        this.bufferSize = i2;
        this.limit = Math.max(1, i2 / 2);
        OUTSTANDING.lazySet(this, i2);
        SUBSCRIBERS.lazySet(this, EMPTY);
    }

    @Override // reactor.core.publisher.FluxProcessor, reactor.core.Scannable
    public Stream<? extends Scannable> inners() {
        return Stream.of((Object[]) this.subscribers);
    }

    @Override // reactor.core.publisher.FluxProcessor, org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super T> subscriber) {
        if (subscriber == null) {
            throw Exceptions.argumentIsNullException();
        }
        EmitterInner<T> emitterInner = new EmitterInner<>(this, subscriber);
        if (!addInner(emitterInner)) {
            Operators.complete(emitterInner.actual);
        } else if (this.upstreamSubscription != null) {
            emitterInner.start();
        }
    }

    @Override // reactor.core.publisher.FluxProcessor
    public EmitterProcessor<T> connect() {
        onSubscribe(Operators.emptySubscription());
        return this;
    }

    @Override // reactor.core.Trackable
    public long getPending() {
        if (this.emitBuffer == null) {
            return -1L;
        }
        return this.emitBuffer.getPending();
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(T t) {
        if (t == null) {
            throw Exceptions.argumentIsNullException();
        }
        EmitterInner<?>[] emitterInnerArr = this.subscribers;
        if (emitterInnerArr == CANCELLED) {
            return;
        }
        int length = emitterInnerArr.length;
        if (length == 0) {
            buffer((EmitterProcessor<T>) t);
            return;
        }
        long j = -1;
        if (this.upstreamSubscription != Operators.emptySubscription()) {
            if (this.outstanding == 0) {
                buffer((EmitterProcessor<T>) t);
                drain();
                return;
            }
            OUTSTANDING.decrementAndGet(this);
        }
        for (int i = 0; i < length; i++) {
            EmitterInner<?> emitterInner = emitterInnerArr[i];
            if (emitterInner.done) {
                removeInner(emitterInner, this.autoCancel ? CANCELLED : EMPTY);
                if (this.autoCancel && this.subscribers == CANCELLED) {
                    if (RUNNING.compareAndSet(this, 0, 1)) {
                        cancel();
                        return;
                    }
                    return;
                }
            } else {
                long j2 = ((EmitterInner) emitterInner).requested;
                emitterInner.unbounded = j2 == Long.MAX_VALUE;
                RingBuffer.Sequence sequence = emitterInner.unbounded ? null : emitterInner.pollCursor;
                if (j2 > 0 && sequence == null) {
                    if (j2 != Long.MAX_VALUE) {
                        EmitterInner.REQUESTED.decrementAndGet(emitterInner);
                    }
                    emitterInner.actual.onNext(t);
                } else if (j == -1) {
                    j = buffer((EmitterProcessor<T>) t);
                    startAllTrackers(emitterInnerArr, j, i + 1);
                } else if (sequence == null) {
                    emitterInner.startTracking(j);
                }
            }
        }
        if (RUNNING.getAndIncrement(this) != 0) {
            return;
        }
        drainLoop();
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        if (th == null) {
            throw Exceptions.argumentIsNullException();
        }
        if (this.done) {
            Operators.onErrorDropped(th);
        }
        reportError(th);
        this.done = true;
        drain();
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        if (this.done) {
            return;
        }
        this.done = true;
        drain();
    }

    @Override // org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        if (Operators.validate(this.upstreamSubscription, subscription)) {
            this.upstreamSubscription = subscription;
            EmitterInner<?>[] emitterInnerArr = this.subscribers;
            if (emitterInnerArr == CANCELLED || emitterInnerArr.length == 0) {
                return;
            }
            for (EmitterInner<?> emitterInner : emitterInnerArr) {
                emitterInner.start();
            }
        }
    }

    @Override // reactor.core.publisher.FluxProcessor, reactor.core.Trackable
    public Throwable getError() {
        return this.error;
    }

    @Override // reactor.core.Trackable
    public boolean isCancelled() {
        return this.subscribers == CANCELLED;
    }

    @Override // reactor.core.publisher.FluxProcessor
    public final int getBufferSize() {
        return this.bufferSize;
    }

    @Override // reactor.core.publisher.FluxProcessor, reactor.core.Trackable
    public boolean isStarted() {
        return this.upstreamSubscription != null;
    }

    @Override // reactor.core.publisher.FluxProcessor, reactor.core.Trackable
    public boolean isTerminated() {
        return this.done && (this.emitBuffer == null || this.emitBuffer.getPending() == 0);
    }

    @Override // reactor.core.publisher.FluxProcessor, reactor.core.Scannable
    public Object scan(Scannable.Attr attr) {
        switch (attr) {
            case PARENT:
                return this.upstreamSubscription;
            case BUFFERED:
                return Integer.valueOf(this.emitBuffer == null ? 0 : this.emitBuffer.getPending());
            case CANCELLED:
                return Boolean.valueOf(isCancelled());
            default:
                return super.scan(attr);
        }
    }

    RingBuffer<EventLoopProcessor.Slot<T>> getMainQueue() {
        RingBuffer<EventLoopProcessor.Slot<T>> ringBuffer = this.emitBuffer;
        if (ringBuffer == null) {
            ringBuffer = EventLoopProcessor.createSingleProducer(this.bufferSize);
            this.emitBuffer = ringBuffer;
        }
        return ringBuffer;
    }

    final long buffer(T t) {
        RingBuffer<EventLoopProcessor.Slot<T>> mainQueue = getMainQueue();
        long next = mainQueue.next();
        mainQueue.get(next).value = t;
        mainQueue.publish(next);
        return next;
    }

    final void drain() {
        if (RUNNING.getAndIncrement(this) == 0) {
            drainLoop();
        }
    }

    final void drainLoop() {
        long j;
        int i = 1;
        RingBuffer<EventLoopProcessor.Slot<T>> ringBuffer = null;
        do {
            EmitterInner<?>[] emitterInnerArr = this.subscribers;
            if (emitterInnerArr == CANCELLED) {
                cancel();
                return;
            }
            boolean z = this.done;
            if (z && emitterInnerArr == EMPTY) {
                return;
            }
            if (emitterInnerArr.length != 0) {
                for (EmitterInner<?> emitterInner : emitterInnerArr) {
                    long j2 = ((EmitterInner) emitterInner).requested;
                    if (emitterInner.done) {
                        removeInner(emitterInner, this.autoCancel ? CANCELLED : EMPTY);
                        if (this.autoCancel && this.subscribers == CANCELLED) {
                            cancel();
                            return;
                        }
                    } else {
                        if (ringBuffer == null) {
                            ringBuffer = this.emitBuffer;
                        }
                        RingBuffer.Sequence sequence = emitterInner.pollCursor;
                        if (emitterInner.unbounded || sequence == null || j2 <= 0) {
                            j = 0;
                        } else {
                            j = j2;
                            boolean z2 = j == Long.MAX_VALUE;
                            long asLong = sequence.getAsLong();
                            long cursor = ringBuffer.getCursor();
                            while (j != 0) {
                                asLong++;
                                if (cursor < asLong) {
                                    break;
                                }
                                T t = ringBuffer.get(asLong).value;
                                sequence.set(asLong);
                                emitterInner.actual.onNext(t);
                                if (!z2) {
                                    j--;
                                }
                            }
                            if (!z2 && j2 > j) {
                                EmitterInner.REQUESTED.addAndGet(emitterInner, j - j2);
                            }
                        }
                        if (z) {
                            checkTerminal(emitterInner, sequence, j);
                        }
                    }
                }
                if (!this.done && this.firstDrain) {
                    Subscription subscription = this.upstreamSubscription;
                    if (subscription != null) {
                        this.firstDrain = false;
                        subscription.request(this.bufferSize);
                    }
                } else if (ringBuffer != null) {
                    requestMore(ringBuffer.getPending());
                } else {
                    requestMore(0);
                }
            }
            i = RUNNING.addAndGet(this, -i);
        } while (i != 0);
    }

    final void checkTerminal(EmitterInner<T> emitterInner, RingBuffer.Sequence sequence, long j) {
        Throwable th = this.error;
        if ((th == null || j != 0) && sequence != null && !emitterInner.unbounded && sequence.getAsLong() < this.emitBuffer.getCursor()) {
            return;
        }
        removeInner(emitterInner, EMPTY);
        if (emitterInner.done) {
            return;
        }
        if (th == null) {
            emitterInner.actual.onComplete();
        } else {
            emitterInner.actual.onError(th);
        }
    }

    final void startAllTrackers(EmitterInner<?>[] emitterInnerArr, long j, int i) {
        for (int i2 = 0; i2 < i - 1; i2++) {
            if (emitterInnerArr[i2].pollCursor == null) {
                emitterInnerArr[i2].startTracking(j + 1);
            }
        }
        emitterInnerArr[i - 1].startTracking(j);
    }

    final void reportError(Throwable th) {
        ERROR.compareAndSet(this, null, th);
    }

    final boolean addInner(EmitterInner<T> emitterInner) {
        EmitterInner<?>[] emitterInnerArr;
        EmitterInner[] emitterInnerArr2;
        do {
            emitterInnerArr = this.subscribers;
            if (emitterInnerArr == CANCELLED) {
                return false;
            }
            int length = emitterInnerArr.length;
            if (length + 1 > this.maxConcurrency) {
                throw Exceptions.failWithOverflow();
            }
            emitterInnerArr2 = new EmitterInner[length + 1];
            System.arraycopy(emitterInnerArr, 0, emitterInnerArr2, 0, length);
            emitterInnerArr2[length] = emitterInner;
        } while (!SUBSCRIBERS.compareAndSet(this, emitterInnerArr, emitterInnerArr2));
        return true;
    }

    final void removeInner(EmitterInner<?> emitterInner, EmitterInner<?>[] emitterInnerArr) {
        EmitterInner<?>[] emitterInnerArr2;
        EmitterInner<?>[] emitterInnerArr3;
        do {
            emitterInnerArr2 = this.subscribers;
            if (emitterInnerArr2 == CANCELLED || emitterInnerArr2 == EMPTY) {
                return;
            }
            int length = emitterInnerArr2.length;
            int i = 0;
            int i2 = 0;
            while (true) {
                if (i2 >= length) {
                    break;
                }
                if (emitterInnerArr2[i2] == emitterInner) {
                    i = i2;
                    break;
                }
                i2++;
            }
            if (length == 1) {
                emitterInnerArr3 = emitterInnerArr;
            } else {
                emitterInnerArr3 = new EmitterInner[length - 1];
                System.arraycopy(emitterInnerArr2, 0, emitterInnerArr3, 0, i);
                System.arraycopy(emitterInnerArr2, i + 1, emitterInnerArr3, i, (length - i) - 1);
            }
        } while (!SUBSCRIBERS.compareAndSet(this, emitterInnerArr2, emitterInnerArr3));
        RingBuffer.Sequence sequence = emitterInner.pollCursor;
        if (sequence != null) {
            getMainQueue().removeGatingSequence(sequence);
        }
    }

    final void requestMore(int i) {
        int i2;
        int i3;
        Subscription subscription = this.upstreamSubscription;
        if (subscription != Operators.emptySubscription() && i < this.bufferSize && (i2 = this.outstanding) <= this.limit && (i3 = (this.bufferSize - i2) - i) > 0 && subscription != null) {
            OUTSTANDING.addAndGet(this, i3);
            subscription.request(i3);
        }
    }

    final void cancel() {
        Subscription subscription;
        if (this.done || (subscription = this.upstreamSubscription) == null) {
            return;
        }
        this.upstreamSubscription = null;
        subscription.cancel();
    }

    @Override // reactor.core.publisher.FluxProcessor, reactor.core.MultiProducer
    public long downstreamCount() {
        return this.subscribers.length;
    }

    @Override // reactor.core.publisher.Flux
    public String toString() {
        return "{done: " + this.done + (this.error != null ? ", error: '" + this.error.getMessage() + "', " : "") + ", outstanding: " + this.outstanding + ", pending: " + this.emitBuffer + '}';
    }
}
