/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.topology;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import net.jcip.annotations.GuardedBy;
import org.infinispan.commands.topology.CacheShutdownCommand;
import org.infinispan.commands.topology.CacheStatusRequestCommand;
import org.infinispan.commands.topology.RebalanceStartCommand;
import org.infinispan.commands.topology.RebalanceStatusRequestCommand;
import org.infinispan.commands.topology.TopologyUpdateCommand;
import org.infinispan.commands.topology.TopologyUpdateStableCommand;
import org.infinispan.commons.IllegalLifecycleStateException;
import org.infinispan.commons.time.TimeService;
import org.infinispan.commons.util.InfinispanCollections;
import org.infinispan.commons.util.ProcessorInfo;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.configuration.ConfigurationManager;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.executors.LimitedExecutor;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.globalstate.GlobalStateManager;
import org.infinispan.globalstate.ScopedPersistentState;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.infinispan.partitionhandling.AvailabilityMode;
import org.infinispan.partitionhandling.PartitionHandling;
import org.infinispan.partitionhandling.impl.AvailabilityStrategy;
import org.infinispan.partitionhandling.impl.LostDataCheck;
import org.infinispan.partitionhandling.impl.PreferAvailabilityStrategy;
import org.infinispan.partitionhandling.impl.PreferConsistencyStrategy;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.responses.ValidResponse;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollectors;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.ValidResponseCollector;
import org.infinispan.remoting.transport.impl.VoidResponseCollector;
import org.infinispan.statetransfer.RebalanceType;
import org.infinispan.topology.CacheJoinInfo;
import org.infinispan.topology.CacheStatusResponse;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.ClusterCacheStatus;
import org.infinispan.topology.ClusterTopologyManager;
import org.infinispan.topology.EventLoggerViewListener;
import org.infinispan.topology.HeartBeatCommand;
import org.infinispan.topology.ManagerStatusResponse;
import org.infinispan.topology.PersistentUUIDManager;
import org.infinispan.topology.RebalancingStatus;
import org.infinispan.topology.TopologyManagementHelper;
import org.infinispan.util.concurrent.ActionSequencer;
import org.infinispan.util.concurrent.AggregateCompletionStage;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.concurrent.ConditionFuture;
import org.infinispan.util.concurrent.TimeoutException;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.util.logging.events.EventLogManager;

