/*
 * Decompiled with CFR 0.152.
 */
package dlshade.org.apache.bookkeeper.client;

import dlshade.com.google.common.annotations.VisibleForTesting;
import dlshade.com.google.common.base.Preconditions;
import dlshade.com.google.common.cache.CacheBuilder;
import dlshade.com.google.common.cache.CacheLoader;
import dlshade.com.google.common.cache.LoadingCache;
import dlshade.com.google.common.collect.Iterators;
import dlshade.com.google.common.collect.Sets;
import dlshade.com.google.common.util.concurrent.RateLimiter;
import dlshade.org.apache.bookkeeper.client.AsyncCallback;
import dlshade.org.apache.bookkeeper.client.BKException;
import dlshade.org.apache.bookkeeper.client.BookKeeper;
import dlshade.org.apache.bookkeeper.client.BookiesHealthInfo;
import dlshade.org.apache.bookkeeper.client.ClientContext;
import dlshade.org.apache.bookkeeper.client.DistributionSchedule;
import dlshade.org.apache.bookkeeper.client.EnsembleUtils;
import dlshade.org.apache.bookkeeper.client.ExplicitLacFlushPolicy;
import dlshade.org.apache.bookkeeper.client.ForceLedgerOp;
import dlshade.org.apache.bookkeeper.client.LedgerEntry;
import dlshade.org.apache.bookkeeper.client.LedgerMetadataBuilder;
import dlshade.org.apache.bookkeeper.client.LedgerMetadataUtils;
import dlshade.org.apache.bookkeeper.client.MetadataUpdateLoop;
import dlshade.org.apache.bookkeeper.client.PendingAddOp;
import dlshade.org.apache.bookkeeper.client.PendingReadLacOp;
import dlshade.org.apache.bookkeeper.client.PendingReadOp;
import dlshade.org.apache.bookkeeper.client.ReadLastConfirmedAndEntryOp;
import dlshade.org.apache.bookkeeper.client.ReadLastConfirmedOp;
import dlshade.org.apache.bookkeeper.client.RoundRobinDistributionSchedule;
import dlshade.org.apache.bookkeeper.client.SyncCallbackUtils;
import dlshade.org.apache.bookkeeper.client.TryReadLastConfirmedOp;
import dlshade.org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import dlshade.org.apache.bookkeeper.client.api.LedgerEntries;
import dlshade.org.apache.bookkeeper.client.api.LedgerMetadata;
import dlshade.org.apache.bookkeeper.client.api.WriteFlag;
import dlshade.org.apache.bookkeeper.client.api.WriteHandle;
import dlshade.org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import dlshade.org.apache.bookkeeper.common.util.MathUtils;
import dlshade.org.apache.bookkeeper.net.BookieId;
import dlshade.org.apache.bookkeeper.proto.checksum.DigestManager;
import dlshade.org.apache.bookkeeper.stats.Counter;
import dlshade.org.apache.bookkeeper.stats.Gauge;
import dlshade.org.apache.bookkeeper.stats.OpStatsLogger;
import dlshade.org.apache.bookkeeper.versioning.Versioned;
import dlshade.org.apache.commons.collections4.IteratorUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LedgerHandle
implements WriteHandle {
    static final Logger LOG = LoggerFactory.getLogger(LedgerHandle.class);
    private static final int STICKY_READ_BOOKIE_INDEX_UNSET = -1;
    final ClientContext clientCtx;
    final byte[] ledgerKey;
    private Versioned<LedgerMetadata> versionedMetadata;
    final long ledgerId;
    long lastAddPushed;
    private HandleState handleState = HandleState.OPEN;
    private final CompletableFuture<Void> closePromise = new CompletableFuture();
    volatile long lastAddConfirmed;
    volatile long pendingAddsSequenceHead;
    private int stickyBookieIndex;
    long length;
    final DigestManager macManager;
    final DistributionSchedule distributionSchedule;
    final RateLimiter throttler;
    final LoadingCache<BookieId, Long> bookieFailureHistory;
    final BookiesHealthInfo bookiesHealthInfo;
    final EnumSet<WriteFlag> writeFlags;
    ScheduledFuture<?> timeoutFuture = null;
    @VisibleForTesting
    final Map<Integer, BookieId> delayedWriteFailedBookies = new HashMap<Integer, BookieId>();
    public static final long INVALID_ENTRY_ID = -1L;
    public static final long INVALID_LEDGER_ID = -2882382797L;
    final Object metadataLock = new Object();
    boolean changingEnsemble = false;
    final AtomicInteger numEnsembleChanges = new AtomicInteger(0);
    Queue<PendingAddOp> pendingAddOps;
    ExplicitLacFlushPolicy explicitLacFlushPolicy;
    final Counter ensembleChangeCounter;
    final Counter lacUpdateHitsCounter;
    final Counter lacUpdateMissesCounter;
    private final OpStatsLogger clientChannelWriteWaitStats;

    LedgerHandle(final ClientContext clientCtx, final long ledgerId, Versioned<LedgerMetadata> versionedMetadata, BookKeeper.DigestType digestType, byte[] password, EnumSet<WriteFlag> writeFlags) throws GeneralSecurityException, NumberFormatException {
        this.clientCtx = clientCtx;
        this.versionedMetadata = versionedMetadata;
        this.pendingAddOps = new ConcurrentLinkedQueue<PendingAddOp>();
        this.writeFlags = writeFlags;
        LedgerMetadata metadata = versionedMetadata.getValue();
        if (metadata.isClosed()) {
            this.lastAddConfirmed = this.lastAddPushed = metadata.getLastEntryId();
            this.length = metadata.getLength();
        } else {
            this.lastAddPushed = -1L;
            this.lastAddConfirmed = -1L;
            this.length = 0L;
        }
        this.pendingAddsSequenceHead = this.lastAddConfirmed;
        this.ledgerId = ledgerId;
        this.stickyBookieIndex = clientCtx.getConf().enableStickyReads && this.getLedgerMetadata().getEnsembleSize() == this.getLedgerMetadata().getWriteQuorumSize() ? clientCtx.getPlacementPolicy().getStickyReadBookieIndex(metadata, Optional.empty()) : -1;
        this.throttler = clientCtx.getConf().throttleValue > 0 ? RateLimiter.create(clientCtx.getConf().throttleValue) : null;
        this.macManager = DigestManager.instantiate(ledgerId, password, BookKeeper.DigestType.toProtoDigestType(digestType), clientCtx.getByteBufAllocator(), clientCtx.getConf().useV2WireProtocol);
        this.ledgerKey = DigestManager.generateMasterKey(password);
        this.distributionSchedule = new RoundRobinDistributionSchedule(metadata.getWriteQuorumSize(), metadata.getAckQuorumSize(), metadata.getEnsembleSize());
        this.bookieFailureHistory = CacheBuilder.newBuilder().expireAfterWrite(clientCtx.getConf().bookieFailureHistoryExpirationMSec, TimeUnit.MILLISECONDS).build(new CacheLoader<BookieId, Long>(){

            @Override
            public Long load(BookieId key) {
                return -1L;
            }
        });
        this.bookiesHealthInfo = new BookiesHealthInfo(){

            @Override
            public long getBookieFailureHistory(BookieId bookieSocketAddress) {
                Long lastFailure = (Long)LedgerHandle.this.bookieFailureHistory.getIfPresent(bookieSocketAddress);
                return lastFailure == null ? -1L : lastFailure;
            }

            @Override
            public long getBookiePendingRequests(BookieId bookieSocketAddress) {
                return clientCtx.getBookieClient().getNumPendingRequests(bookieSocketAddress, ledgerId);
            }
        };
        this.ensembleChangeCounter = clientCtx.getClientStats().getEnsembleChangeCounter();
        this.lacUpdateHitsCounter = clientCtx.getClientStats().getLacUpdateHitsCounter();
        this.lacUpdateMissesCounter = clientCtx.getClientStats().getLacUpdateMissesCounter();
        this.clientChannelWriteWaitStats = clientCtx.getClientStats().getClientChannelWriteWaitLogger();
        clientCtx.getClientStats().registerPendingAddsGauge(new Gauge<Integer>(){

            @Override
            public Integer getDefaultValue() {
                return 0;
            }

            @Override
            public Integer getSample() {
                return LedgerHandle.this.pendingAddOps.size();
            }
        });
        this.initializeWriteHandleState();
    }

    void recordReadErrorOnBookie(int bookieIndex) {
        if (this.stickyBookieIndex != -1) {
            this.stickyBookieIndex = this.clientCtx.getPlacementPolicy().getStickyReadBookieIndex(this.getLedgerMetadata(), Optional.of(bookieIndex));
        }
    }

    protected void initializeWriteHandleState() {
        this.explicitLacFlushPolicy = this.clientCtx.getConf().explicitLacInterval > 0 ? new ExplicitLacFlushPolicy.ExplicitLacFlushPolicyImpl(this, this.clientCtx) : ExplicitLacFlushPolicy.VOID_EXPLICITLAC_FLUSH_POLICY;
        if (this.clientCtx.getConf().addEntryQuorumTimeoutNanos > 0L) {
            this.timeoutFuture = this.clientCtx.getScheduler().scheduleAtFixedRate(() -> this.monitorPendingAddOps(), this.clientCtx.getConf().timeoutMonitorIntervalSec, this.clientCtx.getConf().timeoutMonitorIntervalSec, TimeUnit.SECONDS);
        }
    }

    private void tearDownWriteHandleState() {
        this.explicitLacFlushPolicy.stopExplicitLacFlush();
        if (this.timeoutFuture != null) {
            this.timeoutFuture.cancel(false);
        }
    }

    @Override
    public long getId() {
        return this.ledgerId;
    }

    @VisibleForTesting
    public EnumSet<WriteFlag> getWriteFlags() {
        return this.writeFlags;
    }

    @Override
    public synchronized long getLastAddConfirmed() {
        return this.lastAddConfirmed;
    }

    synchronized void setLastAddConfirmed(long lac) {
        this.lastAddConfirmed = lac;
    }

    @Override
    public synchronized long getLastAddPushed() {
        return this.lastAddPushed;
    }

    public byte[] getLedgerKey() {
        return Arrays.copyOf(this.ledgerKey, this.ledgerKey.length);
    }

    @Override
    public LedgerMetadata getLedgerMetadata() {
        return this.versionedMetadata.getValue();
    }

    Versioned<LedgerMetadata> getVersionedLedgerMetadata() {
        return this.versionedMetadata;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean setLedgerMetadata(Versioned<LedgerMetadata> expected, Versioned<LedgerMetadata> newMetadata) {
        LedgerHandle ledgerHandle = this;
        synchronized (ledgerHandle) {
            if (this.versionedMetadata == expected) {
                this.versionedMetadata = newMetadata;
                LedgerMetadata metadata = this.versionedMetadata.getValue();
                if (metadata.isClosed()) {
                    this.lastAddConfirmed = this.lastAddPushed = metadata.getLastEntryId();
                    this.length = metadata.getLength();
                }
                return true;
            }
            return false;
        }
    }

    public Map<String, byte[]> getCustomMetadata() {
        return this.getLedgerMetadata().getCustomMetadata();
    }

    public synchronized long getNumFragments() {
        return this.getLedgerMetadata().getAllEnsembles().size();
    }

    public synchronized long getNumBookies() {
        NavigableMap<Long, ? extends List<BookieId>> m = this.getLedgerMetadata().getAllEnsembles();
        HashSet s = Sets.newHashSet();
        for (List aList : m.values()) {
            s.addAll(aList);
        }
        return s.size();
    }

    DigestManager getDigestManager() {
        return this.macManager;
    }

    synchronized long addToLength(long delta) {
        this.length += delta;
        return this.length;
    }

    @Override
    public synchronized long getLength() {
        return this.length;
    }

    public long getCtime() {
        return this.getLedgerMetadata().getCtime();
    }

    DistributionSchedule getDistributionSchedule() {
        return this.distributionSchedule;
    }

    BookiesHealthInfo getBookiesHealthInfo() {
        return this.bookiesHealthInfo;
    }

    @Override
    public void close() throws InterruptedException, BKException {
        SyncCallbackUtils.waitForResult(this.closeAsync());
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        SyncCallbackUtils.SyncCloseCallback callback = new SyncCallbackUtils.SyncCloseCallback(result);
        this.asyncClose(callback, null);
        return result;
    }

    public void asyncClose(AsyncCallback.CloseCallback cb, Object ctx) {
        this.asyncCloseInternal(cb, ctx, -11);
    }

    @Override
    public synchronized boolean isClosed() {
        return this.getLedgerMetadata().isClosed();
    }

    boolean isHandleWritable() {
        return !this.getLedgerMetadata().isClosed() && this.handleState == HandleState.OPEN;
    }

    void asyncCloseInternal(AsyncCallback.CloseCallback cb, Object ctx, int rc) {
        try {
            this.doAsyncCloseInternal(cb, ctx, rc);
        }
        catch (RejectedExecutionException re) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Failed to close ledger {} : ", (Object)this.ledgerId, (Object)re);
            }
            this.errorOutPendingAdds(BookKeeper.getReturnRc(this.clientCtx.getBookieClient(), rc));
            cb.closeComplete(BookKeeper.getReturnRc(this.clientCtx.getBookieClient(), -15), this, ctx);
        }
    }

    void doAsyncCloseInternal(AsyncCallback.CloseCallback cb, Object ctx, int rc) {
        this.executeOrdered(() -> {
            long finalLength;
            long lastEntry;
            List<PendingAddOp> pendingAdds;
            HandleState prevHandleState;
            this.closePromise.whenComplete((ignore, ex) -> {
                if (ex != null) {
                    cb.closeComplete(BKException.getExceptionCode(ex, -999), this, ctx);
                } else {
                    cb.closeComplete(0, this, ctx);
                }
            });
            LedgerHandle ledgerHandle = this;
            synchronized (ledgerHandle) {
                prevHandleState = this.handleState;
                pendingAdds = this.drainPendingAddsAndAdjustLength();
                lastEntry = this.lastAddPushed = this.lastAddConfirmed;
                finalLength = this.length;
                this.handleState = HandleState.CLOSED;
            }
            try {
                this.errorOutPendingAdds(rc, pendingAdds);
            }
            catch (Throwable e) {
                this.closePromise.completeExceptionally(e);
                return;
            }
            if (prevHandleState != HandleState.CLOSED) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Closing ledger: {} at entryId {} with {} bytes", new Object[]{this.getId(), lastEntry, finalLength});
                }
                this.tearDownWriteHandleState();
                new MetadataUpdateLoop(this.clientCtx.getLedgerManager(), this.getId(), this::getVersionedLedgerMetadata, metadata -> {
                    if (metadata.isClosed()) {
                        if (lastEntry == metadata.getLastEntryId() && finalLength == metadata.getLength()) {
                            return false;
                        }
                        LOG.error("Metadata conflict when closing ledger {}. Another client may have recovered the ledger while there were writes outstanding. (local lastEntry:{} length:{})  (metadata lastEntry:{} length:{})", new Object[]{this.getId(), lastEntry, finalLength, metadata.getLastEntryId(), metadata.getLength()});
                        throw new BKException.BKMetadataVersionException();
                    }
                    return true;
                }, metadata -> LedgerMetadataBuilder.from(metadata).withClosedState().withLastEntryId(lastEntry).withLength(finalLength).build(), this::setLedgerMetadata).run().whenComplete((metadata, ex) -> {
                    if (ex != null) {
                        this.closePromise.completeExceptionally((Throwable)ex);
                    } else {
                        FutureUtils.complete(this.closePromise, null);
                    }
                });
            }
        });
    }

    public Enumeration<LedgerEntry> readEntries(long firstEntry, long lastEntry) throws InterruptedException, BKException {
        CompletableFuture<Enumeration<LedgerEntry>> result = new CompletableFuture<Enumeration<LedgerEntry>>();
        this.asyncReadEntries(firstEntry, lastEntry, new SyncCallbackUtils.SyncReadCallback(result), null);
        return SyncCallbackUtils.waitForResult(result);
    }

    public Enumeration<LedgerEntry> readUnconfirmedEntries(long firstEntry, long lastEntry) throws InterruptedException, BKException {
        CompletableFuture<Enumeration<LedgerEntry>> result = new CompletableFuture<Enumeration<LedgerEntry>>();
        this.asyncReadUnconfirmedEntries(firstEntry, lastEntry, new SyncCallbackUtils.SyncReadCallback(result), null);
        return SyncCallbackUtils.waitForResult(result);
    }

    public void asyncReadEntries(long firstEntry, long lastEntry, AsyncCallback.ReadCallback cb, Object ctx) {
        if (firstEntry < 0L || firstEntry > lastEntry) {
            LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} lastEntry:{}", new Object[]{this.ledgerId, firstEntry, lastEntry});
            cb.readComplete(-14, this, null, ctx);
            return;
        }
        if (lastEntry > this.lastAddConfirmed) {
            LOG.error("ReadEntries exception on ledgerId:{} firstEntry:{} lastEntry:{} lastAddConfirmed:{}", new Object[]{this.ledgerId, firstEntry, lastEntry, this.lastAddConfirmed});
            cb.readComplete(-1, this, null, ctx);
            return;
        }
        this.asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false);
    }

    public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, AsyncCallback.ReadCallback cb, Object ctx) {
        if (firstEntry < 0L || firstEntry > lastEntry) {
            LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} lastEntry:{}", new Object[]{this.ledgerId, firstEntry, lastEntry});
            cb.readComplete(-14, this, null, ctx);
            return;
        }
        this.asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false);
    }

    @Override
    public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
        if (firstEntry < 0L || firstEntry > lastEntry) {
            LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} lastEntry:{}", new Object[]{this.ledgerId, firstEntry, lastEntry});
            return FutureUtils.exception(new BKException.BKIncorrectParameterException());
        }
        if (lastEntry > this.lastAddConfirmed) {
            LOG.error("ReadAsync exception on ledgerId:{} firstEntry:{} lastEntry:{} lastAddConfirmed:{}", new Object[]{this.ledgerId, firstEntry, lastEntry, this.lastAddConfirmed});
            return FutureUtils.exception(new BKException.BKReadException());
        }
        return this.readEntriesInternalAsync(firstEntry, lastEntry, false);
    }

    @Override
    public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
        if (firstEntry < 0L || firstEntry > lastEntry) {
            LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} lastEntry:{}", new Object[]{this.ledgerId, firstEntry, lastEntry});
            return FutureUtils.exception(new BKException.BKIncorrectParameterException());
        }
        return this.readEntriesInternalAsync(firstEntry, lastEntry, false);
    }

    void asyncReadEntriesInternal(long firstEntry, long lastEntry, final AsyncCallback.ReadCallback cb, final Object ctx, boolean isRecoveryRead) {
        if (!this.clientCtx.isClientClosed()) {
            this.readEntriesInternalAsync(firstEntry, lastEntry, isRecoveryRead).whenCompleteAsync(new FutureEventListener<LedgerEntries>(){

                @Override
                public void onSuccess(LedgerEntries entries) {
                    cb.readComplete(0, LedgerHandle.this, IteratorUtils.asEnumeration(Iterators.transform(entries.iterator(), le -> {
                        LedgerEntry entry = new LedgerEntry((LedgerEntryImpl)le);
                        le.close();
                        return entry;
                    })), ctx);
                }

                @Override
                public void onFailure(Throwable cause) {
                    if (cause instanceof BKException) {
                        BKException bke = (BKException)cause;
                        cb.readComplete(bke.getCode(), LedgerHandle.this, null, ctx);
                    } else {
                        cb.readComplete(-999, LedgerHandle.this, null, ctx);
                    }
                }
            }, (Executor)this.clientCtx.getMainWorkerPool().chooseThread(this.ledgerId));
        } else {
            cb.readComplete(-19, this, null, ctx);
        }
    }

    public void asyncReadLastEntry(AsyncCallback.ReadCallback cb, Object ctx) {
        long lastEntryId = this.getLastAddConfirmed();
        if (lastEntryId < 0L) {
            cb.readComplete(-13, this, null, ctx);
        } else {
            this.asyncReadEntriesInternal(lastEntryId, lastEntryId, cb, ctx, false);
        }
    }

    public LedgerEntry readLastEntry() throws InterruptedException, BKException {
        long lastEntryId = this.getLastAddConfirmed();
        if (lastEntryId < 0L) {
            throw new BKException.BKNoSuchEntryException();
        }
        CompletableFuture<Enumeration<LedgerEntry>> result = new CompletableFuture<Enumeration<LedgerEntry>>();
        this.asyncReadEntries(lastEntryId, lastEntryId, new SyncCallbackUtils.SyncReadCallback(result), null);
        return SyncCallbackUtils.waitForResult(result).nextElement();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    CompletableFuture<LedgerEntries> readEntriesInternalAsync(long firstEntry, long lastEntry, boolean isRecoveryRead) {
        PendingReadOp op = new PendingReadOp(this, this.clientCtx, firstEntry, lastEntry, isRecoveryRead);
        if (!this.clientCtx.isClientClosed()) {
            if (this.clientCtx.getConf().waitForWriteSetMs >= 0L) {
                DistributionSchedule.WriteSet ws = this.distributionSchedule.getWriteSet(firstEntry);
                try {
                    if (!this.waitForWritable(ws, ws.size() - 1, this.clientCtx.getConf().waitForWriteSetMs)) {
                        op.allowFailFastOnUnwritableChannel();
                    }
                }
                finally {
                    ws.recycle();
                }
            }
            if (this.isHandleWritable()) {
                this.executeOrdered(op);
            } else {
                op.run();
            }
        } else {
            op.future().completeExceptionally(BKException.create(-19));
        }
        return op.future();
    }

    public long addEntry(byte[] data) throws InterruptedException, BKException {
        return this.addEntry(data, 0, data.length);
    }

    @Override
    public CompletableFuture<Long> appendAsync(ByteBuf data) {
        SyncCallbackUtils.SyncAddCallback callback = new SyncCallbackUtils.SyncAddCallback();
        this.asyncAddEntry(data, (AsyncCallback.AddCallback)callback, null);
        return callback;
    }

    public long addEntry(long entryId, byte[] data) throws InterruptedException, BKException {
        LOG.error("To use this feature Ledger must be created with createLedgerAdv interface.");
        throw BKException.create(-100);
    }

    public long addEntry(byte[] data, int offset, int length) throws InterruptedException, BKException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Adding entry {}", (Object)data);
        }
        SyncCallbackUtils.SyncAddCallback callback = new SyncCallbackUtils.SyncAddCallback();
        this.asyncAddEntry(data, offset, length, callback, null);
        return SyncCallbackUtils.waitForResult(callback);
    }

    public long addEntry(long entryId, byte[] data, int offset, int length) throws InterruptedException, BKException {
        LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface.");
        throw BKException.create(-100);
    }

    public void asyncAddEntry(byte[] data, AsyncCallback.AddCallback cb, Object ctx) {
        this.asyncAddEntry(data, 0, data.length, cb, ctx);
    }

    public void asyncAddEntry(long entryId, byte[] data, AsyncCallback.AddCallback cb, Object ctx) {
        LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface.");
        cb.addCompleteWithLatency(-100, this, entryId, 0L, ctx);
    }

    public void asyncAddEntry(byte[] data, int offset, int length, AsyncCallback.AddCallback cb, Object ctx) {
        if (offset < 0 || length < 0 || offset + length > data.length) {
            throw new ArrayIndexOutOfBoundsException("Invalid values for offset(" + offset + ") or length(" + length + ")");
        }
        this.asyncAddEntry(Unpooled.wrappedBuffer((byte[])data, (int)offset, (int)length), cb, ctx);
    }

    public void asyncAddEntry(ByteBuf data, AsyncCallback.AddCallback cb, Object ctx) {
        PendingAddOp op = PendingAddOp.create(this, this.clientCtx, this.getCurrentEnsemble(), data, this.writeFlags, cb, ctx);
        this.doAsyncAddEntry(op);
    }

    public void asyncAddEntry(long entryId, byte[] data, int offset, int length, AsyncCallback.AddCallback cb, Object ctx) {
        LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface.");
        cb.addCompleteWithLatency(-100, this, entryId, 0L, ctx);
    }

    public void asyncAddEntry(long entryId, byte[] data, int offset, int length, AsyncCallback.AddCallbackWithLatency cb, Object ctx) {
        LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface.");
        cb.addCompleteWithLatency(-100, this, entryId, 0L, ctx);
    }

    public void asyncAddEntry(long entryId, ByteBuf data, AsyncCallback.AddCallbackWithLatency cb, Object ctx) {
        LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface.");
        cb.addCompleteWithLatency(-100, this, entryId, 0L, ctx);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> force() {
        final CompletableFuture<Void> result = new CompletableFuture<Void>();
        ForceLedgerOp op = new ForceLedgerOp(this, this.clientCtx.getBookieClient(), this.getCurrentEnsemble(), result);
        boolean wasClosed = false;
        LedgerHandle ledgerHandle = this;
        synchronized (ledgerHandle) {
            if (!this.isHandleWritable()) {
                wasClosed = true;
            }
        }
        if (wasClosed) {
            try {
                this.executeOrdered(new Runnable(){

                    @Override
                    public void run() {
                        LOG.warn("Force() attempted on a closed ledger: {}", (Object)LedgerHandle.this.ledgerId);
                        result.completeExceptionally(new BKException.BKLedgerClosedException());
                    }

                    public String toString() {
                        return String.format("force(lid=%d)", LedgerHandle.this.ledgerId);
                    }
                });
            }
            catch (RejectedExecutionException e) {
                result.completeExceptionally(new BKException.BKInterruptedException());
            }
            return result;
        }
        if (this.pendingAddsSequenceHead == -1L) {
            this.executeOrdered(new Runnable(){

                @Override
                public void run() {
                    FutureUtils.complete(result, null);
                }

                public String toString() {
                    return String.format("force(lid=%d)", LedgerHandle.this.ledgerId);
                }
            });
            return result;
        }
        try {
            this.executeOrdered(op);
        }
        catch (RejectedExecutionException e) {
            result.completeExceptionally(new BKException.BKInterruptedException());
        }
        return result;
    }

    void asyncRecoveryAddEntry(byte[] data, int offset, int length, AsyncCallback.AddCallback cb, Object ctx) {
        PendingAddOp op = PendingAddOp.create(this, this.clientCtx, this.getCurrentEnsemble(), Unpooled.wrappedBuffer((byte[])data, (int)offset, (int)length), this.writeFlags, cb, ctx).enableRecoveryAdd();
        this.doAsyncAddEntry(op);
    }

    private boolean isWriteSetWritable(DistributionSchedule.WriteSet writeSet, int allowedNonWritableCount) {
        if (allowedNonWritableCount < 0) {
            allowedNonWritableCount = 0;
        }
        int sz = writeSet.size();
        int requiredWritable = sz - allowedNonWritableCount;
        int nonWritableCount = 0;
        List<BookieId> currentEnsemble = this.getCurrentEnsemble();
        for (int i = 0; i < sz; ++i) {
            int writeBookieIndex = writeSet.get(i);
            if (writeBookieIndex < currentEnsemble.size() && !this.clientCtx.getBookieClient().isWritable(currentEnsemble.get(writeBookieIndex), this.ledgerId)) {
                if (++nonWritableCount < allowedNonWritableCount) continue;
                return false;
            }
            int knownWritable = i - nonWritableCount;
            if (knownWritable < requiredWritable) continue;
            return true;
        }
        return true;
    }

    @VisibleForTesting
    protected boolean waitForWritable(DistributionSchedule.WriteSet writeSet, int allowedNonWritableCount, long durationMs) {
        if (durationMs < 0L) {
            return true;
        }
        long startTime = MathUtils.nowInNano();
        boolean writableResult = this.isWriteSetWritable(writeSet, allowedNonWritableCount);
        if (!writableResult && durationMs > 0L) {
            int backoff = 1;
            int maxBackoff = 4;
            long deadline = startTime + TimeUnit.MILLISECONDS.toNanos(durationMs);
            while (!(writableResult = this.isWriteSetWritable(writeSet, allowedNonWritableCount))) {
                if (MathUtils.nowInNano() < deadline) {
                    long maxSleep = MathUtils.elapsedMSec(startTime);
                    if (maxSleep < 0L) {
                        maxSleep = 1L;
                    }
                    long sleepMs = Math.min((long)backoff, maxSleep);
                    try {
                        TimeUnit.MILLISECONDS.sleep(sleepMs);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        writableResult = this.isWriteSetWritable(writeSet, allowedNonWritableCount);
                        break;
                    }
                    if (backoff > 4) continue;
                    ++backoff;
                    continue;
                }
                writableResult = false;
                break;
            }
            if (backoff > 1) {
                LOG.info("Spent {} ms waiting for {} writable channels, writable result {}", new Object[]{MathUtils.elapsedMSec(startTime), writeSet.size() - allowedNonWritableCount, writableResult});
            }
        }
        if (writableResult) {
            this.clientChannelWriteWaitStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
        } else {
            this.clientChannelWriteWaitStats.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
        }
        return writableResult;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void doAsyncAddEntry(final PendingAddOp op) {
        if (this.throttler != null) {
            this.throttler.acquire();
        }
        boolean wasClosed = false;
        LedgerHandle ledgerHandle = this;
        synchronized (ledgerHandle) {
            if (this.isHandleWritable()) {
                long entryId = ++this.lastAddPushed;
                long currentLedgerLength = this.addToLength(op.payload.readableBytes());
                op.setEntryId(entryId);
                op.setLedgerLength(currentLedgerLength);
                this.pendingAddOps.add(op);
            } else {
                wasClosed = true;
            }
        }
        if (wasClosed) {
            try {
                this.executeOrdered(new Runnable(){

                    @Override
                    public void run() {
                        LOG.warn("Attempt to add to closed ledger: {}", (Object)LedgerHandle.this.ledgerId);
                        op.cb.addCompleteWithLatency(-11, LedgerHandle.this, -1L, 0L, op.ctx);
                        op.recyclePendAddOpObject();
                    }

                    public String toString() {
                        return String.format("AsyncAddEntryToClosedLedger(lid=%d)", LedgerHandle.this.ledgerId);
                    }
                });
            }
            catch (RejectedExecutionException e) {
                op.cb.addCompleteWithLatency(BookKeeper.getReturnRc(this.clientCtx.getBookieClient(), -15), this, -1L, 0L, op.ctx);
                op.recyclePendAddOpObject();
            }
            return;
        }
        if (this.clientCtx.getConf().waitForWriteSetMs >= 0L) {
            DistributionSchedule.WriteSet ws = this.distributionSchedule.getWriteSet(op.getEntryId());
            try {
                if (!this.waitForWritable(ws, 0, this.clientCtx.getConf().waitForWriteSetMs)) {
                    op.allowFailFastOnUnwritableChannel();
                }
            }
            finally {
                ws.recycle();
            }
        }
        op.initiate();
    }

    synchronized void updateLastConfirmed(long lac, long len) {
        if (lac > this.lastAddConfirmed) {
            this.lastAddConfirmed = lac;
            this.lacUpdateHitsCounter.inc();
        } else {
            this.lacUpdateMissesCounter.inc();
        }
        this.lastAddPushed = Math.max(this.lastAddPushed, lac);
        this.length = Math.max(this.length, len);
    }

    public void asyncReadLastConfirmed(AsyncCallback.ReadLastConfirmedCallback cb, Object ctx) {
        if (this.clientCtx.getConf().useV2WireProtocol) {
            this.asyncReadPiggybackLastConfirmed(cb, ctx);
        } else {
            this.asyncReadExplicitLastConfirmed(cb, ctx);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void asyncReadPiggybackLastConfirmed(final AsyncCallback.ReadLastConfirmedCallback cb, final Object ctx) {
        long lastEntryId;
        boolean isClosed;
        LedgerHandle ledgerHandle = this;
        synchronized (ledgerHandle) {
            LedgerMetadata metadata = this.getLedgerMetadata();
            isClosed = metadata.isClosed();
            lastEntryId = metadata.getLastEntryId();
        }
        if (isClosed) {
            cb.readLastConfirmedComplete(0, lastEntryId, ctx);
            return;
        }
        ReadLastConfirmedOp.LastConfirmedDataCallback innercb = new ReadLastConfirmedOp.LastConfirmedDataCallback(){

            @Override
            public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
                if (rc == 0) {
                    LedgerHandle.this.updateLastConfirmed(data.getLastAddConfirmed(), data.getLength());
                    cb.readLastConfirmedComplete(rc, data.getLastAddConfirmed(), ctx);
                } else {
                    cb.readLastConfirmedComplete(rc, -1L, ctx);
                }
            }
        };
        new ReadLastConfirmedOp(this.clientCtx.getBookieClient(), this.distributionSchedule, this.macManager, this.ledgerId, this.getCurrentEnsemble(), this.ledgerKey, innercb).initiate();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void asyncTryReadLastConfirmed(final AsyncCallback.ReadLastConfirmedCallback cb, final Object ctx) {
        long lastEntryId;
        boolean isClosed;
        LedgerHandle ledgerHandle = this;
        synchronized (ledgerHandle) {
            LedgerMetadata metadata = this.getLedgerMetadata();
            isClosed = metadata.isClosed();
            lastEntryId = metadata.getLastEntryId();
        }
        if (isClosed) {
            cb.readLastConfirmedComplete(0, lastEntryId, ctx);
            return;
        }
        ReadLastConfirmedOp.LastConfirmedDataCallback innercb = new ReadLastConfirmedOp.LastConfirmedDataCallback(){
            AtomicBoolean completed = new AtomicBoolean(false);

            @Override
            public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
                if (rc == 0) {
                    LedgerHandle.this.updateLastConfirmed(data.getLastAddConfirmed(), data.getLength());
                    if (this.completed.compareAndSet(false, true)) {
                        cb.readLastConfirmedComplete(rc, data.getLastAddConfirmed(), ctx);
                    }
                } else if (this.completed.compareAndSet(false, true)) {
                    cb.readLastConfirmedComplete(rc, -1L, ctx);
                }
            }
        };
        new TryReadLastConfirmedOp(this, this.clientCtx.getBookieClient(), this.getCurrentEnsemble(), innercb, this.getLastAddConfirmed()).initiate();
    }

    @Override
    public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
        SyncCallbackUtils.FutureReadLastConfirmed result = new SyncCallbackUtils.FutureReadLastConfirmed();
        this.asyncTryReadLastConfirmed(result, null);
        return result;
    }

    @Override
    public CompletableFuture<Long> readLastAddConfirmedAsync() {
        SyncCallbackUtils.FutureReadLastConfirmed result = new SyncCallbackUtils.FutureReadLastConfirmed();
        this.asyncReadLastConfirmed(result, null);
        return result;
    }

    @Override
    public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId, long timeOutInMillis, boolean parallel) {
        SyncCallbackUtils.FutureReadLastConfirmedAndEntry result = new SyncCallbackUtils.FutureReadLastConfirmedAndEntry();
        this.asyncReadLastConfirmedAndEntry(entryId, timeOutInMillis, parallel, result, null);
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void asyncReadLastConfirmedAndEntry(long entryId, long timeOutInMillis, boolean parallel, final AsyncCallback.ReadLastConfirmedAndEntryCallback cb, final Object ctx) {
        long lac;
        boolean isClosed;
        LedgerHandle ledgerHandle = this;
        synchronized (ledgerHandle) {
            LedgerMetadata metadata = this.getLedgerMetadata();
            isClosed = metadata.isClosed();
            lac = metadata.getLastEntryId();
        }
        if (isClosed) {
            if (entryId > lac) {
                cb.readLastConfirmedAndEntryComplete(0, lac, null, ctx);
                return;
            }
        } else {
            lac = this.getLastAddConfirmed();
        }
        if (entryId <= lac) {
            this.asyncReadEntries(entryId, entryId, new AsyncCallback.ReadCallback(){

                @Override
                public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
                    if (0 == rc) {
                        if (seq.hasMoreElements()) {
                            cb.readLastConfirmedAndEntryComplete(rc, LedgerHandle.this.getLastAddConfirmed(), seq.nextElement(), ctx);
                        } else {
                            cb.readLastConfirmedAndEntryComplete(rc, LedgerHandle.this.getLastAddConfirmed(), null, ctx);
                        }
                    } else {
                        cb.readLastConfirmedAndEntryComplete(rc, -1L, null, ctx);
                    }
                }
            }, ctx);
            return;
        }
        ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback innercb = new ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback(){
            AtomicBoolean completed = new AtomicBoolean(false);

            @Override
            public void readLastConfirmedAndEntryComplete(int rc, long lastAddConfirmed, LedgerEntry entry) {
                if (rc == 0) {
                    if (this.completed.compareAndSet(false, true)) {
                        cb.readLastConfirmedAndEntryComplete(rc, lastAddConfirmed, entry, ctx);
                    }
                } else if (this.completed.compareAndSet(false, true)) {
                    cb.readLastConfirmedAndEntryComplete(rc, -1L, null, ctx);
                }
            }
        };
        new ReadLastConfirmedAndEntryOp(this, this.clientCtx, this.getCurrentEnsemble(), innercb, entryId - 1L, timeOutInMillis).parallelRead(parallel).initiate();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long readLastConfirmed() throws InterruptedException, BKException {
        LastConfirmedCtx ctx = new LastConfirmedCtx();
        this.asyncReadLastConfirmed(new SyncCallbackUtils.SyncReadLastConfirmedCallback(), ctx);
        LastConfirmedCtx lastConfirmedCtx = ctx;
        synchronized (lastConfirmedCtx) {
            while (!ctx.ready()) {
                ctx.wait();
            }
        }
        if (ctx.getRC() != 0) {
            throw BKException.create(ctx.getRC());
        }
        return ctx.getlastConfirmed();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long tryReadLastConfirmed() throws InterruptedException, BKException {
        LastConfirmedCtx ctx = new LastConfirmedCtx();
        this.asyncTryReadLastConfirmed(new SyncCallbackUtils.SyncReadLastConfirmedCallback(), ctx);
        LastConfirmedCtx lastConfirmedCtx = ctx;
        synchronized (lastConfirmedCtx) {
            while (!ctx.ready()) {
                ctx.wait();
            }
        }
        if (ctx.getRC() != 0) {
            throw BKException.create(ctx.getRC());
        }
        return ctx.getlastConfirmed();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void asyncReadExplicitLastConfirmed(final AsyncCallback.ReadLastConfirmedCallback cb, final Object ctx) {
        boolean isClosed;
        LedgerHandle ledgerHandle = this;
        synchronized (ledgerHandle) {
            LedgerMetadata metadata = this.getLedgerMetadata();
            isClosed = metadata.isClosed();
            if (isClosed) {
                this.lastAddConfirmed = metadata.getLastEntryId();
                this.length = metadata.getLength();
            }
        }
        if (isClosed) {
            cb.readLastConfirmedComplete(0, this.lastAddConfirmed, ctx);
            return;
        }
        PendingReadLacOp.LacCallback innercb = new PendingReadLacOp.LacCallback(){

            @Override
            public void getLacComplete(int rc, long lac) {
                if (rc == 0) {
                    LedgerHandle.this.updateLastConfirmed(lac, 0L);
                    cb.readLastConfirmedComplete(rc, lac, ctx);
                } else {
                    cb.readLastConfirmedComplete(rc, -1L, ctx);
                }
            }
        };
        new PendingReadLacOp(this, this.clientCtx.getBookieClient(), this.getCurrentEnsemble(), innercb).initiate();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long readExplicitLastConfirmed() throws InterruptedException, BKException {
        LastConfirmedCtx ctx = new LastConfirmedCtx();
        this.asyncReadExplicitLastConfirmed(new SyncCallbackUtils.SyncReadLastConfirmedCallback(), ctx);
        LastConfirmedCtx lastConfirmedCtx = ctx;
        synchronized (lastConfirmedCtx) {
            while (!ctx.ready()) {
                ctx.wait();
            }
        }
        if (ctx.getRC() != 0) {
            throw BKException.create(ctx.getRC());
        }
        return ctx.getlastConfirmed();
    }

    void handleUnrecoverableErrorDuringAdd(int rc) {
        if (this.getLedgerMetadata().getState() == LedgerMetadata.State.IN_RECOVERY) {
            this.errorOutPendingAdds(rc);
            return;
        }
        LOG.error("Closing ledger {} due to {}", (Object)this.ledgerId, BKException.codeLogger(rc));
        this.asyncCloseInternal(NoopCloseCallback.instance, null, rc);
    }

    private void monitorPendingAddOps() {
        int timedOut = 0;
        for (PendingAddOp op : this.pendingAddOps) {
            if (!op.maybeTimeout()) continue;
            ++timedOut;
        }
        if (timedOut > 0) {
            LOG.info("Timed out {} add ops", (Object)timedOut);
        }
    }

    void errorOutPendingAdds(int rc) {
        this.errorOutPendingAdds(rc, this.drainPendingAddsAndAdjustLength());
    }

    synchronized List<PendingAddOp> drainPendingAddsAndAdjustLength() {
        PendingAddOp pendingAddOp;
        ArrayList<PendingAddOp> opsDrained = new ArrayList<PendingAddOp>(this.pendingAddOps.size());
        while ((pendingAddOp = this.pendingAddOps.poll()) != null) {
            this.addToLength(-pendingAddOp.entryLength);
            opsDrained.add(pendingAddOp);
        }
        return opsDrained;
    }

    void errorOutPendingAdds(int rc, List<PendingAddOp> ops) {
        for (PendingAddOp op : ops) {
            op.submitCallback(rc);
        }
    }

    void sendAddSuccessCallbacks() {
        PendingAddOp pendingAddOp;
        while ((pendingAddOp = this.pendingAddOps.peek()) != null && !this.changingEnsemble) {
            if (!pendingAddOp.completed) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("pending add not completed: {}", (Object)pendingAddOp);
                }
                return;
            }
            if (pendingAddOp.entryId != 0L && pendingAddOp.entryId != this.pendingAddsSequenceHead + 1L) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Head of the queue entryId: {} is not the expected value: {}", (Object)pendingAddOp.entryId, (Object)(this.pendingAddsSequenceHead + 1L));
                }
                return;
            }
            this.pendingAddOps.remove();
            this.explicitLacFlushPolicy.updatePiggyBackedLac(this.lastAddConfirmed);
            this.pendingAddsSequenceHead = pendingAddOp.entryId;
            if (!this.writeFlags.contains((Object)WriteFlag.DEFERRED_SYNC)) {
                this.lastAddConfirmed = this.pendingAddsSequenceHead;
            }
            pendingAddOp.submitCallback(0);
        }
    }

    @VisibleForTesting
    boolean hasDelayedWriteFailedBookies() {
        return !this.delayedWriteFailedBookies.isEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void notifyWriteFailed(int index, BookieId addr) {
        Object object = this.metadataLock;
        synchronized (object) {
            this.delayedWriteFailedBookies.put(index, addr);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void maybeHandleDelayedWriteBookieFailure() {
        Object object = this.metadataLock;
        synchronized (object) {
            if (this.delayedWriteFailedBookies.isEmpty()) {
                return;
            }
            HashMap<Integer, BookieId> toReplace = new HashMap<Integer, BookieId>(this.delayedWriteFailedBookies);
            this.delayedWriteFailedBookies.clear();
            this.handleBookieFailure(toReplace);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handleBookieFailure(Map<Integer, BookieId> failedBookies) {
        if (this.clientCtx.getConf().disableEnsembleChangeFeature.isAvailable()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Ensemble change is disabled. Retry sending to failed bookies {} for ledger {}.", failedBookies, (Object)this.ledgerId);
            }
            this.executeOrdered(() -> this.unsetSuccessAndSendWriteRequest(this.getCurrentEnsemble(), failedBookies.keySet()));
            return;
        }
        if (this.writeFlags.contains((Object)WriteFlag.DEFERRED_SYNC)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Cannot perform ensemble change with write flags {}. Failed bookies {} for ledger {}.", new Object[]{this.writeFlags, failedBookies, this.ledgerId});
            }
            this.handleUnrecoverableErrorDuringAdd(-12);
            return;
        }
        boolean triggerLoop = false;
        HashMap<Integer, BookieId> toReplace = null;
        List<BookieId> origEnsemble = null;
        Object object = this.metadataLock;
        synchronized (object) {
            if (this.changingEnsemble) {
                this.delayedWriteFailedBookies.putAll(failedBookies);
            } else {
                this.changingEnsemble = true;
                triggerLoop = true;
                toReplace = new HashMap<Integer, BookieId>(this.delayedWriteFailedBookies);
                this.delayedWriteFailedBookies.clear();
                toReplace.putAll(failedBookies);
                origEnsemble = this.getCurrentEnsemble();
            }
        }
        if (triggerLoop) {
            this.ensembleChangeLoop(origEnsemble, toReplace);
        }
    }

    void ensembleChangeLoop(List<BookieId> origEnsemble, Map<Integer, BookieId> failedBookies) {
        int ensembleChangeId = this.numEnsembleChanges.incrementAndGet();
        this.ensembleChangeCounter.inc();
        String logContext = String.format("[EnsembleChange(ledger:%d, change-id:%010d)]", this.ledgerId, ensembleChangeId);
        if (ensembleChangeId > this.clientCtx.getConf().maxAllowedEnsembleChanges) {
            LOG.info("{} reaches max allowed ensemble change number {}", (Object)logContext, (Object)this.clientCtx.getConf().maxAllowedEnsembleChanges);
            this.handleUnrecoverableErrorDuringAdd(-12);
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} Replacing {} in {}", new Object[]{logContext, failedBookies, origEnsemble});
        }
        AtomicInteger attempts = new AtomicInteger(0);
        new MetadataUpdateLoop(this.clientCtx.getLedgerManager(), this.getId(), this::getVersionedLedgerMetadata, metadata -> metadata.getState() == LedgerMetadata.State.OPEN && failedBookies.entrySet().stream().anyMatch(e -> LedgerMetadataUtils.getLastEnsembleValue(metadata).get((Integer)e.getKey()).equals(e.getValue())), metadata -> {
            attempts.incrementAndGet();
            List<BookieId> currentEnsemble = this.getCurrentEnsemble();
            List<BookieId> newEnsemble = EnsembleUtils.replaceBookiesInEnsemble(this.clientCtx.getBookieWatcher(), metadata, currentEnsemble, failedBookies, logContext);
            Long lastEnsembleKey = LedgerMetadataUtils.getLastEnsembleKey(metadata);
            LedgerMetadataBuilder builder = LedgerMetadataBuilder.from(metadata);
            long newEnsembleStartEntry = this.getLastAddConfirmed() + 1L;
            Preconditions.checkState(lastEnsembleKey <= newEnsembleStartEntry, "New ensemble must either replace the last ensemble, or add a new one");
            if (LOG.isDebugEnabled()) {
                LOG.debug("{}[attempt:{}] changing ensemble from: {} to: {} starting at entry: {}", new Object[]{logContext, attempts.get(), currentEnsemble, newEnsemble, newEnsembleStartEntry});
            }
            if (lastEnsembleKey.equals(newEnsembleStartEntry)) {
                return builder.replaceEnsembleEntry(newEnsembleStartEntry, newEnsemble).build();
            }
            return builder.newEnsembleEntry(newEnsembleStartEntry, newEnsemble).build();
        }, this::setLedgerMetadata).run().whenCompleteAsync((metadata, ex) -> {
            if (ex != null) {
                LOG.warn("{}[attempt:{}] Exception changing ensemble", new Object[]{logContext, attempts.get(), ex});
                this.handleUnrecoverableErrorDuringAdd(BKException.getExceptionCode(ex, -12));
            } else if (((LedgerMetadata)metadata.getValue()).isClosed()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{}[attempt:{}] Metadata closed during attempt to replace bookie. Another client must have recovered the ledger.", (Object)logContext, (Object)attempts.get());
                }
                this.handleUnrecoverableErrorDuringAdd(-11);
            } else if (((LedgerMetadata)metadata.getValue()).getState() == LedgerMetadata.State.IN_RECOVERY) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{}[attempt:{}] Metadata marked as in-recovery during attempt to replace bookie. Another client must be recovering the ledger.", (Object)logContext, (Object)attempts.get());
                }
                this.handleUnrecoverableErrorDuringAdd(-101);
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{}[attempt:{}] Success updating metadata.", (Object)logContext, (Object)attempts.get());
                }
                List<BookieId> newEnsemble = null;
                Set<Integer> replaced = null;
                Object object = this.metadataLock;
                synchronized (object) {
                    if (!this.delayedWriteFailedBookies.isEmpty()) {
                        HashMap<Integer, BookieId> toReplace = new HashMap<Integer, BookieId>(this.delayedWriteFailedBookies);
                        this.delayedWriteFailedBookies.clear();
                        this.ensembleChangeLoop(origEnsemble, toReplace);
                    } else {
                        newEnsemble = this.getCurrentEnsemble();
                        replaced = EnsembleUtils.diffEnsemble(origEnsemble, newEnsemble);
                        LOG.info("New Ensemble: {} for ledger: {}", newEnsemble, (Object)this.ledgerId);
                        this.changingEnsemble = false;
                    }
                }
                if (newEnsemble != null) {
                    this.unsetSuccessAndSendWriteRequest(newEnsemble, replaced);
                }
            }
        }, (Executor)this.clientCtx.getMainWorkerPool().chooseThread(this.ledgerId));
    }

    void unsetSuccessAndSendWriteRequest(List<BookieId> ensemble, Set<Integer> bookies) {
        for (PendingAddOp pendingAddOp : this.pendingAddOps) {
            for (Integer bookieIndex : bookies) {
                pendingAddOp.unsetSuccessAndSendWriteRequest(ensemble, bookieIndex);
            }
        }
    }

    void registerOperationFailureOnBookie(BookieId bookie, long entryId) {
        if (this.clientCtx.getConf().enableBookieFailureTracking) {
            this.bookieFailureHistory.put(bookie, entryId);
        }
    }

    List<BookieId> getCurrentEnsemble() {
        return LedgerMetadataUtils.getCurrentEnsemble(this.versionedMetadata.getValue());
    }

    DistributionSchedule.WriteSet getWriteSetForReadOperation(long entryId) {
        if (this.stickyBookieIndex != -1) {
            return this.distributionSchedule.getWriteSet(this.stickyBookieIndex);
        }
        return this.distributionSchedule.getWriteSet(entryId);
    }

    void executeOrdered(Runnable runnable) throws RejectedExecutionException {
        this.clientCtx.getMainWorkerPool().executeOrdered(this.ledgerId, runnable);
    }

    static class NoopCloseCallback
    implements AsyncCallback.CloseCallback {
        static NoopCloseCallback instance = new NoopCloseCallback();

        NoopCloseCallback() {
        }

        @Override
        public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
            if (rc != 0) {
                LOG.warn("Close failed: {}", BKException.codeLogger(rc));
            }
        }
    }

    static class LastConfirmedCtx {
        static final long ENTRY_ID_PENDING = -10L;
        long response = -10L;
        int rc;

        LastConfirmedCtx() {
        }

        void setLastConfirmed(long lastConfirmed) {
            this.response = lastConfirmed;
        }

        long getlastConfirmed() {
            return this.response;
        }

        void setRC(int rc) {
            this.rc = rc;
        }

        int getRC() {
            return this.rc;
        }

        boolean ready() {
            return this.response != -10L;
        }
    }

    private static enum HandleState {
        OPEN,
        CLOSED;

    }
}

