/*
 * Decompiled with CFR 0.152.
 */
package org.apache.distributedlog;

import dlshade.com.google.common.annotations.VisibleForTesting;
import dlshade.com.google.common.base.Preconditions;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import dlshade.org.apache.bookkeeper.common.util.OrderedScheduler;
import dlshade.org.apache.bookkeeper.feature.FeatureProvider;
import dlshade.org.apache.bookkeeper.stats.AlertStatsLogger;
import dlshade.org.apache.bookkeeper.stats.StatsLogger;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import org.apache.distributedlog.AppendOnlyStreamReader;
import org.apache.distributedlog.AppendOnlyStreamWriter;
import org.apache.distributedlog.AsyncNotification;
import org.apache.distributedlog.BKAsyncLogReader;
import org.apache.distributedlog.BKAsyncLogWriter;
import org.apache.distributedlog.BKLogReadHandler;
import org.apache.distributedlog.BKLogWriteHandler;
import org.apache.distributedlog.BKSyncLogReader;
import org.apache.distributedlog.BKSyncLogWriter;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.ReadUtils;
import org.apache.distributedlog.api.AsyncLogReader;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.api.subscription.SubscriptionsStore;
import org.apache.distributedlog.bk.LedgerMetadata;
import org.apache.distributedlog.callback.LogSegmentListener;
import org.apache.distributedlog.common.stats.BroadCastStatsLogger;
import org.apache.distributedlog.common.util.PermitLimiter;
import org.apache.distributedlog.common.util.PermitManager;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.exceptions.AlreadyClosedException;
import org.apache.distributedlog.exceptions.LogEmptyException;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.injector.AsyncFailureInjector;
import org.apache.distributedlog.io.AsyncCloseable;
import org.apache.distributedlog.lock.DistributedLock;
import org.apache.distributedlog.lock.NopDistributedLock;
import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
import org.apache.distributedlog.logsegment.LogSegmentFilter;
import org.apache.distributedlog.logsegment.LogSegmentMetadataCache;
import org.apache.distributedlog.metadata.LogMetadataForReader;
import org.apache.distributedlog.metadata.LogMetadataForWriter;
import org.apache.distributedlog.metadata.LogStreamMetadataStore;
import org.apache.distributedlog.namespace.NamespaceDriver;
import org.apache.distributedlog.util.Allocator;
import org.apache.distributedlog.util.DLUtils;
import org.apache.distributedlog.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class BKDistributedLogManager
implements DistributedLogManager {
    static final Logger LOG = LoggerFactory.getLogger(BKDistributedLogManager.class);
    static final Function<LogRecordWithDLSN, Long> RECORD_2_TXID_FUNCTION = record -> record.getTransactionId();
    static final Function<LogRecordWithDLSN, DLSN> RECORD_2_DLSN_FUNCTION = record -> record.getDlsn();
    private final URI uri;
    private final String name;
    private final String clientId;
    private final int regionId;
    private final String streamIdentifier;
    private final DistributedLogConfiguration conf;
    private final DynamicDistributedLogConfiguration dynConf;
    private final NamespaceDriver driver;
    private CompletableFuture<Void> closePromise;
    private final OrderedScheduler scheduler;
    private final FeatureProvider featureProvider;
    private final AsyncFailureInjector failureInjector;
    private final StatsLogger statsLogger;
    private final StatsLogger perLogStatsLogger;
    final AlertStatsLogger alertStatsLogger;
    private final LogSegmentMetadataCache logSegmentMetadataCache;
    private final PermitLimiter writeLimiter;
    private BKLogReadHandler readHandlerForListener = null;
    private final PendingReaders pendingReaders;
    private final Optional<AsyncCloseable> resourcesCloseable;

    BKDistributedLogManager(String name, DistributedLogConfiguration conf, DynamicDistributedLogConfiguration dynConf, URI uri, NamespaceDriver driver, LogSegmentMetadataCache logSegmentMetadataCache, OrderedScheduler scheduler, String clientId, Integer regionId, PermitLimiter writeLimiter, FeatureProvider featureProvider, AsyncFailureInjector failureInjector, StatsLogger statsLogger, StatsLogger perLogStatsLogger, Optional<AsyncCloseable> resourcesCloseable) {
        this.name = name;
        this.conf = conf;
        this.dynConf = dynConf;
        this.uri = uri;
        this.driver = driver;
        this.logSegmentMetadataCache = logSegmentMetadataCache;
        this.scheduler = scheduler;
        this.statsLogger = statsLogger;
        this.perLogStatsLogger = BroadCastStatsLogger.masterslave(perLogStatsLogger, statsLogger);
        this.pendingReaders = new PendingReaders(scheduler);
        this.regionId = regionId;
        this.clientId = clientId;
        this.streamIdentifier = conf.getUnpartitionedStreamName();
        this.writeLimiter = writeLimiter;
        this.featureProvider = featureProvider;
        this.failureInjector = failureInjector;
        this.alertStatsLogger = new AlertStatsLogger(this.perLogStatsLogger, "dl_alert");
        this.resourcesCloseable = resourcesCloseable;
    }

    @Override
    public String getStreamName() {
        return this.name;
    }

    @Override
    public NamespaceDriver getNamespaceDriver() {
        return this.driver;
    }

    URI getUri() {
        return this.uri;
    }

    DistributedLogConfiguration getConf() {
        return this.conf;
    }

    OrderedScheduler getScheduler() {
        return this.scheduler;
    }

    AsyncFailureInjector getFailureInjector() {
        return this.failureInjector;
    }

    @VisibleForTesting
    LogStreamMetadataStore getWriterMetadataStore() {
        return this.driver.getLogStreamMetadataStore(NamespaceDriver.Role.WRITER);
    }

    @VisibleForTesting
    LogSegmentEntryStore getReaderEntryStore() {
        return this.driver.getLogSegmentEntryStore(NamespaceDriver.Role.READER);
    }

    @VisibleForTesting
    FeatureProvider getFeatureProvider() {
        return this.featureProvider;
    }

    private synchronized BKLogReadHandler getReadHandlerAndRegisterListener(boolean create, LogSegmentListener listener) {
        if (null == this.readHandlerForListener && create) {
            this.readHandlerForListener = this.createReadHandler();
            this.readHandlerForListener.registerListener(listener);
            this.readHandlerForListener.asyncStartFetchLogSegments();
            return this.readHandlerForListener;
        }
        if (null != this.readHandlerForListener && null != listener) {
            this.readHandlerForListener.registerListener(listener);
        }
        return this.readHandlerForListener;
    }

    @Override
    public List<LogSegmentMetadata> getLogSegments() throws IOException {
        return Utils.ioResult(this.getLogSegmentsAsync());
    }

    @Override
    public CompletableFuture<List<LogSegmentMetadata>> getLogSegmentsAsync() {
        BKLogReadHandler readHandler = this.createReadHandler();
        return ((CompletableFuture)readHandler.readLogSegmentsFromStore(LogSegmentMetadata.COMPARATOR, LogSegmentFilter.DEFAULT_FILTER, null).thenApply(versionedList -> (List)versionedList.getValue())).whenComplete((value, cause) -> readHandler.asyncClose());
    }

    @Override
    public void registerListener(LogSegmentListener listener) throws IOException {
        this.getReadHandlerAndRegisterListener(true, listener);
    }

    @Override
    public synchronized void unregisterListener(LogSegmentListener listener) {
        if (null != this.readHandlerForListener) {
            this.readHandlerForListener.unregisterListener(listener);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void checkClosedOrInError(String operation) throws AlreadyClosedException {
        BKDistributedLogManager bKDistributedLogManager = this;
        synchronized (bKDistributedLogManager) {
            if (null != this.closePromise) {
                throw new AlreadyClosedException("Executing " + operation + " on already closed DistributedLogManager");
            }
        }
    }

    synchronized BKLogReadHandler createReadHandler() {
        Optional<String> subscriberId = Optional.empty();
        return this.createReadHandler(subscriberId, false);
    }

    synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId) {
        return this.createReadHandler(subscriberId, false);
    }

    synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId, boolean isHandleForReading) {
        return this.createReadHandler(subscriberId, null, isHandleForReading);
    }

    synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId, AsyncNotification notification, boolean isHandleForReading) {
        LogMetadataForReader logMetadata = LogMetadataForReader.of(this.uri, this.name, this.streamIdentifier);
        return new BKLogReadHandler(logMetadata, subscriberId, this.conf, this.dynConf, this.driver.getLogStreamMetadataStore(NamespaceDriver.Role.READER), this.logSegmentMetadataCache, this.driver.getLogSegmentEntryStore(NamespaceDriver.Role.READER), this.scheduler, this.alertStatsLogger, this.statsLogger, this.perLogStatsLogger, this.clientId, notification, isHandleForReading);
    }

    public BKLogWriteHandler createWriteHandler(boolean lockHandler) throws IOException {
        return this.createWriteHandler(lockHandler, null);
    }

    public BKLogWriteHandler createWriteHandler(boolean lockHandler, LedgerMetadata ledgerMetadata) throws IOException {
        return Utils.ioResult(this.asyncCreateWriteHandler(lockHandler, ledgerMetadata));
    }

    CompletableFuture<BKLogWriteHandler> asyncCreateWriteHandler(boolean lockHandler, LedgerMetadata ledgerMetadata) {
        return this.driver.getLogStreamMetadataStore(NamespaceDriver.Role.WRITER).getLog(this.uri, this.name, true, this.conf.getCreateStreamIfNotExists()).thenCompose(logMetadata -> {
            CompletableFuture<BKLogWriteHandler> createPromise = new CompletableFuture<BKLogWriteHandler>();
            this.createWriteHandler((LogMetadataForWriter)logMetadata, ledgerMetadata, lockHandler, createPromise);
            return createPromise;
        });
    }

    private void createWriteHandler(LogMetadataForWriter logMetadata, LedgerMetadata ledgerMetadata, boolean lockHandler, final CompletableFuture<BKLogWriteHandler> createPromise) {
        Allocator<LogSegmentEntryWriter, Object> segmentAllocator;
        DistributedLock lock = this.conf.isWriteLockEnabled() ? this.driver.getLogStreamMetadataStore(NamespaceDriver.Role.WRITER).createWriteLock(logMetadata) : NopDistributedLock.INSTANCE;
        try {
            segmentAllocator = this.driver.getLogSegmentEntryStore(NamespaceDriver.Role.WRITER).newLogSegmentAllocator(logMetadata, this.dynConf, ledgerMetadata);
        }
        catch (IOException ioe) {
            FutureUtils.completeExceptionally(createPromise, ioe);
            return;
        }
        final BKLogWriteHandler writeHandler = new BKLogWriteHandler(logMetadata, this.conf, this.driver.getLogStreamMetadataStore(NamespaceDriver.Role.WRITER), this.logSegmentMetadataCache, this.driver.getLogSegmentEntryStore(NamespaceDriver.Role.WRITER), this.scheduler, segmentAllocator, this.statsLogger, this.perLogStatsLogger, this.alertStatsLogger, this.clientId, this.regionId, this.writeLimiter, this.featureProvider, this.dynConf, lock);
        if (lockHandler) {
            writeHandler.lockHandler().whenComplete(new FutureEventListener<DistributedLock>(){

                @Override
                public void onSuccess(DistributedLock lock) {
                    FutureUtils.complete(createPromise, writeHandler);
                }

                @Override
                public void onFailure(Throwable cause) {
                    FutureUtils.ensure(writeHandler.asyncClose(), () -> FutureUtils.completeExceptionally(createPromise, cause));
                }
            });
        } else {
            FutureUtils.complete(createPromise, writeHandler);
        }
    }

    PermitManager getLogSegmentRollingPermitManager() {
        return this.driver.getLogStreamMetadataStore(NamespaceDriver.Role.WRITER).getPermitManager();
    }

    <T> CompletableFuture<T> processReaderOperation(Function<BKLogReadHandler, CompletableFuture<T>> func) {
        CompletableFuture future = FutureUtils.createFuture();
        this.scheduler.submit(() -> {
            BKLogReadHandler readHandler = this.getReadHandlerAndRegisterListener(true, null);
            FutureUtils.proxyTo((CompletableFuture)func.apply(readHandler), future);
        });
        return future;
    }

    @Override
    public boolean isEndOfStreamMarked() throws IOException {
        this.checkClosedOrInError("isEndOfStreamMarked");
        long lastTxId = Utils.ioResult(this.getLastLogRecordAsyncInternal(false, true)).getTransactionId();
        return lastTxId == Long.MAX_VALUE;
    }

    @Override
    public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException {
        long position;
        try {
            position = Utils.ioResult(this.getLastLogRecordAsyncInternal(true, false)).getTransactionId();
            if (-999L == position || -99L == position) {
                position = 0L;
            }
        }
        catch (LogEmptyException ex) {
            position = 0L;
        }
        catch (LogNotFoundException ex) {
            position = 0L;
        }
        return new AppendOnlyStreamWriter(this.startAsyncLogSegmentNonPartitioned(), position);
    }

    @Override
    public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException {
        return new AppendOnlyStreamReader(this);
    }

    @Override
    public BKSyncLogWriter startLogSegmentNonPartitioned() throws IOException {
        return this.openLogWriter();
    }

    @Override
    public BKSyncLogWriter openLogWriter() throws IOException {
        return this.openLogWriter(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public BKSyncLogWriter openLogWriter(LedgerMetadata ledgerMetadata) throws IOException {
        this.checkClosedOrInError("startLogSegmentNonPartitioned");
        BKSyncLogWriter writer = new BKSyncLogWriter(this.conf, this.dynConf, this);
        boolean success = false;
        try {
            writer.createAndCacheWriteHandler(ledgerMetadata);
            BKLogWriteHandler writeHandler = writer.getWriteHandler();
            Utils.ioResult(writeHandler.lockHandler());
            success = true;
            BKSyncLogWriter bKSyncLogWriter = writer;
            return bKSyncLogWriter;
        }
        finally {
            if (!success) {
                writer.abort();
            }
        }
    }

    @Override
    public BKAsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException {
        return (BKAsyncLogWriter)Utils.ioResult(this.openAsyncLogWriter());
    }

    @Override
    public CompletableFuture<AsyncLogWriter> openAsyncLogWriter() {
        return this.openAsyncLogWriter(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<AsyncLogWriter> openAsyncLogWriter(LedgerMetadata ledgerMetadata) {
        CompletableFuture<BKLogWriteHandler> createWriteHandleFuture;
        try {
            this.checkClosedOrInError("startLogSegmentNonPartitioned");
        }
        catch (AlreadyClosedException e) {
            return FutureUtils.exception(e);
        }
        BKDistributedLogManager bKDistributedLogManager = this;
        synchronized (bKDistributedLogManager) {
            createWriteHandleFuture = this.asyncCreateWriteHandler(true, ledgerMetadata);
        }
        return createWriteHandleFuture.thenCompose(writeHandler -> {
            BKAsyncLogWriter writer;
            BKDistributedLogManager bKDistributedLogManager = this;
            synchronized (bKDistributedLogManager) {
                writer = new BKAsyncLogWriter(this.conf, this.dynConf, this, (BKLogWriteHandler)writeHandler, this.featureProvider, this.statsLogger);
            }
            return ((CompletableFuture)writeHandler.recoverIncompleteLogSegments().thenApply(lastTxId -> {
                writer.setLastTxId((long)lastTxId);
                return writer;
            })).whenComplete((lastTxId, cause) -> {
                if (null != cause) {
                    writer.asyncAbort();
                }
            });
        });
    }

    @Override
    public CompletableFuture<DLSN> getDLSNNotLessThanTxId(long fromTxnId) {
        return this.getLogSegmentsAsync().thenCompose(segments -> this.getDLSNNotLessThanTxId(fromTxnId, (List<LogSegmentMetadata>)segments));
    }

    private CompletableFuture<DLSN> getDLSNNotLessThanTxId(long fromTxnId, List<LogSegmentMetadata> segments) {
        if (segments.isEmpty()) {
            return this.getLastDLSNAsync();
        }
        int segmentIdx = DLUtils.findLogSegmentNotLessThanTxnId(segments, fromTxnId);
        if (segmentIdx < 0) {
            return FutureUtils.value(new DLSN(segments.get(0).getLogSegmentSequenceNumber(), 0L, 0L));
        }
        return this.getDLSNNotLessThanTxIdInSegment(fromTxnId, segmentIdx, segments, this.driver.getLogSegmentEntryStore(NamespaceDriver.Role.READER));
    }

    private CompletableFuture<DLSN> getDLSNNotLessThanTxIdInSegment(long fromTxnId, int segmentIdx, List<LogSegmentMetadata> segments, LogSegmentEntryStore entryStore) {
        LogSegmentMetadata segment = segments.get(segmentIdx);
        return ReadUtils.getLogRecordNotLessThanTxId(this.name, segment, fromTxnId, this.scheduler, entryStore, Math.max(2, this.dynConf.getReadAheadBatchSize())).thenCompose(foundRecord -> {
            if (foundRecord.isPresent()) {
                return FutureUtils.value(((LogRecordWithDLSN)foundRecord.get()).getDlsn());
            }
            if (segments.size() - 1 == segmentIdx) {
                return this.getLastLogRecordAsync().thenApply(record -> {
                    if (record.getTransactionId() >= fromTxnId) {
                        return record.getDlsn();
                    }
                    return record.getDlsn().getNextDLSN();
                });
            }
            return this.getDLSNNotLessThanTxIdInSegment(fromTxnId, segmentIdx + 1, segments, entryStore);
        });
    }

    @Override
    public LogReader openLogReader(long fromTxnId) throws IOException {
        return this.getInputStreamInternal(fromTxnId);
    }

    @Override
    public LogReader openLogReader(DLSN fromDLSN) throws IOException {
        return this.getInputStreamInternal(fromDLSN, Optional.empty());
    }

    @Override
    public LogReader getInputStream(long fromTxnId) throws IOException {
        return this.openLogReader(fromTxnId);
    }

    @Override
    public LogReader getInputStream(DLSN fromDLSN) throws IOException {
        return this.openLogReader(fromDLSN);
    }

    @Override
    public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException {
        return Utils.ioResult(this.openAsyncLogReader(fromTxnId));
    }

    @Override
    public CompletableFuture<AsyncLogReader> openAsyncLogReader(long fromTxnId) {
        final CompletableFuture dlsnPromise = new CompletableFuture();
        this.getDLSNNotLessThanTxId(fromTxnId).whenComplete(new FutureEventListener<DLSN>(){

            @Override
            public void onSuccess(DLSN dlsn) {
                dlsnPromise.complete(dlsn);
            }

            @Override
            public void onFailure(Throwable cause) {
                if (cause instanceof LogEmptyException) {
                    dlsnPromise.complete(DLSN.InitialDLSN);
                } else {
                    dlsnPromise.completeExceptionally(cause);
                }
            }
        });
        return dlsnPromise.thenCompose(dlsn -> this.openAsyncLogReader((DLSN)dlsn));
    }

    @Override
    public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException {
        return Utils.ioResult(this.openAsyncLogReader(fromDLSN));
    }

    @Override
    public CompletableFuture<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN) {
        Optional<String> subscriberId = Optional.empty();
        BKAsyncLogReader reader = new BKAsyncLogReader(this, this.scheduler, fromDLSN, subscriberId, false, this.statsLogger);
        return FutureUtils.value(reader);
    }

    @Override
    public CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN) {
        Optional<String> subscriberId = Optional.empty();
        return this.getAsyncLogReaderWithLock(Optional.of(fromDLSN), subscriberId);
    }

    @Override
    public CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN, String subscriberId) {
        return this.getAsyncLogReaderWithLock(Optional.of(fromDLSN), Optional.of(subscriberId));
    }

    @Override
    public CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId) {
        Optional<DLSN> fromDLSN = Optional.empty();
        return this.getAsyncLogReaderWithLock(fromDLSN, Optional.of(subscriberId));
    }

    protected CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(final Optional<DLSN> fromDLSN, final Optional<String> subscriberId) {
        if (!fromDLSN.isPresent() && !subscriberId.isPresent()) {
            return FutureUtils.exception(new UnexpectedException("Neither from dlsn nor subscriber id is provided."));
        }
        final BKAsyncLogReader reader = new BKAsyncLogReader(this, this.scheduler, fromDLSN.isPresent() ? fromDLSN.get() : DLSN.InitialDLSN, subscriberId, false, this.statsLogger);
        this.pendingReaders.add(reader);
        CompletableFuture<Void> lockFuture = reader.lockStream();
        final CompletableFuture<AsyncLogReader> createPromise = FutureUtils.createFuture();
        createPromise.whenComplete((value, cause) -> {
            if (cause instanceof CancellationException) {
                lockFuture.cancel(true);
            }
        });
        ((CompletableFuture)lockFuture.thenCompose(new Function<Void, CompletableFuture<AsyncLogReader>>(){

            @Override
            public CompletableFuture<AsyncLogReader> apply(Void complete) {
                if (fromDLSN.isPresent()) {
                    return FutureUtils.value(reader);
                }
                LOG.info("Reader {} @ {} reading last commit position from subscription store after acquired lock.", subscriberId.get(), (Object)BKDistributedLogManager.this.name);
                SubscriptionsStore subscriptionsStore = BKDistributedLogManager.this.driver.getSubscriptionsStore(BKDistributedLogManager.this.getStreamName());
                return subscriptionsStore.getLastCommitPosition((String)subscriberId.get()).thenCompose(lastCommitPosition -> {
                    LOG.info("Reader {} @ {} positioned to last commit position {}.", new Object[]{subscriberId.get(), BKDistributedLogManager.this.name, lastCommitPosition});
                    try {
                        reader.setStartDLSN((DLSN)lastCommitPosition);
                    }
                    catch (UnexpectedException e) {
                        return FutureUtils.exception(e);
                    }
                    return FutureUtils.value(reader);
                });
            }
        })).whenComplete(new FutureEventListener<AsyncLogReader>(){

            @Override
            public void onSuccess(AsyncLogReader r) {
                BKDistributedLogManager.this.pendingReaders.remove(reader);
                FutureUtils.complete(createPromise, r);
            }

            @Override
            public void onFailure(Throwable cause) {
                BKDistributedLogManager.this.pendingReaders.remove(reader);
                FutureUtils.ensure(reader.asyncClose(), () -> FutureUtils.completeExceptionally(createPromise, cause));
            }
        });
        return createPromise;
    }

    LogReader getInputStreamInternal(long fromTxnId) throws IOException {
        DLSN fromDLSN;
        try {
            fromDLSN = Utils.ioResult(this.getDLSNNotLessThanTxId(fromTxnId));
        }
        catch (LogEmptyException lee) {
            fromDLSN = DLSN.InitialDLSN;
        }
        return this.getInputStreamInternal(fromDLSN, Optional.of(fromTxnId));
    }

    LogReader getInputStreamInternal(DLSN fromDLSN, Optional<Long> fromTxnId) throws IOException {
        LOG.info("Create sync reader starting from {}", (Object)fromDLSN);
        this.checkClosedOrInError("getInputStream");
        return new BKSyncLogReader(this.conf, this, fromDLSN, fromTxnId, this.statsLogger);
    }

    @Override
    public LogRecordWithDLSN getLastLogRecord() throws IOException {
        this.checkClosedOrInError("getLastLogRecord");
        return Utils.ioResult(this.getLastLogRecordAsync());
    }

    @Override
    public long getFirstTxId() throws IOException {
        this.checkClosedOrInError("getFirstTxId");
        return Utils.ioResult(this.getFirstRecordAsyncInternal()).getTransactionId();
    }

    @Override
    public long getLastTxId() throws IOException {
        this.checkClosedOrInError("getLastTxId");
        return Utils.ioResult(this.getLastTxIdAsync());
    }

    @Override
    public DLSN getLastDLSN() throws IOException {
        this.checkClosedOrInError("getLastDLSN");
        return Utils.ioResult(this.getLastLogRecordAsyncInternal(false, false)).getDlsn();
    }

    @Override
    public CompletableFuture<LogRecordWithDLSN> getLastLogRecordAsync() {
        return this.getLastLogRecordAsyncInternal(false, false);
    }

    private CompletableFuture<LogRecordWithDLSN> getLastLogRecordAsyncInternal(final boolean recover, final boolean includeEndOfStream) {
        return this.processReaderOperation(new Function<BKLogReadHandler, CompletableFuture<LogRecordWithDLSN>>(){

            @Override
            public CompletableFuture<LogRecordWithDLSN> apply(BKLogReadHandler ledgerHandler) {
                return ledgerHandler.getLastLogRecordAsync(recover, includeEndOfStream);
            }
        });
    }

    @Override
    public CompletableFuture<Long> getLastTxIdAsync() {
        return this.getLastLogRecordAsyncInternal(false, false).thenApply(RECORD_2_TXID_FUNCTION);
    }

    @Override
    public CompletableFuture<DLSN> getFirstDLSNAsync() {
        return this.getFirstRecordAsyncInternal().thenApply(RECORD_2_DLSN_FUNCTION);
    }

    @Override
    public LogRecordWithDLSN getFirstLogRecord() throws IOException {
        return Utils.ioResult(this.getFirstRecordAsyncInternal());
    }

    @Override
    public CompletableFuture<LogRecordWithDLSN> getFirstLogRecordAsync() {
        return this.getFirstRecordAsyncInternal();
    }

    private CompletableFuture<LogRecordWithDLSN> getFirstRecordAsyncInternal() {
        return this.processReaderOperation(new Function<BKLogReadHandler, CompletableFuture<LogRecordWithDLSN>>(){

            @Override
            public CompletableFuture<LogRecordWithDLSN> apply(BKLogReadHandler ledgerHandler) {
                return ledgerHandler.asyncGetFirstLogRecord();
            }
        });
    }

    @Override
    public CompletableFuture<DLSN> getLastDLSNAsync() {
        return this.getLastLogRecordAsyncInternal(false, false).thenApply(RECORD_2_DLSN_FUNCTION);
    }

    @Override
    public long getLogRecordCount() throws IOException {
        this.checkClosedOrInError("getLogRecordCount");
        return Utils.ioResult(this.getLogRecordCountAsync(DLSN.InitialDLSN));
    }

    @Override
    public CompletableFuture<Long> getLogRecordCountAsync(final DLSN beginDLSN) {
        return this.processReaderOperation(new Function<BKLogReadHandler, CompletableFuture<Long>>(){

            @Override
            public CompletableFuture<Long> apply(BKLogReadHandler ledgerHandler) {
                return ledgerHandler.asyncGetLogRecordCount(beginDLSN);
            }
        });
    }

    @Override
    public void recover() throws IOException {
        this.recoverInternal(this.conf.getUnpartitionedStreamName());
    }

    private void recoverInternal(String streamIdentifier) throws IOException {
        this.checkClosedOrInError("recoverInternal");
        BKLogWriteHandler ledgerHandler = this.createWriteHandler(true);
        try {
            Utils.ioResult(ledgerHandler.recoverIncompleteLogSegments());
        }
        finally {
            Utils.closeQuietly(ledgerHandler);
        }
    }

    @Override
    public void delete() throws IOException {
        BKLogWriteHandler ledgerHandler = this.createWriteHandler(true);
        ledgerHandler.deleteLog();
        Utils.ioResult(this.driver.getLogStreamMetadataStore(NamespaceDriver.Role.WRITER).deleteLog(this.uri, this.getStreamName()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException {
        Preconditions.checkArgument(minTxIdToKeep > 0L, "Invalid transaction id " + minTxIdToKeep);
        this.checkClosedOrInError("purgeLogSegmentsOlderThan");
        BKLogWriteHandler ledgerHandler = this.createWriteHandler(true);
        try {
            LOG.info("Purging logs for {} older than {}", (Object)ledgerHandler.getFullyQualifiedName(), (Object)minTxIdToKeep);
            Utils.ioResult(ledgerHandler.purgeLogSegmentsOlderThanTxnId(minTxIdToKeep));
        }
        finally {
            Utils.closeQuietly(ledgerHandler);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> asyncClose() {
        BKLogReadHandler readHandlerToClose;
        CompletableFuture<Void> closeFuture;
        BKDistributedLogManager bKDistributedLogManager = this;
        synchronized (bKDistributedLogManager) {
            if (null != this.closePromise) {
                return this.closePromise;
            }
            this.closePromise = new CompletableFuture<Void>();
            closeFuture = this.closePromise;
            readHandlerToClose = this.readHandlerForListener;
        }
        CompletableFuture<Void> closeResult = Utils.closeSequence(null, true, readHandlerToClose, this.pendingReaders, this.resourcesCloseable.orElse(AsyncCloseable.NULL));
        FutureUtils.proxyTo(closeResult, closeFuture);
        return closeFuture;
    }

    @Override
    public void close() throws IOException {
        Utils.ioResult(this.asyncClose());
    }

    public String toString() {
        return String.format("DLM:%s:%s", this.getUri(), this.getStreamName());
    }

    public void raiseAlert(String msg, Object ... args) {
        this.alertStatsLogger.raise(msg, args);
    }

    @Override
    public SubscriptionsStore getSubscriptionsStore() {
        return this.driver.getSubscriptionsStore(this.getStreamName());
    }

    static class PendingReaders
    implements AsyncCloseable {
        final ExecutorService executorService;
        final Set<AsyncCloseable> readers = new HashSet<AsyncCloseable>();

        PendingReaders(ExecutorService executorService) {
            this.executorService = executorService;
        }

        public synchronized void remove(AsyncCloseable reader) {
            this.readers.remove(reader);
        }

        public synchronized void add(AsyncCloseable reader) {
            this.readers.add(reader);
        }

        @Override
        public CompletableFuture<Void> asyncClose() {
            return Utils.closeSequence(this.executorService, true, (AsyncCloseable[])this.readers.toArray(new AsyncLogReader[this.readers.size()])).thenApply(value -> {
                this.readers.clear();
                return null;
            });
        }
    }
}

