/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.impl.cluster;

import org.apache.pulsar.functions.runtime.shaded.com.google.common.base.Preconditions;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.StorageConstants;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeader;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeaderSelector;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterControllerLeaderSelectorListener;
import org.apache.pulsar.functions.runtime.shaded.org.apache.curator.framework.CuratorFramework;
import org.apache.pulsar.functions.runtime.shaded.org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.pulsar.functions.runtime.shaded.org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.pulsar.functions.runtime.shaded.org.apache.curator.framework.state.ConnectionState;
import org.apache.pulsar.functions.runtime.shaded.org.apache.curator.framework.state.ConnectionStateListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZkClusterControllerLeaderSelector
implements ClusterControllerLeaderSelector,
ConnectionStateListener {
    private static final Logger log = LoggerFactory.getLogger(ZkClusterControllerLeaderSelector.class);
    private final CuratorFramework client;
    private final String controllerZkPath;
    private ClusterControllerLeader leader;
    private LeaderSelector leaderSelector;

    public ZkClusterControllerLeaderSelector(CuratorFramework client, String zkRootPath) {
        this.client = client;
        this.controllerZkPath = StorageConstants.getControllerPath(zkRootPath);
    }

    @Override
    public void initialize(ClusterControllerLeader leader) {
        this.leader = leader;
        ZkClusterControllerLeaderSelectorListener zkLeader = new ZkClusterControllerLeaderSelectorListener(leader);
        this.leaderSelector = new LeaderSelector(this.client, this.controllerZkPath, (LeaderSelectorListener)zkLeader);
        this.client.getConnectionStateListenable().addListener((Object)this);
    }

    @Override
    public void start() {
        Preconditions.checkNotNull(this.leaderSelector, "leader selector is not initialized");
        this.leaderSelector.autoRequeue();
        this.leaderSelector.start();
    }

    @Override
    public void close() {
        if (null != this.leaderSelector) {
            this.leaderSelector.interruptLeadership();
            this.leaderSelector.close();
        }
    }

    public void stateChanged(CuratorFramework client, ConnectionState newState) {
        switch (newState) {
            case LOST: {
                log.warn("Connection to zookeeper is lost. So interrupt my current leadership.");
                this.leaderSelector.interruptLeadership();
                break;
            }
            case SUSPENDED: {
                if (!this.leaderSelector.hasLeadership()) break;
                log.info("Connection to zookeeper is disconnected, suspend the leader until it is reconnected.");
                this.leader.suspend();
                break;
            }
            case RECONNECTED: {
                if (!this.leaderSelector.hasLeadership()) break;
                log.info("Connection to zookeeper is reconnected, resume the leader");
                this.leader.resume();
                break;
            }
        }
    }
}

