package org.apache.distributedlog;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.distributedlog.api.AsyncLogReader;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.api.subscription.SubscriptionsStore;
import org.apache.distributedlog.callback.LogSegmentListener;
import org.apache.distributedlog.common.concurrent.FutureEventListener;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.common.stats.BroadCastStatsLogger;
import org.apache.distributedlog.common.util.PermitLimiter;
import org.apache.distributedlog.common.util.PermitManager;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.exceptions.AlreadyClosedException;
import org.apache.distributedlog.exceptions.LogEmptyException;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.injector.AsyncFailureInjector;
import org.apache.distributedlog.io.AsyncCloseable;
import org.apache.distributedlog.lock.DistributedLock;
import org.apache.distributedlog.lock.NopDistributedLock;
import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
import org.apache.distributedlog.logsegment.LogSegmentFilter;
import org.apache.distributedlog.logsegment.LogSegmentMetadataCache;
import org.apache.distributedlog.metadata.LogMetadataForReader;
import org.apache.distributedlog.metadata.LogMetadataForWriter;
import org.apache.distributedlog.metadata.LogStreamMetadataStore;
import org.apache.distributedlog.namespace.NamespaceDriver;
import org.apache.distributedlog.util.DLUtils;
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/distributedlog/BKDistributedLogManager.class */
public class BKDistributedLogManager implements DistributedLogManager {
    static final Logger LOG = LoggerFactory.getLogger(BKDistributedLogManager.class);
    static final Function<LogRecordWithDLSN, Long> RECORD_2_TXID_FUNCTION = logRecordWithDLSN -> {
        return Long.valueOf(logRecordWithDLSN.getTransactionId());
    };
    static final Function<LogRecordWithDLSN, DLSN> RECORD_2_DLSN_FUNCTION = logRecordWithDLSN -> {
        return logRecordWithDLSN.getDlsn();
    };
    private final URI uri;
    private final String name;
    private final String clientId;
    private final int regionId;
    private final String streamIdentifier;
    private final DistributedLogConfiguration conf;
    private final DynamicDistributedLogConfiguration dynConf;
    private final NamespaceDriver driver;
    private CompletableFuture<Void> closePromise;
    private final OrderedScheduler scheduler;
    private final FeatureProvider featureProvider;
    private final AsyncFailureInjector failureInjector;
    private final StatsLogger statsLogger;
    private final StatsLogger perLogStatsLogger;
    final AlertStatsLogger alertStatsLogger;
    private final LogSegmentMetadataCache logSegmentMetadataCache;
    private final PermitLimiter writeLimiter;
    private BKLogReadHandler readHandlerForListener = null;
    private final PendingReaders pendingReaders;
    private final Optional<AsyncCloseable> resourcesCloseable;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/distributedlog/BKDistributedLogManager$PendingReaders.class */
    public static class PendingReaders implements AsyncCloseable {
        final ExecutorService executorService;
        final Set<AsyncCloseable> readers = new HashSet();

        PendingReaders(ExecutorService executorService) {
            this.executorService = executorService;
        }

        public synchronized void remove(AsyncCloseable asyncCloseable) {
            this.readers.remove(asyncCloseable);
        }

        public synchronized void add(AsyncCloseable asyncCloseable) {
            this.readers.add(asyncCloseable);
        }

