/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.statetransfer.StateResponseCommand;
import org.infinispan.commons.IllegalLifecycleStateException;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.impl.SingleResponseCollector;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.statetransfer.StateChunk;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class OutboundTransferTask {
    private static final Log log = LogFactory.getLog(OutboundTransferTask.class);
    private final Consumer<Collection<StateChunk>> onChunkReplicated;
    private final int topologyId;
    private final Address destination;
    private final IntSet segments;
    private final int chunkSize;
    private final RpcManager rpcManager;
    private final CommandsFactory commandsFactory;
    private final long timeout;
    private final String cacheName;
    private final boolean applyState;
    private final boolean pushTransfer;
    private final RpcOptions rpcOptions;
    private volatile boolean cancelled;

    public OutboundTransferTask(Address destination, IntSet segments, int segmentCount, int chunkSize, int topologyId, Consumer<Collection<StateChunk>> onChunkReplicated, RpcManager rpcManager, CommandsFactory commandsFactory, long timeout, String cacheName, boolean applyState, boolean pushTransfer) {
        if (segments == null || segments.isEmpty()) {
            throw new IllegalArgumentException("Segments must not be null or empty");
        }
        if (destination == null) {
            throw new IllegalArgumentException("Destination address cannot be null");
        }
        if (chunkSize <= 0) {
            throw new IllegalArgumentException("chunkSize must be greater than 0");
        }
        this.onChunkReplicated = onChunkReplicated;
        this.destination = destination;
        this.segments = IntSets.concurrentCopyFrom(segments, segmentCount);
        this.chunkSize = chunkSize;
        this.topologyId = topologyId;
        this.rpcManager = rpcManager;
        this.commandsFactory = commandsFactory;
        this.timeout = timeout;
        this.cacheName = cacheName;
        this.applyState = applyState;
        this.pushTransfer = pushTransfer;
        this.rpcOptions = new RpcOptions(DeliverOrder.NONE, timeout, TimeUnit.MILLISECONDS);
    }

    public Address getDestination() {
        return this.destination;
    }

    public IntSet getSegments() {
        return this.segments;
    }

    public int getTopologyId() {
        return this.topologyId;
    }

    public CompletionStage<Void> execute(Flowable<SegmentPublisherSupplier.Notification<InternalCacheEntry<?, ?>>> notifications) {
        return notifications.buffer(this.chunkSize).takeUntil(batch -> this.cancelled).concatMapCompletable(batch -> {
            HashMap<Integer, StateChunk> chunks = new HashMap<Integer, StateChunk>();
            for (SegmentPublisherSupplier.Notification notification : batch) {
                if (notification.isValue()) {
                    StateChunk chunk = chunks.computeIfAbsent(notification.valueSegment(), segment -> new StateChunk((int)segment, new ArrayList(), false));
                    chunk.getCacheEntries().add((InternalCacheEntry)notification.value());
                }
                if (!notification.isSegmentComplete()) continue;
                int segment2 = notification.completedSegment();
                chunks.compute(segment2, (s2, previous) -> previous == null ? new StateChunk((int)s2, (Collection<InternalCacheEntry<?, ?>>)Collections.emptyList(), true) : new StateChunk(segment2, previous.getCacheEntries(), true));
            }
            return Completable.fromCompletionStage(this.sendChunks(chunks));
        }, 1).toCompletionStage(null);
    }

    private CompletionStage<Void> sendChunks(Map<Integer, StateChunk> chunks) {
        if (chunks.isEmpty()) {
            return CompletableFutures.completedNull();
        }
        if (log.isTraceEnabled()) {
            long entriesSize = chunks.values().stream().mapToInt(v -> v.getCacheEntries().size()).sum();
            log.tracef("Sending to node %s %d cache entries from segments %s", (Object)this.destination, (Object)entriesSize, (Object)chunks.keySet());
        }
        StateResponseCommand cmd = this.commandsFactory.buildStateResponseCommand(this.topologyId, chunks.values(), this.applyState, this.pushTransfer);
        try {
            return this.rpcManager.invokeCommand(this.destination, (ReplicableCommand)cmd, SingleResponseCollector.validOnly(), this.rpcOptions).handle((response, throwable) -> {
                if (throwable == null) {
                    this.onChunkReplicated.accept(chunks.values());
                    return null;
                }
                this.logSendException((Throwable)throwable);
                this.cancel();
                return null;
            });
        }
        catch (IllegalLifecycleStateException e) {
            this.cancel();
        }
        catch (Exception e) {
            this.logSendException(e);
            this.cancel();
        }
        return CompletableFutures.completedNull();
    }

    private void logSendException(Throwable throwable) {
        Throwable t2 = CompletableFutures.extractException(throwable);
        if (t2 instanceof SuspectException) {
            log.debugf("Node %s left cache %s while we were sending state to it, cancelling transfer.", (Object)this.destination, (Object)this.cacheName);
        } else if (this.isCancelled()) {
            log.debugf("Stopping cancelled transfer to node %s, segments %s", (Object)this.destination, (Object)this.segments);
        } else {
            log.errorf(t2, "Failed to send entries to node %s: %s", (Object)this.destination, (Object)t2.getMessage());
        }
    }

    void cancelSegments(IntSet cancelledSegments) {
        if (this.segments.removeAll(cancelledSegments)) {
            if (log.isTraceEnabled()) {
                log.tracef("Cancelling outbound transfer to node %s, segments %s (remaining segments %s)", (Object)this.destination, (Object)cancelledSegments, (Object)this.segments);
            }
            if (this.segments.isEmpty()) {
                this.cancel();
            }
        }
    }

    public void cancel() {
        if (!this.cancelled) {
            log.debugf("Cancelling outbound transfer to node %s, segments %s", (Object)this.destination, (Object)this.segments);
            this.cancelled = true;
        }
    }

    public boolean isCancelled() {
        return this.cancelled;
    }

    public String toString() {
        return "OutboundTransferTask{topologyId=" + this.topologyId + ", destination=" + String.valueOf(this.destination) + ", segments=" + String.valueOf(this.segments) + ", chunkSize=" + this.chunkSize + ", timeout=" + this.timeout + ", cacheName='" + this.cacheName + "'}";
    }
}

