/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod.impl.iteration;

import io.netty.channel.Channel;
import java.lang.invoke.MethodHandles;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.infinispan.client.hotrod.exceptions.RemoteIllegalLifecycleStateException;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.impl.iteration.RemotePublisher;
import org.infinispan.client.hotrod.impl.operations.IterationNextResponse;
import org.infinispan.client.hotrod.impl.operations.IterationStartResponse;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.commons.reactive.AbstractAsyncPublisherHandler;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.logging.TraceException;

class RemoteInnerPublisherHandler<K, E>
extends AbstractAsyncPublisherHandler<Map.Entry<SocketAddress, IntSet>, Map.Entry<K, E>, IterationStartResponse, IterationNextResponse<K, E>> {
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    protected final RemotePublisher<K, E> publisher;
    protected volatile Channel channel;
    private volatile byte[] iterationId;
    private AtomicBoolean cancelled = new AtomicBoolean();

    protected RemoteInnerPublisherHandler(RemotePublisher<K, E> parent, int batchSize, Supplier<Map.Entry<SocketAddress, IntSet>> supplier, Map.Entry<SocketAddress, IntSet> firstTarget) {
        super(batchSize, supplier, firstTarget);
        this.publisher = parent;
    }

    private String iterationId() {
        return this.publisher.iterationId(this.iterationId);
    }

    @Override
    protected void sendCancel(Map.Entry<SocketAddress, IntSet> target) {
        if (!this.cancelled.getAndSet(true)) {
            this.actualCancel();
        }
    }

    private void actualCancel() {
        if (this.iterationId != null && this.channel != null) {
            this.publisher.sendCancel(this.iterationId, this.channel);
        }
    }

    @Override
    protected CompletionStage<IterationStartResponse> sendInitialCommand(Map.Entry<SocketAddress, IntSet> target, int batchSize) {
        SocketAddress address = target.getKey();
        IntSet segments = target.getValue();
        log.tracef("Starting iteration with segments %s", (Object)segments);
        return this.publisher.newIteratorStartOperation(address, segments, batchSize);
    }

    @Override
    protected CompletionStage<IterationNextResponse<K, E>> sendNextCommand(Map.Entry<SocketAddress, IntSet> target, int batchSize) {
        return this.publisher.newIteratorNextOperation(this.iterationId, this.channel);
    }

    @Override
    protected long handleInitialResponse(IterationStartResponse startResponse, Map.Entry<SocketAddress, IntSet> target) {
        this.channel = startResponse.getChannel();
        this.iterationId = startResponse.getIterationId();
        if (log.isDebugEnabled()) {
            log.iterationTransportObtained(this.channel.remoteAddress(), this.iterationId());
            log.startedIteration(this.iterationId());
        }
        if (this.cancelled.get()) {
            this.actualCancel();
        }
        return 0L;
    }

    @Override
    protected long handleNextResponse(IterationNextResponse<K, E> nextResponse, Map.Entry<SocketAddress, IntSet> target) {
        IntSet targetSegments;
        IntSet completedSegments;
        if (!nextResponse.hasMore()) {
            this.sendCancel(target);
            this.publisher.completeSegments(target.getValue());
            this.targetComplete();
        }
        if ((completedSegments = nextResponse.getCompletedSegments()) != null && log.isTraceEnabled() && (targetSegments = target.getValue()) != null) {
            targetSegments.removeAll(completedSegments);
        }
        this.publisher.completeSegments(completedSegments);
        List<Map.Entry<K, E>> entries = nextResponse.getEntries();
        for (Map.Entry<K, E> entry : entries) {
            if (!this.onNext(entry)) break;
        }
        return entries.size();
    }

    @Override
    protected void handleThrowableInResponse(Throwable t2, Map.Entry<SocketAddress, IntSet> target) {
        if (t2 instanceof TransportException || t2 instanceof RemoteIllegalLifecycleStateException || t2 instanceof ConnectException) {
            IntSet targetSegments;
            log.throwableDuringPublisher(t2);
            if (log.isTraceEnabled() && (targetSegments = target.getValue()) != null) {
                log.tracef("There are still outstanding segments %s that will need to be retried", (Object)targetSegments);
            }
            this.publisher.erroredServer(target.getKey());
            this.targetComplete();
            this.accept(0L);
        } else {
            t2.addSuppressed(new TraceException());
            super.handleThrowableInResponse(t2, target);
        }
    }
}

