/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.impl.sc;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.collect.ImmutableSet;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.collect.Maps;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.collect.Sets;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.clients.utils.NetUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stats.StatsLogger;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.cluster.ServerAssignmentData;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.common.Endpoint;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManager;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRegistry;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZkStorageContainerManager
extends AbstractLifecycleComponent<StorageConfiguration>
implements StorageContainerManager,
Consumer<Void> {
    private static final Logger log = LoggerFactory.getLogger(ZkStorageContainerManager.class);
    private final Endpoint endpoint;
    private final ClusterMetadataStore metadataStore;
    private final StorageContainerRegistry registry;
    private final ScheduledExecutorService executor;
    private volatile ClusterAssignmentData clusterAssignmentData;
    private volatile Map<Endpoint, ServerAssignmentData> clusterAssignmentMap;
    private volatile ServerAssignmentData myAssignmentData;
    private volatile ConcurrentLongHashMap<Endpoint> containerAssignmentMap;
    private ScheduledFuture<?> containerProbeTask;
    private final Duration probeInterval;
    private final Map<Long, StorageContainer> liveContainers;
    private final Set<Long> pendingStartStopContainers;

    public ZkStorageContainerManager(Endpoint myEndpoint, StorageConfiguration conf, ClusterMetadataStore clusterMetadataStore, StorageContainerRegistry registry, StatsLogger statsLogger) {
        super("zk-storage-container-manager", conf, statsLogger);
        this.endpoint = myEndpoint;
        this.metadataStore = clusterMetadataStore;
        this.registry = registry;
        this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("zk-storage-container-manager").build());
        this.liveContainers = Collections.synchronizedMap(Maps.newConcurrentMap());
        this.pendingStartStopContainers = Collections.synchronizedSet(Sets.newConcurrentHashSet());
        this.containerAssignmentMap = new ConcurrentLongHashMap();
        this.clusterAssignmentMap = Maps.newHashMap();
        this.probeInterval = Duration.ofMillis(conf.getClusterControllerScheduleIntervalMs() / 2L);
    }

    @Override
    protected void doStart() {
        this.metadataStore.watchClusterAssignmentData(this, this.executor);
        log.info("Watched cluster assignment data.");
        this.containerProbeTask = this.executor.scheduleAtFixedRate(this::probeContainers, 0L, this.probeInterval.toMillis(), TimeUnit.MILLISECONDS);
        log.info("Scheduled storage container probe task at every {} ms", (Object)this.probeInterval.toMillis());
    }

    @Override
    protected void doStop() {
        this.metadataStore.unwatchClusterAssignmentData(this);
        if (!this.containerProbeTask.cancel(true)) {
            log.warn("Failed to cancel the container probe task.");
        }
        this.stopContainers();
    }

    @Override
    protected void doClose() throws IOException {
        this.registry.close();
        this.executor.shutdown();
    }

    @Override
    public Endpoint getStorageContainer(long scId) {
        return this.containerAssignmentMap.get(scId);
    }

    void probeContainers() {
        boolean isMyAssignmentRefreshed = this.refreshMyAssignment();
        if (!isMyAssignmentRefreshed) {
            return;
        }
        if (this.myAssignmentData == null) {
            this.stopContainers();
        } else {
            this.processMyAssignment(this.myAssignmentData);
        }
    }

    private boolean refreshMyAssignment() {
        ClusterAssignmentData clusterAssignmentData = this.metadataStore.getClusterAssignmentData();
        if (null == clusterAssignmentData) {
            log.info("Cluster assignment data is empty, so skip refreshing");
            return false;
        }
        Map<Endpoint, ServerAssignmentData> newAssignmentMap = clusterAssignmentData.getServersMap().entrySet().stream().collect(Collectors.toMap(e -> NetUtils.parseEndpoint((String)e.getKey()), e -> (ServerAssignmentData)e.getValue()));
        Set<Endpoint> oldAssignedServers = this.clusterAssignmentMap.keySet();
        Set<Endpoint> newAssignedServers = newAssignmentMap.keySet();
        ImmutableSet<Endpoint> serversJoined = Sets.difference(newAssignedServers, oldAssignedServers).immutableCopy();
        ImmutableSet<Endpoint> serversLeft = Sets.difference(oldAssignedServers, newAssignedServers).immutableCopy();
        ImmutableSet<Endpoint> commonServers = Sets.intersection(newAssignedServers, oldAssignedServers).immutableCopy();
        this.processServersLeft(serversLeft, this.clusterAssignmentMap);
        this.processServersJoined(serversJoined, newAssignmentMap);
        this.processServersAssignmentChanged(commonServers, this.clusterAssignmentMap, newAssignmentMap);
        this.clusterAssignmentMap = newAssignmentMap;
        this.myAssignmentData = newAssignmentMap.get(this.endpoint);
        return true;
    }

    private void processServersJoined(Set<Endpoint> serversJoined, Map<Endpoint, ServerAssignmentData> newAssignmentMap) {
        if (!serversJoined.isEmpty()) {
            log.info("Servers joined : {}", serversJoined);
        }
        serversJoined.forEach(ep -> {
            ServerAssignmentData sad = (ServerAssignmentData)newAssignmentMap.get(ep);
            if (null != sad) {
                sad.getContainersList().forEach(container -> this.containerAssignmentMap.put((long)container, (Endpoint)ep));
            }
        });
    }

    private void processServersLeft(Set<Endpoint> serversLeft, Map<Endpoint, ServerAssignmentData> oldAssignmentMap) {
        if (!serversLeft.isEmpty()) {
            log.info("Servers left : {}", serversLeft);
        }
        serversLeft.forEach(ep -> {
            ServerAssignmentData sad = (ServerAssignmentData)oldAssignmentMap.get(ep);
            if (null != sad) {
                sad.getContainersList().forEach(container -> this.containerAssignmentMap.remove((long)container, ep));
            }
        });
    }

    private void processServersAssignmentChanged(Set<Endpoint> commonServers, Map<Endpoint, ServerAssignmentData> oldAssignmentMap, Map<Endpoint, ServerAssignmentData> newAssignmentMap) {
        commonServers.forEach(ep -> {
            ServerAssignmentData newSad;
            ServerAssignmentData oldSad = oldAssignmentMap.getOrDefault(ep, ServerAssignmentData.getDefaultInstance());
            if (oldSad.equals(newSad = newAssignmentMap.getOrDefault(ep, ServerAssignmentData.getDefaultInstance()))) {
                return;
            }
            log.info("Server assignment is change for {}:\nold assignment: {}\nnew assignment: {}", new Object[]{NetUtils.endpointToString(ep), oldSad, newSad});
            oldSad.getContainersList().forEach(container -> this.containerAssignmentMap.remove((long)container, ep));
            newSad.getContainersList().forEach(container -> this.containerAssignmentMap.put((long)container, (Endpoint)ep));
        });
    }

    private void stopContainers() {
        ImmutableSet<Long> liveContainerSet = ImmutableSet.copyOf(this.liveContainers.keySet());
        liveContainerSet.forEach(this::stopStorageContainer);
    }

    private void processMyAssignment(ServerAssignmentData myAssignment) {
        Set assignedContainerSet = myAssignment.getContainersList().stream().collect(Collectors.toSet());
        HashSet<Long> liveContainerSet = Sets.newHashSet(this.liveContainers.keySet());
        Set<Long> containersToStart = Sets.newHashSet(Sets.difference(assignedContainerSet, liveContainerSet).immutableCopy());
        Set<Long> containersToStop = Sets.newHashSet(Sets.difference(liveContainerSet, assignedContainerSet).immutableCopy());
        containersToStart = Sets.filter(containersToStart, container -> !this.pendingStartStopContainers.contains(container));
        containersToStop = Sets.filter(containersToStop, container -> !this.pendingStartStopContainers.contains(container));
        if (!containersToStart.isEmpty() || !containersToStop.isEmpty()) {
            log.info("Process container changes:\n\tIdeal = {}\n\tLive = {}\n\tPending = {}\n\tToStart = {}\n\tToStop = {}", new Object[]{assignedContainerSet, liveContainerSet, this.pendingStartStopContainers, containersToStart, containersToStop});
        }
        containersToStart.forEach(this::startStorageContainer);
        containersToStop.forEach(this::stopStorageContainer);
    }

    private CompletableFuture<StorageContainer> startStorageContainer(long scId) {
        log.info("Starting storage container ({})", (Object)scId);
        StorageContainer sc = this.liveContainers.get(scId);
        if (null != sc) {
            log.warn("Storage container ({}) is already started", (Object)scId);
            return FutureUtils.value(sc);
        }
        this.pendingStartStopContainers.add(scId);
        return this.registry.startStorageContainer(scId).whenComplete((container, cause) -> {
            try {
                if (null != cause) {
                    log.warn("Failed to start storage container ({})", (Object)scId, cause);
                } else {
                    log.info("Successfully started storage container ({})", (Object)scId);
                    this.addStorageContainer(scId, (StorageContainer)container);
                }
            }
            finally {
                this.pendingStartStopContainers.remove(scId);
            }
        });
    }

    private CompletableFuture<Void> stopStorageContainer(long scId) {
        log.info("Stopping storage container ({})", (Object)scId);
        StorageContainer sc = this.liveContainers.get(scId);
        if (null == sc) {
            log.warn("Storage container ({}) is not alive anymore", (Object)scId);
            return FutureUtils.Void();
        }
        this.pendingStartStopContainers.add(scId);
        return this.registry.stopStorageContainer(scId, sc).whenComplete((container, cause) -> {
            try {
                if (cause != null) {
                    log.warn("Failed to stop storage container ({})", (Object)scId, cause);
                } else {
                    log.info("Successfully stopped storage container ({})", (Object)scId);
                    this.removeStorageContainer(scId, sc);
                }
            }
            finally {
                this.pendingStartStopContainers.remove(scId);
            }
        });
    }

    private StorageContainer addStorageContainer(long scId, StorageContainer sc) {
        StorageContainer oldSc = this.liveContainers.putIfAbsent(scId, sc);
        if (null == oldSc) {
            log.info("Storage container ({}) is added to live set.", (Object)sc);
            return sc;
        }
        log.warn("Storage container ({}) has already been added to live set", (Object)sc);
        sc.stop();
        return oldSc;
    }

    private void removeStorageContainer(long scId, StorageContainer sc) {
        if (this.liveContainers.remove(scId, sc)) {
            log.info("Storage container ({}) is removed from live set.", (Object)scId);
        } else {
            log.warn("Storage container ({}) can't be removed from live set.", (Object)scId);
        }
    }

    @Override
    public void accept(Void aVoid) {
        this.executor.submit(() -> this.probeContainers());
    }

    Map<Long, StorageContainer> getLiveContainers() {
        return this.liveContainers;
    }

    Set<Long> getPendingStartStopContainers() {
        return this.pendingStartStopContainers;
    }
}