        public CompletableFuture<Void> asyncClose() {
            return Utils.closeSequence(this.executorService, true, (AsyncCloseable[]) this.readers.toArray(new AsyncLogReader[this.readers.size()])).thenApply(r3 -> {
                this.readers.clear();
                return null;
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BKDistributedLogManager(String str, DistributedLogConfiguration distributedLogConfiguration, DynamicDistributedLogConfiguration dynamicDistributedLogConfiguration, URI uri, NamespaceDriver namespaceDriver, LogSegmentMetadataCache logSegmentMetadataCache, OrderedScheduler orderedScheduler, String str2, Integer num, PermitLimiter permitLimiter, FeatureProvider featureProvider, AsyncFailureInjector asyncFailureInjector, StatsLogger statsLogger, StatsLogger statsLogger2, Optional<AsyncCloseable> optional) {
        this.name = str;
        this.conf = distributedLogConfiguration;
        this.dynConf = dynamicDistributedLogConfiguration;
        this.uri = uri;
        this.driver = namespaceDriver;
        this.logSegmentMetadataCache = logSegmentMetadataCache;
        this.scheduler = orderedScheduler;
        this.statsLogger = statsLogger;
        this.perLogStatsLogger = BroadCastStatsLogger.masterslave(statsLogger2, statsLogger);
        this.pendingReaders = new PendingReaders(orderedScheduler);
        this.regionId = num.intValue();
        this.clientId = str2;
        this.streamIdentifier = distributedLogConfiguration.getUnpartitionedStreamName();
        this.writeLimiter = permitLimiter;
        this.featureProvider = featureProvider;
        this.failureInjector = asyncFailureInjector;
        this.alertStatsLogger = new AlertStatsLogger(this.perLogStatsLogger, "dl_alert");
        this.resourcesCloseable = optional;
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public String getStreamName() {
        return this.name;
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public NamespaceDriver getNamespaceDriver() {
        return this.driver;
    }

    URI getUri() {
        return this.uri;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DistributedLogConfiguration getConf() {
        return this.conf;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OrderedScheduler getScheduler() {
        return this.scheduler;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AsyncFailureInjector getFailureInjector() {
        return this.failureInjector;
    }

    @VisibleForTesting
    LogStreamMetadataStore getWriterMetadataStore() {
        return this.driver.getLogStreamMetadataStore(NamespaceDriver.Role.WRITER);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public LogSegmentEntryStore getReaderEntryStore() {
        return this.driver.getLogSegmentEntryStore(NamespaceDriver.Role.READER);
    }

    @VisibleForTesting
    FeatureProvider getFeatureProvider() {
        return this.featureProvider;
    }

    private synchronized BKLogReadHandler getReadHandlerAndRegisterListener(boolean z, LogSegmentListener logSegmentListener) {
        if (null == this.readHandlerForListener && z) {
            this.readHandlerForListener = createReadHandler();
            this.readHandlerForListener.registerListener(logSegmentListener);
            this.readHandlerForListener.asyncStartFetchLogSegments();
            return this.readHandlerForListener;
        }
        if (null != this.readHandlerForListener && null != logSegmentListener) {
            this.readHandlerForListener.registerListener(logSegmentListener);
        }
        return this.readHandlerForListener;
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public List<LogSegmentMetadata> getLogSegments() throws IOException {
        return (List) Utils.ioResult(getLogSegmentsAsync());
    }

    protected CompletableFuture<List<LogSegmentMetadata>> getLogSegmentsAsync() {
        BKLogReadHandler createReadHandler = createReadHandler();
        return createReadHandler.readLogSegmentsFromStore(LogSegmentMetadata.COMPARATOR, LogSegmentFilter.DEFAULT_FILTER, null).thenApply(versioned -> {
            return (List) versioned.getValue();
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (list, th) -> {
            createReadHandler.asyncClose();
        });
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public void registerListener(LogSegmentListener logSegmentListener) throws IOException {
        getReadHandlerAndRegisterListener(true, logSegmentListener);
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public synchronized void unregisterListener(LogSegmentListener logSegmentListener) {
        if (null != this.readHandlerForListener) {
            this.readHandlerForListener.unregisterListener(logSegmentListener);
        }
    }

    public void checkClosedOrInError(String str) throws AlreadyClosedException {
        synchronized (this) {
            if (null != this.closePromise) {
                throw new AlreadyClosedException("Executing " + str + " on already closed DistributedLogManager");
            }
        }
    }

    synchronized BKLogReadHandler createReadHandler() {
        return createReadHandler(Optional.absent(), false);
    }

    synchronized BKLogReadHandler createReadHandler(Optional<String> optional) {
        return createReadHandler(optional, false);
    }

    synchronized BKLogReadHandler createReadHandler(Optional<String> optional, boolean z) {
        return createReadHandler(optional, null, z);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized BKLogReadHandler createReadHandler(Optional<String> optional, AsyncNotification asyncNotification, boolean z) {
        return new BKLogReadHandler(LogMetadataForReader.of(this.uri, this.name, this.streamIdentifier), optional, this.conf, this.dynConf, this.driver.getLogStreamMetadataStore(NamespaceDriver.Role.READER), this.logSegmentMetadataCache, this.driver.getLogSegmentEntryStore(NamespaceDriver.Role.READER), this.scheduler, this.alertStatsLogger, this.statsLogger, this.perLogStatsLogger, this.clientId, asyncNotification, z);
    }

    public BKLogWriteHandler createWriteHandler(boolean z) throws IOException {
        return (BKLogWriteHandler) Utils.ioResult(asyncCreateWriteHandler(z));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<BKLogWriteHandler> asyncCreateWriteHandler(boolean z) {
        return this.driver.getLogStreamMetadataStore(NamespaceDriver.Role.WRITER).getLog(this.uri, this.name, true, this.conf.getCreateStreamIfNotExists()).thenCompose(logMetadataForWriter -> {
            CompletableFuture<BKLogWriteHandler> completableFuture = new CompletableFuture<>();
            createWriteHandler(logMetadataForWriter, z, completableFuture);
            return completableFuture;
        });
    }

    private void createWriteHandler(LogMetadataForWriter logMetadataForWriter, boolean z, final CompletableFuture<BKLogWriteHandler> completableFuture) {
        try {
            final BKLogWriteHandler bKLogWriteHandler = new BKLogWriteHandler(logMetadataForWriter, this.conf, this.driver.getLogStreamMetadataStore(NamespaceDriver.Role.WRITER), this.logSegmentMetadataCache, this.driver.getLogSegmentEntryStore(NamespaceDriver.Role.WRITER), this.scheduler, this.driver.getLogSegmentEntryStore(NamespaceDriver.Role.WRITER).newLogSegmentAllocator(logMetadataForWriter, this.dynConf), this.statsLogger, this.perLogStatsLogger, this.alertStatsLogger, this.clientId, this.regionId, this.writeLimiter, this.featureProvider, this.dynConf, this.conf.isWriteLockEnabled() ? this.driver.getLogStreamMetadataStore(NamespaceDriver.Role.WRITER).createWriteLock(logMetadataForWriter) : NopDistributedLock.INSTANCE);
            if (z) {
                bKLogWriteHandler.lockHandler().whenComplete((BiConsumer<? super Object, ? super Throwable>) new FutureEventListener<DistributedLock>() { // from class: org.apache.distributedlog.BKDistributedLogManager.1
                    public void onSuccess(DistributedLock distributedLock) {
                        FutureUtils.complete(completableFuture, bKLogWriteHandler);
                    }

                    public void onFailure(Throwable th) {
                        CompletableFuture<Void> asyncClose = bKLogWriteHandler.asyncClose();
                        CompletableFuture completableFuture2 = completableFuture;
                        FutureUtils.ensure(asyncClose, () -> {
                            FutureUtils.completeExceptionally(completableFuture2, th);
                        });
                    }
                });
            } else {
                FutureUtils.complete(completableFuture, bKLogWriteHandler);
            }
        } catch (IOException e) {
            FutureUtils.completeExceptionally(completableFuture, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PermitManager getLogSegmentRollingPermitManager() {
        return this.driver.getLogStreamMetadataStore(NamespaceDriver.Role.WRITER).getPermitManager();
    }

    <T> CompletableFuture<T> processReaderOperation(Function<BKLogReadHandler, CompletableFuture<T>> function) {
        CompletableFuture<T> createFuture = FutureUtils.createFuture();
        this.scheduler.submit(() -> {
            FutureUtils.proxyTo((CompletableFuture) function.apply(getReadHandlerAndRegisterListener(true, null)), createFuture);
        });
        return createFuture;
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public boolean isEndOfStreamMarked() throws IOException {
        checkClosedOrInError("isEndOfStreamMarked");
        return ((LogRecordWithDLSN) Utils.ioResult(getLastLogRecordAsyncInternal(false, true))).getTransactionId() == DistributedLogConstants.MAX_TXID;
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException {
        long j;
        try {
            j = ((LogRecordWithDLSN) Utils.ioResult(getLastLogRecordAsyncInternal(true, false))).getTransactionId();
            if (-999 == j || -99 == j) {
                j = 0;
            }
        } catch (LogNotFoundException e) {
            j = 0;
        } catch (LogEmptyException e2) {
            j = 0;
        }
        return new AppendOnlyStreamWriter(startAsyncLogSegmentNonPartitioned(), j);
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException {
        return new AppendOnlyStreamReader(this);
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public BKSyncLogWriter startLogSegmentNonPartitioned() throws IOException {
        return openLogWriter();
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public BKSyncLogWriter openLogWriter() throws IOException {
        checkClosedOrInError("startLogSegmentNonPartitioned");
        BKSyncLogWriter bKSyncLogWriter = new BKSyncLogWriter(this.conf, this.dynConf, this);
        boolean z = false;
        try {
            bKSyncLogWriter.createAndCacheWriteHandler();
            Utils.ioResult(bKSyncLogWriter.getWriteHandler().lockHandler());
            z = true;
            if (1 == 0) {
                bKSyncLogWriter.abort();
            }
            return bKSyncLogWriter;
        } catch (Throwable th) {
            if (!z) {
                bKSyncLogWriter.abort();
            }
            throw th;
        }
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public BKAsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException {
        return (BKAsyncLogWriter) Utils.ioResult(openAsyncLogWriter());
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public CompletableFuture<AsyncLogWriter> openAsyncLogWriter() {
        CompletableFuture<BKLogWriteHandler> asyncCreateWriteHandler;
        try {
            checkClosedOrInError("startLogSegmentNonPartitioned");
            synchronized (this) {
                asyncCreateWriteHandler = asyncCreateWriteHandler(true);
            }
            return asyncCreateWriteHandler.thenCompose(bKLogWriteHandler -> {
                BKAsyncLogWriter bKAsyncLogWriter;
                synchronized (this) {
                    bKAsyncLogWriter = new BKAsyncLogWriter(this.conf, this.dynConf, this, bKLogWriteHandler, this.featureProvider, this.statsLogger);
                }
                return bKLogWriteHandler.recoverIncompleteLogSegments().thenApply(l -> {
                    bKAsyncLogWriter.setLastTxId(l.longValue());
                    return bKAsyncLogWriter;
                }).whenComplete((BiConsumer<? super U, ? super Throwable>) (asyncLogWriter, th) -> {
                    if (null != th) {
                        bKAsyncLogWriter.asyncAbort();
                    }
                });
            });
        } catch (AlreadyClosedException e) {
            return FutureUtils.exception(e);
        }
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public CompletableFuture<DLSN> getDLSNNotLessThanTxId(long j) {
        return getLogSegmentsAsync().thenCompose(list -> {
            return getDLSNNotLessThanTxId(j, list);
        });
    }

    private CompletableFuture<DLSN> getDLSNNotLessThanTxId(long j, List<LogSegmentMetadata> list) {
        if (list.isEmpty()) {
            return getLastDLSNAsync();
        }
        int findLogSegmentNotLessThanTxnId = DLUtils.findLogSegmentNotLessThanTxnId(list, j);
        return findLogSegmentNotLessThanTxnId < 0 ? FutureUtils.value(new DLSN(list.get(0).getLogSegmentSequenceNumber(), 0L, 0L)) : getDLSNNotLessThanTxIdInSegment(j, findLogSegmentNotLessThanTxnId, list, this.driver.getLogSegmentEntryStore(NamespaceDriver.Role.READER));
    }

    private CompletableFuture<DLSN> getDLSNNotLessThanTxIdInSegment(long j, int i, List<LogSegmentMetadata> list, LogSegmentEntryStore logSegmentEntryStore) {
        return ReadUtils.getLogRecordNotLessThanTxId(this.name, list.get(i), j, this.scheduler, logSegmentEntryStore, Math.max(2, this.dynConf.getReadAheadBatchSize())).thenCompose(optional -> {
            return optional.isPresent() ? FutureUtils.value(((LogRecordWithDLSN) optional.get()).getDlsn()) : list.size() - 1 == i ? getLastLogRecordAsync().thenApply(logRecordWithDLSN -> {
                return logRecordWithDLSN.getTransactionId() >= j ? logRecordWithDLSN.getDlsn() : logRecordWithDLSN.getDlsn().getNextDLSN();
            }) : getDLSNNotLessThanTxIdInSegment(j, i + 1, list, logSegmentEntryStore);
        });
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public LogReader openLogReader(long j) throws IOException {
        return getInputStreamInternal(j);
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public LogReader openLogReader(DLSN dlsn) throws IOException {
        return getInputStreamInternal(dlsn, Optional.absent());
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public LogReader getInputStream(long j) throws IOException {
        return openLogReader(j);
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public LogReader getInputStream(DLSN dlsn) throws IOException {
        return openLogReader(dlsn);
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public AsyncLogReader getAsyncLogReader(long j) throws IOException {
        return (AsyncLogReader) Utils.ioResult(openAsyncLogReader(j));
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public CompletableFuture<AsyncLogReader> openAsyncLogReader(long j) {
        final CompletableFuture completableFuture = new CompletableFuture();
        getDLSNNotLessThanTxId(j).whenComplete((BiConsumer<? super DLSN, ? super Throwable>) new FutureEventListener<DLSN>() { // from class: org.apache.distributedlog.BKDistributedLogManager.2
            public void onSuccess(DLSN dlsn) {
                completableFuture.complete(dlsn);
            }

            public void onFailure(Throwable th) {
                if (th instanceof LogEmptyException) {
                    completableFuture.complete(DLSN.InitialDLSN);
                } else {
                    completableFuture.completeExceptionally(th);
                }
            }
        });
        return completableFuture.thenCompose(dlsn -> {
            return openAsyncLogReader(dlsn);
        });
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public AsyncLogReader getAsyncLogReader(DLSN dlsn) throws IOException {
        return (AsyncLogReader) Utils.ioResult(openAsyncLogReader(dlsn));
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public CompletableFuture<AsyncLogReader> openAsyncLogReader(DLSN dlsn) {
        BKAsyncLogReader bKAsyncLogReader = new BKAsyncLogReader(this, this.scheduler, dlsn, Optional.absent(), false, this.statsLogger);
        this.pendingReaders.add(bKAsyncLogReader);
        return FutureUtils.value(bKAsyncLogReader);
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(DLSN dlsn) {
        return getAsyncLogReaderWithLock(Optional.of(dlsn), Optional.absent());
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(DLSN dlsn, String str) {
        return getAsyncLogReaderWithLock(Optional.of(dlsn), Optional.of(str));
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(String str) {
        return getAsyncLogReaderWithLock(Optional.absent(), Optional.of(str));
    }

    protected CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(final Optional<DLSN> optional, final Optional<String> optional2) {
        if (!optional.isPresent() && !optional2.isPresent()) {
            return FutureUtils.exception(new UnexpectedException("Neither from dlsn nor subscriber id is provided."));
        }
        final BKAsyncLogReader bKAsyncLogReader = new BKAsyncLogReader(this, this.scheduler, optional.isPresent() ? (DLSN) optional.get() : DLSN.InitialDLSN, optional2, false, this.statsLogger);
        this.pendingReaders.add(bKAsyncLogReader);
        CompletableFuture<Void> lockStream = bKAsyncLogReader.lockStream();
        final CompletableFuture<AsyncLogReader> createFuture = FutureUtils.createFuture();
        createFuture.whenComplete((asyncLogReader, th) -> {
            if (th instanceof CancellationException) {
                lockStream.cancel(true);
            }
        });
        lockStream.thenCompose((Function<? super Void, ? extends CompletionStage<U>>) new Function<Void, CompletableFuture<AsyncLogReader>>() { // from class: org.apache.distributedlog.BKDistributedLogManager.4
            @Override // java.util.function.Function
            public CompletableFuture<AsyncLogReader> apply(Void r6) {
                if (optional.isPresent()) {
                    return FutureUtils.value(bKAsyncLogReader);
                }
                BKDistributedLogManager.LOG.info("Reader {} @ {} reading last commit position from subscription store after acquired lock.", optional2.get(), BKDistributedLogManager.this.name);
                CompletableFuture<DLSN> lastCommitPosition = BKDistributedLogManager.this.driver.getSubscriptionsStore(BKDistributedLogManager.this.getStreamName()).getLastCommitPosition((String) optional2.get());
                Optional optional3 = optional2;
                BKAsyncLogReader bKAsyncLogReader2 = bKAsyncLogReader;
                return lastCommitPosition.thenCompose(dlsn -> {
                    BKDistributedLogManager.LOG.info("Reader {} @ {} positioned to last commit position {}.", new Object[]{optional3.get(), BKDistributedLogManager.this.name, dlsn});
                    try {
                        bKAsyncLogReader2.setStartDLSN(dlsn);
                        return FutureUtils.value(bKAsyncLogReader2);
                    } catch (UnexpectedException e) {
                        return FutureUtils.exception(e);
                    }
                });
            }
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) new FutureEventListener<AsyncLogReader>() { // from class: org.apache.distributedlog.BKDistributedLogManager.3
            public void onSuccess(AsyncLogReader asyncLogReader2) {
                BKDistributedLogManager.this.pendingReaders.remove(bKAsyncLogReader);
                FutureUtils.complete(createFuture, asyncLogReader2);
            }

            public void onFailure(Throwable th2) {
                BKDistributedLogManager.this.pendingReaders.remove(bKAsyncLogReader);
                CompletableFuture<Void> asyncClose = bKAsyncLogReader.asyncClose();
                CompletableFuture completableFuture = createFuture;
                FutureUtils.ensure(asyncClose, () -> {
                    FutureUtils.completeExceptionally(completableFuture, th2);
                });
            }
        });
        return createFuture;
    }

    LogReader getInputStreamInternal(long j) throws IOException {
        DLSN dlsn;
        try {
            dlsn = (DLSN) Utils.ioResult(getDLSNNotLessThanTxId(j));
        } catch (LogEmptyException e) {
            dlsn = DLSN.InitialDLSN;
        }
        return getInputStreamInternal(dlsn, Optional.of(Long.valueOf(j)));
    }

    LogReader getInputStreamInternal(DLSN dlsn, Optional<Long> optional) throws IOException {
        LOG.info("Create sync reader starting from {}", dlsn);
        checkClosedOrInError("getInputStream");
        return new BKSyncLogReader(this.conf, this, dlsn, optional, this.statsLogger);
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public LogRecordWithDLSN getLastLogRecord() throws IOException {
        checkClosedOrInError("getLastLogRecord");
        return (LogRecordWithDLSN) Utils.ioResult(getLastLogRecordAsync());
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public long getFirstTxId() throws IOException {
        checkClosedOrInError("getFirstTxId");
        return ((LogRecordWithDLSN) Utils.ioResult(getFirstRecordAsyncInternal())).getTransactionId();
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public long getLastTxId() throws IOException {
        checkClosedOrInError("getLastTxId");
        return ((Long) Utils.ioResult(getLastTxIdAsync())).longValue();
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public DLSN getLastDLSN() throws IOException {
        checkClosedOrInError("getLastDLSN");
        return ((LogRecordWithDLSN) Utils.ioResult(getLastLogRecordAsyncInternal(false, false))).getDlsn();
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public CompletableFuture<LogRecordWithDLSN> getLastLogRecordAsync() {
        return getLastLogRecordAsyncInternal(false, false);
    }

    private CompletableFuture<LogRecordWithDLSN> getLastLogRecordAsyncInternal(final boolean z, final boolean z2) {
        return processReaderOperation(new Function<BKLogReadHandler, CompletableFuture<LogRecordWithDLSN>>() { // from class: org.apache.distributedlog.BKDistributedLogManager.5
            @Override // java.util.function.Function
            public CompletableFuture<LogRecordWithDLSN> apply(BKLogReadHandler bKLogReadHandler) {
                return bKLogReadHandler.getLastLogRecordAsync(z, z2);
            }
        });
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public CompletableFuture<Long> getLastTxIdAsync() {
        return getLastLogRecordAsyncInternal(false, false).thenApply((Function<? super LogRecordWithDLSN, ? extends U>) RECORD_2_TXID_FUNCTION);
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public CompletableFuture<DLSN> getFirstDLSNAsync() {
        return getFirstRecordAsyncInternal().thenApply((Function<? super LogRecordWithDLSN, ? extends U>) RECORD_2_DLSN_FUNCTION);
    }

    private CompletableFuture<LogRecordWithDLSN> getFirstRecordAsyncInternal() {
        return processReaderOperation(new Function<BKLogReadHandler, CompletableFuture<LogRecordWithDLSN>>() { // from class: org.apache.distributedlog.BKDistributedLogManager.6
            @Override // java.util.function.Function
            public CompletableFuture<LogRecordWithDLSN> apply(BKLogReadHandler bKLogReadHandler) {
                return bKLogReadHandler.asyncGetFirstLogRecord();
            }
        });
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public CompletableFuture<DLSN> getLastDLSNAsync() {
        return getLastLogRecordAsyncInternal(false, false).thenApply((Function<? super LogRecordWithDLSN, ? extends U>) RECORD_2_DLSN_FUNCTION);
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public long getLogRecordCount() throws IOException {
        checkClosedOrInError("getLogRecordCount");
        return ((Long) Utils.ioResult(getLogRecordCountAsync(DLSN.InitialDLSN))).longValue();
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public CompletableFuture<Long> getLogRecordCountAsync(final DLSN dlsn) {
        return processReaderOperation(new Function<BKLogReadHandler, CompletableFuture<Long>>() { // from class: org.apache.distributedlog.BKDistributedLogManager.7
            @Override // java.util.function.Function
            public CompletableFuture<Long> apply(BKLogReadHandler bKLogReadHandler) {
                return bKLogReadHandler.asyncGetLogRecordCount(dlsn);
            }
        });
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public void recover() throws IOException {
        recoverInternal(this.conf.getUnpartitionedStreamName());
    }

    private void recoverInternal(String str) throws IOException {
        checkClosedOrInError("recoverInternal");
        BKLogWriteHandler createWriteHandler = createWriteHandler(true);
        try {
            Utils.ioResult(createWriteHandler.recoverIncompleteLogSegments());
        } finally {
            Utils.closeQuietly(createWriteHandler);
        }
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public void delete() throws IOException {
        createWriteHandler(true).deleteLog();
        Utils.ioResult(this.driver.getLogStreamMetadataStore(NamespaceDriver.Role.WRITER).deleteLog(this.uri, getStreamName()));
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public void purgeLogsOlderThan(long j) throws IOException {
        Preconditions.checkArgument(j > 0, "Invalid transaction id " + j);
        checkClosedOrInError("purgeLogSegmentsOlderThan");
        BKLogWriteHandler createWriteHandler = createWriteHandler(true);
        try {
            LOG.info("Purging logs for {} older than {}", createWriteHandler.getFullyQualifiedName(), Long.valueOf(j));
            Utils.ioResult(createWriteHandler.purgeLogSegmentsOlderThanTxnId(j));
            Utils.closeQuietly(createWriteHandler);
        } catch (Throwable th) {
            Utils.closeQuietly(createWriteHandler);
            throw th;
        }
    }

    public CompletableFuture<Void> asyncClose() {
        synchronized (this) {
            if (null != this.closePromise) {
                return this.closePromise;
            }
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            this.closePromise = completableFuture;
            FutureUtils.proxyTo(Utils.closeSequence(null, true, this.readHandlerForListener, this.pendingReaders, (AsyncCloseable) this.resourcesCloseable.or(AsyncCloseable.NULL)), completableFuture);
            return completableFuture;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Utils.ioResult(asyncClose());
    }

    public String toString() {
        return String.format("DLM:%s:%s", getUri(), getStreamName());
    }

    public void raiseAlert(String str, Object... objArr) {
        this.alertStatsLogger.raise(str, objArr);
    }

    @Override // org.apache.distributedlog.api.DistributedLogManager
    public SubscriptionsStore getSubscriptionsStore() {
        return this.driver.getSubscriptionsStore(getStreamName());
    }
}