@Scope(value=Scopes.GLOBAL)
public class ClusterTopologyManagerImpl
implements ClusterTopologyManager {
    public static final int INITIAL_CONNECTION_ATTEMPTS = 10;
    public static final int CLUSTER_RECOVERY_ATTEMPTS = 10;
    private static final Log log = LogFactory.getLog(ClusterTopologyManagerImpl.class);
    private static final CompletableFuture<CacheStatusResponseCollector> SKIP_RECOVERY_FUTURE = CompletableFutures.completedExceptionFuture(new IllegalStateException());
    @Inject
    Transport transport;
    @Inject
    GlobalConfiguration globalConfiguration;
    @Inject
    ConfigurationManager configurationManager;
    @Inject
    GlobalComponentRegistry gcr;
    @Inject
    CacheManagerNotifier cacheManagerNotifier;
    @Inject
    EmbeddedCacheManager cacheManager;
    @Inject
    @ComponentName(value="org.infinispan.executors.non-blocking")
    ExecutorService nonBlockingExecutor;
    @Inject
    @ComponentName(value="org.infinispan.executors.timeout")
    ScheduledExecutorService timeoutScheduledExecutor;
    @Inject
    EventLogManager eventLogManager;
    @Inject
    PersistentUUIDManager persistentUUIDManager;
    @Inject
    TimeService timeService;
    private TopologyManagementHelper helper;
    private ConditionFuture<ClusterTopologyManagerImpl> joinViewFuture;
    private ActionSequencer actionSequencer;
    private final Lock updateLock = new ReentrantLock();
    @GuardedBy(value="updateLock")
    private int viewId = -1;
    @GuardedBy(value="updateLock")
    private ClusterTopologyManager.ClusterManagerStatus clusterManagerStatus = ClusterTopologyManager.ClusterManagerStatus.INITIALIZING;
    @GuardedBy(value="updateLock")
    private final ConcurrentMap<String, ClusterCacheStatus> cacheStatusMap = new ConcurrentHashMap<String, ClusterCacheStatus>();
    private AtomicInteger recoveryAttemptCount = new AtomicInteger();
    private boolean globalRebalancingEnabled = true;
    private EventLoggerViewListener viewListener;

    @Start(priority=100)
    public void start() {
        this.helper = new TopologyManagementHelper(this.gcr);
        this.joinViewFuture = new ConditionFuture(this.timeoutScheduledExecutor);
        this.actionSequencer = new ActionSequencer(this.nonBlockingExecutor, true, this.timeService);
        this.viewListener = new EventLoggerViewListener(this.eventLogManager, e -> this.handleClusterView(e.isMergeView(), e.getViewId()));
        this.cacheManagerNotifier.addListener(this.viewListener);
        this.handleClusterView(false, this.transport.getViewId());
        this.globalRebalancingEnabled = CompletionStages.join(this.fetchRebalancingStatusFromCoordinator(10));
    }

    private CompletionStage<Boolean> fetchRebalancingStatusFromCoordinator(int attempts) {
        if (this.transport.isCoordinator()) {
            return CompletableFutures.completedTrue();
        }
        RebalanceStatusRequestCommand command = new RebalanceStatusRequestCommand();
        Address coordinator = this.transport.getCoordinator();
        return this.helper.executeOnCoordinator(this.transport, command, this.getGlobalTimeout() / 10).handle((rebalancingStatus, throwable) -> {
            if (throwable == null) {
                return CompletableFuture.completedFuture(rebalancingStatus != RebalancingStatus.SUSPENDED);
            }
            if (attempts == 1 || !(throwable instanceof TimeoutException)) {
                log.errorReadingRebalancingStatus(coordinator, (Throwable)throwable);
                return CompletableFutures.completedTrue();
            }
            log.debug("Timed out waiting for rebalancing status from coordinator, trying again");
            return this.fetchRebalancingStatusFromCoordinator(attempts - 1);
        }).thenCompose(Function.identity());
    }

    @Stop(priority=100)
    public void stop() {
        this.acquireUpdateLock();
        try {
            this.clusterManagerStatus = ClusterTopologyManager.ClusterManagerStatus.STOPPING;
            this.joinViewFuture.stop();
        }
        finally {
            this.releaseUpdateLock();
        }
        this.cacheManagerNotifier.removeListener(this.viewListener);
    }

    private void acquireUpdateLock() {
        this.updateLock.lock();
    }

    private void releaseUpdateLock() {
        this.updateLock.unlock();
    }

    @Override
    public ClusterTopologyManager.ClusterManagerStatus getStatus() {
        return this.clusterManagerStatus;
    }

    @Override
    public List<Address> currentJoiners(String cacheName) {
        if (!this.getStatus().isCoordinator()) {
            return null;
        }
        ClusterCacheStatus status = (ClusterCacheStatus)this.cacheStatusMap.get(cacheName);
        return status != null ? status.getExpectedMembers() : null;
    }

    @Override
    public CompletionStage<CacheStatusResponse> handleJoin(String cacheName, Address joiner, CacheJoinInfo joinInfo, int joinerViewId) {
        CompletionStage<Void> viewStage;
        if (this.canHandleJoin(joinerViewId)) {
            viewStage = CompletableFutures.completedNull();
        } else {
            if (log.isTraceEnabled()) {
                log.tracef("Delaying join request from %s until view %s is installed (and cluster status is recovered)", (Object)joiner, (Object)joinerViewId);
            }
            viewStage = this.joinViewFuture.newConditionStage(ctmi -> ctmi.canHandleJoin(joinerViewId), () -> Log.CLUSTER.coordinatorTimeoutWaitingForView(joinerViewId, this.viewId, (Object)this.clusterManagerStatus), joinInfo.getTimeout(), TimeUnit.MILLISECONDS);
        }
        return viewStage.thenCompose(v -> {
            ClusterCacheStatus cacheStatus = this.prepareJoin(cacheName, joiner, joinInfo, joinerViewId);
            if (cacheStatus == null) {
                return CompletableFutures.completedNull();
            }
            return cacheStatus.nodeCanJoinFuture(joinInfo).thenApply(ignored -> cacheStatus.doJoin(joiner, joinInfo));
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ClusterCacheStatus prepareJoin(String cacheName, Address joiner, CacheJoinInfo joinInfo, int joinerViewId) {
        this.acquireUpdateLock();
        try {
            if (!this.clusterManagerStatus.isRunning()) {
                log.debugf("Ignoring join request from %s for cache %s, the local cache manager is shutting down", (Object)joiner, (Object)cacheName);
                throw new IllegalLifecycleStateException();
            }
            if (joinerViewId < this.viewId) {
                log.debugf("Ignoring join request from %s for cache %s, joiner's view id is too old: %d", (Object)joiner, (Object)cacheName, (Object)joinerViewId);
                ClusterCacheStatus clusterCacheStatus = null;
                return clusterCacheStatus;
            }
            ClusterCacheStatus clusterCacheStatus = this.initCacheStatusIfAbsent(cacheName, joinInfo.getCacheMode());
            return clusterCacheStatus;
        }
        finally {
            this.releaseUpdateLock();
        }
    }

    private boolean canHandleJoin(int joinerViewId) {
        this.acquireUpdateLock();
        try {
            boolean bl = joinerViewId <= this.viewId && this.clusterManagerStatus != ClusterTopologyManager.ClusterManagerStatus.RECOVERING_CLUSTER && this.clusterManagerStatus != ClusterTopologyManager.ClusterManagerStatus.INITIALIZING;
            return bl;
        }
        finally {
            this.releaseUpdateLock();
        }
    }

    @Override
    public CompletionStage<Void> handleLeave(String cacheName, Address leaver, int viewId) throws Exception {
        if (!this.clusterManagerStatus.isRunning()) {
            log.debugf("Ignoring leave request from %s for cache %s, the local cache manager is shutting down", (Object)leaver, (Object)cacheName);
            return CompletableFutures.completedNull();
        }
        ClusterCacheStatus cacheStatus = (ClusterCacheStatus)this.cacheStatusMap.get(cacheName);
        if (cacheStatus == null) {
            log.tracef("Ignoring leave request from %s for cache %s because it doesn't have a cache status entry", (Object)leaver, (Object)cacheName);
            return CompletableFutures.completedNull();
        }
        return cacheStatus.doLeave(leaver);
    }

    synchronized void removeCacheStatus(String cacheName) {
        this.cacheStatusMap.remove(cacheName);
    }

    @Override
    public CompletionStage<Void> handleRebalancePhaseConfirm(String cacheName, Address node, int topologyId, Throwable throwable, int viewId) throws Exception {
        ClusterCacheStatus cacheStatus;
        if (throwable != null) {
            log.rebalanceError(cacheName, node, topologyId, throwable);
        }
        if ((cacheStatus = (ClusterCacheStatus)this.cacheStatusMap.get(cacheName)) == null) {
            log.debugf("Ignoring rebalance confirmation from %s for cache %s because it doesn't have a cache status entry", (Object)node, (Object)cacheName);
            return CompletableFutures.completedNull();
        }
        cacheStatus.confirmRebalancePhase(node, topologyId);
        return CompletableFutures.completedNull();
    }

    private void handleClusterView(boolean mergeView, int newViewId) {
        this.orderOnManager(() -> {
            try {
                if (!this.updateClusterState(mergeView, newViewId)) {
                    return CompletableFutures.completedNull();
                }
                if (this.clusterManagerStatus == ClusterTopologyManager.ClusterManagerStatus.RECOVERING_CLUSTER) {
                    return this.recoverClusterStatus(newViewId);
                }
                if (this.clusterManagerStatus == ClusterTopologyManager.ClusterManagerStatus.COORDINATOR) {
                    this.joinViewFuture.updateAsync(this, this.nonBlockingExecutor);
                    return this.updateCacheMembers(newViewId);
                }
            }
            catch (Throwable t2) {
                log.viewHandlingError(newViewId, t2);
            }
            return CompletableFutures.completedNull();
        });
    }

    private <T> CompletionStage<T> orderOnManager(Callable<CompletionStage<T>> action) {
        return this.actionSequencer.orderOnKey(ClusterTopologyManagerImpl.class, action);
    }

    private CompletionStage<Void> orderOnCache(String cacheName, Runnable action) {
        return this.actionSequencer.orderOnKey(cacheName, () -> {
            action.run();
            return CompletableFutures.completedNull();
        });
    }

    private CompletionStage<Void> recoverClusterStatus(int newViewId) {
        this.cacheStatusMap.clear();
        this.recoveryAttemptCount.set(0);
        return this.fetchClusterStatus(newViewId).thenCompose(responseCollector -> {
            Map<String, Map<Address, CacheStatusResponse>> responsesByCache = responseCollector.getResponsesByCache();
            log.debugf("Cluster recovery found %d caches, members are %s", responsesByCache.size(), (Object)this.transport.getMembers());
            int maxThreads = ProcessorInfo.availableProcessors() / 2 + 1;
            AggregateCompletionStage<Void> mergeStage = CompletionStages.aggregateCompletionStage();
            LimitedExecutor cs = new LimitedExecutor("Merge-" + newViewId, this.nonBlockingExecutor, maxThreads);
            for (Map.Entry<String, Map<Address, CacheStatusResponse>> e : responsesByCache.entrySet()) {
                CacheJoinInfo joinInfo = e.getValue().values().iterator().next().getCacheJoinInfo();
                ClusterCacheStatus cacheStatus = this.initCacheStatusIfAbsent(e.getKey(), joinInfo.getCacheMode());
                mergeStage.dependsOn(CompletableFuture.runAsync(() -> cacheStatus.doMergePartitions((Map)e.getValue()), cs));
            }
            return mergeStage.freeze().thenRun(() -> {
                this.acquireUpdateLock();
                try {
                    if (this.viewId != newViewId) {
                        log.debugf("View updated while we were recovering the cluster for view %d", newViewId);
                        return;
                    }
                    this.clusterManagerStatus = ClusterTopologyManager.ClusterManagerStatus.COORDINATOR;
                    this.globalRebalancingEnabled = responseCollector.getRebalancingEnabled();
                }
                finally {
                    this.releaseUpdateLock();
                }
                for (ClusterCacheStatus cacheStatus : this.cacheStatusMap.values()) {
                    this.orderOnCache(cacheStatus.getCacheName(), () -> {
                        block2: {
                            try {
                                cacheStatus.doHandleClusterView(newViewId);
                            }
                            catch (Throwable throwable) {
                                if (!this.clusterManagerStatus.isRunning()) break block2;
                                log.errorUpdatingMembersList(newViewId, throwable);
                            }
                        }
                    });
                }
                this.joinViewFuture.updateAsync(this, this.nonBlockingExecutor);
            });
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean updateClusterState(boolean mergeView, int newViewId) {
        this.acquireUpdateLock();
        try {
            boolean becameCoordinator;
            if (newViewId < this.transport.getViewId()) {
                log.tracef("Ignoring old cluster view notification: %s", newViewId);
                boolean bl = false;
                return bl;
            }
            boolean isCoordinator = this.transport.isCoordinator();
            boolean bl = becameCoordinator = isCoordinator && !this.clusterManagerStatus.isCoordinator();
            if (log.isTraceEnabled()) {
                log.tracef("Received new cluster view: %d, isCoordinator = %s, old status = %s", (Object)newViewId, (Object)isCoordinator, (Object)this.clusterManagerStatus);
            }
            if (!isCoordinator) {
                this.clusterManagerStatus = ClusterTopologyManager.ClusterManagerStatus.REGULAR_MEMBER;
                boolean bl2 = false;
                return bl2;
            }
            if (becameCoordinator || mergeView) {
                this.clusterManagerStatus = ClusterTopologyManager.ClusterManagerStatus.RECOVERING_CLUSTER;
            }
            this.viewId = newViewId;
        }
        finally {
            this.releaseUpdateLock();
        }
        return true;
    }

    private ClusterCacheStatus initCacheStatusIfAbsent(String cacheName, CacheMode cacheMode) {
        return this.cacheStatusMap.computeIfAbsent(cacheName, name -> {
            LostDataCheck lostDataCheck = cacheMode.isScattered() ? ClusterTopologyManagerImpl::scatteredLostDataCheck : ClusterTopologyManagerImpl::distLostDataCheck;
            Configuration config = this.configurationManager.getConfiguration(cacheName, true);
            PartitionHandling partitionHandling = config != null ? config.clustering().partitionHandling().whenSplit() : null;
            boolean resolveConflictsOnMerge = this.resolveConflictsOnMerge(config, cacheMode);
            AvailabilityStrategy availabilityStrategy = partitionHandling != null && partitionHandling != PartitionHandling.ALLOW_READ_WRITES ? new PreferConsistencyStrategy(this.eventLogManager, this.persistentUUIDManager, lostDataCheck) : new PreferAvailabilityStrategy(this.eventLogManager, this.persistentUUIDManager, lostDataCheck);
            Optional<GlobalStateManager> globalStateManager = this.gcr.getOptionalComponent(GlobalStateManager.class);
            Optional<ScopedPersistentState> persistedState = globalStateManager.flatMap(gsm -> gsm.readScopedState(cacheName));
            return new ClusterCacheStatus(this.cacheManager, this.gcr, cacheName, availabilityStrategy, RebalanceType.from(cacheMode), this, this.transport, this.persistentUUIDManager, this.eventLogManager, persistedState, resolveConflictsOnMerge);
        });
    }

    private boolean resolveConflictsOnMerge(Configuration config, CacheMode cacheMode) {
        if (config == null || cacheMode.isScattered() || cacheMode.isInvalidation()) {
            return false;
        }
        return config.clustering().partitionHandling().resolveConflictsOnMerge();
    }

    void broadcastRebalanceStart(String cacheName, CacheTopology cacheTopology) {
        RebalanceStartCommand command = new RebalanceStartCommand(cacheName, this.transport.getAddress(), cacheTopology, this.viewId);
        this.helper.executeOnClusterAsync(this.transport, command);
    }

    private CompletionStage<CacheStatusResponseCollector> fetchClusterStatus(int newViewId) {
        int attemptCount = this.recoveryAttemptCount.getAndIncrement();
        if (log.isTraceEnabled()) {
            log.debugf("Recovering cluster status for view %d, attempt %d", newViewId, attemptCount);
        }
        CacheStatusRequestCommand command = new CacheStatusRequestCommand(newViewId);
        CacheStatusResponseCollector responseCollector = new CacheStatusResponseCollector();
        int timeout = this.getGlobalTimeout() / 10;
        CompletionStage<CacheStatusResponseCollector> remoteStage = this.helper.executeOnClusterSync(this.transport, command, timeout, responseCollector);
        return CompletionStages.handleAndCompose(remoteStage, (collector, throwable) -> {
            if (newViewId < this.transport.getViewId()) {
                if (log.isTraceEnabled()) {
                    log.tracef("Ignoring cluster state responses for view %d, we already have view %d", newViewId, this.transport.getViewId());
                }
                return SKIP_RECOVERY_FUTURE;
            }
            if (throwable == null) {
                if (log.isTraceEnabled()) {
                    log.tracef("Received valid cluster state responses for view %d", newViewId);
                }
                if (!collector.getSuspectedMembers().isEmpty()) {
                    log.debugf("Missing cache status responses from nodes %s", (Object)collector.getSuspectedMembers());
                }
                return CompletableFuture.completedFuture(collector);
            }
            Throwable t2 = CompletableFutures.extractException(throwable);
            if (t2 instanceof IllegalLifecycleStateException) {
                return SKIP_RECOVERY_FUTURE;
            }
            log.failedToRecoverClusterState(t2);
            if (t2 instanceof TimeoutException && attemptCount < 10) {
                return this.fetchClusterStatus(newViewId);
            }
            throw CompletableFutures.asCompletionException(t2);
        });
    }

    private CompletionStage<Void> updateCacheMembers(int viewId) {
        return this.confirmMembersAvailable().whenComplete((ignored, throwable) -> {
            if (throwable == null) {
                try {
                    int newViewId = this.transport.getViewId();
                    if (newViewId != viewId) {
                        log.debugf("Skipping cache members update for view %d, newer view received: %d", viewId, newViewId);
                        return;
                    }
                    for (ClusterCacheStatus cacheStatus : this.cacheStatusMap.values()) {
                        cacheStatus.doHandleClusterView(viewId);
                    }
                }
                catch (Throwable t2) {
                    throwable = t2;
                }
            }
            if (throwable != null && this.clusterManagerStatus.isRunning()) {
                log.errorUpdatingMembersList(viewId, (Throwable)throwable);
            }
        });
    }

    private CompletionStage<Void> confirmMembersAvailable() {
        try {
            HashSet<Address> expectedMembers = new HashSet<Address>();
            for (ClusterCacheStatus cacheStatus : this.cacheStatusMap.values()) {
                expectedMembers.addAll(cacheStatus.getExpectedMembers());
            }
            expectedMembers.retainAll(this.transport.getMembers());
            return this.transport.invokeCommandOnAll(expectedMembers, HeartBeatCommand.INSTANCE, VoidResponseCollector.validOnly(), DeliverOrder.NONE, this.getGlobalTimeout() / 10, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            return CompletableFutures.completedExceptionFuture(e);
        }
    }

    private int getGlobalTimeout() {
        return (int)this.globalConfiguration.transport().distributedSyncTimeout();
    }

    void broadcastTopologyUpdate(String cacheName, CacheTopology cacheTopology, AvailabilityMode availabilityMode) {
        TopologyUpdateCommand command = new TopologyUpdateCommand(cacheName, this.transport.getAddress(), cacheTopology, availabilityMode, this.viewId);
        this.helper.executeOnClusterAsync(this.transport, command);
    }

    void broadcastStableTopologyUpdate(String cacheName, CacheTopology cacheTopology) {
        TopologyUpdateStableCommand command = new TopologyUpdateStableCommand(cacheName, this.transport.getAddress(), cacheTopology, this.viewId);
        this.helper.executeOnClusterAsync(this.transport, command);
    }

    @Override
    public boolean isRebalancingEnabled() {
        return this.globalRebalancingEnabled;
    }

    @Override
    public boolean isRebalancingEnabled(String cacheName) {
        if (cacheName == null) {
            return this.isRebalancingEnabled();
        }
        ClusterCacheStatus s2 = (ClusterCacheStatus)this.cacheStatusMap.get(cacheName);
        return s2 != null ? s2.isRebalanceEnabled() : this.isRebalancingEnabled();
    }

    @Override
    public CompletionStage<Void> setRebalancingEnabled(String cacheName, boolean enabled) {
        if (cacheName == null) {
            return this.setRebalancingEnabled(enabled);
        }
        ClusterCacheStatus clusterCacheStatus = (ClusterCacheStatus)this.cacheStatusMap.get(cacheName);
        if (clusterCacheStatus != null) {
            return clusterCacheStatus.setRebalanceEnabled(enabled);
        }
        log.debugf("Trying to enable rebalancing for inexistent cache %s", (Object)cacheName);
        return CompletableFutures.completedNull();
    }

    @Override
    public CompletionStage<Void> setRebalancingEnabled(boolean enabled) {
        if (enabled) {
            if (!this.globalRebalancingEnabled) {
                Log.CLUSTER.rebalancingEnabled();
            }
        } else if (this.globalRebalancingEnabled) {
            Log.CLUSTER.rebalancingSuspended();
        }
        this.globalRebalancingEnabled = enabled;
        this.cacheStatusMap.values().forEach(ClusterCacheStatus::startQueuedRebalance);
        return CompletableFutures.completedNull();
    }

    @Override
    public CompletionStage<Void> forceRebalance(String cacheName) {
        ClusterCacheStatus cacheStatus = (ClusterCacheStatus)this.cacheStatusMap.get(cacheName);
        if (cacheStatus != null) {
            cacheStatus.forceRebalance();
        }
        return CompletableFutures.completedNull();
    }

    @Override
    public CompletionStage<Void> forceAvailabilityMode(String cacheName, AvailabilityMode availabilityMode) {
        ClusterCacheStatus cacheStatus = (ClusterCacheStatus)this.cacheStatusMap.get(cacheName);
        if (cacheStatus != null) {
            return cacheStatus.forceAvailabilityMode(availabilityMode);
        }
        return CompletableFutures.completedNull();
    }

    @Override
    public RebalancingStatus getRebalancingStatus(String cacheName) {
        ClusterCacheStatus cacheStatus = (ClusterCacheStatus)this.cacheStatusMap.get(cacheName);
        if (cacheStatus != null) {
            return cacheStatus.getRebalancingStatus();
        }
        return RebalancingStatus.PENDING;
    }

    public CompletionStage<Void> broadcastShutdownCache(String cacheName) {
        CacheShutdownCommand command = new CacheShutdownCommand(cacheName);
        return this.helper.executeOnClusterSync(this.transport, command, this.getGlobalTimeout(), VoidResponseCollector.validOnly());
    }

    @Override
    public void setInitialCacheTopologyId(String cacheName, int topologyId) {
        Configuration configuration = this.configurationManager.getConfiguration(cacheName, true);
        ClusterCacheStatus cacheStatus = this.initCacheStatusIfAbsent(cacheName, configuration.clustering().cacheMode());
        cacheStatus.setInitialTopologyId(topologyId);
    }

    @Override
    public CompletionStage<Void> handleShutdownRequest(String cacheName) throws Exception {
        ClusterCacheStatus cacheStatus = (ClusterCacheStatus)this.cacheStatusMap.get(cacheName);
        return cacheStatus.shutdownCache();
    }

    public static boolean scatteredLostDataCheck(ConsistentHash stableCH, List<Address> newMembers) {
        HashSet<Address> lostMembers = new HashSet<Address>(stableCH.getMembers());
        lostMembers.removeAll(newMembers);
        log.tracef("Stable CH members: %s, actual members: %s, lost members: %s", (Object)stableCH.getMembers(), (Object)newMembers, (Object)lostMembers);
        return lostMembers.size() > 1;
    }

    public static boolean distLostDataCheck(ConsistentHash stableCH, List<Address> newMembers) {
        for (int i = 0; i < stableCH.getNumSegments(); ++i) {
            if (InfinispanCollections.containsAny(newMembers, stableCH.locateOwnersForSegment(i))) continue;
            return true;
        }
        return false;
    }

    private static class CacheStatusResponseCollector
    extends ValidResponseCollector<CacheStatusResponseCollector> {
        private final Map<String, Map<Address, CacheStatusResponse>> responsesByCache = new HashMap<String, Map<Address, CacheStatusResponse>>();
        private final List<Address> suspectedMembers = new ArrayList<Address>();
        private final Map<CacheTopology, CacheTopology> seenTopologies = new HashMap<CacheTopology, CacheTopology>();
        private final Map<CacheJoinInfo, CacheJoinInfo> seenInfos = new HashMap<CacheJoinInfo, CacheJoinInfo>();
        private boolean rebalancingEnabled = true;

        private CacheStatusResponseCollector() {
        }

        @Override
        protected CacheStatusResponseCollector addValidResponse(Address sender, ValidResponse response) {
            if (response.isSuccessful()) {
                ManagerStatusResponse nodeStatus = (ManagerStatusResponse)response.getResponseValue();
                this.rebalancingEnabled &= nodeStatus.isRebalancingEnabled();
                for (Map.Entry<String, CacheStatusResponse> entry : nodeStatus.getCaches().entrySet()) {
                    String cacheName = entry.getKey();
                    CacheStatusResponse csr = entry.getValue();
                    CacheTopology cacheTopology = this.intern(this.seenTopologies, csr.getCacheTopology());
                    CacheTopology stableTopology = this.intern(this.seenTopologies, csr.getStableTopology());
                    CacheJoinInfo info = this.intern(this.seenInfos, csr.getCacheJoinInfo());
                    Map cacheResponses = this.responsesByCache.computeIfAbsent(cacheName, k -> new HashMap());
                    cacheResponses.put(sender, new CacheStatusResponse(info, cacheTopology, stableTopology, csr.getAvailabilityMode(), csr.joinedMembers()));
                }
            }
            return null;
        }

        private <T> T intern(Map<T, T> internMap, T value) {
            T replacementValue = internMap.get(value);
            if (replacementValue == null) {
                internMap.put(value, value);
                replacementValue = value;
            }
            return replacementValue;
        }

        @Override
        protected CacheStatusResponseCollector addTargetNotFound(Address sender) {
            this.suspectedMembers.add(sender);
            return null;
        }

        @Override
        protected CacheStatusResponseCollector addException(Address sender, Exception exception) {
            throw ResponseCollectors.wrapRemoteException(sender, exception);
        }

        @Override
        public CacheStatusResponseCollector finish() {
            return this;
        }

        public Map<String, Map<Address, CacheStatusResponse>> getResponsesByCache() {
            return this.responsesByCache;
        }

        public boolean getRebalancingEnabled() {
            return this.rebalancingEnabled;
        }

        public List<Address> getSuspectedMembers() {
            return this.suspectedMembers;
        }
    }
}

