/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.reactive.publisher.impl;

import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.functions.LongConsumer;
import io.reactivex.rxjava3.processors.FlowableProcessor;
import io.reactivex.rxjava3.processors.UnicastProcessor;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.infinispan.commons.reactive.RxJavaInterop;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManagerImpl;
import org.infinispan.reactive.publisher.impl.Notifications;
import org.infinispan.reactive.publisher.impl.PublisherHandler;
import org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier;
import org.infinispan.reactive.publisher.impl.commands.batch.KeyPublisherResponse;
import org.infinispan.reactive.publisher.impl.commands.batch.PublisherResponse;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

public class InnerPublisherSubscription<K, I, R, E>
implements LongConsumer,
Action {
    protected static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    private final InnerPublisherSubscriptionBuilder<K, I, R> builder;
    private final FlowableProcessor<E> flowableProcessor;
    private final AtomicLong requestedAmount = new AtomicLong();
    private volatile Map.Entry<Address, IntSet> currentTarget;
    private volatile boolean cancelled;
    private volatile boolean alreadyCreated;

    private InnerPublisherSubscription(InnerPublisherSubscriptionBuilder<K, I, R> builder, FlowableProcessor<E> flowableProcessor, Map.Entry<Address, IntSet> firstTarget) {
        this.builder = builder;
        this.flowableProcessor = flowableProcessor;
        this.currentTarget = firstTarget;
    }

    @Override
    public void run() {
        Map.Entry<Address, IntSet> target;
        this.cancelled = true;
        if (this.alreadyCreated && (target = this.currentTarget) != null) {
            this.builder.parent.sendCancelCommand(target.getKey());
        }
    }

    @Override
    public void accept(long count) {
        if (this.shouldSubmit(count)) {
            CompletionStage<PublisherResponse> stage;
            if (this.checkCancelled()) {
                return;
            }
            Map.Entry<Address, IntSet> target = this.currentTarget;
            if (target == null) {
                this.alreadyCreated = false;
                target = this.builder.supplier.get();
                if (target == null) {
                    if (log.isTraceEnabled()) {
                        log.tracef("Completing processor %s", (Object)this.flowableProcessor);
                    }
                    this.flowableProcessor.onComplete();
                    return;
                }
                this.currentTarget = target;
            }
            ClusterPublisherManagerImpl.SubscriberHandler parent = this.builder.parent;
            Address address = target.getKey();
            IntSet segments = target.getValue();
            try {
                if (this.alreadyCreated) {
                    stage = parent.sendNextCommand(address, this.builder.topologyId);
                } else {
                    this.alreadyCreated = true;
                    stage = parent.sendInitialCommand(address, segments, this.builder.batchSize, this.builder.excludedKeys.remove(address), this.builder.topologyId);
                }
            }
            catch (Throwable t3) {
                this.handleThrowableInResponse(t3, address, segments);
                return;
            }
            stage.whenComplete((values, t2) -> {
                if (t2 != null) {
                    this.handleThrowableInResponse(CompletableFutures.extractException(t2), address, segments);
                    return;
                }
                try {
                    KeyPublisherResponse kpr;
                    int extraSize;
                    boolean complete;
                    IntSet lostSegments;
                    IntSet completedSegments;
                    if (log.isTraceEnabled()) {
                        log.tracef("Received %s for id %s from %s", values, (Object)parent.requestId, (Object)address);
                    }
                    if ((completedSegments = values.getCompletedSegments()) != null) {
                        if (log.isTraceEnabled()) {
                            log.tracef("Completed segments %s for id %s from %s", (Object)completedSegments, (Object)parent.requestId, (Object)address);
                        }
                        completedSegments.forEach(parent::completeSegment);
                        completedSegments.forEach(segments::remove);
                    }
                    if ((lostSegments = values.getLostSegments()) != null) {
                        if (log.isTraceEnabled()) {
                            log.tracef("Lost segments %s for id %s from %s", (Object)completedSegments, (Object)parent.requestId, (Object)address);
                        }
                        lostSegments.forEach(segments::remove);
                    }
                    if (complete = values.isComplete()) {
                        this.currentTarget = null;
                    } else {
                        values.keysForNonCompletedSegments(parent);
                    }
                    Object[] valueArray = values.getResults();
                    if (values instanceof KeyPublisherResponse && (extraSize = (kpr = (KeyPublisherResponse)values).getExtraSize()) > 0) {
                        int arrayLength = valueArray.length;
                        Object[] newArray = new Object[arrayLength + extraSize];
                        System.arraycopy(valueArray, 0, newArray, 0, arrayLength);
                        System.arraycopy(kpr.getExtraObjects(), 0, newArray, arrayLength, extraSize);
                        valueArray = newArray;
                    }
                    int pos = 0;
                    for (PublisherHandler.SegmentResult segmentResult : values.getSegmentResults()) {
                        if (this.checkCancelled()) {
                            return;
                        }
                        int segment = segmentResult.getSegment();
                        for (int i = 0; i < segmentResult.getEntryCount(); ++i) {
                            Object value = valueArray[pos++];
                            this.doOnValue(value, segment);
                        }
                        if (completedSegments == null || !completedSegments.remove(segment)) continue;
                        this.doOnSegmentComplete(segment);
                    }
                    if (completedSegments != null) {
                        completedSegments.forEach(this::doOnSegmentComplete);
                    }
                    this.accept(-pos);
                }
                catch (Throwable innerT) {
                    this.handleThrowableInResponse(innerT, address, segments);
                }
            });
        }
    }

    protected void doOnValue(R value, int segment) {
    }

    protected void doOnSegmentComplete(int segment) {
    }

    private boolean shouldSubmit(long count) {
        long newValue;
        long prev;
        while (!this.requestedAmount.compareAndSet(prev = this.requestedAmount.get(), newValue = prev + count)) {
        }
        return newValue > 0L && (prev <= 0L || count <= 0L);
    }

    private void handleThrowableInResponse(Throwable t2, Address address, IntSet segments) {
        if (this.cancelled) {
            log.tracef("Encountered exception after subscription was cancelled, this can most likely ignored, message is %s", (Object)t2.getMessage());
        } else if (this.builder.parent.handleThrowable(t2, address, segments)) {
            this.currentTarget = null;
            this.accept(0L);
        } else {
            this.flowableProcessor.onError(t2);
        }
    }

    private boolean checkCancelled() {
        if (this.cancelled) {
            if (log.isTraceEnabled()) {
                log.tracef("Subscription %s was cancelled, terminating early", (Object)this);
            }
            return true;
        }
        return false;
    }

    public static class InnerPublisherSubscriptionBuilder<K, I, R> {
        private final ClusterPublisherManagerImpl.SubscriberHandler<I, R> parent;
        private final int batchSize;
        private final Supplier<Map.Entry<Address, IntSet>> supplier;
        private final Map<Address, Set<K>> excludedKeys;
        private final int topologyId;

        public InnerPublisherSubscriptionBuilder(ClusterPublisherManagerImpl.SubscriberHandler<I, R> parent, int batchSize, Supplier<Map.Entry<Address, IntSet>> supplier, Map<Address, Set<K>> excludedKeys, int topologyId) {
            this.parent = parent;
            this.batchSize = batchSize;
            this.supplier = supplier;
            this.excludedKeys = excludedKeys;
            this.topologyId = topologyId;
        }

        Publisher<R> createValuePublisher(Map.Entry<Address, IntSet> firstTarget) {
            final UnicastProcessor unicastProcessor = UnicastProcessor.create(this.batchSize);
            InnerPublisherSubscription innerPublisherSubscription = new InnerPublisherSubscription<K, I, R, R>(this, unicastProcessor, firstTarget){

                @Override
                protected void doOnValue(R value, int segment) {
                    unicastProcessor.onNext(value);
                }
            };
            return unicastProcessor.doOnLifecycle(RxJavaInterop.emptyConsumer(), innerPublisherSubscription, innerPublisherSubscription);
        }

        Publisher<SegmentPublisherSupplier.Notification<R>> createNotificationPublisher(Map.Entry<Address, IntSet> firstTarget) {
            final UnicastProcessor unicastProcessor = UnicastProcessor.create(this.batchSize);
            InnerPublisherSubscription innerPublisherSubscription = new InnerPublisherSubscription<K, I, R, SegmentPublisherSupplier.Notification<R>>(this, unicastProcessor, firstTarget){

                @Override
                protected void doOnValue(R value, int segment) {
                    unicastProcessor.onNext(Notifications.value(value, segment));
                }

                @Override
                protected void doOnSegmentComplete(int segment) {
                    unicastProcessor.onNext(Notifications.segmentComplete(segment));
                }
            };
            return unicastProcessor.doOnLifecycle(RxJavaInterop.emptyConsumer(), innerPublisherSubscription, innerPublisherSubscription);
        }
    }
}

