/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.shade.org.apache.bookkeeper.replication;

import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.protobuf.Message;
import org.apache.pulsar.shade.com.google.protobuf.TextFormat;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BKException;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BookKeeper;
import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.meta.ZkLayoutManager;
import org.apache.pulsar.shade.org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.pulsar.shade.org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.DataFormats;
import org.apache.pulsar.shade.org.apache.bookkeeper.replication.Auditor;
import org.apache.pulsar.shade.org.apache.bookkeeper.replication.ReplicationException;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.Counter;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsLogger;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
import org.apache.pulsar.shade.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.shade.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.shade.org.apache.zookeeper.WatchedEvent;
import org.apache.pulsar.shade.org.apache.zookeeper.Watcher;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.shade.org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@StatsDoc(name="auditor", help="Auditor related stats")
public class AuditorElector {
    private static final Logger LOG = LoggerFactory.getLogger(AuditorElector.class);
    private static final int AUDITOR_INDEX = 0;
    private static final String VOTE_PREFIX = "V_";
    private static final String PATH_SEPARATOR = "/";
    private static final String ELECTION_ZNODE = "auditorelection";
    private final String basePath;
    private final String electionPath;
    private final String bookieId;
    private final ServerConfiguration conf;
    private final BookKeeper bkc;
    private final ZooKeeper zkc;
    private final boolean ownBkc;
    private final ExecutorService executor;
    private String myVote;
    Auditor auditor;
    private AtomicBoolean running = new AtomicBoolean(false);
    @StatsDoc(name="election_attempts", help="The number of auditor election attempts")
    private final Counter electionAttempts;
    private final StatsLogger statsLogger;

    @VisibleForTesting
    public AuditorElector(String bookieId, ServerConfiguration conf) throws ReplicationException.UnavailableException {
        this(bookieId, conf, Auditor.createBookKeeperClientThrowUnavailableException(conf), true);
    }

    public AuditorElector(String bookieId, ServerConfiguration conf, BookKeeper bkc, boolean ownBkc) throws ReplicationException.UnavailableException {
        this(bookieId, conf, bkc, NullStatsLogger.INSTANCE, ownBkc);
    }

