/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.scattered.impl;

import io.reactivex.rxjava3.core.Flowable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PrimitiveIterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import net.jcip.annotations.GuardedBy;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.remote.ClusteredGetAllCommand;
import org.infinispan.commands.statetransfer.ScatteredStateConfirmRevokedCommand;
import org.infinispan.commands.write.InvalidateVersionsCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.container.entries.RemoteMetadata;
import org.infinispan.container.impl.InternalEntryFactory;
import org.infinispan.container.versioning.EntryVersion;
import org.infinispan.container.versioning.SimpleClusteredVersion;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.metadata.Metadata;
import org.infinispan.metadata.impl.InternalMetadataImpl;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.impl.MapResponseCollector;
import org.infinispan.remoting.transport.impl.SingleResponseCollector;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.scattered.ScatteredVersionManager;
import org.infinispan.statetransfer.InboundTransferTask;
import org.infinispan.statetransfer.StateConsumerImpl;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

public class ScatteredStateConsumerImpl
extends StateConsumerImpl {
    private static final Log log = LogFactory.getLog(ScatteredStateConsumerImpl.class);
    protected static final long SKIP_OWNERSHIP_FLAGS = FlagBitSets.SKIP_OWNERSHIP_CHECK;
    @Inject
    protected InternalEntryFactory entryFactory;
    @Inject
    protected ScatteredVersionManager<?> svm;
    @GuardedBy(value="transferMapsLock")
    protected IntSet inboundSegments;
    protected AtomicLong chunkCounter = new AtomicLong();
    protected final ConcurrentMap<Address, BlockingQueue<Object>> retrievedEntries = new ConcurrentHashMap<Address, BlockingQueue<Object>>();
    protected BlockingQueue<InternalCacheEntry<?, ?>> backupQueue;
    protected final ConcurrentMap<Address, BlockingQueue<KeyAndVersion>> invalidations = new ConcurrentHashMap<Address, BlockingQueue<KeyAndVersion>>();
    protected Collection<Address> backupAddress;
    protected Collection<Address> nonBackupAddresses;
    private int chunkSize;

    @Override
    public void start() {
        super.start();
        this.chunkSize = this.configuration.clustering().stateTransfer().chunkSize();
        this.backupQueue = new ArrayBlockingQueue(this.chunkSize);
    }

    @Override
    public CompletionStage<CompletionStage<Void>> onTopologyUpdate(CacheTopology cacheTopology, boolean isRebalance) {
        Address nextMember = this.getNextMember(cacheTopology);
        this.backupAddress = nextMember == null ? Collections.emptySet() : Collections.singleton(nextMember);
        this.nonBackupAddresses = new ArrayList<Address>(cacheTopology.getActualMembers());
        this.nonBackupAddresses.remove(nextMember);
        this.nonBackupAddresses.remove(this.rpcManager.getAddress());
        return super.onTopologyUpdate(cacheTopology, isRebalance);
    }

    @Override
    protected void beforeTopologyInstalled(int topologyId, ConsistentHash previousWriteCh, ConsistentHash newWriteCh) {
        for (int segment = 0; segment < newWriteCh.getNumSegments(); ++segment) {
            if (newWriteCh.isSegmentLocalToNode(this.rpcManager.getAddress(), segment)) continue;
            this.cancelTransfers(IntSets.immutableSet(segment));
            this.svm.unregisterSegment(segment);
        }
        IntSet addedSegments = this.getOwnedSegments(newWriteCh);
        if (previousWriteCh != null && !addedSegments.isEmpty()) {
            addedSegments.removeAll(this.getOwnedSegments(previousWriteCh));
        }
        this.svm.setTopologyId(topologyId);
        if (previousWriteCh == null || !this.isFetchEnabled) {
            log.trace("This is the first topology or state transfer is disabled, not expecting any state transfer.");
            this.svm.setOwnedSegments(addedSegments);
            return;
        }
        if (!addedSegments.isEmpty()) {
            this.svm.setValuesTransferTopology(topologyId);
            PrimitiveIterator.OfInt segmentIterator = addedSegments.iterator();
            while (segmentIterator.hasNext()) {
                this.svm.registerSegment(segmentIterator.nextInt());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected CompletionStage<Void> handleSegments(boolean isRebalance, IntSet addedSegments, IntSet transactionOnlySegments) {
        if (!isRebalance) {
            log.trace("This is not a rebalance, not doing anything...");
            return CompletableFutures.completedNull();
        }
        if (addedSegments.isEmpty()) {
            log.trace("No segments missing");
            return CompletableFutures.completedNull();
        }
        Object object = this.transferMapsLock;
        synchronized (object) {
            this.inboundSegments = IntSets.mutableFrom(addedSegments);
        }
        this.chunkCounter.set(0L);
        if (log.isTraceEnabled()) {
            log.tracef("Revoking all segments, chunk counter reset to 0", new Object[0]);
        }
        ScatteredStateConfirmRevokedCommand command = this.commandsFactory.buildScatteredStateConfirmRevokeCommand(this.cacheTopology.getTopologyId(), addedSegments);
        return this.rpcManager.invokeCommandOnAll(command, MapResponseCollector.ignoreLeavers(), this.rpcManager.getSyncRpcOptions()).handle((responses, throwable) -> {
            if (throwable == null) {
                try {
                    this.svm.startKeyTransfer(addedSegments);
                    this.requestKeyTransfer(addedSegments);
                }
                catch (SuspectException e) {
                    log.tracef("Key transfer source %s was suspected, another source will be selected", (Object)e.getSuspect());
                }
                catch (Throwable t2) {
                    log.failedToRequestSegments(this.cacheName, null, addedSegments, t2);
                }
            } else {
                if (((Cache)this.cache.wired()).getStatus() == ComponentStatus.RUNNING) {
                    log.failedConfirmingRevokedSegments((Throwable)throwable);
                } else {
                    log.debug("Failed confirming revoked segments", (Throwable)throwable);
                }
                PrimitiveIterator.OfInt ofInt = addedSegments.iterator();
                while (ofInt.hasNext()) {
                    int segment = (Integer)ofInt.next();
                    this.svm.notifyKeyTransferFinished(segment, false, false);
                }
                this.notifyEndOfStateTransferIfNeeded();
            }
            return null;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void requestKeyTransfer(IntSet segments) {
        boolean isTransferringKeys = false;
        Object object = this.transferMapsLock;
        synchronized (object) {
            ArrayList<Address> members = new ArrayList<Address>(this.cacheTopology.getActualMembers());
            Collections.shuffle(members);
            for (Address source : members) {
                if (source.equals(this.rpcManager.getAddress())) continue;
                isTransferringKeys = true;
                InboundTransferTask inboundTransfer = new InboundTransferTask(segments, source, this.cacheTopology.getTopologyId(), this.rpcManager, this.commandsFactory, this.configuration.clustering().stateTransfer().timeout(), this.cacheName, true);
                this.addTransfer(inboundTransfer, segments);
                this.stateRequestExecutor.executeAsync(() -> {
                    log.tracef("Requesting keys for segments %s from %s", (Object)inboundTransfer.getSegments(), (Object)inboundTransfer.getSource());
                    return inboundTransfer.requestKeys().whenComplete((nil, e) -> this.onTaskCompletion(inboundTransfer));
                });
            }
        }
        if (!isTransferringKeys) {
            log.trace("No keys in transfer, finishing segments " + String.valueOf(segments));
            object = segments.iterator();
            while (object.hasNext()) {
                int segment = (Integer)object.next();
                this.svm.notifyKeyTransferFinished(segment, false, false);
            }
            this.notifyEndOfStateTransferIfNeeded();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void onTaskCompletion(InboundTransferTask inboundTransfer) {
        IntSet completedSegments = IntSets.immutableEmptySet();
        if (log.isTraceEnabled()) {
            log.tracef("Inbound transfer finished %s: %s", (Object)inboundTransfer, (Object)(inboundTransfer.isCompletedSuccessfully() ? "successfully" : "unsuccessfuly"));
        }
        Object object = this.transferMapsLock;
        synchronized (object) {
            PrimitiveIterator.OfInt iter = inboundTransfer.getSegments().iterator();
            block12: while (iter.hasNext()) {
                int segment = iter.nextInt();
                List transfers = (List)this.transfersBySegment.get(segment);
                if (transfers == null) {
                    log.tracef("Transfers for segment %d have not been found.", segment);
                    continue;
                }
                transfers.remove(inboundTransfer);
                if (!transfers.isEmpty()) continue;
                this.transfersBySegment.remove(segment);
                if (log.isTraceEnabled()) {
                    log.tracef("All transfer tasks for segment %d have completed.", segment);
                }
                this.svm.notifyKeyTransferFinished(segment, inboundTransfer.isCompletedSuccessfully(), inboundTransfer.isCancelled());
                switch (completedSegments.size()) {
                    case 0: {
                        completedSegments = IntSets.immutableSet(segment);
                        continue block12;
                    }
                    case 1: {
                        completedSegments = IntSets.mutableCopyFrom(completedSegments);
                    }
                }
                completedSegments.set(segment);
            }
        }
        if (completedSegments.isEmpty()) {
            log.tracef("Not requesting any values yet because no segments have been completed.", new Object[0]);
        } else if (inboundTransfer.isCompletedSuccessfully()) {
            log.tracef("Requesting values from segments %s, for in-memory keys", (Object)completedSegments);
            this.dataContainer.forEach(completedSegments, ice -> {
                if (ice.getMetadata() instanceof RemoteMetadata) {
                    Address backup = ((RemoteMetadata)ice.getMetadata()).getAddress();
                    this.retrieveEntry(ice.getKey(), backup);
                    for (Address member : this.cacheTopology.getActualMembers()) {
                        if (member.equals(backup)) continue;
                        this.invalidate(ice.getKey(), ice.getMetadata().version(), member);
                    }
                } else {
                    this.backupEntry((InternalCacheEntry<?, ?>)ice);
                    for (Address member : this.nonBackupAddresses) {
                        this.invalidate(ice.getKey(), ice.getMetadata().version(), member);
                    }
                }
            });
            Publisher persistencePublisher = this.persistenceManager.publishEntries(completedSegments, k -> this.dataContainer.peek(k) == null, true, true, PersistenceManager.AccessMode.PRIVATE);
            try {
                ScatteredStateConsumerImpl.blockingSubscribe(Flowable.fromPublisher(persistencePublisher).doOnNext(me -> {
                    try {
                        Metadata metadata = me.getMetadata();
                        if (metadata instanceof RemoteMetadata) {
                            Address backup = ((RemoteMetadata)metadata).getAddress();
                            this.retrieveEntry(me.getKey(), backup);
                            for (Address member : this.cacheTopology.getActualMembers()) {
                                if (member.equals(backup)) continue;
                                this.invalidate(me.getKey(), metadata.version(), member);
                            }
                        } else {
                            this.backupEntry(this.entryFactory.create(me.getKey(), me.getValue(), me.getMetadata()));
                            for (Address member : this.nonBackupAddresses) {
                                this.invalidate(me.getKey(), metadata.version(), member);
                            }
                        }
                    }
                    catch (CacheException e) {
                        log.failedLoadingValueFromCacheStore(me.getKey(), e);
                    }
                }));
            }
            catch (CacheException e) {
                Log.PERSISTENCE.failedLoadingKeysFromCacheStore(e);
            }
        }
        boolean lastTransfer = false;
        Iterator e = this.transferMapsLock;
        synchronized (e) {
            this.inboundSegments.removeAll(completedSegments);
            log.tracef("Unfinished inbound segments: " + String.valueOf(this.inboundSegments), new Object[0]);
            if (this.inboundSegments.isEmpty()) {
                lastTransfer = true;
            }
        }
        if (lastTransfer) {
            for (Map.Entry pair : this.retrievedEntries.entrySet()) {
                BlockingQueue queue = (BlockingQueue)pair.getValue();
                ArrayList<Object> keys = new ArrayList<Object>(queue.size());
                queue.drainTo(keys);
                if (keys.isEmpty()) continue;
                this.getValuesAndApply((Address)pair.getKey(), keys);
            }
            ArrayList entries = new ArrayList(this.backupQueue.size());
            this.backupQueue.drainTo(entries);
            if (!entries.isEmpty()) {
                this.backupEntries(entries);
            }
            for (Map.Entry pair : this.invalidations.entrySet()) {
                BlockingQueue queue = (BlockingQueue)pair.getValue();
                ArrayList<KeyAndVersion> list = new ArrayList<KeyAndVersion>(queue.size());
                queue.drainTo(list);
                if (list.isEmpty()) continue;
                this.invalidate(list, (Address)pair.getKey());
            }
        }
        this.removeTransfer(inboundTransfer);
        if (lastTransfer) {
            if (log.isTraceEnabled()) {
                log.tracef("Inbound transfer removed, chunk counter is %s", this.chunkCounter.get());
            }
            if (this.chunkCounter.get() == 0L) {
                this.notifyEndOfStateTransferIfNeeded();
            }
        }
    }

    @Override
    protected void onCompletedSegment(int segmentId, InboundTransferTask inboundTransfer) {
    }

    private static void blockingSubscribe(Flowable<?> flowable) {
        flowable.blockingSubscribe();
    }

    private <T> List<T> offerAndDrain(BlockingQueue<T> queue, T element) {
        ArrayList<T> list = null;
        if (queue.offer(element)) {
            if (queue.size() >= this.chunkSize) {
                list = new ArrayList(this.chunkSize);
                queue.drainTo(list, this.chunkSize);
            }
        } else {
            list = new ArrayList<T>(this.chunkSize);
            list.add(element);
            queue.drainTo(list, this.chunkSize - 1);
        }
        return list;
    }

    private void invalidate(Object key, EntryVersion version, Address member) {
        BlockingQueue queue = this.invalidations.computeIfAbsent(member, m4 -> new ArrayBlockingQueue(this.chunkSize));
        List<KeyAndVersion> list = this.offerAndDrain(queue, new KeyAndVersion(key, version));
        if (list != null && !list.isEmpty()) {
            this.invalidate(list, member);
        }
    }

    private void invalidate(List<KeyAndVersion> list, Address member) {
        Object[] keys = new Object[list.size()];
        int[] topologyIds = new int[list.size()];
        long[] versions = new long[list.size()];
        int i = 0;
        for (KeyAndVersion pair : list) {
            keys[i] = pair.key;
            SimpleClusteredVersion version = (SimpleClusteredVersion)pair.version;
            topologyIds[i] = version.getTopologyId();
            versions[i] = version.getVersion();
            ++i;
        }
        long incrementedCounter = this.chunkCounter.incrementAndGet();
        if (log.isTraceEnabled()) {
            log.tracef("Invalidating versions on %s, chunk counter incremented to %d", (Object)member, (Object)incrementedCounter);
        }
        InvalidateVersionsCommand ivc = this.commandsFactory.buildInvalidateVersionsCommand(this.cacheTopology.getTopologyId(), keys, topologyIds, versions, true);
        this.rpcManager.invokeCommand(member, (ReplicableCommand)ivc, SingleResponseCollector.validOnly(), this.rpcManager.getSyncRpcOptions()).whenComplete((response, t2) -> {
            if (t2 != null) {
                log.failedInvalidatingRemoteCache((Throwable)t2);
            }
            long decrementedCounter = this.chunkCounter.decrementAndGet();
            if (log.isTraceEnabled()) {
                log.tracef("Versions invalidated on %s, chunk counter decremented to %d", (Object)member, (Object)decrementedCounter);
            }
            if (decrementedCounter == 0L) {
                this.notifyEndOfStateTransferIfNeeded();
            }
        });
    }

    private void backupEntry(InternalCacheEntry<?, ?> entry) {
        List<InternalCacheEntry<?, ?>> entries = this.offerAndDrain(this.backupQueue, entry);
        if (entries != null && !entries.isEmpty()) {
            this.backupEntries(entries);
        }
    }

    private void backupEntries(List<InternalCacheEntry<?, ?>> entries) {
        long incrementedCounter = this.chunkCounter.incrementAndGet();
        if (log.isTraceEnabled()) {
            log.tracef("Backing up entries, chunk counter is %d", incrementedCounter);
        }
        HashMap map = new HashMap();
        for (InternalCacheEntry<?, ?> entry : entries) {
            map.put(entry.getKey(), entry.toInternalCacheValue());
        }
        PutMapCommand putMapCommand = this.commandsFactory.buildPutMapCommand(map, null, STATE_TRANSFER_FLAGS);
        putMapCommand.setTopologyId(this.rpcManager.getTopologyId());
        this.rpcManager.invokeCommand(this.backupAddress, (ReplicableCommand)putMapCommand, SingleResponseCollector.validOnly(), this.rpcManager.getSyncRpcOptions()).whenComplete((response, throwable) -> {
            try {
                if (throwable != null) {
                    log.failedOutBoundTransferExecution((Throwable)throwable);
                }
            }
            finally {
                long decrementedCounter = this.chunkCounter.decrementAndGet();
                if (log.isTraceEnabled()) {
                    log.tracef("Backed up entries, chunk counter is %d", decrementedCounter);
                }
                if (decrementedCounter == 0L) {
                    this.notifyEndOfStateTransferIfNeeded();
                }
            }
        });
    }

    private void retrieveEntry(Object key, Address address) {
        BlockingQueue queue = this.retrievedEntries.computeIfAbsent(address, k -> new ArrayBlockingQueue(this.chunkSize));
        List<Object> keys = this.offerAndDrain(queue, key);
        if (keys != null && !keys.isEmpty()) {
            this.getValuesAndApply(address, keys);
        }
    }

    private void getValuesAndApply(Address address, List<Object> keys) {
        long incrementedCounter = this.chunkCounter.incrementAndGet();
        if (log.isTraceEnabled()) {
            log.tracef("Retrieving values, chunk counter is %d", incrementedCounter);
        }
        ClusteredGetAllCommand command = this.commandsFactory.buildClusteredGetAllCommand(keys, SKIP_OWNERSHIP_FLAGS, null);
        command.setTopologyId(this.rpcManager.getTopologyId());
        this.rpcManager.invokeCommand(address, (ReplicableCommand)command, SingleResponseCollector.validOnly(), this.rpcManager.getSyncRpcOptions()).whenComplete((response, throwable) -> {
            try {
                if (throwable != null) {
                    throw Log.CONTAINER.exceptionProcessingEntryRetrievalValues((Throwable)throwable);
                }
                this.applyValues(address, keys, (Response)response);
            }
            catch (Throwable t2) {
                log.failedProcessingValuesDuringRebalance(t2);
                throw t2;
            }
            finally {
                long decrementedCounter = this.chunkCounter.decrementAndGet();
                if (log.isTraceEnabled()) {
                    log.tracef("Applied values, chunk counter is %d", decrementedCounter);
                }
                if (decrementedCounter == 0L) {
                    this.notifyEndOfStateTransferIfNeeded();
                }
            }
        });
    }

    private void applyValues(Address address, List<Object> keys, Response response) {
        if (response == null) {
            throw new CacheException("Did not get response from " + String.valueOf(address));
        }
        if (!response.isSuccessful()) {
            throw new CacheException("Response from " + String.valueOf(address) + " is unsuccessful: " + String.valueOf(response));
        }
        InternalCacheValue[] values = (InternalCacheValue[])((SuccessfulResponse)response).getResponseValue();
        if (values == null) {
            throw new IllegalStateException();
        }
        for (int i = 0; i < keys.size(); ++i) {
            Object key = keys.get(i);
            InternalCacheValue icv = values[i];
            if (icv == null) continue;
            InternalMetadataImpl metadata = new InternalMetadataImpl(icv);
            PutKeyValueCommand put = this.commandsFactory.buildPutKeyValueCommand(key, icv.getValue(), this.keyPartitioner.getSegment(key), metadata, STATE_TRANSFER_FLAGS);
            try {
                this.interceptorChain.invoke(this.icf.createSingleKeyNonTxInvocationContext(), put);
                continue;
            }
            catch (Exception e) {
                if (!((Cache)this.cache.wired()).getStatus().allowInvocations()) {
                    log.debugf("Cache %s is shutting down, stopping state transfer", (Object)this.cacheName);
                    break;
                }
                log.problemApplyingStateForKey(key, e);
            }
        }
    }

    @Override
    public void stopApplyingState(int topologyId) {
        this.svm.notifyValueTransferFinished();
        super.stopApplyingState(topologyId);
    }

    @Override
    protected CompletionStage<Void> removeStaleData(IntSet removedSegments) {
        return CompletableFutures.completedNull();
    }

    private Address getNextMember(CacheTopology cacheTopology) {
        Address myAddress = this.rpcManager.getAddress();
        List<Address> members = cacheTopology.getActualMembers();
        if (members.size() <= 1) {
            return null;
        }
        Iterator<Address> it = members.iterator();
        while (it.hasNext()) {
            Address member = it.next();
            if (!member.equals(myAddress)) continue;
            if (it.hasNext()) {
                return it.next();
            }
            return members.get(0);
        }
        return null;
    }

    protected static class KeyAndVersion {
        public final Object key;
        public final EntryVersion version;

        public KeyAndVersion(Object key, EntryVersion version) {
            this.key = key;
            this.version = version;
        }
    }
}

