/*
 * Decompiled with CFR 0.152.
 */
package org.apache.distributedlog.admin;

import dlshade.com.google.common.base.Preconditions;
import dlshade.com.google.common.collect.Lists;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import dlshade.org.apache.bookkeeper.common.util.OrderedScheduler;
import dlshade.org.apache.bookkeeper.util.IOUtils;
import dlshade.org.apache.commons.cli.CommandLine;
import dlshade.org.apache.commons.cli.Options;
import dlshade.org.apache.commons.cli.ParseException;
import dlshade.org.apache.zookeeper.KeeperException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.ReadUtils;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClientBuilder;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.common.util.SchedulerUtils;
import org.apache.distributedlog.exceptions.DLIllegalStateException;
import org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.distributedlog.impl.acl.ZKAccessControl;
import org.apache.distributedlog.impl.federated.FederatedZKLogMetadataStore;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
import org.apache.distributedlog.metadata.DLMetadata;
import org.apache.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater;
import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
import org.apache.distributedlog.metadata.MetadataUpdater;
import org.apache.distributedlog.namespace.NamespaceDriver;
import org.apache.distributedlog.thrift.AccessControlEntry;
import org.apache.distributedlog.tools.DistributedLogTool;
import org.apache.distributedlog.tools.Tool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedLogAdmin
extends DistributedLogTool {
    static final Logger LOG = LoggerFactory.getLogger(DistributedLogAdmin.class);
    private static final Comparator<LogSegmentCandidate> LOG_SEGMENT_CANDIDATE_COMPARATOR = new Comparator<LogSegmentCandidate>(){

        @Override
        public int compare(LogSegmentCandidate o1, LogSegmentCandidate o2) {
            return LogSegmentMetadata.COMPARATOR.compare(o1.metadata, o2.metadata);
        }
    };

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void fixInprogressSegmentWithLowerSequenceNumber(Namespace namespace, MetadataUpdater metadataUpdater, String streamName, boolean verbose, boolean interactive) throws Exception {
        try (DistributedLogManager dlm = namespace.openLog(streamName);){
            List<LogSegmentMetadata> segments = dlm.getLogSegments();
            if (verbose) {
                System.out.println("LogSegments for " + streamName + " : ");
                for (LogSegmentMetadata segment : segments) {
                    System.out.println(segment.getLogSegmentSequenceNumber() + "\t: " + segment);
                }
            }
            LOG.info("Get log segments for {} : {}", (Object)streamName, segments);
            long maxCompletedLogSegmentSequenceNumber = -1L;
            LogSegmentMetadata inprogressSegment = null;
            for (LogSegmentMetadata segment : segments) {
                if (!segment.isInProgress()) {
                    maxCompletedLogSegmentSequenceNumber = Math.max(maxCompletedLogSegmentSequenceNumber, segment.getLogSegmentSequenceNumber());
                    continue;
                }
                if (null != inprogressSegment) {
                    throw new DLIllegalStateException("Multiple inprogress segments found for stream " + streamName + " : " + segments);
                }
                inprogressSegment = segment;
            }
            if (null == inprogressSegment || inprogressSegment.getLogSegmentSequenceNumber() > maxCompletedLogSegmentSequenceNumber) {
                return;
            }
            long newLogSegmentSequenceNumber = maxCompletedLogSegmentSequenceNumber + 1L;
            if (interactive && !IOUtils.confirmPrompt("Confirm to fix (Y/N), Ctrl+C to break : ")) {
                return;
            }
            LogSegmentMetadata newSegment = FutureUtils.result(metadataUpdater.changeSequenceNumber(inprogressSegment, newLogSegmentSequenceNumber));
            LOG.info("Fixed {} : {} -> {} ", new Object[]{streamName, inprogressSegment, newSegment});
            if (verbose) {
                System.out.println("Fixed " + streamName + " : " + inprogressSegment.getZNodeName() + " -> " + newSegment.getZNodeName());
                System.out.println("\t old: " + inprogressSegment);
                System.out.println("\t new: " + newSegment);
                System.out.println();
            }
        }
    }

    public static void checkAndRepairDLNamespace(URI uri, Namespace namespace, MetadataUpdater metadataUpdater, OrderedScheduler scheduler, boolean verbose, boolean interactive) throws Exception {
        DistributedLogAdmin.checkAndRepairDLNamespace(uri, namespace, metadataUpdater, scheduler, verbose, interactive, 1);
    }

    public static void checkAndRepairDLNamespace(URI uri, Namespace namespace, MetadataUpdater metadataUpdater, OrderedScheduler scheduler, boolean verbose, boolean interactive, int concurrency) throws Exception {
        Preconditions.checkArgument(concurrency > 0, "Invalid concurrency " + concurrency + " found.");
        Iterator<String> streamsIter = namespace.getLogs();
        ArrayList<String> streams = Lists.newArrayList();
        while (streamsIter.hasNext()) {
            streams.add(streamsIter.next());
        }
        if (verbose) {
            System.out.println("- 0. checking streams under " + uri);
        }
        if (streams.size() == 0) {
            System.out.println("+ 0. nothing to check. quit.");
            return;
        }
        Map<String, StreamCandidate> streamCandidates = DistributedLogAdmin.checkStreams(namespace, streams, scheduler, concurrency);
        if (verbose) {
            System.out.println("+ 0. " + streamCandidates.size() + " corrupted streams found.");
        }
        if (interactive && !IOUtils.confirmPrompt("Do you want to fix all " + streamCandidates.size() + " corrupted streams (Y/N) : ")) {
            return;
        }
        if (verbose) {
            System.out.println("- 1. repairing " + streamCandidates.size() + " corrupted streams.");
        }
        for (StreamCandidate candidate : streamCandidates.values()) {
            if (DistributedLogAdmin.repairStream(metadataUpdater, candidate, verbose, interactive)) continue;
            if (verbose) {
                System.out.println("* 1. aborted repairing corrupted streams.");
            }
            return;
        }
        if (verbose) {
            System.out.println("+ 1. repaired " + streamCandidates.size() + " corrupted streams.");
        }
    }

    private static Map<String, StreamCandidate> checkStreams(final Namespace namespace, Collection<String> streams, final OrderedScheduler scheduler, int concurrency) throws IOException {
        final LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>();
        streamQueue.addAll(streams);
        final ConcurrentSkipListMap<String, StreamCandidate> candidateMap = new ConcurrentSkipListMap<String, StreamCandidate>();
        final AtomicInteger numPendingStreams = new AtomicInteger(streams.size());
        final CountDownLatch doneLatch = new CountDownLatch(1);
        Runnable checkRunnable = new Runnable(){

            @Override
            public void run() {
                while (!streamQueue.isEmpty()) {
                    StreamCandidate candidate;
                    String stream;
                    try {
                        stream = (String)streamQueue.take();
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                    try {
                        LOG.info("Checking stream {}.", (Object)stream);
                        candidate = DistributedLogAdmin.checkStream(namespace, stream, scheduler);
                        LOG.info("Checked stream {} - {}.", (Object)stream, (Object)candidate);
                    }
                    catch (Throwable e) {
                        LOG.error("Error on checking stream {} : ", (Object)stream, (Object)e);
                        doneLatch.countDown();
                        break;
                    }
                    if (null != candidate) {
                        candidateMap.put(stream, candidate);
                    }
                    if (numPendingStreams.decrementAndGet() != 0) continue;
                    doneLatch.countDown();
                }
            }
        };
        Thread[] threads = new Thread[concurrency];
        for (int i = 0; i < concurrency; ++i) {
            threads[i] = new Thread(checkRunnable, "check-thread-" + i);
            threads[i].start();
        }
        try {
            doneLatch.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        if (numPendingStreams.get() != 0) {
            throw new IOException(numPendingStreams.get() + " streams left w/o checked");
        }
        for (int i = 0; i < concurrency; ++i) {
            threads[i].interrupt();
            try {
                threads[i].join();
                continue;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        return candidateMap;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static StreamCandidate checkStream(Namespace namespace, String streamName, OrderedScheduler scheduler) throws IOException {
        try (DistributedLogManager dlm = namespace.openLog(streamName);){
            Object object;
            List<LogSegmentCandidate> segmentCandidates;
            List<LogSegmentMetadata> segments = dlm.getLogSegments();
            if (segments.isEmpty()) {
                StreamCandidate streamCandidate = null;
                return streamCandidate;
            }
            ArrayList futures = new ArrayList(segments.size());
            for (LogSegmentMetadata segment : segments) {
                futures.add(DistributedLogAdmin.checkLogSegment(namespace, streamName, segment, scheduler));
            }
            try {
                segmentCandidates = FutureUtils.result(FutureUtils.collect(futures));
            }
            catch (Exception e) {
                throw new IOException("Failed on checking stream " + streamName, e);
            }
            StreamCandidate streamCandidate = new StreamCandidate(streamName);
            for (LogSegmentCandidate segmentCandidate : segmentCandidates) {
                if (null == segmentCandidate) continue;
                streamCandidate.addLogSegmentCandidate(segmentCandidate);
            }
            if (streamCandidate.segmentCandidates.isEmpty()) {
                object = null;
                return object;
            }
            object = streamCandidate;
            return object;
        }
    }

    private static CompletableFuture<LogSegmentCandidate> checkLogSegment(Namespace namespace, String streamName, final LogSegmentMetadata metadata, OrderedScheduler scheduler) {
        if (metadata.isInProgress()) {
            return FutureUtils.value(null);
        }
        LogSegmentEntryStore entryStore = namespace.getNamespaceDriver().getLogSegmentEntryStore(NamespaceDriver.Role.READER);
        return ReadUtils.asyncReadLastRecord(streamName, metadata, true, false, true, 4, 16, new AtomicInteger(0), scheduler, entryStore).thenApply(new Function<LogRecordWithDLSN, LogSegmentCandidate>(){

            @Override
            public LogSegmentCandidate apply(LogRecordWithDLSN record) {
                if (!(null == record || record.getDlsn().compareTo(metadata.getLastDLSN()) <= 0 && record.getTransactionId() <= metadata.getLastTxId() && metadata.isRecordPositionWithinSegmentScope(record))) {
                    return new LogSegmentCandidate(metadata, record);
                }
                return null;
            }
        });
    }

    private static boolean repairStream(MetadataUpdater metadataUpdater, StreamCandidate streamCandidate, boolean verbose, boolean interactive) throws Exception {
        if (verbose) {
            System.out.println("Stream " + streamCandidate.streamName + " : ");
            for (LogSegmentCandidate segmentCandidate : streamCandidate.segmentCandidates) {
                System.out.println("  " + segmentCandidate.metadata.getLogSegmentSequenceNumber() + " : metadata = " + segmentCandidate.metadata + ", last dlsn = " + segmentCandidate.lastRecord.getDlsn());
            }
            System.out.println("-------------------------------------------");
        }
        if (interactive && !IOUtils.confirmPrompt("Do you want to fix the stream " + streamCandidate.streamName + " (Y/N) : ")) {
            return false;
        }
        for (LogSegmentCandidate segmentCandidate : streamCandidate.segmentCandidates) {
            LogSegmentMetadata newMetadata = FutureUtils.result(metadataUpdater.updateLastRecord(segmentCandidate.metadata, segmentCandidate.lastRecord));
            if (!verbose) continue;
            System.out.println(" Fixed segment " + segmentCandidate.metadata.getLogSegmentSequenceNumber() + " : ");
            System.out.println("    old metadata : " + segmentCandidate.metadata);
            System.out.println("    new metadata : " + newMetadata);
        }
        if (verbose) {
            System.out.println("-------------------------------------------");
        }
        return true;
    }

    public DistributedLogAdmin() {
        this.commands.clear();
        this.addCommand(new Tool.HelpCommand(this));
        this.addCommand(new BindCommand());
        this.addCommand(new UnbindCommand());
        this.addCommand(new RepairSeqNoCommand());
        this.addCommand(new DLCKCommand());
        this.addCommand(new SetDefaultACLCommand());
        this.addCommand(new SetStreamACLCommand());
        this.addCommand(new DeleteStreamACLCommand());
    }

    @Override
    protected String getName() {
        return "dlog_admin";
    }

    static /* synthetic */ Comparator access$000() {
        return LOG_SEGMENT_CANDIDATE_COMPARATOR;
    }

    static abstract class SetACLCommand
    extends DistributedLogTool.PerDLCommand {
        boolean denyWrite = false;
        boolean denyTruncate = false;
        boolean denyDelete = false;
        boolean denyAcquire = false;
        boolean denyRelease = false;

        protected SetACLCommand(String name, String description) {
            super(name, description);
            this.options.addOption("dw", "deny-write", false, "Deny write/bulkWrite requests");
            this.options.addOption("dt", "deny-truncate", false, "Deny truncate requests");
            this.options.addOption("dd", "deny-delete", false, "Deny delete requests");
            this.options.addOption("da", "deny-acquire", false, "Deny acquire requests");
            this.options.addOption("dr", "deny-release", false, "Deny release requests");
        }

        @Override
        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
            super.parseCommandLine(cmdline);
            this.denyWrite = cmdline.hasOption("dw");
            this.denyTruncate = cmdline.hasOption("dt");
            this.denyDelete = cmdline.hasOption("dd");
            this.denyAcquire = cmdline.hasOption("da");
            this.denyRelease = cmdline.hasOption("dr");
        }

        protected abstract String getZKPath(String var1);

        protected ZKAccessControl getZKAccessControl(ZooKeeperClient zkc, String zkPath) throws Exception {
            ZKAccessControl accessControl;
            try {
                accessControl = FutureUtils.result(ZKAccessControl.read(zkc, zkPath, null));
            }
            catch (KeeperException.NoNodeException nne) {
                accessControl = new ZKAccessControl(new AccessControlEntry(), zkPath);
            }
            return accessControl;
        }

        protected void setZKAccessControl(ZooKeeperClient zkc, ZKAccessControl accessControl) throws Exception {
            String zkPath = accessControl.getZKPath();
            if (null == zkc.get().exists(zkPath, false)) {
                accessControl.create(zkc);
            } else {
                accessControl.update(zkc);
            }
        }

        @Override
        protected int runCmd() throws Exception {
            BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(this.getZooKeeperClient(), this.getUri());
            if (null == bkdlConfig.getACLRootPath()) {
                System.err.println("ACL isn't enabled for namespace " + this.getUri());
                return -1;
            }
            String zkPath = this.getZKPath(this.getUri().getPath() + "/" + bkdlConfig.getACLRootPath());
            ZKAccessControl accessControl = this.getZKAccessControl(this.getZooKeeperClient(), zkPath);
            AccessControlEntry acl = accessControl.getAccessControlEntry();
            acl.setDenyWrite(this.denyWrite);
            acl.setDenyTruncate(this.denyTruncate);
            acl.setDenyDelete(this.denyDelete);
            acl.setDenyAcquire(this.denyAcquire);
            acl.setDenyRelease(this.denyRelease);
            this.setZKAccessControl(this.getZooKeeperClient(), accessControl);
            return 0;
        }
    }

    static class SetDefaultACLCommand
    extends SetACLCommand {
        SetDefaultACLCommand() {
            super("set_default_acl", "Set Default ACL for a namespace");
        }

        @Override
        protected String getZKPath(String zkRootPath) {
            return zkRootPath;
        }

        @Override
        protected String getUsage() {
            return "set_default_acl [options]";
        }
    }

    static class SetStreamACLCommand
    extends SetACLCommand {
        String stream = null;

        SetStreamACLCommand() {
            super("set_stream_acl", "Set Default ACL for a given stream");
            this.options.addOption("s", "stream", true, "Stream to set ACL");
        }

        @Override
        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
            super.parseCommandLine(cmdline);
            if (!cmdline.hasOption("s")) {
                throw new ParseException("No stream to set ACL");
            }
            this.stream = cmdline.getOptionValue("s");
        }

        @Override
        protected String getZKPath(String zkRootPath) {
            return zkRootPath + "/" + this.stream;
        }

        @Override
        protected String getUsage() {
            return "set_stream_acl [options]";
        }
    }

    static class DeleteStreamACLCommand
    extends DistributedLogTool.PerDLCommand {
        String stream = null;

        DeleteStreamACLCommand() {
            super("delete_stream_acl", "Delete ACL for a given stream");
            this.options.addOption("s", "stream", true, "Stream to set ACL");
        }

        @Override
        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
            super.parseCommandLine(cmdline);
            if (!cmdline.hasOption("s")) {
                throw new ParseException("No stream to set ACL");
            }
            this.stream = cmdline.getOptionValue("s");
        }

        @Override
        protected int runCmd() throws Exception {
            BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(this.getZooKeeperClient(), this.getUri());
            if (null == bkdlConfig.getACLRootPath()) {
                System.err.println("ACL isn't enabled for namespace " + this.getUri());
                return -1;
            }
            String zkPath = this.getUri() + "/" + bkdlConfig.getACLRootPath() + "/" + this.stream;
            ZKAccessControl.delete(this.getZooKeeperClient(), zkPath);
            return 0;
        }

        @Override
        protected String getUsage() {
            return null;
        }
    }

    static class DLCKCommand
    extends DistributedLogTool.PerDLCommand {
        boolean dryrun = false;
        boolean verbose = false;
        int concurrency = 1;

        DLCKCommand() {
            super("dlck", "Check and repair a distributedlog namespace");
            this.options.addOption("d", "dryrun", false, "Dry run without repairing");
            this.options.addOption("v", "verbose", false, "Print verbose messages");
            this.options.addOption("cy", "concurrency", true, "Concurrency on checking streams");
        }

        @Override
        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
            super.parseCommandLine(cmdline);
            this.dryrun = cmdline.hasOption("d");
            this.verbose = cmdline.hasOption("v");
            if (cmdline.hasOption("cy")) {
                try {
                    this.concurrency = Integer.parseInt(cmdline.getOptionValue("cy"));
                }
                catch (NumberFormatException nfe) {
                    throw new ParseException("Invalid concurrency value : " + cmdline.getOptionValue("cy"));
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected int runCmd() throws Exception {
            MetadataUpdater metadataUpdater = this.dryrun ? new DryrunLogSegmentMetadataStoreUpdater(this.getConf(), this.getLogSegmentMetadataStore()) : LogSegmentMetadataStoreUpdater.createMetadataUpdater(this.getConf(), this.getLogSegmentMetadataStore());
            OrderedScheduler scheduler = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().name("dlck-scheduler").numThreads(Runtime.getRuntime().availableProcessors()).build();
            ExecutorService executorService = Executors.newCachedThreadPool();
            try {
                DistributedLogAdmin.checkAndRepairDLNamespace(this.getUri(), this.getNamespace(), metadataUpdater, scheduler, this.verbose, !this.getForce(), this.concurrency);
            }
            finally {
                SchedulerUtils.shutdownScheduler(executorService, 5L, TimeUnit.MINUTES);
            }
            return 0;
        }

        @Override
        protected String getUsage() {
            return "dlck [options]";
        }
    }

    static class RepairSeqNoCommand
    extends DistributedLogTool.PerDLCommand {
        boolean dryrun = false;
        boolean verbose = false;
        final List<String> streams = new ArrayList<String>();

        RepairSeqNoCommand() {
            super("repairseqno", "Repair a stream whose inprogress log segment has lower sequence number.");
            this.options.addOption("d", "dryrun", false, "Dry run without repairing");
            this.options.addOption("l", "list", true, "List of streams to repair, separated by comma");
            this.options.addOption("v", "verbose", false, "Print verbose messages");
        }

        @Override
        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
            super.parseCommandLine(cmdline);
            this.dryrun = cmdline.hasOption("d");
            this.verbose = cmdline.hasOption("v");
            boolean bl = this.force = !this.dryrun && cmdline.hasOption("f");
            if (!cmdline.hasOption("l")) {
                throw new ParseException("No streams provided to repair");
            }
            String streamsList = cmdline.getOptionValue("l");
            Collections.addAll(this.streams, streamsList.split(","));
        }

        @Override
        protected int runCmd() throws Exception {
            MetadataUpdater metadataUpdater = this.dryrun ? new DryrunLogSegmentMetadataStoreUpdater(this.getConf(), this.getLogSegmentMetadataStore()) : LogSegmentMetadataStoreUpdater.createMetadataUpdater(this.getConf(), this.getLogSegmentMetadataStore());
            System.out.println("List of streams : ");
            System.out.println(this.streams);
            if (!IOUtils.confirmPrompt("Do you want to repair all these streams (Y/N):")) {
                return -1;
            }
            for (String stream : this.streams) {
                DistributedLogAdmin.fixInprogressSegmentWithLowerSequenceNumber(this.getNamespace(), metadataUpdater, stream, this.verbose, !this.getForce());
            }
            return 0;
        }

        @Override
        protected String getUsage() {
            return "repairseqno [options]";
        }
    }

    class BindCommand
    extends Tool.OptsCommand {
        Options options;

        BindCommand() {
            super("bind", "bind the bookkeeper environment settings for a given distributedlog instance.");
            this.options = new Options();
            this.options.addOption("l", "bkLedgers", true, "ZooKeeper ledgers path for bookkeeper instance.");
            this.options.addOption("s", "bkZkServers", true, "ZooKeeper servers used for bookkeeper for writers.");
            this.options.addOption("bkzr", "bkZkServersForReader", true, "ZooKeeper servers used for bookkeeper for readers.");
            this.options.addOption("dlzw", "dlZkServersForWriter", true, "ZooKeeper servers used for distributedlog for writers.");
            this.options.addOption("dlzr", "dlZkServersForReader", true, "ZooKeeper servers used for distributedlog for readers.");
            this.options.addOption("i", "sanityCheckTxnID", true, "Flag to sanity check highest txn id.");
            this.options.addOption("r", "encodeRegionID", true, "Flag to encode region id.");
            this.options.addOption("seqno", "firstLogSegmentSeqNo", true, "The first log segment sequence number to use after upgrade");
            this.options.addOption("fns", "federatedNamespace", false, "Flag to turn a namespace to federated namespace");
            this.options.addOption("f", "force", false, "Force binding without prompt.");
            this.options.addOption("c", "creation", false, "Whether is it a creation binding.");
            this.options.addOption("q", "query", false, "Query the bookkeeper bindings");
        }

        @Override
        protected Options getOptions() {
            return this.options;
        }

        @Override
        protected String getUsage() {
            return "bind [options] <distributedlog uri>";
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected int runCmd(CommandLine cmdline) throws Exception {
            boolean isQuery = cmdline.hasOption("q");
            if (!(isQuery || cmdline.hasOption("l") && cmdline.hasOption("s"))) {
                System.err.println("Error: Neither zkServers nor ledgersPath specified for bookkeeper environment.");
                this.printUsage();
                return -1;
            }
            String[] args = cmdline.getArgs();
            if (args.length <= 0) {
                System.err.println("No distributedlog uri specified.");
                this.printUsage();
                return -1;
            }
            boolean force = cmdline.hasOption("f");
            boolean creation = cmdline.hasOption("c");
            String bkLedgersPath = cmdline.getOptionValue("l");
            String bkZkServersForWriter = cmdline.getOptionValue("s");
            boolean sanityCheckTxnID = !cmdline.hasOption("i") || Boolean.parseBoolean(cmdline.getOptionValue("i"));
            boolean encodeRegionID = cmdline.hasOption("r") && Boolean.parseBoolean(cmdline.getOptionValue("r"));
            String bkZkServersForReader = cmdline.hasOption("bkzr") ? cmdline.getOptionValue("bkzr") : bkZkServersForWriter;
            URI uri = URI.create(args[0]);
            String dlZkServersForWriter = cmdline.hasOption("dlzw") ? cmdline.getOptionValue("dlzw") : BKNamespaceDriver.getZKServersFromDLUri(uri);
            String dlZkServersForReader = cmdline.hasOption("dlzr") ? cmdline.getOptionValue("dlzr") : dlZkServersForWriter;
            try (ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder().uri(uri).zkAclId(null).sessionTimeoutMs(10000).build();){
                BKDLConfig bkdlConfig;
                BKDLConfig newBKDLConfig = new BKDLConfig(dlZkServersForWriter, dlZkServersForReader, bkZkServersForWriter, bkZkServersForReader, bkLedgersPath).setSanityCheckTxnID(sanityCheckTxnID).setEncodeRegionID(encodeRegionID);
                if (cmdline.hasOption("seqno")) {
                    newBKDLConfig = newBKDLConfig.setFirstLogSegmentSeqNo(Long.parseLong(cmdline.getOptionValue("seqno")));
                }
                if (cmdline.hasOption("fns")) {
                    newBKDLConfig = newBKDLConfig.setFederatedNamespace(true);
                }
                try {
                    bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri);
                }
                catch (IOException ie) {
                    bkdlConfig = null;
                }
                if (null == bkdlConfig) {
                    System.out.println("No bookkeeper is bound to " + uri);
                } else {
                    System.out.println("There is bookkeeper bound to " + uri + " : ");
                    System.out.println("");
                    System.out.println(bkdlConfig.toString());
                    System.out.println("");
                    if (!isQuery) {
                        if (newBKDLConfig.equals(bkdlConfig)) {
                            System.out.println("No bookkeeper binding needs to be updated. Quit.");
                            int ie = 0;
                            return ie;
                        }
                        if (!newBKDLConfig.isFederatedNamespace() && bkdlConfig.isFederatedNamespace()) {
                            System.out.println("You can't turn a federated namespace back to non-federated.");
                            int ie = 0;
                            return ie;
                        }
                        if (!force && !IOUtils.confirmPrompt("Do you want to bind " + uri + " with new bookkeeper instance :\n" + newBKDLConfig)) {
                            int ie = 0;
                            return ie;
                        }
                    }
                }
                if (isQuery) {
                    System.out.println("Done.");
                    int ie = 0;
                    return ie;
                }
                DLMetadata dlMetadata = DLMetadata.create(newBKDLConfig);
                if (creation) {
                    try {
                        dlMetadata.create(uri);
                        System.out.println("Created binding on " + uri + ".");
                    }
                    catch (IOException ie) {
                        System.err.println("Failed to create binding on " + uri + " : " + ie.getMessage());
                    }
                } else {
                    try {
                        dlMetadata.update(uri);
                        System.out.println("Updated binding on " + uri + " : ");
                        System.out.println("");
                        System.out.println(newBKDLConfig.toString());
                        System.out.println("");
                    }
                    catch (IOException ie) {
                        System.err.println("Failed to update binding on " + uri + " : " + ie.getMessage());
                    }
                }
                if (newBKDLConfig.isFederatedNamespace()) {
                    try {
                        FederatedZKLogMetadataStore.createFederatedNamespace(uri, zkc);
                    }
                    catch (KeeperException.NodeExistsException nodeExistsException) {
                        // empty catch block
                    }
                }
                int n = 0;
                return n;
            }
        }
    }

    class UnbindCommand
    extends Tool.OptsCommand {
        Options options;

        UnbindCommand() {
            super("unbind", "unbind the bookkeeper environment bound for a given distributedlog instance.");
            this.options = new Options();
            this.options.addOption("f", "force", false, "Force unbinding without prompt.");
        }

        @Override
        protected Options getOptions() {
            return this.options;
        }

        @Override
        protected String getUsage() {
            return "unbind [options] <distributedlog uri>";
        }

        @Override
        protected int runCmd(CommandLine cmdline) throws Exception {
            BKDLConfig bkdlConfig;
            String[] args = cmdline.getArgs();
            if (args.length <= 0) {
                System.err.println("No distributedlog uri specified.");
                this.printUsage();
                return -1;
            }
            boolean force = cmdline.hasOption("f");
            URI uri = URI.create(args[0]);
            ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder().uri(uri).zkAclId(null).sessionTimeoutMs(10000).build();
            try {
                bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri);
            }
            catch (IOException ie) {
                bkdlConfig = null;
            }
            if (null == bkdlConfig) {
                System.out.println("No bookkeeper is bound to " + uri);
                return 0;
            }
            System.out.println("There is bookkeeper bound to " + uri + " : ");
            System.out.println("");
            System.out.println(bkdlConfig.toString());
            System.out.println("");
            if (!force && !IOUtils.confirmPrompt("Do you want to unbind " + uri + " :\n")) {
                return 0;
            }
            DLMetadata.unbind(uri);
            System.out.println("Unbound on " + uri + ".");
            return 0;
        }
    }

    private static class StreamCandidate {
        final String streamName;
        final SortedSet<LogSegmentCandidate> segmentCandidates = new TreeSet<LogSegmentCandidate>(DistributedLogAdmin.access$000());

        StreamCandidate(String streamName) {
            this.streamName = streamName;
        }

        synchronized void addLogSegmentCandidate(LogSegmentCandidate segmentCandidate) {
            this.segmentCandidates.add(segmentCandidate);
        }

        public String toString() {
            return "StreamCandidate[ name = " + this.streamName + ", segments = " + this.segmentCandidates + " ]";
        }
    }

    private static class LogSegmentCandidate {
        final LogSegmentMetadata metadata;
        final LogRecordWithDLSN lastRecord;

        LogSegmentCandidate(LogSegmentMetadata metadata, LogRecordWithDLSN lastRecord) {
            this.metadata = metadata;
            this.lastRecord = lastRecord;
        }

        public String toString() {
            return "LogSegmentCandidate[ metadata = " + this.metadata + ", last record = " + this.lastRecord + " ]";
        }
    }
}