    public AuditorElector(final String bookieId, ServerConfiguration conf, BookKeeper bkc, StatsLogger statsLogger, boolean ownBkc) throws ReplicationException.UnavailableException {
        this.bookieId = bookieId;
        this.conf = conf;
        this.bkc = bkc;
        this.ownBkc = ownBkc;
        this.zkc = ((ZkLayoutManager)bkc.getMetadataClientDriver().getLayoutManager()).getZk();
        this.statsLogger = statsLogger;
        this.electionAttempts = statsLogger.getCounter("election_attempts");
        this.basePath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf) + '/' + "underreplication";
        this.electionPath = this.basePath + '/' + ELECTION_ZNODE;
        this.createElectorPath();
        this.executor = Executors.newSingleThreadExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "AuditorElector-" + bookieId);
            }
        });
    }

    private void createMyVote() throws KeeperException, InterruptedException {
        if (null == this.myVote || null == this.zkc.exists(this.myVote, false)) {
            List<ACL> zkAcls = ZkUtils.getACLs(this.conf);
            DataFormats.AuditorVoteFormat.Builder builder = DataFormats.AuditorVoteFormat.newBuilder().setBookieId(this.bookieId);
            this.myVote = this.zkc.create(this.getVotePath("/V_"), TextFormat.printToString(builder.build()).getBytes(StandardCharsets.UTF_8), zkAcls, CreateMode.EPHEMERAL_SEQUENTIAL);
        }
    }

    String getMyVote() {
        return this.myVote;
    }

    private String getVotePath(String vote) {
        return this.electionPath + vote;
    }

    private void createElectorPath() throws ReplicationException.UnavailableException {
        try {
            List<ACL> zkAcls = ZkUtils.getACLs(this.conf);
            if (this.zkc.exists(this.basePath, false) == null) {
                try {
                    this.zkc.create(this.basePath, new byte[0], zkAcls, CreateMode.PERSISTENT);
                }
                catch (KeeperException.NodeExistsException nodeExistsException) {
                    // empty catch block
                }
            }
            if (this.zkc.exists(this.getVotePath(""), false) == null) {
                try {
                    this.zkc.create(this.getVotePath(""), new byte[0], zkAcls, CreateMode.PERSISTENT);
                }
                catch (KeeperException.NodeExistsException nodeExistsException) {}
            }
        }
        catch (KeeperException ke) {
            throw new ReplicationException.UnavailableException("Failed to initialize Auditor Elector", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Failed to initialize Auditor Elector", ie);
        }
    }

    public Future<?> start() {
        this.running.set(true);
        return this.submitElectionTask();
    }

    private void submitShutdownTask() {
        this.executor.submit(new Runnable(){

            @Override
            public void run() {
                if (!AuditorElector.this.running.compareAndSet(true, false)) {
                    return;
                }
                LOG.info("Shutting down AuditorElector");
                if (AuditorElector.this.myVote != null) {
                    try {
                        AuditorElector.this.zkc.delete(AuditorElector.this.myVote, -1);
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        LOG.warn("InterruptedException while deleting myVote: " + AuditorElector.this.myVote, (Throwable)ie);
                    }
                    catch (KeeperException ke) {
                        LOG.error("Exception while deleting myVote:" + AuditorElector.this.myVote, (Throwable)ke);
                    }
                }
            }
        });
    }

    @VisibleForTesting
    Future<?> submitElectionTask() {
        Runnable r = new Runnable(){

            @Override
            public void run() {
                if (!AuditorElector.this.running.get()) {
                    return;
                }
                try {
                    AuditorElector.this.createMyVote();
                    List<String> children = AuditorElector.this.zkc.getChildren(AuditorElector.this.getVotePath(""), false);
                    if (0 >= children.size()) {
                        throw new IllegalArgumentException("Atleast one bookie server should present to elect the Auditor!");
                    }
                    Collections.sort(children, new ElectionComparator());
                    String voteNode = StringUtils.substringAfterLast(AuditorElector.this.myVote, AuditorElector.PATH_SEPARATOR);
                    if (children.get(0).equals(voteNode)) {
                        DataFormats.AuditorVoteFormat.Builder builder = DataFormats.AuditorVoteFormat.newBuilder().setBookieId(AuditorElector.this.bookieId);
                        AuditorElector.this.zkc.setData(AuditorElector.this.getVotePath(""), TextFormat.printToString(builder.build()).getBytes(StandardCharsets.UTF_8), -1);
                        AuditorElector.this.auditor = new Auditor(AuditorElector.this.bookieId, AuditorElector.this.conf, AuditorElector.this.bkc, false, AuditorElector.this.statsLogger);
                        AuditorElector.this.auditor.start();
                    } else {
                        ElectionWatcher electionWatcher = new ElectionWatcher();
                        int myIndex = children.indexOf(voteNode);
                        int prevNodeIndex = myIndex - 1;
                        if (null == AuditorElector.this.zkc.exists(AuditorElector.this.getVotePath(AuditorElector.PATH_SEPARATOR) + children.get(prevNodeIndex), electionWatcher)) {
                            AuditorElector.this.submitElectionTask();
                        }
                        AuditorElector.this.electionAttempts.inc();
                    }
                }
                catch (KeeperException e) {
                    LOG.error("Exception while performing auditor election", (Throwable)e);
                    AuditorElector.this.submitShutdownTask();
                }
                catch (InterruptedException e) {
                    LOG.error("Interrupted while performing auditor election", (Throwable)e);
                    Thread.currentThread().interrupt();
                    AuditorElector.this.submitShutdownTask();
                }
                catch (ReplicationException.UnavailableException e) {
                    LOG.error("Ledger underreplication manager unavailable during election", (Throwable)e);
                    AuditorElector.this.submitShutdownTask();
                }
            }
        };
        return this.executor.submit(r);
    }

    @VisibleForTesting
    Auditor getAuditor() {
        return this.auditor;
    }

    public static BookieId getCurrentAuditor(ServerConfiguration conf, ZooKeeper zk) throws KeeperException, InterruptedException, IOException {
        String electionRoot = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf) + '/' + "underreplication" + '/' + ELECTION_ZNODE;
        List<String> children = zk.getChildren(electionRoot, false);
        Collections.sort(children, new ElectionComparator());
        if (children.size() < 1) {
            return null;
        }
        String ledger = electionRoot + PATH_SEPARATOR + children.get(0);
        byte[] data = zk.getData(ledger, false, null);
        DataFormats.AuditorVoteFormat.Builder builder = DataFormats.AuditorVoteFormat.newBuilder();
        TextFormat.merge(new String(data, StandardCharsets.UTF_8), (Message.Builder)builder);
        DataFormats.AuditorVoteFormat v = builder.build();
        return BookieId.parse(v.getBookieId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() throws InterruptedException {
        AuditorElector auditorElector = this;
        synchronized (auditorElector) {
            if (this.executor.isShutdown()) {
                return;
            }
            this.submitShutdownTask();
            this.executor.shutdown();
        }
        if (this.auditor != null) {
            this.auditor.shutdown();
            this.auditor = null;
        }
        if (this.ownBkc) {
            try {
                this.bkc.close();
            }
            catch (BKException e) {
                LOG.warn("Failed to close bookkeeper client", (Throwable)e);
            }
        }
    }

    public boolean isRunning() {
        if (this.auditor != null) {
            return this.auditor.isRunning();
        }
        return this.running.get();
    }

    public String toString() {
        return "AuditorElector for " + this.bookieId;
    }

    private static class ElectionComparator
    implements Comparator<String>,
    Serializable {
        private ElectionComparator() {
        }

        @Override
        public int compare(String vote1, String vote2) {
            long voteSeqId2;
            long voteSeqId1 = this.getVoteSequenceId(vote1);
            int result = voteSeqId1 < (voteSeqId2 = this.getVoteSequenceId(vote2)) ? -1 : (voteSeqId1 > voteSeqId2 ? 1 : 0);
            return result;
        }

        private long getVoteSequenceId(String vote) {
            String voteId = StringUtils.substringAfter(vote, AuditorElector.VOTE_PREFIX);
            return Long.parseLong(voteId);
        }
    }

    private class ElectionWatcher
    implements Watcher {
        private ElectionWatcher() {
        }

        @Override
        public void process(WatchedEvent event) {
            if (event.getState() == Watcher.Event.KeeperState.Expired) {
                LOG.error("Lost ZK connection, shutting down");
                AuditorElector.this.submitShutdownTask();
            } else if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
                AuditorElector.this.submitElectionTask();
            }
        }
    }
}

