package com.twitter.distributedlog;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.twitter.distributedlog.LogSegmentMetadata;
import com.twitter.distributedlog.bk.LedgerAllocator;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.DLIllegalStateException;
import com.twitter.distributedlog.exceptions.DLInterruptedException;
import com.twitter.distributedlog.exceptions.EndOfStreamException;
import com.twitter.distributedlog.exceptions.LockingException;
import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
import com.twitter.distributedlog.exceptions.ZKException;
import com.twitter.distributedlog.function.GetLastTxIdFunction;
import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter;
import com.twitter.distributedlog.impl.ZKLogSegmentFilters;
import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
import com.twitter.distributedlog.lock.DistributedLock;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
import com.twitter.distributedlog.logsegment.RollingPolicy;
import com.twitter.distributedlog.logsegment.SizeBasedRollingPolicy;
import com.twitter.distributedlog.logsegment.TimeBasedRollingPolicy;
import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
import com.twitter.distributedlog.metadata.MetadataUpdater;
import com.twitter.distributedlog.util.DLUtils;
import com.twitter.distributedlog.util.FailpointUtils;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.distributedlog.util.PermitLimiter;
import com.twitter.distributedlog.util.Transaction;
import com.twitter.distributedlog.util.Utils;
import com.twitter.distributedlog.zk.ZKOp;
import com.twitter.distributedlog.zk.ZKTransaction;
import com.twitter.distributedlog.zk.ZKVersionedSetOp;
import com.twitter.util.Function;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.meta.ZkVersion;
import org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.versioning.Version;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.AbstractFunction1;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/twitter/distributedlog/BKLogWriteHandler.class */
public class BKLogWriteHandler extends BKLogHandler {
    static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class);
    protected final DistributedLock lock;
    protected final int ensembleSize;
    protected final int writeQuorumSize;
    protected final int ackQuorumSize;
    protected final LedgerAllocator ledgerAllocator;
    protected final MaxTxId maxTxId;
    protected final MaxLogSegmentSequenceNo maxLogSegmentSequenceNo;
    protected final boolean sanityCheckTxnId;
    protected final boolean validateLogSegmentSequenceNumber;
    protected final int regionId;
    protected volatile boolean closed;
    protected final RollingPolicy rollingPolicy;
    protected Future<DistributedLock> lockFuture;
    protected final PermitLimiter writeLimiter;
    protected final FeatureProvider featureProvider;
    protected final DynamicDistributedLogConfiguration dynConf;
    protected final MetadataUpdater metadataUpdater;
    protected final LinkedList<Long> inprogressLSSNs;
    private final RecoverLogSegmentFunction recoverLogSegmentFunction;
    private final AbstractFunction1<List<LogSegmentMetadata>, Future<Long>> recoverLogSegmentsFunction;
    private final StatsLogger perLogStatsLogger;
    private final OpStatsLogger closeOpStats;
    private final OpStatsLogger openOpStats;
    private final OpStatsLogger recoverOpStats;
    private final OpStatsLogger deleteOpStats;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/twitter/distributedlog/BKLogWriteHandler$RecoverLogSegmentFunction.class */
    public class RecoverLogSegmentFunction extends Function<LogSegmentMetadata, Future<LogSegmentMetadata>> {
        RecoverLogSegmentFunction() {
        }

        public Future<LogSegmentMetadata> apply(final LogSegmentMetadata logSegmentMetadata) {
            if (!logSegmentMetadata.isInProgress()) {
                return Future.value(logSegmentMetadata);
            }
            BKLogWriteHandler.LOG.info("Recovering last record in log segment {} for {}.", logSegmentMetadata, BKLogWriteHandler.this.getFullyQualifiedName());
            return BKLogWriteHandler.this.asyncReadLastRecord(logSegmentMetadata, true, true, true).flatMap(new AbstractFunction1<LogRecordWithDLSN, Future<LogSegmentMetadata>>() { // from class: com.twitter.distributedlog.BKLogWriteHandler.RecoverLogSegmentFunction.1
                public Future<LogSegmentMetadata> apply(LogRecordWithDLSN logRecordWithDLSN) {
                    return RecoverLogSegmentFunction.this.completeLogSegment(logSegmentMetadata, logRecordWithDLSN);
                }
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Future<LogSegmentMetadata> completeLogSegment(LogSegmentMetadata logSegmentMetadata, LogRecordWithDLSN logRecordWithDLSN) {
            BKLogWriteHandler.LOG.info("Recovered last record in log segment {} for {}.", logSegmentMetadata, BKLogWriteHandler.this.getFullyQualifiedName());
            long j = -99;
            int i = 0;
            long j2 = -1;
            long j3 = -1;
            if (null != logRecordWithDLSN) {
                j = logRecordWithDLSN.getTransactionId();
                i = logRecordWithDLSN.getLastPositionWithinLogSegment();
                j2 = logRecordWithDLSN.getDlsn().getEntryId();
                j3 = logRecordWithDLSN.getDlsn().getSlotId();
            }
            if (j == -999) {
                BKLogWriteHandler.LOG.error("Unrecoverable corruption has occurred in segment " + logSegmentMetadata.toString() + " at path " + logSegmentMetadata.getZkPath() + ". Unable to continue recovery.");
                return Future.exception(new IOException("Unrecoverable corruption, please check logs."));
            }
            if (j == -99) {
                j = logSegmentMetadata.getFirstTxId();
            }
            Promise<LogSegmentMetadata> promise = new Promise<>();
            BKLogWriteHandler.this.doCompleteAndCloseLogSegment(logSegmentMetadata.getZNodeName(), logSegmentMetadata.getLogSegmentSequenceNumber(), logSegmentMetadata.getLedgerId(), logSegmentMetadata.getFirstTxId(), j, i, j2, j3, promise);
            return promise;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BKLogWriteHandler(ZKLogMetadataForWriter zKLogMetadataForWriter, DistributedLogConfiguration distributedLogConfiguration, ZooKeeperClientBuilder zooKeeperClientBuilder, BookKeeperClientBuilder bookKeeperClientBuilder, LogSegmentMetadataStore logSegmentMetadataStore, OrderedScheduler orderedScheduler, LedgerAllocator ledgerAllocator, StatsLogger statsLogger, StatsLogger statsLogger2, AlertStatsLogger alertStatsLogger, String str, int i, PermitLimiter permitLimiter, FeatureProvider featureProvider, DynamicDistributedLogConfiguration dynamicDistributedLogConfiguration, DistributedLock distributedLock) {
        super(zKLogMetadataForWriter, distributedLogConfiguration, zooKeeperClientBuilder, bookKeeperClientBuilder, logSegmentMetadataStore, orderedScheduler, statsLogger, alertStatsLogger, null, ZKLogSegmentFilters.WRITE_HANDLE_FILTER, str);
        this.closed = false;
        this.lockFuture = null;
        this.recoverLogSegmentFunction = new RecoverLogSegmentFunction();
        this.recoverLogSegmentsFunction = new AbstractFunction1<List<LogSegmentMetadata>, Future<Long>>() { // from class: com.twitter.distributedlog.BKLogWriteHandler.1
            public Future<Long> apply(List<LogSegmentMetadata> list) {
                BKLogWriteHandler.LOG.info("Initiating Recovery For {} : {}", BKLogWriteHandler.this.getFullyQualifiedName(), list);
                synchronized (BKLogWriteHandler.this) {
                    if (BKLogWriteHandler.this.lastLedgerRollingTimeMillis < 0) {
                        BKLogWriteHandler.this.lastLedgerRollingTimeMillis = Utils.nowInMillis();
                    }
                }
                if (BKLogWriteHandler.this.validateLogSegmentSequenceNumber) {
                    synchronized (BKLogWriteHandler.this.inprogressLSSNs) {
                        for (LogSegmentMetadata logSegmentMetadata : list) {
                            if (logSegmentMetadata.isInProgress()) {
                                BKLogWriteHandler.this.inprogressLSSNs.addLast(Long.valueOf(logSegmentMetadata.getLogSegmentSequenceNumber()));
                            }
                        }
                    }
                }
                return FutureUtils.processList(list, BKLogWriteHandler.this.recoverLogSegmentFunction, BKLogWriteHandler.this.scheduler).map(GetLastTxIdFunction.INSTANCE);
            }
        };
        this.perLogStatsLogger = statsLogger2;
        this.writeLimiter = permitLimiter;
        this.featureProvider = featureProvider;
        this.dynConf = dynamicDistributedLogConfiguration;
        this.ledgerAllocator = ledgerAllocator;
        this.lock = distributedLock;
        this.metadataUpdater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(distributedLogConfiguration, logSegmentMetadataStore);
        this.ensembleSize = distributedLogConfiguration.getEnsembleSize();
        if (this.ensembleSize < distributedLogConfiguration.getWriteQuorumSize()) {
            this.writeQuorumSize = this.ensembleSize;
            LOG.warn("Setting write quorum size {} greater than ensemble size {}", Integer.valueOf(distributedLogConfiguration.getWriteQuorumSize()), Integer.valueOf(this.ensembleSize));
        } else {
            this.writeQuorumSize = distributedLogConfiguration.getWriteQuorumSize();
        }
        if (this.writeQuorumSize < distributedLogConfiguration.getAckQuorumSize()) {
            this.ackQuorumSize = this.writeQuorumSize;
            LOG.warn("Setting write ack quorum size {} greater than write quorum size {}", Integer.valueOf(distributedLogConfiguration.getAckQuorumSize()), Integer.valueOf(this.writeQuorumSize));
        } else {
            this.ackQuorumSize = distributedLogConfiguration.getAckQuorumSize();
        }
        if (distributedLogConfiguration.getEncodeRegionIDInLogSegmentMetadata()) {
            this.regionId = i;
        } else {
            this.regionId = 0;
        }
        this.sanityCheckTxnId = distributedLogConfiguration.getSanityCheckTxnID();
        this.validateLogSegmentSequenceNumber = distributedLogConfiguration.isLogSegmentSequenceNumberValidationEnabled();
        this.maxLogSegmentSequenceNo = new MaxLogSegmentSequenceNo(zKLogMetadataForWriter.getMaxLSSNData());
        this.inprogressLSSNs = new LinkedList<>();
        this.maxTxId = new MaxTxId(this.zooKeeperClient, zKLogMetadataForWriter.getMaxTxIdPath(), distributedLogConfiguration.getSanityCheckTxnID(), zKLogMetadataForWriter.getMaxTxIdData());
        scheduleGetLedgersTask(false, true);
        setLastLedgerRollingTimeMillis(Utils.nowInMillis());
        if (distributedLogConfiguration.getLogSegmentRollingIntervalMinutes() > 0) {
            this.rollingPolicy = new TimeBasedRollingPolicy(distributedLogConfiguration.getLogSegmentRollingIntervalMinutes() * 60 * 1000);
        } else {
            this.rollingPolicy = new SizeBasedRollingPolicy(distributedLogConfiguration.getMaxLogSegmentBytes());
        }
        StatsLogger scope = statsLogger.scope("segments");
        this.openOpStats = scope.getOpStatsLogger("open");
        this.closeOpStats = scope.getOpStatsLogger("close");
        this.recoverOpStats = scope.getOpStatsLogger("recover");
        this.deleteOpStats = scope.getOpStatsLogger("delete");
    }

    void storeMaxSequenceNumber(Transaction transaction, final MaxLogSegmentSequenceNo maxLogSegmentSequenceNo, final long j, final boolean z) {
        transaction.addOp(new ZKVersionedSetOp(Op.setData(this.logMetadata.getLogSegmentsPath(), DLUtils.serializeLogSegmentSequenceNumber(j), maxLogSegmentSequenceNo.getZkVersion()), new Transaction.OpListener<Version>() { // from class: com.twitter.distributedlog.BKLogWriteHandler.2
            @Override // com.twitter.distributedlog.util.Transaction.OpListener
            public void onCommit(Version version) {
                if (BKLogWriteHandler.this.validateLogSegmentSequenceNumber) {
                    synchronized (BKLogWriteHandler.this.inprogressLSSNs) {
                        if (z) {
                            BKLogWriteHandler.this.inprogressLSSNs.add(Long.valueOf(j));
                        } else {
                            BKLogWriteHandler.this.inprogressLSSNs.removeFirst();
                        }
                    }
                }
                maxLogSegmentSequenceNo.update((ZkVersion) version, j);
            }

            @Override // com.twitter.distributedlog.util.Transaction.OpListener
            public void onAbort(Throwable th) {
            }
        }));
    }

    void storeMaxTxId(ZKTransaction zKTransaction, final MaxTxId maxTxId, final long j) {
        byte[] couldStore = maxTxId.couldStore(j);
        if (null != couldStore) {
            zKTransaction.addOp(new ZKVersionedSetOp(Op.setData(maxTxId.getZkPath(), couldStore, -1), new Transaction.OpListener<Version>() { // from class: com.twitter.distributedlog.BKLogWriteHandler.3
                @Override // com.twitter.distributedlog.util.Transaction.OpListener
                public void onCommit(Version version) {
                    maxTxId.setMaxTxId(j);
                }

                @Override // com.twitter.distributedlog.util.Transaction.OpListener
                public void onAbort(Throwable th) {
                }
            }));
        }
    }

    void writeLogSegment(ZKTransaction zKTransaction, List<ACL> list, final String str, final LogSegmentMetadata logSegmentMetadata, String str2) {
        zKTransaction.addOp(new ZKOp(Op.create(str2, logSegmentMetadata.getFinalisedData().getBytes(Charsets.UTF_8), list, CreateMode.PERSISTENT)) { // from class: com.twitter.distributedlog.BKLogWriteHandler.4
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // com.twitter.distributedlog.zk.ZKOp
            public void commitOpResult(OpResult opResult) {
                BKLogWriteHandler.this.addLogSegmentToCache(str, logSegmentMetadata);
            }

            /* JADX INFO: Access modifiers changed from: protected */
            @Override // com.twitter.distributedlog.zk.ZKOp
            public void abortOpResult(Throwable th, OpResult opResult) {
            }
        });
    }

    void deleteLogSegment(ZKTransaction zKTransaction, final String str, String str2) {
        zKTransaction.addOp(new ZKOp(Op.delete(str2, -1)) { // from class: com.twitter.distributedlog.BKLogWriteHandler.5
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // com.twitter.distributedlog.zk.ZKOp
            public void commitOpResult(OpResult opResult) {
                BKLogWriteHandler.this.removeLogSegmentFromCache(str);
            }

            /* JADX INFO: Access modifiers changed from: protected */
            @Override // com.twitter.distributedlog.zk.ZKOp
            public void abortOpResult(Throwable th, OpResult opResult) {
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future<DistributedLock> lockHandler() {
        if (null != this.lockFuture) {
            return this.lockFuture;
        }
        this.lockFuture = this.lock.asyncAcquire();
        return this.lockFuture;
    }

    Future<Void> unlockHandler() {
        return null != this.lockFuture ? this.lock.asyncClose() : Future.Void();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void register(Watcher watcher) {
        this.zooKeeperClient.register(watcher);
    }

    public BKLogSegmentWriter startLogSegment(long j) throws IOException {
        return startLogSegment(j, false, false);
    }

    public BKLogSegmentWriter startLogSegment(long j, boolean z, boolean z2) throws IOException {
        Stopwatch createStarted = Stopwatch.createStarted();
        boolean z3 = false;
        try {
            BKLogSegmentWriter doStartLogSegment = doStartLogSegment(j, z, z2);
            z3 = true;
            if (1 != 0) {
                this.openOpStats.registerSuccessfulEvent(createStarted.stop().elapsed(TimeUnit.MICROSECONDS));
            } else {
                this.openOpStats.registerFailedEvent(createStarted.stop().elapsed(TimeUnit.MICROSECONDS));
            }
            return doStartLogSegment;
        } catch (Throwable th) {
            if (z3) {
                this.openOpStats.registerSuccessfulEvent(createStarted.stop().elapsed(TimeUnit.MICROSECONDS));
            } else {
                this.openOpStats.registerFailedEvent(createStarted.stop().elapsed(TimeUnit.MICROSECONDS));
            }
            throw th;
        }
    }

    protected long assignLogSegmentSequenceNumber() throws IOException {
        long j = 0;
        boolean z = false;
        if (LogSegmentMetadata.supportsLogSegmentSequenceNo(this.conf.getDLLedgerMetadataLayoutVersion())) {
            Long nextLogSegmentSequenceNumber = DLUtils.nextLogSegmentSequenceNumber(getFilteredLedgerListDesc(false, false));
            if (null == nextLogSegmentSequenceNumber) {
                z = false;
                j = this.conf.getFirstLogSegmentSequenceNumber();
            } else {
                z = true;
                j = nextLogSegmentSequenceNumber.longValue();
            }
        }
        if (!z && 0 == this.maxLogSegmentSequenceNo.getSequenceNumber()) {
            LOG.info("No max ledger sequence number found while creating log segment {} for {}.", Long.valueOf(j), getFullyQualifiedName());
        } else if (this.maxLogSegmentSequenceNo.getSequenceNumber() + 1 != j) {
            LOG.warn("Unexpected max log segment sequence number {} for {} : list of cached segments = {}", new Object[]{Long.valueOf(this.maxLogSegmentSequenceNo.getSequenceNumber()), getFullyQualifiedName(), getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR)});
            throw new DLIllegalStateException("Unexpected max log segment sequence number " + this.maxLogSegmentSequenceNo.getSequenceNumber() + " for " + getFullyQualifiedName() + ", expected " + (j - 1));
        }
        return j;
    }

    protected BKLogSegmentWriter doStartLogSegment(long j, boolean z, boolean z2) throws IOException {
        return (BKLogSegmentWriter) FutureUtils.result(asyncStartLogSegment(j, z, z2));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Future<BKLogSegmentWriter> asyncStartLogSegment(long j, boolean z, boolean z2) {
        Promise<BKLogSegmentWriter> promise = new Promise<>();
        try {
            this.lock.checkOwnershipAndReacquire();
            doStartLogSegment(j, z, z2, promise);
            return promise;
        } catch (LockingException e) {
            FutureUtils.setException(promise, e);
            return promise;
        }
    }

    protected void doStartLogSegment(final long j, final boolean z, boolean z2, final Promise<BKLogSegmentWriter> promise) {
        if (j < 0 || (!z2 && j == DistributedLogConstants.MAX_TXID)) {
            FutureUtils.setException(promise, new IOException("Invalid Transaction Id " + j));
            return;
        }
        if (this.sanityCheckTxnId) {
            long j2 = this.maxTxId.get();
            if (j < j2) {
                if (j2 == DistributedLogConstants.MAX_TXID) {
                    LOG.error("We've already marked the stream as ended and attempting to start a new log segment");
                    FutureUtils.setException(promise, new EndOfStreamException("Writing to a stream after it has been marked as completed"));
                    return;
                } else {
                    LOG.error("We've already seen TxId {} the max TXId is {}", Long.valueOf(j), Long.valueOf(j2));
                    FutureUtils.setException(promise, new TransactionIdOutOfOrderException(j, j2));
                    return;
                }
            }
        }
        try {
            this.ledgerAllocator.allocate();
            final ZKTransaction zKTransaction = new ZKTransaction(this.zooKeeperClient);
            try {
                FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_StartLogSegmentBeforeLedgerCreate);
                this.ledgerAllocator.tryObtain(zKTransaction, new Transaction.OpListener<LedgerHandle>() { // from class: com.twitter.distributedlog.BKLogWriteHandler.7
                    @Override // com.twitter.distributedlog.util.Transaction.OpListener
                    public void onCommit(LedgerHandle ledgerHandle) {
                    }

                    @Override // com.twitter.distributedlog.util.Transaction.OpListener
                    public void onAbort(Throwable th) {
                    }
                }).addEventListener(new FutureEventListener<LedgerHandle>() { // from class: com.twitter.distributedlog.BKLogWriteHandler.6
                    public void onSuccess(LedgerHandle ledgerHandle) {
                        BKLogWriteHandler.this.createInprogressLogSegment(zKTransaction, j, ledgerHandle, z, promise);
                    }

                    public void onFailure(Throwable th) {
                        BKLogWriteHandler.this.failStartLogSegment(promise, z, th);
                    }
                });
            } catch (IOException e) {
                failStartLogSegment(promise, z, e);
            }
        } catch (IOException e2) {
            failStartLogSegment(promise, z, e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void failStartLogSegment(Promise<BKLogSegmentWriter> promise, boolean z, Throwable th) {
        if (z) {
            FutureUtils.setValue(promise, null);
        } else {
            FutureUtils.setException(promise, th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void createInprogressLogSegment(ZKTransaction zKTransaction, final long j, final LedgerHandle ledgerHandle, boolean z, final Promise<BKLogSegmentWriter> promise) {
        try {
            FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_StartLogSegmentOnAssignLogSegmentSequenceNumber);
            final long assignLogSegmentSequenceNumber = assignLogSegmentSequenceNumber();
            final String inprogressZNodeName = inprogressZNodeName(ledgerHandle.getId(), j, assignLogSegmentSequenceNumber);
            String inprogressZNode = inprogressZNode(ledgerHandle.getId(), j, assignLogSegmentSequenceNumber);
            writeLogSegment(zKTransaction, this.zooKeeperClient.getDefaultACL(), inprogressZNodeName, new LogSegmentMetadata.LogSegmentMetadataBuilder(inprogressZNode, this.conf.getDLLedgerMetadataLayoutVersion(), ledgerHandle.getId(), j).setLogSegmentSequenceNo(assignLogSegmentSequenceNumber).setRegionId(this.regionId).setEnvelopeEntries(LogSegmentMetadata.supportsEnvelopedEntries(this.conf.getDLLedgerMetadataLayoutVersion())).build(), inprogressZNode);
            LOG.debug("Try storing max sequence number in startLogSegment {} : {}", inprogressZNode, Long.valueOf(assignLogSegmentSequenceNumber));
            storeMaxSequenceNumber(zKTransaction, this.maxLogSegmentSequenceNo, assignLogSegmentSequenceNumber, true);
            LOG.debug("Try storing MaxTxId in startLogSegment  {} {}", inprogressZNode, Long.valueOf(j));
            storeMaxTxId(zKTransaction, this.maxTxId, j);
            zKTransaction.execute().addEventListener(FutureUtils.FutureEventListenerRunnable.of(new FutureEventListener<Void>() { // from class: com.twitter.distributedlog.BKLogWriteHandler.8
                public void onSuccess(Void r22) {
                    try {
                        FutureUtils.setValue(promise, new BKLogSegmentWriter(BKLogWriteHandler.this.getFullyQualifiedName(), inprogressZNodeName, BKLogWriteHandler.this.conf, BKLogWriteHandler.this.conf.getDLLedgerMetadataLayoutVersion(), new BKLogSegmentEntryWriter(ledgerHandle), BKLogWriteHandler.this.lock, j, assignLogSegmentSequenceNumber, BKLogWriteHandler.this.scheduler, BKLogWriteHandler.this.statsLogger, BKLogWriteHandler.this.perLogStatsLogger, BKLogWriteHandler.this.alertStatsLogger, BKLogWriteHandler.this.writeLimiter, BKLogWriteHandler.this.featureProvider, BKLogWriteHandler.this.dynConf));
                    } catch (IOException e) {
                        BKLogWriteHandler.this.failStartLogSegment(promise, false, e);
                    }
                }

                public void onFailure(Throwable th) {
                    BKLogWriteHandler.this.failStartLogSegment(promise, false, th);
                }
            }, this.scheduler));
        } catch (IOException e) {
            zKTransaction.abort(e);
            failStartLogSegment(promise, z, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean shouldStartNewSegment(BKLogSegmentWriter bKLogSegmentWriter) {
        return this.rollingPolicy.shouldRollover(bKLogSegmentWriter, this.lastLedgerRollingTimeMillis);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future<LogSegmentMetadata> completeAndCloseLogSegment(BKLogSegmentWriter bKLogSegmentWriter) {
        Promise<LogSegmentMetadata> promise = new Promise<>();
        completeAndCloseLogSegment(bKLogSegmentWriter, promise);
        return promise;
    }

    private void completeAndCloseLogSegment(final BKLogSegmentWriter bKLogSegmentWriter, final Promise<LogSegmentMetadata> promise) {
        bKLogSegmentWriter.asyncClose().addEventListener(new FutureEventListener<Void>() { // from class: com.twitter.distributedlog.BKLogWriteHandler.9
            public void onSuccess(Void r18) {
                if (bKLogSegmentWriter.shouldFailCompleteLogSegment()) {
                    FutureUtils.setException(promise, new IOException("LogSegmentWriter for " + bKLogSegmentWriter.getFullyQualifiedLogSegment() + " is already in error."));
                } else {
                    BKLogWriteHandler.this.doCompleteAndCloseLogSegment(BKLogWriteHandler.this.inprogressZNodeName(bKLogSegmentWriter.getLogSegmentId(), bKLogSegmentWriter.getStartTxId(), bKLogSegmentWriter.getLogSegmentSequenceNumber()), bKLogSegmentWriter.getLogSegmentSequenceNumber(), bKLogSegmentWriter.getLogSegmentId(), bKLogSegmentWriter.getStartTxId(), bKLogSegmentWriter.getLastTxId(), bKLogSegmentWriter.getPositionWithinLogSegment(), bKLogSegmentWriter.getLastDLSN().getEntryId(), bKLogSegmentWriter.getLastDLSN().getSlotId(), promise);
                }
            }

            public void onFailure(Throwable th) {
                FutureUtils.setException(promise, th);
            }
        });
    }

    @VisibleForTesting
    LogSegmentMetadata completeAndCloseLogSegment(long j, long j2, long j3, long j4, int i) throws IOException {
        return completeAndCloseLogSegment(inprogressZNodeName(j2, j3, j), j, j2, j3, j4, i, -1L, -1L);
    }

    LogSegmentMetadata completeAndCloseLogSegment(String str, long j, long j2, long j3, long j4, int i, long j5, long j6) throws IOException {
        Stopwatch createStarted = Stopwatch.createStarted();
        boolean z = false;
        try {
            LogSegmentMetadata doCompleteAndCloseLogSegment = doCompleteAndCloseLogSegment(str, j, j2, j3, j4, i, j5, j6);
            z = true;
            if (1 != 0) {
                this.closeOpStats.registerSuccessfulEvent(createStarted.stop().elapsed(TimeUnit.MICROSECONDS));
            } else {
                this.closeOpStats.registerFailedEvent(createStarted.stop().elapsed(TimeUnit.MICROSECONDS));
            }
            return doCompleteAndCloseLogSegment;
        } catch (Throwable th) {
            if (z) {
                this.closeOpStats.registerSuccessfulEvent(createStarted.stop().elapsed(TimeUnit.MICROSECONDS));
            } else {
                this.closeOpStats.registerFailedEvent(createStarted.stop().elapsed(TimeUnit.MICROSECONDS));
            }
            throw th;
        }
    }

    protected long computeStartSequenceId(LogSegmentMetadata logSegmentMetadata) throws IOException {
        if (!logSegmentMetadata.isInProgress()) {
            return logSegmentMetadata.getStartSequenceId();
        }
        long j = -1;
        if (LogSegmentMetadata.supportsSequenceId(this.conf.getDLLedgerMetadataLayoutVersion()) && logSegmentMetadata.supportsSequenceId()) {
            j = DLUtils.computeStartSequenceId(getFilteredLedgerListDesc(false, false), logSegmentMetadata);
        }
        return j;
    }

    protected LogSegmentMetadata doCompleteAndCloseLogSegment(String str, long j, long j2, long j3, long j4, int i, long j5, long j6) throws IOException {
        Promise<LogSegmentMetadata> promise = new Promise<>();
        doCompleteAndCloseLogSegment(str, j, j2, j3, j4, i, j5, j6, promise);
        return (LogSegmentMetadata) FutureUtils.result(promise);
    }

    protected void doCompleteAndCloseLogSegment(final String str, long j, long j2, long j3, long j4, int i, long j5, long j6, final Promise<LogSegmentMetadata> promise) {
        try {
            this.lock.checkOwnershipAndReacquire();
            LOG.debug("Completing and Closing Log Segment {} {}", Long.valueOf(j3), Long.valueOf(j4));
            String inprogressZNode = inprogressZNode(str);
            LogSegmentMetadata readLogSegmentFromCache = readLogSegmentFromCache(str);
            if (readLogSegmentFromCache.getLedgerId() != j2) {
                FutureUtils.setException(promise, new IOException("Active ledger has different ID to inprogress. " + readLogSegmentFromCache.getLedgerId() + " found, " + j2 + " expected"));
                return;
            }
            if (readLogSegmentFromCache.getFirstTxId() != j3) {
                FutureUtils.setException(promise, new IOException("Transaction id not as expected, " + readLogSegmentFromCache.getFirstTxId() + " found, " + j3 + " expected"));
                return;
            }
            if (this.validateLogSegmentSequenceNumber) {
                synchronized (this.inprogressLSSNs) {
                    if (this.inprogressLSSNs.isEmpty()) {
                        FutureUtils.setException(promise, new UnexpectedException("Didn't find matched inprogress log segments when completing inprogress " + readLogSegmentFromCache));
                        return;
                    }
                    long longValue = this.inprogressLSSNs.getFirst().longValue();
                    if (readLogSegmentFromCache.getLogSegmentSequenceNumber() != j || longValue != j) {
                        FutureUtils.setException(promise, new UnexpectedException("Didn't find matched inprogress log segments when completing inprogress " + readLogSegmentFromCache));
                        return;
                    }
                }
            }
            long max = Math.max(j, this.maxLogSegmentSequenceNo.getSequenceNumber());
            if (this.maxLogSegmentSequenceNo.getSequenceNumber() == j || this.maxLogSegmentSequenceNo.getSequenceNumber() == j + 1) {
                LOG.info("Try storing max sequence number {} in completing {}.", new Object[]{Long.valueOf(j), inprogressZNode});
            } else {
                LOG.warn("Unexpected max ledger sequence number {} found while completing log segment {} for {}", new Object[]{Long.valueOf(this.maxLogSegmentSequenceNo.getSequenceNumber()), Long.valueOf(j), getFullyQualifiedName()});
                if (this.validateLogSegmentSequenceNumber) {
                    FutureUtils.setException(promise, new DLIllegalStateException("Unexpected max log segment sequence number " + this.maxLogSegmentSequenceNo.getSequenceNumber() + " for " + getFullyQualifiedName() + ", expected " + (j - 1)));
                    return;
                }
            }
            final String completedLedgerZNodeName = completedLedgerZNodeName(j3, j4, j);
            String completedLedgerZNode = completedLedgerZNode(j3, j4, j);
            try {
                final LogSegmentMetadata completeLogSegment = readLogSegmentFromCache.completeLogSegment(completedLedgerZNode, j4, i, j5, j6, computeStartSequenceId(readLogSegmentFromCache));
                setLastLedgerRollingTimeMillis(completeLogSegment.getCompletionTime());
                ZKTransaction zKTransaction = new ZKTransaction(this.zooKeeperClient);
                writeLogSegment(zKTransaction, this.zooKeeperClient.getDefaultACL(), completedLedgerZNodeName, completeLogSegment, completedLedgerZNode);
                deleteLogSegment(zKTransaction, str, inprogressZNode);
                storeMaxSequenceNumber(zKTransaction, this.maxLogSegmentSequenceNo, max, false);
                LOG.debug("Trying storing LastTxId in Finalize Path {} LastTxId {}", completedLedgerZNode, Long.valueOf(j4));
                storeMaxTxId(zKTransaction, this.maxTxId, j4);
                zKTransaction.execute().addEventListener(FutureUtils.FutureEventListenerRunnable.of(new FutureEventListener<Void>() { // from class: com.twitter.distributedlog.BKLogWriteHandler.10
                    public void onSuccess(Void r8) {
                        BKLogWriteHandler.LOG.info("Completed {} to {} for {} : {}", new Object[]{str, completedLedgerZNodeName, BKLogWriteHandler.this.getFullyQualifiedName(), completeLogSegment});
                        FutureUtils.setValue(promise, completeLogSegment);
                    }

                    public void onFailure(Throwable th) {
                        FutureUtils.setException(promise, th);
                    }
                }, this.scheduler));
            } catch (IOException e) {
                FutureUtils.setException(promise, e);
            }
        } catch (IOException e2) {
            FutureUtils.setException(promise, e2);
        }
    }

    public Future<Long> recoverIncompleteLogSegments() {
        try {
            FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_RecoverIncompleteLogSegments);
            return asyncGetFilteredLedgerList(false, false).flatMap(this.recoverLogSegmentsFunction);
        } catch (IOException e) {
            return Future.exception(e);
        }
    }

    public void deleteLog() throws IOException {
        this.lock.checkOwnershipAndReacquire();
        FutureUtils.result(purgeLogSegmentsOlderThanTxnId(-1L));
        try {
            Utils.closeQuietly(this.lock);
            this.zooKeeperClient.get().exists(this.logMetadata.getLogSegmentsPath(), false);
            this.zooKeeperClient.get().exists(this.logMetadata.getMaxTxIdPath(), false);
            if (this.logMetadata.getLogRootPath().toLowerCase().contains(DistributedLogConstants.SCHEME_PREFIX)) {
                ZKUtil.deleteRecursive(this.zooKeeperClient.get(), this.logMetadata.getLogRootPath());
            } else {
                LOG.warn("Skip deletion of unrecognized ZK Path {}", this.logMetadata.getLogRootPath());
            }
        } catch (KeeperException e) {
            LOG.error("Error deleting" + this.logMetadata.getLogRootPath() + " in zookeeper", e);
        } catch (InterruptedException e2) {
            LOG.error("Interrupted while deleting log znodes", e2);
            throw new DLInterruptedException("Interrupted while deleting " + this.logMetadata.getLogRootPath(), e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(final DLSN dlsn) {
        if (DLSN.InvalidDLSN == dlsn) {
            return Future.value(new ArrayList(0));
        }
        scheduleGetAllLedgersTaskIfNeeded();
        return asyncGetFullLedgerList(false, false).flatMap(new AbstractFunction1<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() { // from class: com.twitter.distributedlog.BKLogWriteHandler.11
            public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> list) {
                return BKLogWriteHandler.this.setLogSegmentsOlderThanDLSNTruncated(list, dlsn);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Future<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(List<LogSegmentMetadata> list, DLSN dlsn) {
        LOG.debug("Setting truncation status on logs older than {} from {} for {}", new Object[]{dlsn, list, getFullyQualifiedName()});
        ArrayList arrayList = new ArrayList(list.size());
        LogSegmentMetadata logSegmentMetadata = null;
        LOG.info("{}: Truncating log segments older than {}", getFullyQualifiedName(), dlsn);
        int numCandidateLogSegmentsToTruncate = getNumCandidateLogSegmentsToTruncate(list);
        for (int i = 0; i < numCandidateLogSegmentsToTruncate; i++) {
            LogSegmentMetadata logSegmentMetadata2 = list.get(i);
            if (logSegmentMetadata2.isInProgress()) {
                break;
            }
            if (logSegmentMetadata2.getLastDLSN().compareTo(dlsn) >= 0) {
                if (logSegmentMetadata2.getFirstDLSN().compareTo(dlsn) >= 0) {
                    break;
                }
                if (null != logSegmentMetadata) {
                    String format = String.format("Potential metadata inconsistency for stream %s at segment %s", getFullyQualifiedName(), logSegmentMetadata2);
                    LOG.error(format);
                    return Future.exception(new DLIllegalStateException(format));
                }
                LOG.info("{}: Partially truncating log segment {} older than {}.", new Object[]{getFullyQualifiedName(), logSegmentMetadata2, dlsn});
                logSegmentMetadata = logSegmentMetadata2;
            } else {
                LOG.debug("{}: Truncating log segment {} ", getFullyQualifiedName(), logSegmentMetadata2);
                arrayList.add(logSegmentMetadata2);
            }
        }
        return setLogSegmentTruncationStatus(arrayList, logSegmentMetadata, dlsn);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int getNumCandidateLogSegmentsToTruncate(List<LogSegmentMetadata> list) {
        if (list.isEmpty()) {
            return 0;
        }
        int i = 0;
        Iterator<LogSegmentMetadata> it = list.iterator();
        while (it.hasNext() && !it.next().isInProgress()) {
            i++;
        }
        return i - 1;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTimestamp(final long j) {
        return j >= Utils.nowInMillis() ? Future.exception(new IllegalArgumentException("Invalid timestamp " + j + " to purge logs for " + getFullyQualifiedName())) : asyncGetFullLedgerList(false, false).flatMap(new Function<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() { // from class: com.twitter.distributedlog.BKLogWriteHandler.12
            public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> list) {
                ArrayList arrayList = new ArrayList(list.size());
                int numCandidateLogSegmentsToTruncate = BKLogWriteHandler.this.getNumCandidateLogSegmentsToTruncate(list);
                for (int i = 0; i < numCandidateLogSegmentsToTruncate; i++) {
                    LogSegmentMetadata logSegmentMetadata = list.get(i);
                    if ((!logSegmentMetadata.isTruncated() && BKLogWriteHandler.this.conf.getExplicitTruncationByApplication()) || logSegmentMetadata.isInProgress() || logSegmentMetadata.getCompletionTime() >= j) {
                        break;
                    }
                    arrayList.add(logSegmentMetadata);
                }
                BKLogWriteHandler.LOG.info("Deleting log segments older than {} for {} : {}", new Object[]{Long.valueOf(j), BKLogWriteHandler.this.getFullyQualifiedName(), arrayList});
                return BKLogWriteHandler.this.deleteLogSegments(arrayList);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTxnId(final long j) {
        return asyncGetFullLedgerList(true, false).flatMap(new AbstractFunction1<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() { // from class: com.twitter.distributedlog.BKLogWriteHandler.13
            public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> list) {
                int size = j < 0 ? list.size() : BKLogWriteHandler.this.getNumCandidateLogSegmentsToTruncate(list);
                ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(size);
                for (int i = 0; i < size; i++) {
                    LogSegmentMetadata logSegmentMetadata = list.get(i);
                    if (j >= 0 && ((!logSegmentMetadata.isTruncated() && BKLogWriteHandler.this.conf.getExplicitTruncationByApplication()) || logSegmentMetadata.isInProgress() || logSegmentMetadata.getLastTxId() >= j)) {
                        break;
                    }
                    newArrayListWithExpectedSize.add(logSegmentMetadata);
                }
                return BKLogWriteHandler.this.deleteLogSegments(newArrayListWithExpectedSize);
            }
        });
    }

    private Future<List<LogSegmentMetadata>> setLogSegmentTruncationStatus(List<LogSegmentMetadata> list, LogSegmentMetadata logSegmentMetadata, DLSN dlsn) {
        final ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(list.size() + 1);
        final ArrayList newArrayListWithCapacity2 = Lists.newArrayListWithCapacity(list.size() + 1);
        Transaction<Object> transaction = this.metadataUpdater.transaction();
        for (LogSegmentMetadata logSegmentMetadata2 : list) {
            if (!logSegmentMetadata2.isTruncated()) {
                LogSegmentMetadata logSegmentTruncated = this.metadataUpdater.setLogSegmentTruncated(transaction, logSegmentMetadata2);
                newArrayListWithCapacity.add(logSegmentMetadata2);
                newArrayListWithCapacity2.add(logSegmentTruncated);
            }
        }
        if (null != logSegmentMetadata && (logSegmentMetadata.isNonTruncated() || (logSegmentMetadata.isPartiallyTruncated() && logSegmentMetadata.getMinActiveDLSN().compareTo(dlsn) < 0))) {
            LogSegmentMetadata logSegmentPartiallyTruncated = this.metadataUpdater.setLogSegmentPartiallyTruncated(transaction, logSegmentMetadata, dlsn);
            newArrayListWithCapacity.add(logSegmentMetadata);
            newArrayListWithCapacity2.add(logSegmentPartiallyTruncated);
        }
        return transaction.execute().map(new AbstractFunction1<Void, List<LogSegmentMetadata>>() { // from class: com.twitter.distributedlog.BKLogWriteHandler.14
            public List<LogSegmentMetadata> apply(Void r5) {
                for (int i = 0; i < newArrayListWithCapacity.size(); i++) {
                    BKLogWriteHandler.this.removeLogSegmentFromCache(((LogSegmentMetadata) newArrayListWithCapacity.get(i)).getSegmentName());
                    LogSegmentMetadata logSegmentMetadata3 = (LogSegmentMetadata) newArrayListWithCapacity2.get(i);
                    BKLogWriteHandler.this.addLogSegmentToCache(logSegmentMetadata3.getSegmentName(), logSegmentMetadata3);
                }
                return newArrayListWithCapacity2;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Future<List<LogSegmentMetadata>> deleteLogSegments(List<LogSegmentMetadata> list) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Purging logs for {} : {}", getFullyQualifiedName(), list);
        }
        return FutureUtils.processList(list, new Function<LogSegmentMetadata, Future<LogSegmentMetadata>>() { // from class: com.twitter.distributedlog.BKLogWriteHandler.15
            public Future<LogSegmentMetadata> apply(LogSegmentMetadata logSegmentMetadata) {
                return BKLogWriteHandler.this.deleteLogSegment(logSegmentMetadata);
            }
        }, this.scheduler);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Future<LogSegmentMetadata> deleteLogSegment(final LogSegmentMetadata logSegmentMetadata) {
        LOG.info("Deleting ledger {} for {}", logSegmentMetadata, getFullyQualifiedName());
        final Promise promise = new Promise();
        final Stopwatch createStarted = Stopwatch.createStarted();
        promise.addEventListener(new FutureEventListener<LogSegmentMetadata>() { // from class: com.twitter.distributedlog.BKLogWriteHandler.16
            public void onSuccess(LogSegmentMetadata logSegmentMetadata2) {
                BKLogWriteHandler.this.deleteOpStats.registerSuccessfulEvent(createStarted.stop().elapsed(TimeUnit.MICROSECONDS));
            }

            public void onFailure(Throwable th) {
                BKLogWriteHandler.this.deleteOpStats.registerFailedEvent(createStarted.stop().elapsed(TimeUnit.MICROSECONDS));
            }
        });
        try {
            this.bookKeeperClient.get().asyncDeleteLedger(logSegmentMetadata.getLedgerId(), new AsyncCallback.DeleteCallback() { // from class: com.twitter.distributedlog.BKLogWriteHandler.17
                public void deleteComplete(int i, Object obj) {
                    if (-7 == i) {
                        BKLogWriteHandler.LOG.warn("No ledger {} found to delete for {} : {}.", new Object[]{Long.valueOf(logSegmentMetadata.getLedgerId()), BKLogWriteHandler.this.getFullyQualifiedName(), logSegmentMetadata});
                    } else if (0 != i) {
                        BKException create = BKException.create(i);
                        BKLogWriteHandler.LOG.error("Couldn't delete ledger {} from bookkeeper for {} : ", new Object[]{Long.valueOf(logSegmentMetadata.getLedgerId()), BKLogWriteHandler.this.getFullyQualifiedName(), create});
                        promise.setException(create);
                        return;
                    }
                    BKLogWriteHandler.this.scheduler.submit(new Runnable() { // from class: com.twitter.distributedlog.BKLogWriteHandler.17.1
                        @Override // java.lang.Runnable
                        public void run() {
                            BKLogWriteHandler.this.deleteLogSegmentMetadata(logSegmentMetadata, promise);
                        }
                    });
                }
            }, (Object) null);
        } catch (IOException e) {
            promise.setException(BKException.create(-8));
        }
        return promise;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void deleteLogSegmentMetadata(final LogSegmentMetadata logSegmentMetadata, final Promise<LogSegmentMetadata> promise) {
        Transaction<Object> transaction = this.metadataStore.transaction();
        this.metadataStore.deleteLogSegment(transaction, logSegmentMetadata);
        transaction.execute().addEventListener(new FutureEventListener<Void>() { // from class: com.twitter.distributedlog.BKLogWriteHandler.18
            public void onSuccess(Void r4) {
                BKLogWriteHandler.this.removeLogSegmentFromCache(logSegmentMetadata.getZNodeName());
                promise.setValue(logSegmentMetadata);
            }

            public void onFailure(Throwable th) {
                if (!(th instanceof ZKException) || KeeperException.Code.NONODE != ((ZKException) th).getKeeperExceptionCode()) {
                    BKLogWriteHandler.LOG.error("Couldn't purge {} for {}: with error {}", new Object[]{logSegmentMetadata, BKLogWriteHandler.this.getFullyQualifiedName(), th});
                    promise.setException(th);
                } else {
                    BKLogWriteHandler.LOG.error("No log segment {} found for {}.", logSegmentMetadata, BKLogWriteHandler.this.getFullyQualifiedName());
                    BKLogWriteHandler.this.removeLogSegmentFromCache(logSegmentMetadata.getZNodeName());
                    promise.setValue(logSegmentMetadata);
                }
            }
        });
    }

    @Override // com.twitter.distributedlog.BKLogHandler, com.twitter.distributedlog.io.AsyncCloseable
    public Future<Void> asyncClose() {
        return Utils.closeSequence(this.scheduler, this.lock, this.ledgerAllocator).flatMap(new AbstractFunction1<Void, Future<Void>>() { // from class: com.twitter.distributedlog.BKLogWriteHandler.19
            public Future<Void> apply(Void r3) {
                return BKLogWriteHandler.super.asyncClose();
            }
        });
    }

    @Override // com.twitter.distributedlog.BKLogHandler, com.twitter.distributedlog.io.AsyncAbortable
    public Future<Void> asyncAbort() {
        return asyncClose();
    }

    String completedLedgerZNodeName(long j, long j2, long j3) {
        return 1 == this.conf.getLogSegmentNameVersion() ? String.format("%s_%018d", DistributedLogConstants.COMPLETED_LOGSEGMENT_PREFIX, Long.valueOf(j3)) : String.format("%s_%018d_%018d", DistributedLogConstants.COMPLETED_LOGSEGMENT_PREFIX, Long.valueOf(j), Long.valueOf(j2));
    }

    String completedLedgerZNode(long j, long j2, long j3) {
        return String.format("%s/%s", this.logMetadata.getLogSegmentsPath(), completedLedgerZNodeName(j, j2, j3));
    }

    String inprogressZNodeName(long j, long j2, long j3) {
        return 1 == this.conf.getLogSegmentNameVersion() ? String.format("%s_%018d", DistributedLogConstants.INPROGRESS_LOGSEGMENT_PREFIX, Long.valueOf(j3)) : "inprogress_" + Long.toString(j2, 16);
    }

    String inprogressZNode(long j, long j2, long j3) {
        return this.logMetadata.getLogSegmentsPath() + "/" + inprogressZNodeName(j, j2, j3);
    }

    String inprogressZNode(String str) {
        return this.logMetadata.getLogSegmentsPath() + "/" + str;
    }
}
