/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsoftware.elasticactors.kubernetes.cluster;

import io.fabric8.kubernetes.api.model.apps.StatefulSet;
import io.fabric8.kubernetes.api.model.apps.StatefulSetList;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import io.fabric8.kubernetes.client.dsl.WatchAndWaitable;
import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.elasticsoftware.elasticactors.PhysicalNode;
import org.elasticsoftware.elasticactors.cluster.ClusterEventListener;
import org.elasticsoftware.elasticactors.cluster.ClusterMessageHandler;
import org.elasticsoftware.elasticactors.cluster.ClusterService;
import org.elasticsoftware.elasticactors.kubernetes.cluster.DaemonThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class KubernetesClusterService
implements ClusterService {
    private static final Logger logger = LoggerFactory.getLogger(KubernetesClusterService.class);
    private KubernetesClient client;
    private final String namespace;
    private final String name;
    private final String nodeId;
    private final String masterNodeId;
    private final Queue<ClusterEventListener> eventListeners = new ConcurrentLinkedQueue<ClusterEventListener>();
    private final ExecutorService clusterServiceExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("KUBERNETES_CLUSTER_SERVICE"));
    private final AtomicInteger currentTopology = new AtomicInteger();
    private final AtomicReference<Watch> currentWatch = new AtomicReference();
    private final Boolean useDesiredReplicas;
    private final Watcher<StatefulSet> watcher = new Watcher<StatefulSet>(){

        public void eventReceived(Watcher.Action action, StatefulSet resource) {
            if (action == Watcher.Action.MODIFIED) {
                KubernetesClusterService.this.handleStatefulSetUpdate(resource);
            }
        }

        public void onClose(KubernetesClientException cause) {
            if (cause != null) {
                if (cause.getCode() == 410) {
                    logger.info("Watcher on StatefulSet {} was closed. Reason: {} ", (Object)KubernetesClusterService.this.name, (Object)cause.getMessage());
                } else {
                    logger.error("Watcher on StatefulSet {} was closed", (Object)KubernetesClusterService.this.name, (Object)cause);
                }
                KubernetesClusterService.this.watchStatefulSet();
            }
        }
    };

    public KubernetesClusterService(String namespace, String name, String nodeId, Boolean useDesiredReplicas) {
        this.namespace = namespace;
        this.name = name;
        this.nodeId = nodeId;
        this.useDesiredReplicas = useDesiredReplicas;
        this.masterNodeId = String.format("%s-0", name);
    }

    @PostConstruct
    public void init() {
        try {
            this.client = new DefaultKubernetesClient();
        }
        catch (KubernetesClientException e) {
            logger.error("Exception creation DefaultKubernetesClient", (Throwable)e);
            throw new IllegalStateException("Unable to create DefaultKubernetesClient", e);
        }
    }

    @PreDestroy
    public void destroy() {
        Watch watch = this.currentWatch.get();
        if (watch != null) {
            watch.close();
        }
        this.clusterServiceExecutor.shutdownNow();
    }

    public void reportReady() throws Exception {
        StatefulSet resource = (StatefulSet)((RollableScalableResource)((NonNamespaceOperation)this.client.apps().statefulSets().inNamespace(this.namespace)).withName(this.name)).get();
        if (resource == null) {
            throw new IllegalStateException(String.format("StatefulSet %s not found in namespace %s", this.name, this.namespace));
        }
        this.handleStatefulSetUpdate(resource);
        this.watchStatefulSet();
        PhysicalNode masterNode = new PhysicalNode(this.masterNodeId, null, this.nodeId.equals(this.masterNodeId));
        this.eventListeners.forEach(clusterEventListener -> {
            try {
                clusterEventListener.onMasterElected(masterNode);
            }
            catch (Exception e) {
                logger.error("Unexpected exception while calling clusterEventListener.onMasterElected", (Throwable)e);
            }
        });
    }

    private void watchStatefulSet() {
        String resourceVersion = ((StatefulSetList)((NonNamespaceOperation)this.client.apps().statefulSets().inNamespace(this.namespace)).list()).getMetadata().getResourceVersion();
        this.currentWatch.set((Watch)((WatchAndWaitable)((RollableScalableResource)((NonNamespaceOperation)this.client.apps().statefulSets().inNamespace(this.namespace)).withName(this.name)).withResourceVersion(resourceVersion)).watch(this.watcher));
        logger.info("Watching StatefulSet {} on namespace {} for resource version {}", new Object[]{this.name, this.namespace, resourceVersion});
    }

    private void handleStatefulSetUpdate(StatefulSet resource) {
        this.clusterServiceExecutor.submit(() -> this.reportTopologyChangeIfNeeded(resource));
    }

    private void reportTopologyChangeIfNeeded(StatefulSet resource) {
        int totalReplicas;
        logger.info("Received Cluster State Update: spec.replicas={}, status.replicas={}, status.readyReplicas={}", new Object[]{resource.getSpec().getReplicas(), resource.getStatus().getReplicas(), resource.getStatus().getReadyReplicas()});
        int n = totalReplicas = Boolean.TRUE.equals(this.useDesiredReplicas) ? Math.max(resource.getSpec().getReplicas(), resource.getStatus().getReplicas()) : resource.getStatus().getReplicas();
        if (this.currentTopology.getAndSet(totalReplicas) != totalReplicas) {
            logger.info("Signalling Cluster Topology change to {} nodes", (Object)totalReplicas);
            ArrayList<PhysicalNode> nodeList = new ArrayList<PhysicalNode>(totalReplicas);
            for (int i = 0; i < totalReplicas; ++i) {
                String id = String.format("%s-%d", this.name, i);
                nodeList.add(new PhysicalNode(id, null, id.equals(this.nodeId)));
            }
            this.eventListeners.forEach(clusterEventListener -> {
                try {
                    clusterEventListener.onTopologyChanged(nodeList);
                }
                catch (Exception e) {
                    logger.error("Unexpected exception while calling clusterEventListener.onTopologyChanged", (Throwable)e);
                }
            });
        }
    }

    public void reportPlannedShutdown() {
    }

    public void addEventListener(ClusterEventListener eventListener) {
        this.eventListeners.add(eventListener);
    }

    public void removeEventListener(ClusterEventListener eventListener) {
        this.eventListeners.remove(eventListener);
    }

    public void sendMessage(String memberToken, byte[] message) throws Exception {
    }

    public void setClusterMessageHandler(ClusterMessageHandler clusterMessageHandler) {
    }
}

