package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.MulticastUtils;
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;
import io.servicetalk.concurrent.internal.EmptySubscription;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/servicetalk/concurrent/api/AbstractPublisherGroupBy.class */
public abstract class AbstractPublisherGroupBy<Key, T> extends AbstractSynchronousPublisherOperator<T, GroupedPublisher<Key, T>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractPublisherGroupBy.class);
    final int initialCapacityForGroups;
    final int groupQueueSize;

    /* loaded from: input_file:io/servicetalk/concurrent/api/AbstractPublisherGroupBy$AbstractSourceSubscriber.class */
    static abstract class AbstractSourceSubscriber<Key, T> implements PublisherSource.Subscriber<T>, PublisherSource.Subscription {
        private static final AtomicLongFieldUpdater<AbstractSourceSubscriber> groupRequestedUpdater;
        private static final AtomicIntegerFieldUpdater<AbstractSourceSubscriber> subscriberStateUpdater;
        private static final AtomicReferenceFieldUpdater<AbstractSourceSubscriber, Throwable> cancelCauseUpdater;
        private boolean terminatedPrematurely;

        @Nullable
        private volatile Throwable cancelCause;
        private volatile long groupRequested;
        private volatile int subscriberState;

        @Nullable
        private volatile PublisherSource.Subscription subscription;

        @Nullable
        private volatile MulticastUtils.SpscQueue<GroupedPublisher<Key, T>> groupQueue;
        private final Executor executor;
        private final PublisherSource.Subscriber<? super GroupedPublisher<Key, T>> target;
        private final Map<Key, GroupSink<Key, T>> groups;
        static final /* synthetic */ boolean $assertionsDisabled;

        /* JADX INFO: Access modifiers changed from: package-private */
        public AbstractSourceSubscriber(Executor executor, int i, PublisherSource.Subscriber<? super GroupedPublisher<Key, T>> subscriber) {
            this.executor = executor;
            this.target = subscriber;
            this.groups = new ConcurrentHashMap(i, 1.0f, 1);
        }

        public final void onSubscribe(PublisherSource.Subscription subscription) {
            if (SubscriberUtils.checkDuplicateSubscription(this.subscription, subscription)) {
                this.subscription = ConcurrentSubscription.wrap(subscription);
                this.target.onSubscribe(this);
            }
        }

        abstract void onNext0(@Nullable T t);

        abstract int groupQueueSize();

        /* JADX INFO: Access modifiers changed from: package-private */
        /* JADX WARN: Code restructure failed: missing block: B:36:0x0133, code lost:
        
            r9.target.onNext(r12.groupedPublisher);
         */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public final void onNextGroup(Key r10, @javax.annotation.Nullable T r11) {
            /*
                Method dump skipped, instructions count: 585
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: io.servicetalk.concurrent.api.AbstractPublisherGroupBy.AbstractSourceSubscriber.onNextGroup(java.lang.Object, java.lang.Object):void");
        }

        public final void onNext(T t) {
            if (this.terminatedPrematurely) {
                return;
            }
            onNext0(t);
        }

        public final void onError(Throwable th) {
            if (this.terminatedPrematurely) {
                return;
            }
            MulticastUtils.SpscQueue<GroupedPublisher<Key, T>> spscQueue = this.groupQueue;
            if (spscQueue != null && (!spscQueue.isEmpty() || !subscriberStateUpdater.compareAndSet(this, 0, 2))) {
                spscQueue.addTerminal(TerminalNotification.error(th));
                drainPendingGroupsFromSource(spscQueue);
            } else {
                try {
                    this.target.onError(th);
                } finally {
                    sendErrorToAllGroups(th);
                }
            }
        }

        public final void onComplete() {
            MulticastUtils.SpscQueue<GroupedPublisher<Key, T>> spscQueue = this.groupQueue;
            if (spscQueue != null && (!spscQueue.isEmpty() || !subscriberStateUpdater.compareAndSet(this, 0, 2))) {
                spscQueue.addTerminal(TerminalNotification.complete());
                drainPendingGroupsFromSource(spscQueue);
            } else {
                try {
                    this.target.onComplete();
                } finally {
                    sendCompleteToAllGroups();
                }
            }
        }

        public final void request(long j) {
            PublisherSource.Subscription subscription = this.subscription;
            if (!$assertionsDisabled && subscription == null) {
                throw new AssertionError("Subscription can not be null");
            }
            if (!SubscriberUtils.isRequestNValid(j)) {
                subscription.request(j);
                return;
            }
            groupRequestedUpdater.accumulateAndGet(this, j, FlowControlUtils::addWithOverflowProtection);
            MulticastUtils.SpscQueue<GroupedPublisher<Key, T>> spscQueue = this.groupQueue;
            if (spscQueue == null) {
                subscription.request(j);
                return;
            }
            long drainPendingGroupsFromSubscription = drainPendingGroupsFromSubscription(spscQueue);
            if (drainPendingGroupsFromSubscription < 0 || drainPendingGroupsFromSubscription >= j) {
                return;
            }
            subscription.request(j - drainPendingGroupsFromSubscription);
        }

        final void requestFromGroup(long j) {
            PublisherSource.Subscription subscription = this.subscription;
            if (!$assertionsDisabled && subscription == null) {
                throw new AssertionError("Subscription can not be null");
            }
            subscription.request(j);
        }

        public final void cancel() {
            cancelSourceFromExternal(new CancellationException("Group subscriber cancelled its subscription"));
        }

        final void cancelSourceFromExternal(Throwable th) {
            PublisherSource.Subscription subscription = this.subscription;
            if (!$assertionsDisabled && subscription == null) {
                throw new AssertionError("Subscription can not be null");
            }
            subscription.cancel();
            if (cancelCauseUpdater.compareAndSet(this, null, th) && subscriberStateUpdater.compareAndSet(this, 0, 1)) {
                sendErrorToAllGroups(th);
                this.subscriberState = 0;
                MulticastUtils.SpscQueue<GroupedPublisher<Key, T>> spscQueue = this.groupQueue;
                if (spscQueue != null) {
                    drainPendingGroupsFromSubscription(spscQueue);
                }
            }
        }

        final void removeGroup(GroupSink<Key, T> groupSink) {
            this.groups.remove(groupSink.groupedPublisher.key(), groupSink);
        }

        final void cancelSourceFromSource(Throwable th) {
            cancelSourceFromSource(true, th);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public final void cancelSourceFromSource(boolean z, Throwable th) {
            cancelSourceFromSource(z, th, this.groupQueue);
        }

        private void cancelSourceFromSource(boolean z, Throwable th, @Nullable MulticastUtils.SpscQueue<GroupedPublisher<Key, T>> spscQueue) {
            PublisherSource.Subscription subscription = this.subscription;
            if (!$assertionsDisabled && subscription == null) {
                throw new AssertionError("Subscription can not be null in cancel()");
            }
            this.terminatedPrematurely = true;
            subscription.cancel();
            if (spscQueue != null && !spscQueue.isEmpty()) {
                spscQueue.addTerminal(TerminalNotification.error(th));
                drainPendingGroupsFromSource(spscQueue);
                return;
            }
            if (!z && !subscriberStateUpdater.compareAndSet(this, 0, 1)) {
                if (spscQueue == null) {
                    MulticastUtils.SpscQueue<GroupedPublisher<Key, T>> spscQueue2 = new MulticastUtils.SpscQueue<>(groupQueueSize());
                    spscQueue = spscQueue2;
                    this.groupQueue = spscQueue2;
                }
                spscQueue.addTerminal(TerminalNotification.error(th));
                drainPendingGroupsFromSource(spscQueue);
                return;
            }
            try {
                try {
                    this.target.onError(th);
                    sendErrorToAllGroups(th);
                    if (z) {
                        return;
                    }
                    this.subscriberState = 0;
                } catch (Throwable th2) {
                    AbstractPublisherGroupBy.LOGGER.error("Subscriber {} threw from onError for exception {}", new Object[]{this.target, th, th2});
                    sendErrorToAllGroups(th);
                    if (z) {
                        return;
                    }
                    this.subscriberState = 0;
                }
            } catch (Throwable th3) {
                sendErrorToAllGroups(th);
                if (!z) {
                    this.subscriberState = 0;
                }
                throw th3;
            }
        }

        private void cancelSourceFromSubscription(Throwable th) {
            PublisherSource.Subscription subscription = this.subscription;
            if (!$assertionsDisabled && subscription == null) {
                throw new AssertionError("Subscription can not be null in cancel()");
            }
            subscription.cancel();
            if (!$assertionsDisabled && this.subscriberState == 0) {
                throw new AssertionError();
            }
            sendErrorToAllGroups(th);
            AbstractPublisherGroupBy.LOGGER.error("Unexpected exception thrown from subscriber", th);
        }

        private void drainPendingGroupsFromSource(MulticastUtils.SpscQueue<GroupedPublisher<Key, T>> spscQueue) {
            drainPendingGroups(spscQueue, this::cancelSourceFromSource);
        }

        private long drainPendingGroupsFromSubscription(MulticastUtils.SpscQueue<GroupedPublisher<Key, T>> spscQueue) {
            return drainPendingGroups(spscQueue, this::cancelSourceFromSubscription);
        }

        private long drainPendingGroups(MulticastUtils.SpscQueue<GroupedPublisher<Key, T>> spscQueue, Consumer<Throwable> consumer) {
            return MulticastUtils.drainToSubscriber(spscQueue, this.target, subscriberStateUpdater, () -> {
                return groupRequestedUpdater.get(this);
            }, terminalNotification -> {
                Throwable cause = terminalNotification.cause();
                if (cause == null) {
                    sendCompleteToAllGroups();
                } else {
                    sendErrorToAllGroups(cause);
                }
            }, consumer, this::drainPendingGroupsDecrementRequestN, this);
        }

        private void drainPendingGroupsDecrementRequestN(int i) {
            if (!$assertionsDisabled && i <= 0) {
                throw new AssertionError();
            }
            groupRequestedUpdater.addAndGet(this, -i);
        }

        private void sendErrorToAllGroups(Throwable th) {
            Iterator<GroupSink<Key, T>> it = this.groups.values().iterator();
            while (it.hasNext()) {
                GroupSink<Key, T> next = it.next();
                it.remove();
                next.onError(th);
            }
        }

        private void sendCompleteToAllGroups() {
            Iterator<GroupSink<Key, T>> it = this.groups.values().iterator();
            while (it.hasNext()) {
                GroupSink<Key, T> next = it.next();
                it.remove();
                next.onComplete();
            }
        }

        static {
            $assertionsDisabled = !AbstractPublisherGroupBy.class.desiredAssertionStatus();
            groupRequestedUpdater = AtomicLongFieldUpdater.newUpdater(AbstractSourceSubscriber.class, "groupRequested");
            subscriberStateUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractSourceSubscriber.class, "subscriberState");
            cancelCauseUpdater = AtomicReferenceFieldUpdater.newUpdater(AbstractSourceSubscriber.class, Throwable.class, "cancelCause");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/servicetalk/concurrent/api/AbstractPublisherGroupBy$GroupSink.class */
    public static final class GroupSink<Key, T> extends MulticastUtils.IndividualMulticastSubscriber<T> {
        final GroupedPublisher<Key, T> groupedPublisher;
        private final AbstractSourceSubscriber<Key, T> sourceSubscriber;

        GroupSink(Executor executor, Key key, int i, AbstractSourceSubscriber<Key, T> abstractSourceSubscriber) {
            super(i);
            this.sourceSubscriber = abstractSourceSubscriber;
            this.groupedPublisher = new GroupedPublisherSource<Key, T>(executor, key) { // from class: io.servicetalk.concurrent.api.AbstractPublisherGroupBy.GroupSink.1
                @Override // io.servicetalk.concurrent.api.Publisher
                protected void handleSubscribe(PublisherSource.Subscriber<? super T> subscriber) {
                    Objects.requireNonNull(subscriber);
                    PublisherSource.Subscriber<? super T> subscriber2 = GroupSink.this.target;
                    if (subscriber2 != null) {
                        subscriber.onSubscribe(EmptySubscription.EMPTY_SUBSCRIPTION);
                        subscriber.onError(new DuplicateSubscribeException(subscriber2, subscriber));
                        return;
                    }
                    subscriber.onSubscribe(GroupSink.this);
                    GroupSink.this.target = subscriber;
                    MulticastUtils.SpscQueue<T> subscriberQueue = GroupSink.this.subscriberQueue();
                    if (subscriberQueue != null) {
                        GroupSink.this.drainPendingFromExternal(subscriberQueue, subscriber);
                    }
                }
            };
        }

        @Override // io.servicetalk.concurrent.api.MulticastUtils.IndividualMulticastSubscriber
        String queueIdentifier() {
            return this.groupedPublisher.key().toString();
        }

        @Override // io.servicetalk.concurrent.api.MulticastUtils.IndividualMulticastSubscriber
        void requestFromSource(int i) {
            this.sourceSubscriber.requestFromGroup(i);
        }

        @Override // io.servicetalk.concurrent.api.MulticastUtils.IndividualMulticastSubscriber
        void handleInvalidRequestN(long j) {
            this.sourceSubscriber.requestFromGroup(j);
        }

        @Override // io.servicetalk.concurrent.api.MulticastUtils.IndividualMulticastSubscriber
        void cancelSourceFromExternal(Throwable th) {
            this.sourceSubscriber.cancelSourceFromExternal(th);
            AbstractPublisherGroupBy.LOGGER.error("Unexpected exception thrown from group {} subscriber", this.groupedPublisher.key(), th);
        }

        @Override // io.servicetalk.concurrent.api.MulticastUtils.IndividualMulticastSubscriber
        void cancelSourceFromSource(boolean z, Throwable th) {
            this.sourceSubscriber.removeGroup(this);
            this.sourceSubscriber.cancelSourceFromSource(z, th);
        }

        public void cancel() {
            this.sourceSubscriber.removeGroup(this);
        }
    }

    /* loaded from: input_file:io/servicetalk/concurrent/api/AbstractPublisherGroupBy$GroupedPublisherSource.class */
    private static abstract class GroupedPublisherSource<Key, T> extends GroupedPublisher<Key, T> implements PublisherSource<T> {
        GroupedPublisherSource(Executor executor, Key key) {
            super(executor, key);
        }

        public final void subscribe(PublisherSource.Subscriber<? super T> subscriber) {
            subscribeInternal(subscriber);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractPublisherGroupBy(Publisher<T> publisher, int i, Executor executor) {
        this(publisher, i, 4, executor);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractPublisherGroupBy(Publisher<T> publisher, int i, int i2, Executor executor) {
        super(publisher, executor);
        if (i2 <= 0) {
            throw new IllegalArgumentException("expectedGroupCountHint " + i2 + " (expected >0)");
        }
        this.initialCapacityForGroups = i2;
        if (i <= 0) {
            throw new IllegalArgumentException("groupQueueSize " + i + " (expected >0)");
        }
        this.groupQueueSize = i;
    }
}
