/*
 * Decompiled with CFR 0.152.
 */
package dlshade.org.apache.bookkeeper.meta;

import dlshade.com.google.common.base.Joiner;
import dlshade.com.google.protobuf.InvalidProtocolBufferException;
import dlshade.com.google.protobuf.Message;
import dlshade.com.google.protobuf.ProtocolStringList;
import dlshade.com.google.protobuf.TextFormat;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import dlshade.org.apache.bookkeeper.conf.AbstractConfiguration;
import dlshade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import dlshade.org.apache.bookkeeper.meta.UnderreplicatedLedger;
import dlshade.org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import dlshade.org.apache.bookkeeper.net.DNS;
import dlshade.org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import dlshade.org.apache.bookkeeper.proto.DataFormats;
import dlshade.org.apache.bookkeeper.replication.ReplicationEnableCb;
import dlshade.org.apache.bookkeeper.replication.ReplicationException;
import dlshade.org.apache.bookkeeper.util.SubTreeCache;
import dlshade.org.apache.bookkeeper.util.ZkUtils;
import dlshade.org.apache.zookeeper.CreateMode;
import dlshade.org.apache.zookeeper.KeeperException;
import dlshade.org.apache.zookeeper.WatchedEvent;
import dlshade.org.apache.zookeeper.Watcher;
import dlshade.org.apache.zookeeper.ZooDefs;
import dlshade.org.apache.zookeeper.ZooKeeper;
import dlshade.org.apache.zookeeper.data.ACL;
import dlshade.org.apache.zookeeper.data.Stat;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZkLedgerUnderreplicationManager
implements LedgerUnderreplicationManager {
    static final Logger LOG = LoggerFactory.getLogger(ZkLedgerUnderreplicationManager.class);
    static final String LAYOUT = "BASIC";
    static final int LAYOUT_VERSION = 1;
    private static final byte[] LOCK_DATA = ZkLedgerUnderreplicationManager.getLockData();
    private final Map<Long, Lock> heldLocks = new ConcurrentHashMap<Long, Lock>();
    private final Pattern idExtractionPattern;
    private final String rootPath;
    private final String basePath;
    private final String urLedgerPath;
    private final String urLockPath;
    private final String layoutZNode;
    private final AbstractConfiguration conf;
    private final String lostBookieRecoveryDelayZnode;
    private final String checkAllLedgersCtimeZnode;
    private final String placementPolicyCheckCtimeZnode;
    private final String replicasCheckCtimeZnode;
    private final ZooKeeper zkc;
    private final SubTreeCache subTreeCache;

    public ZkLedgerUnderreplicationManager(AbstractConfiguration conf, final ZooKeeper zkc) throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
        this.conf = conf;
        this.rootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
        this.basePath = ZkLedgerUnderreplicationManager.getBasePath(this.rootPath);
        this.layoutZNode = this.basePath + '/' + "LAYOUT";
        this.urLedgerPath = this.basePath + "/ledgers";
        this.urLockPath = this.basePath + '/' + "locks";
        this.lostBookieRecoveryDelayZnode = this.basePath + '/' + "lostBookieRecoveryDelay";
        this.checkAllLedgersCtimeZnode = this.basePath + '/' + "checkallledgersctime";
        this.placementPolicyCheckCtimeZnode = this.basePath + '/' + "placementpolicycheckctime";
        this.replicasCheckCtimeZnode = this.basePath + '/' + "replicascheckctime";
        this.idExtractionPattern = Pattern.compile("urL(\\d+)$");
        this.zkc = zkc;
        this.subTreeCache = new SubTreeCache(new SubTreeCache.TreeProvider(){

            @Override
            public List<String> getChildren(String path, Watcher watcher) throws InterruptedException, KeeperException {
                return zkc.getChildren(path, watcher);
            }
        });
        this.checkLayout();
    }

    public static String getBasePath(String rootPath) {
        return String.format("%s/%s", rootPath, "underreplication");
    }

    public static String getUrLockPath(String rootPath) {
        return String.format("%s/%s", ZkLedgerUnderreplicationManager.getBasePath(rootPath), "locks");
    }

    public static byte[] getLockData() {
        DataFormats.LockDataFormat.Builder lockDataBuilder = DataFormats.LockDataFormat.newBuilder();
        try {
            lockDataBuilder.setBookieId(DNS.getDefaultHost("default"));
        }
        catch (UnknownHostException unknownHostException) {
            // empty catch block
        }
        return lockDataBuilder.build().toString().getBytes(StandardCharsets.UTF_8);
    }

    private void checkLayout() throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
        List<ACL> zkAcls = ZkUtils.getACLs(this.conf);
        if (this.zkc.exists(this.basePath, false) == null) {
            try {
                this.zkc.create(this.basePath, new byte[0], zkAcls, CreateMode.PERSISTENT);
            }
            catch (KeeperException.NodeExistsException nodeExistsException) {
                // empty catch block
            }
        }
        while (this.zkc.exists(this.layoutZNode, false) == null) {
            DataFormats.LedgerRereplicationLayoutFormat.Builder builder = DataFormats.LedgerRereplicationLayoutFormat.newBuilder();
            builder.setType(LAYOUT).setVersion(1);
            try {
                this.zkc.create(this.layoutZNode, builder.build().toString().getBytes(StandardCharsets.UTF_8), zkAcls, CreateMode.PERSISTENT);
            }
            catch (KeeperException.NodeExistsException nne) {}
        }
        byte[] layoutData = this.zkc.getData(this.layoutZNode, false, null);
        DataFormats.LedgerRereplicationLayoutFormat.Builder builder = DataFormats.LedgerRereplicationLayoutFormat.newBuilder();
        try {
            TextFormat.merge(new String(layoutData, StandardCharsets.UTF_8), (Message.Builder)builder);
            DataFormats.LedgerRereplicationLayoutFormat layout = builder.build();
            if (!layout.getType().equals(LAYOUT) || layout.getVersion() != 1) {
                throw new ReplicationException.CompatibilityException("Incompatible layout found (BASIC:1)");
            }
        }
        catch (TextFormat.ParseException pe) {
            throw new ReplicationException.CompatibilityException("Invalid data found", pe);
        }
        if (this.zkc.exists(this.urLedgerPath, false) == null) {
            try {
                this.zkc.create(this.urLedgerPath, new byte[0], zkAcls, CreateMode.PERSISTENT);
            }
            catch (KeeperException.NodeExistsException nodeExistsException) {
                // empty catch block
            }
        }
        if (this.zkc.exists(this.urLockPath, false) == null) {
            try {
                this.zkc.create(this.urLockPath, new byte[0], zkAcls, CreateMode.PERSISTENT);
            }
            catch (KeeperException.NodeExistsException nodeExistsException) {
                // empty catch block
            }
        }
    }

    private long getLedgerId(String path) throws NumberFormatException {
        Matcher m = this.idExtractionPattern.matcher(path);
        if (m.find()) {
            return Long.parseLong(m.group(1));
        }
        throw new NumberFormatException("Couldn't find ledgerid in path");
    }

    public static String getParentZnodePath(String base, long ledgerId) {
        String subdir1 = String.format("%04x", ledgerId >> 48 & 0xFFFFL);
        String subdir2 = String.format("%04x", ledgerId >> 32 & 0xFFFFL);
        String subdir3 = String.format("%04x", ledgerId >> 16 & 0xFFFFL);
        String subdir4 = String.format("%04x", ledgerId & 0xFFFFL);
        return String.format("%s/%s/%s/%s/%s", base, subdir1, subdir2, subdir3, subdir4);
    }

    public static String getUrLedgerZnode(String base, long ledgerId) {
        return String.format("%s/urL%010d", ZkLedgerUnderreplicationManager.getParentZnodePath(base, ledgerId), ledgerId);
    }

    public static String getUrLedgerLockZnode(String base, long ledgerId) {
        return String.format("%s/urL%010d", base, ledgerId);
    }

    private String getUrLedgerZnode(long ledgerId) {
        return ZkLedgerUnderreplicationManager.getUrLedgerZnode(this.urLedgerPath, ledgerId);
    }

    @Override
    public UnderreplicatedLedger getLedgerUnreplicationInfo(long ledgerId) throws ReplicationException.UnavailableException {
        try {
            String znode = this.getUrLedgerZnode(ledgerId);
            DataFormats.UnderreplicatedLedgerFormat.Builder builder = DataFormats.UnderreplicatedLedgerFormat.newBuilder();
            byte[] data = null;
            try {
                data = this.zkc.getData(znode, false, null);
            }
            catch (KeeperException.NoNodeException nne) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Ledger: {} is not marked underreplicated", (Object)ledgerId);
                }
                return null;
            }
            TextFormat.merge(new String(data, StandardCharsets.UTF_8), (Message.Builder)builder);
            DataFormats.UnderreplicatedLedgerFormat underreplicatedLedgerFormat = builder.build();
            UnderreplicatedLedger underreplicatedLedger = new UnderreplicatedLedger(ledgerId);
            ProtocolStringList replicaList = underreplicatedLedgerFormat.getReplicaList();
            long ctime = underreplicatedLedgerFormat.hasCtime() ? underreplicatedLedgerFormat.getCtime() : -1L;
            underreplicatedLedger.setCtime(ctime);
            underreplicatedLedger.setReplicaList(replicaList);
            return underreplicatedLedger;
        }
        catch (KeeperException ke) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
        }
        catch (TextFormat.ParseException pe) {
            throw new ReplicationException.UnavailableException("Error parsing proto message", pe);
        }
    }

    @Override
    public CompletableFuture<Void> markLedgerUnderreplicatedAsync(long ledgerId, Collection<String> missingReplicas) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("markLedgerUnderreplicated(ledgerId={}, missingReplica={})", (Object)ledgerId, missingReplicas);
        }
        List<ACL> zkAcls = ZkUtils.getACLs(this.conf);
        String znode = this.getUrLedgerZnode(ledgerId);
        CompletableFuture<Void> createFuture = new CompletableFuture<Void>();
        this.tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, createFuture);
        return createFuture;
    }

    private void tryMarkLedgerUnderreplicatedAsync(String znode, Collection<String> missingReplicas, List<ACL> zkAcls, CompletableFuture<Void> finalFuture) {
        DataFormats.UnderreplicatedLedgerFormat.Builder builder = DataFormats.UnderreplicatedLedgerFormat.newBuilder();
        if (this.conf.getStoreSystemTimeAsLedgerUnderreplicatedMarkTime()) {
            builder.setCtime(System.currentTimeMillis());
        }
        missingReplicas.forEach(builder::addReplica);
        byte[] urLedgerData = builder.build().toString().getBytes(StandardCharsets.UTF_8);
        ZkUtils.asyncCreateFullPathOptimistic(this.zkc, znode, urLedgerData, zkAcls, CreateMode.PERSISTENT, (rc, path, ctx, name) -> {
            if (KeeperException.Code.OK.intValue() == rc) {
                FutureUtils.complete(finalFuture, null);
            } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
                this.handleLedgerUnderreplicatedAlreadyMarked(znode, missingReplicas, zkAcls, finalFuture);
            } else {
                FutureUtils.completeExceptionally(finalFuture, KeeperException.create(KeeperException.Code.get(rc)));
            }
        }, null);
    }

    private void handleLedgerUnderreplicatedAlreadyMarked(String znode, Collection<String> missingReplicas, List<ACL> zkAcls, CompletableFuture<Void> finalFuture) {
        this.zkc.getData(znode, false, (getRc, getPath, getCtx, existingUrLedgerData, getStat) -> {
            if (KeeperException.Code.OK.intValue() == getRc) {
                DataFormats.UnderreplicatedLedgerFormat.Builder builder = DataFormats.UnderreplicatedLedgerFormat.newBuilder();
                try {
                    TextFormat.merge(new String(existingUrLedgerData, StandardCharsets.UTF_8), (Message.Builder)builder);
                }
                catch (TextFormat.ParseException e) {
                    FutureUtils.completeExceptionally(finalFuture, new ReplicationException.UnavailableException("Invalid underreplicated ledger data for ledger " + znode, e));
                    return;
                }
                DataFormats.UnderreplicatedLedgerFormat existingUrLedgerFormat = builder.build();
                boolean replicaAdded = false;
                for (String missingReplica : missingReplicas) {
                    if (existingUrLedgerFormat.getReplicaList().contains(missingReplica)) continue;
                    builder.addReplica(missingReplica);
                    replicaAdded = true;
                }
                if (!replicaAdded) {
                    FutureUtils.complete(finalFuture, null);
                    return;
                }
                if (this.conf.getStoreSystemTimeAsLedgerUnderreplicatedMarkTime()) {
                    builder.setCtime(System.currentTimeMillis());
                }
                byte[] newUrLedgerData = builder.build().toString().getBytes(StandardCharsets.UTF_8);
                this.zkc.setData(znode, newUrLedgerData, getStat.getVersion(), (setRc, setPath, setCtx, setStat) -> {
                    if (KeeperException.Code.OK.intValue() == setRc) {
                        FutureUtils.complete(finalFuture, null);
                    } else if (KeeperException.Code.NONODE.intValue() == setRc) {
                        this.tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, finalFuture);
                    } else if (KeeperException.Code.BADVERSION.intValue() == setRc) {
                        this.handleLedgerUnderreplicatedAlreadyMarked(znode, missingReplicas, zkAcls, finalFuture);
                    } else {
                        FutureUtils.completeExceptionally(finalFuture, KeeperException.create(KeeperException.Code.get(setRc)));
                    }
                }, null);
            } else if (KeeperException.Code.NONODE.intValue() == getRc) {
                this.tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, finalFuture);
            } else {
                FutureUtils.completeExceptionally(finalFuture, KeeperException.create(KeeperException.Code.get(getRc)));
            }
        }, null);
    }

    @Override
    public void markLedgerReplicated(long ledgerId) throws ReplicationException.UnavailableException {
        block12: {
            if (LOG.isDebugEnabled()) {
                LOG.debug("markLedgerReplicated(ledgerId={})", (Object)ledgerId);
            }
            try {
                Optional<Integer> ledgerZNodeVersion;
                Lock l = this.heldLocks.get(ledgerId);
                if (l == null || !(ledgerZNodeVersion = l.getLedgerZNodeVersion()).isPresent()) break block12;
                this.zkc.delete(this.getUrLedgerZnode(ledgerId), ledgerZNodeVersion.get());
                try {
                    String[] parts = this.getUrLedgerZnode(ledgerId).split("/");
                    for (int i = 1; i <= 4; ++i) {
                        Object[] p = Arrays.copyOf(parts, parts.length - i);
                        String path = Joiner.on("/").join(p);
                        Stat s = this.zkc.exists(path, null);
                        if (s == null) continue;
                        this.zkc.delete(path, s.getVersion());
                    }
                }
                catch (KeeperException.NotEmptyException notEmptyException) {
                    // empty catch block
                }
            }
            catch (KeeperException.NoNodeException l) {
            }
            catch (KeeperException.BadVersionException l) {
            }
            catch (KeeperException ke) {
                LOG.error("Error deleting underreplicated ledger znode", (Throwable)ke);
                throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
            }
            finally {
                this.releaseUnderreplicatedLedger(ledgerId);
            }
        }
    }

    @Override
    public Iterator<UnderreplicatedLedger> listLedgersToRereplicate(final Predicate<List<String>> predicate) {
        final LinkedList<String> queue = new LinkedList<String>();
        queue.add(this.urLedgerPath);
        return new Iterator<UnderreplicatedLedger>(){
            final Queue<UnderreplicatedLedger> curBatch = new LinkedList<UnderreplicatedLedger>();

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }

            @Override
            public boolean hasNext() {
                if (this.curBatch.size() > 0) {
                    return true;
                }
                while (queue.size() > 0 && this.curBatch.size() == 0) {
                    String parent = (String)queue.remove();
                    try {
                        for (String c : ZkLedgerUnderreplicationManager.this.zkc.getChildren(parent, false)) {
                            String child = parent + "/" + c;
                            if (c.startsWith("urL")) {
                                long ledgerId = ZkLedgerUnderreplicationManager.this.getLedgerId(child);
                                UnderreplicatedLedger underreplicatedLedger = ZkLedgerUnderreplicationManager.this.getLedgerUnreplicationInfo(ledgerId);
                                if (underreplicatedLedger == null) continue;
                                List<String> replicaList = underreplicatedLedger.getReplicaList();
                                if (predicate != null && !predicate.test(replicaList)) continue;
                                this.curBatch.add(underreplicatedLedger);
                                continue;
                            }
                            queue.add(child);
                        }
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        return false;
                    }
                    catch (KeeperException.NoNodeException ie) {
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Error reading list", e);
                    }
                }
                return this.curBatch.size() > 0;
            }

            @Override
            public UnderreplicatedLedger next() {
                assert (this.curBatch.size() > 0);
                return this.curBatch.remove();
            }
        };
    }

    private long getLedgerToRereplicateFromHierarchy(String parent, long depth) throws KeeperException, InterruptedException {
        List<String> children;
        if (depth == 4L) {
            List<String> children2;
            try {
                children2 = this.subTreeCache.getChildren(parent);
            }
            catch (KeeperException.NoNodeException nne) {
                return -1L;
            }
            Collections.shuffle(children2);
            List<ACL> zkAcls = ZkUtils.getACLs(this.conf);
            while (children2.size() > 0) {
                String tryChild = children2.get(0);
                try {
                    List<String> locks = this.subTreeCache.getChildren(this.urLockPath);
                    if (locks.contains(tryChild)) {
                        children2.remove(tryChild);
                        continue;
                    }
                    Stat stat = this.zkc.exists(parent + "/" + tryChild, false);
                    if (stat == null) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("{}/{} doesn't exist", (Object)parent, (Object)tryChild);
                        }
                        children2.remove(tryChild);
                        continue;
                    }
                    String lockPath = this.urLockPath + "/" + tryChild;
                    long ledgerId = this.getLedgerId(tryChild);
                    this.zkc.create(lockPath, LOCK_DATA, zkAcls, CreateMode.EPHEMERAL);
                    this.heldLocks.put(ledgerId, new Lock(lockPath, Optional.of(stat.getVersion())));
                    return ledgerId;
                }
                catch (KeeperException.NodeExistsException nee) {
                    children2.remove(tryChild);
                }
                catch (NumberFormatException nfe) {
                    children2.remove(tryChild);
                }
            }
            return -1L;
        }
        try {
            children = this.subTreeCache.getChildren(parent);
        }
        catch (KeeperException.NoNodeException nne) {
            return -1L;
        }
        Collections.shuffle(children);
        while (children.size() > 0) {
            String tryChild = children.get(0);
            String tryPath = parent + "/" + tryChild;
            long ledger = this.getLedgerToRereplicateFromHierarchy(tryPath, depth + 1L);
            if (ledger != -1L) {
                return ledger;
            }
            children.remove(tryChild);
        }
        return -1L;
    }

    @Override
    public long pollLedgerToRereplicate() throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("pollLedgerToRereplicate()");
        }
        try {
            return this.getLedgerToRereplicateFromHierarchy(this.urLedgerPath, 0L);
        }
        catch (KeeperException ke) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public long getLedgerToRereplicate() throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("getLedgerToRereplicate()");
        }
        while (true) {
            final CountDownLatch changedLatch = new CountDownLatch(1);
            Watcher w = new Watcher(){

                @Override
                public void process(WatchedEvent e) {
                    LOG.info("Latch countdown due to ZK event: " + e);
                    changedLatch.countDown();
                }
            };
            try {
                SubTreeCache.WatchGuard wg = this.subTreeCache.registerWatcherWithGuard(w);
                Throwable throwable = null;
                try {
                    this.waitIfLedgerReplicationDisabled();
                    long ledger = this.getLedgerToRereplicateFromHierarchy(this.urLedgerPath, 0L);
                    if (ledger != -1L) {
                        long l = ledger;
                        return l;
                    }
                    changedLatch.await();
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (wg == null) continue;
                    if (throwable != null) {
                        try {
                            wg.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    wg.close();
                }
            }
            catch (KeeperException ke) {
                throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
            }
        }
    }

    private void waitIfLedgerReplicationDisabled() throws ReplicationException.UnavailableException, InterruptedException {
        ReplicationEnableCb cb = new ReplicationEnableCb();
        if (!this.isLedgerReplicationEnabled()) {
            this.notifyLedgerReplicationEnabled(cb);
            cb.await();
        }
    }

    @Override
    public void releaseUnderreplicatedLedger(long ledgerId) throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("releaseLedger(ledgerId={})", (Object)ledgerId);
        }
        try {
            Lock l = this.heldLocks.get(ledgerId);
            if (l != null) {
                this.zkc.delete(l.getLockZNode(), -1);
            }
        }
        catch (KeeperException.NoNodeException l) {
        }
        catch (KeeperException ke) {
            LOG.error("Error deleting underreplicated ledger lock", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
        }
        this.heldLocks.remove(ledgerId);
    }

    @Override
    public void close() throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("close()");
        }
        try {
            for (Map.Entry<Long, Lock> e : this.heldLocks.entrySet()) {
                this.zkc.delete(e.getValue().getLockZNode(), -1);
            }
        }
        catch (KeeperException.NoNodeException noNodeException) {
        }
        catch (KeeperException ke) {
            LOG.error("Error deleting underreplicated ledger lock", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
        }
    }

    @Override
    public void disableLedgerReplication() throws ReplicationException.UnavailableException {
        List<ACL> zkAcls = ZkUtils.getACLs(this.conf);
        if (LOG.isDebugEnabled()) {
            LOG.debug("disableLedegerReplication()");
        }
        try {
            String znode = this.basePath + '/' + "disable";
            this.zkc.create(znode, "".getBytes(StandardCharsets.UTF_8), zkAcls, CreateMode.PERSISTENT);
            LOG.info("Auto ledger re-replication is disabled!");
        }
        catch (KeeperException.NodeExistsException ke) {
            LOG.warn("AutoRecovery is already disabled!", (Throwable)ke);
            throw new ReplicationException.UnavailableException("AutoRecovery is already disabled!", ke);
        }
        catch (KeeperException ke) {
            LOG.error("Exception while stopping auto ledger re-replication", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Exception while stopping auto ledger re-replication", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while stopping auto ledger re-replication", ie);
        }
    }

    @Override
    public void enableLedgerReplication() throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("enableLedegerReplication()");
        }
        try {
            this.zkc.delete(this.basePath + '/' + "disable", -1);
            LOG.info("Resuming automatic ledger re-replication");
        }
        catch (KeeperException.NoNodeException ke) {
            LOG.warn("AutoRecovery is already enabled!", (Throwable)ke);
            throw new ReplicationException.UnavailableException("AutoRecovery is already enabled!", ke);
        }
        catch (KeeperException ke) {
            LOG.error("Exception while resuming ledger replication", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Exception while resuming auto ledger re-replication", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while resuming auto ledger re-replication", ie);
        }
    }

    @Override
    public boolean isLedgerReplicationEnabled() throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("isLedgerReplicationEnabled()");
        }
        try {
            return null == this.zkc.exists(this.basePath + '/' + "disable", false);
        }
        catch (KeeperException ke) {
            LOG.error("Error while checking the state of ledger re-replication", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    @Override
    public void notifyLedgerReplicationEnabled(final BookkeeperInternalCallbacks.GenericCallback<Void> cb) throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("notifyLedgerReplicationEnabled()");
        }
        Watcher w = new Watcher(){

            @Override
            public void process(WatchedEvent e) {
                if (e.getType() == Watcher.Event.EventType.NodeDeleted) {
                    LOG.info("LedgerReplication is enabled externally through Zookeeper, since DISABLE_NODE ZNode is deleted");
                    cb.operationComplete(0, null);
                }
            }
        };
        try {
            if (null == this.zkc.exists(this.basePath + '/' + "disable", w)) {
                LOG.info("LedgerReplication is enabled externally through Zookeeper, since DISABLE_NODE ZNode is deleted");
                cb.operationComplete(0, null);
                return;
            }
        }
        catch (KeeperException ke) {
            LOG.error("Error while checking the state of ledger re-replication", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    @Override
    public boolean isLedgerBeingReplicated(long ledgerId) throws ReplicationException {
        try {
            return this.zkc.exists(ZkLedgerUnderreplicationManager.getUrLedgerLockZnode(this.urLockPath, ledgerId), false) != null;
        }
        catch (Exception e) {
            throw new ReplicationException.UnavailableException("Failed to check ledger lock", e);
        }
    }

    public static String acquireUnderreplicatedLedgerLock(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId, List<ACL> zkAcls) throws KeeperException, InterruptedException {
        String lockPath = ZkLedgerUnderreplicationManager.getUrLedgerLockZnode(ZkLedgerUnderreplicationManager.getUrLockPath(zkLedgersRootPath), ledgerId);
        ZkUtils.createFullPathOptimistic(zkc, lockPath, LOCK_DATA, zkAcls, CreateMode.EPHEMERAL);
        return lockPath;
    }

    @Override
    public void acquireUnderreplicatedLedger(long ledgerId) throws ReplicationException {
        try {
            String lockPath = ZkLedgerUnderreplicationManager.acquireUnderreplicatedLedgerLock(this.zkc, this.rootPath, ledgerId, ZkUtils.getACLs(this.conf));
            this.heldLocks.put(ledgerId, new Lock(lockPath, Optional.empty()));
        }
        catch (Exception e) {
            throw new ReplicationException.UnavailableException("Failed to acquire underreplicated ledger lock for " + ledgerId, e);
        }
    }

    @Override
    public boolean initializeLostBookieRecoveryDelay(int lostBookieRecoveryDelay) throws ReplicationException.UnavailableException {
        LOG.debug("initializeLostBookieRecoveryDelay()");
        try {
            this.zkc.create(this.lostBookieRecoveryDelayZnode, Integer.toString(lostBookieRecoveryDelay).getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        catch (KeeperException.NodeExistsException ke) {
            LOG.info("lostBookieRecoveryDelay Znode is already present, so using existing lostBookieRecoveryDelay Znode value");
            return false;
        }
        catch (KeeperException.NoNodeException nne) {
            LOG.error("lostBookieRecoveryDelay Znode not found. Please verify if Auditor has been initialized.", (Throwable)nne);
            return false;
        }
        catch (KeeperException ke) {
            LOG.error("Error while initializing LostBookieRecoveryDelay", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
        return true;
    }

    @Override
    public void setLostBookieRecoveryDelay(int lostBookieRecoveryDelay) throws ReplicationException.UnavailableException {
        LOG.debug("setLostBookieRecoveryDelay()");
        try {
            if (this.zkc.exists(this.lostBookieRecoveryDelayZnode, false) != null) {
                this.zkc.setData(this.lostBookieRecoveryDelayZnode, Integer.toString(lostBookieRecoveryDelay).getBytes(StandardCharsets.UTF_8), -1);
            } else {
                this.zkc.create(this.lostBookieRecoveryDelayZnode, Integer.toString(lostBookieRecoveryDelay).getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        }
        catch (KeeperException ke) {
            LOG.error("Error while setting LostBookieRecoveryDelay ", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    @Override
    public int getLostBookieRecoveryDelay() throws ReplicationException.UnavailableException {
        LOG.debug("getLostBookieRecoveryDelay()");
        try {
            byte[] data = this.zkc.getData(this.lostBookieRecoveryDelayZnode, false, null);
            return Integer.parseInt(new String(data, StandardCharsets.UTF_8));
        }
        catch (KeeperException ke) {
            LOG.error("Error while getting LostBookieRecoveryDelay ", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    @Override
    public void notifyLostBookieRecoveryDelayChanged(final BookkeeperInternalCallbacks.GenericCallback<Void> cb) throws ReplicationException.UnavailableException {
        LOG.debug("notifyLostBookieRecoveryDelayChanged()");
        Watcher w = new Watcher(){

            @Override
            public void process(WatchedEvent e) {
                if (e.getType() == Watcher.Event.EventType.NodeDataChanged) {
                    cb.operationComplete(0, null);
                }
            }
        };
        try {
            if (null == this.zkc.exists(this.lostBookieRecoveryDelayZnode, w)) {
                cb.operationComplete(0, null);
                return;
            }
        }
        catch (KeeperException ke) {
            LOG.error("Error while checking the state of lostBookieRecoveryDelay", (Throwable)ke);
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    @Override
    public String getReplicationWorkerIdRereplicatingLedger(long ledgerId) throws ReplicationException.UnavailableException {
        String replicationWorkerId = null;
        try {
            byte[] lockData = this.zkc.getData(ZkLedgerUnderreplicationManager.getUrLedgerLockZnode(this.urLockPath, ledgerId), false, null);
            DataFormats.LockDataFormat.Builder lockDataBuilder = DataFormats.LockDataFormat.newBuilder();
            TextFormat.merge(new String(lockData, StandardCharsets.UTF_8), (Message.Builder)lockDataBuilder);
            DataFormats.LockDataFormat lock = lockDataBuilder.build();
            replicationWorkerId = lock.getBookieId();
        }
        catch (KeeperException.NoNodeException lockData) {
        }
        catch (KeeperException e) {
            LOG.error("Error while getting ReplicationWorkerId rereplicating Ledger", (Throwable)e);
            throw new ReplicationException.UnavailableException("Error while getting ReplicationWorkerId rereplicating Ledger", e);
        }
        catch (InterruptedException e) {
            LOG.error("Got interrupted while getting ReplicationWorkerId rereplicating Ledger", (Throwable)e);
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
        }
        catch (TextFormat.ParseException e) {
            LOG.error("Error while parsing ZK data of lock", (Throwable)e);
            throw new ReplicationException.UnavailableException("Error while parsing ZK data of lock", e);
        }
        return replicationWorkerId;
    }

    @Override
    public void setCheckAllLedgersCTime(long checkAllLedgersCTime) throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("setCheckAllLedgersCTime");
        }
        try {
            List<ACL> zkAcls = ZkUtils.getACLs(this.conf);
            DataFormats.CheckAllLedgersFormat.Builder builder = DataFormats.CheckAllLedgersFormat.newBuilder();
            builder.setCheckAllLedgersCTime(checkAllLedgersCTime);
            byte[] checkAllLedgersFormatByteArray = builder.build().toByteArray();
            if (this.zkc.exists(this.checkAllLedgersCtimeZnode, false) != null) {
                this.zkc.setData(this.checkAllLedgersCtimeZnode, checkAllLedgersFormatByteArray, -1);
            } else {
                this.zkc.create(this.checkAllLedgersCtimeZnode, checkAllLedgersFormatByteArray, zkAcls, CreateMode.PERSISTENT);
            }
        }
        catch (KeeperException ke) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    @Override
    public long getCheckAllLedgersCTime() throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("setCheckAllLedgersCTime");
        }
        try {
            byte[] data = this.zkc.getData(this.checkAllLedgersCtimeZnode, false, null);
            DataFormats.CheckAllLedgersFormat checkAllLedgersFormat = DataFormats.CheckAllLedgersFormat.parseFrom(data);
            return checkAllLedgersFormat.hasCheckAllLedgersCTime() ? checkAllLedgersFormat.getCheckAllLedgersCTime() : -1L;
        }
        catch (KeeperException.NoNodeException ne) {
            LOG.warn("checkAllLedgersCtimeZnode is not yet available");
            return -1L;
        }
        catch (KeeperException ke) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
        catch (InvalidProtocolBufferException ipbe) {
            throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", ipbe);
        }
    }

    @Override
    public void setPlacementPolicyCheckCTime(long placementPolicyCheckCTime) throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("setPlacementPolicyCheckCTime");
        }
        try {
            List<ACL> zkAcls = ZkUtils.getACLs(this.conf);
            DataFormats.PlacementPolicyCheckFormat.Builder builder = DataFormats.PlacementPolicyCheckFormat.newBuilder();
            builder.setPlacementPolicyCheckCTime(placementPolicyCheckCTime);
            byte[] placementPolicyCheckFormatByteArray = builder.build().toByteArray();
            if (this.zkc.exists(this.placementPolicyCheckCtimeZnode, false) != null) {
                this.zkc.setData(this.placementPolicyCheckCtimeZnode, placementPolicyCheckFormatByteArray, -1);
            } else {
                this.zkc.create(this.placementPolicyCheckCtimeZnode, placementPolicyCheckFormatByteArray, zkAcls, CreateMode.PERSISTENT);
            }
        }
        catch (KeeperException ke) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    @Override
    public long getPlacementPolicyCheckCTime() throws ReplicationException.UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("getPlacementPolicyCheckCTime");
        }
        try {
            byte[] data = this.zkc.getData(this.placementPolicyCheckCtimeZnode, false, null);
            DataFormats.PlacementPolicyCheckFormat placementPolicyCheckFormat = DataFormats.PlacementPolicyCheckFormat.parseFrom(data);
            return placementPolicyCheckFormat.hasPlacementPolicyCheckCTime() ? placementPolicyCheckFormat.getPlacementPolicyCheckCTime() : -1L;
        }
        catch (KeeperException.NoNodeException ne) {
            LOG.warn("placementPolicyCheckCtimeZnode is not yet available");
            return -1L;
        }
        catch (KeeperException ke) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
        catch (InvalidProtocolBufferException ipbe) {
            throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", ipbe);
        }
    }

    @Override
    public void setReplicasCheckCTime(long replicasCheckCTime) throws ReplicationException.UnavailableException {
        try {
            List<ACL> zkAcls = ZkUtils.getACLs(this.conf);
            DataFormats.ReplicasCheckFormat.Builder builder = DataFormats.ReplicasCheckFormat.newBuilder();
            builder.setReplicasCheckCTime(replicasCheckCTime);
            byte[] replicasCheckFormatByteArray = builder.build().toByteArray();
            if (this.zkc.exists(this.replicasCheckCtimeZnode, false) != null) {
                this.zkc.setData(this.replicasCheckCtimeZnode, replicasCheckFormatByteArray, -1);
            } else {
                this.zkc.create(this.replicasCheckCtimeZnode, replicasCheckFormatByteArray, zkAcls, CreateMode.PERSISTENT);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("setReplicasCheckCTime completed successfully");
            }
        }
        catch (KeeperException ke) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
    }

    @Override
    public long getReplicasCheckCTime() throws ReplicationException.UnavailableException {
        try {
            byte[] data = this.zkc.getData(this.replicasCheckCtimeZnode, false, null);
            DataFormats.ReplicasCheckFormat replicasCheckFormat = DataFormats.ReplicasCheckFormat.parseFrom(data);
            if (LOG.isDebugEnabled()) {
                LOG.debug("getReplicasCheckCTime completed successfully");
            }
            return replicasCheckFormat.hasReplicasCheckCTime() ? replicasCheckFormat.getReplicasCheckCTime() : -1L;
        }
        catch (KeeperException.NoNodeException ne) {
            LOG.warn("replicasCheckCtimeZnode is not yet available");
            return -1L;
        }
        catch (KeeperException ke) {
            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
        }
        catch (InvalidProtocolBufferException ipbe) {
            throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", ipbe);
        }
    }

    private static class Lock {
        private final String lockZNode;
        private final Optional<Integer> ledgerZNodeVersion;

        Lock(String lockZNode, Optional<Integer> ledgerZNodeVersion) {
            this.lockZNode = lockZNode;
            this.ledgerZNodeVersion = ledgerZNodeVersion;
        }

        String getLockZNode() {
            return this.lockZNode;
        }

        Optional<Integer> getLedgerZNodeVersion() {
            return this.ledgerZNodeVersion;
        }
    }
}

