package com.twitter.distributedlog;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.bk.DynamicQuorumConfigProvider;
import com.twitter.distributedlog.bk.LedgerAllocator;
import com.twitter.distributedlog.bk.LedgerAllocatorDelegator;
import com.twitter.distributedlog.bk.SimpleLedgerAllocator;
import com.twitter.distributedlog.callback.LogSegmentListener;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.AlreadyClosedException;
import com.twitter.distributedlog.exceptions.DLInterruptedException;
import com.twitter.distributedlog.exceptions.LogEmptyException;
import com.twitter.distributedlog.exceptions.LogNotFoundException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore;
import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
import com.twitter.distributedlog.io.AsyncCloseable;
import com.twitter.distributedlog.lock.DistributedLock;
import com.twitter.distributedlog.lock.SessionLockFactory;
import com.twitter.distributedlog.lock.ZKSessionLockFactory;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
import com.twitter.distributedlog.metadata.BKDLConfig;
import com.twitter.distributedlog.stats.BroadCastStatsLogger;
import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
import com.twitter.distributedlog.subscription.SubscriptionStateStore;
import com.twitter.distributedlog.subscription.SubscriptionsStore;
import com.twitter.distributedlog.subscription.ZKSubscriptionStateStore;
import com.twitter.distributedlog.subscription.ZKSubscriptionsStore;
import com.twitter.distributedlog.util.ConfUtils;
import com.twitter.distributedlog.util.DLUtils;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.MonitoredFuturePool;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.distributedlog.util.PermitLimiter;
import com.twitter.distributedlog.util.PermitManager;
import com.twitter.distributedlog.util.SchedulerUtils;
import com.twitter.distributedlog.util.Utils;
import com.twitter.util.ExceptionalFunction;
import com.twitter.util.ExceptionalFunction0;
import com.twitter.util.ExecutorServiceFuturePool;
import com.twitter.util.Function;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.FuturePool;
import com.twitter.util.Promise;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooKeeper;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.util.HashedWheelTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/twitter/distributedlog/BKDistributedLogManager.class */
public class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedLogManager {
    static final Logger LOG = LoggerFactory.getLogger(BKDistributedLogManager.class);
    static final Function<LogRecordWithDLSN, Long> RECORD_2_TXID_FUNCTION = new Function<LogRecordWithDLSN, Long>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.1
        public Long apply(LogRecordWithDLSN logRecordWithDLSN) {
            return Long.valueOf(logRecordWithDLSN.getTransactionId());
        }
    };
    static final Function<LogRecordWithDLSN, DLSN> RECORD_2_DLSN_FUNCTION = new Function<LogRecordWithDLSN, DLSN>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.2
        public DLSN apply(LogRecordWithDLSN logRecordWithDLSN) {
            return logRecordWithDLSN.getDlsn();
        }
    };
    private final String clientId;
    private final int regionId;
    private final String streamIdentifier;
    private final DistributedLogConfiguration conf;
    private final DynamicDistributedLogConfiguration dynConf;
    private Promise<Void> closePromise;
    private final OrderedScheduler scheduler;
    private final OrderedScheduler readAheadScheduler;
    private boolean ownExecutor;
    private final FeatureProvider featureProvider;
    private final StatsLogger statsLogger;
    private final StatsLogger perLogStatsLogger;
    private final AlertStatsLogger alertStatsLogger;
    private SessionLockFactory lockFactory;
    private final LogSegmentMetadataStore writerMetadataStore;
    private final LogSegmentMetadataStore readerMetadataStore;
    private final BookKeeperClientBuilder writerBKCBuilder;
    private final BookKeeperClient writerBKC;
    private final boolean ownWriterBKC;
    private final BookKeeperClientBuilder readerBKCBuilder;
    private final BookKeeperClient readerBKC;
    private final boolean ownReaderBKC;
    private final LedgerAllocator ledgerAllocator;
    private final PermitLimiter writeLimiter;
    private final PermitManager logSegmentRollingPermitManager;
    private OrderedScheduler lockStateExecutor;
    private BKLogReadHandler readHandlerForListener;
    private FuturePool readerFuturePool;
    private final PendingReaders pendingReaders;
    private final ReadAheadExceptionsLogger readAheadExceptionsLogger;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/twitter/distributedlog/BKDistributedLogManager$PendingReaders.class */
    public static class PendingReaders implements AsyncCloseable {
        final ExecutorService executorService;
        final Set<AsyncLogReader> readers = new HashSet();

        PendingReaders(ExecutorService executorService) {
            this.executorService = executorService;
        }

        public synchronized void remove(AsyncLogReader asyncLogReader) {
            this.readers.remove(asyncLogReader);
        }

        public synchronized void add(AsyncLogReader asyncLogReader) {
            this.readers.add(asyncLogReader);
        }

        @Override // com.twitter.distributedlog.io.AsyncCloseable
        public Future<Void> asyncClose() {
            return Utils.closeSequence(this.executorService, true, (AsyncCloseable[]) this.readers.toArray(new AsyncLogReader[this.readers.size()])).onSuccess(new AbstractFunction1<Void, BoxedUnit>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.PendingReaders.1
                public BoxedUnit apply(Void r3) {
                    PendingReaders.this.readers.clear();
                    return BoxedUnit.UNIT;
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void createLog(DistributedLogConfiguration distributedLogConfiguration, ZooKeeperClient zooKeeperClient, URI uri, String str) throws IOException, InterruptedException {
        FutureUtils.result(ZKLogMetadataForWriter.of(uri, str, distributedLogConfiguration.getUnpartitionedStreamName(), zooKeeperClient.get(), zooKeeperClient.getDefaultACL(), true, true));
    }

    BKDistributedLogManager(String str, DistributedLogConfiguration distributedLogConfiguration, URI uri, ZooKeeperClientBuilder zooKeeperClientBuilder, ZooKeeperClientBuilder zooKeeperClientBuilder2, ZooKeeperClient zooKeeperClient, ZooKeeperClient zooKeeperClient2, BookKeeperClientBuilder bookKeeperClientBuilder, BookKeeperClientBuilder bookKeeperClientBuilder2, FeatureProvider featureProvider, PermitLimiter permitLimiter, StatsLogger statsLogger) throws IOException {
        this(str, distributedLogConfiguration, ConfUtils.getConstDynConf(distributedLogConfiguration), uri, zooKeeperClientBuilder, zooKeeperClientBuilder2, zooKeeperClient, zooKeeperClient2, bookKeeperClientBuilder, bookKeeperClientBuilder2, null, null, null, OrderedScheduler.newBuilder().name("BKDL-" + str).corePoolSize(1).build(), null, null, null, null, new ReadAheadExceptionsLogger(statsLogger), DistributedLogConstants.UNKNOWN_CLIENT_ID, 0, null, permitLimiter, PermitManager.UNLIMITED_PERMIT_MANAGER, featureProvider, statsLogger, NullStatsLogger.INSTANCE);
        this.ownExecutor = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BKDistributedLogManager(String str, DistributedLogConfiguration distributedLogConfiguration, DynamicDistributedLogConfiguration dynamicDistributedLogConfiguration, URI uri, ZooKeeperClientBuilder zooKeeperClientBuilder, ZooKeeperClientBuilder zooKeeperClientBuilder2, ZooKeeperClient zooKeeperClient, ZooKeeperClient zooKeeperClient2, BookKeeperClientBuilder bookKeeperClientBuilder, BookKeeperClientBuilder bookKeeperClientBuilder2, SessionLockFactory sessionLockFactory, LogSegmentMetadataStore logSegmentMetadataStore, LogSegmentMetadataStore logSegmentMetadataStore2, OrderedScheduler orderedScheduler, OrderedScheduler orderedScheduler2, OrderedScheduler orderedScheduler3, ClientSocketChannelFactory clientSocketChannelFactory, HashedWheelTimer hashedWheelTimer, ReadAheadExceptionsLogger readAheadExceptionsLogger, String str2, Integer num, LedgerAllocator ledgerAllocator, PermitLimiter permitLimiter, PermitManager permitManager, FeatureProvider featureProvider, StatsLogger statsLogger, StatsLogger statsLogger2) throws IOException {
        super(str, distributedLogConfiguration, uri, zooKeeperClientBuilder, zooKeeperClientBuilder2, statsLogger);
        this.lockFactory = null;
        this.lockStateExecutor = null;
        this.readHandlerForListener = null;
        this.readerFuturePool = null;
        Preconditions.checkNotNull(readAheadExceptionsLogger, "No ReadAhead Stats Logger Provided.");
        this.conf = distributedLogConfiguration;
        this.dynConf = dynamicDistributedLogConfiguration;
        this.scheduler = orderedScheduler;
        this.lockFactory = sessionLockFactory;
        this.lockStateExecutor = orderedScheduler3;
        this.readAheadScheduler = null == orderedScheduler2 ? orderedScheduler : orderedScheduler2;
        this.statsLogger = statsLogger;
        this.perLogStatsLogger = BroadCastStatsLogger.masterslave(statsLogger2, statsLogger);
        this.ownExecutor = false;
        this.pendingReaders = new PendingReaders(orderedScheduler);
        this.regionId = num.intValue();
        this.clientId = str2;
        this.streamIdentifier = distributedLogConfiguration.getUnpartitionedStreamName();
        this.ledgerAllocator = ledgerAllocator;
        this.writeLimiter = permitLimiter;
        this.logSegmentRollingPermitManager = permitManager;
        if (null == logSegmentMetadataStore) {
            this.writerMetadataStore = new ZKLogSegmentMetadataStore(distributedLogConfiguration, this.writerZKC, orderedScheduler);
        } else {
            this.writerMetadataStore = logSegmentMetadataStore;
        }
        if (null == logSegmentMetadataStore2) {
            this.readerMetadataStore = new ZKLogSegmentMetadataStore(distributedLogConfiguration, this.readerZKC, orderedScheduler);
        } else {
            this.readerMetadataStore = logSegmentMetadataStore2;
        }
        if (null == bookKeeperClientBuilder) {
            BKDLConfig resolveDLConfig = BKDLConfig.resolveDLConfig(this.writerZKC, uri);
            BKDLConfig.propagateConfiguration(resolveDLConfig, distributedLogConfiguration);
            this.writerBKCBuilder = BookKeeperClientBuilder.newBuilder().dlConfig(distributedLogConfiguration).name(String.format("bk:%s:dlm_writer_shared", str)).ledgersPath(resolveDLConfig.getBkLedgersPath()).channelFactory(clientSocketChannelFactory).requestTimer(hashedWheelTimer).statsLogger(statsLogger);
            if (null == zooKeeperClient) {
                this.writerBKCBuilder.zkServers(resolveDLConfig.getBkZkServersForWriter());
            } else {
                this.writerBKCBuilder.zkc(zooKeeperClient);
            }
            this.ownWriterBKC = true;
        } else {
            this.writerBKCBuilder = bookKeeperClientBuilder;
            this.ownWriterBKC = false;
        }
        this.writerBKC = this.writerBKCBuilder.build();
        if (null == bookKeeperClientBuilder2) {
            BKDLConfig resolveDLConfig2 = BKDLConfig.resolveDLConfig(this.writerZKC, uri);
            BKDLConfig.propagateConfiguration(resolveDLConfig2, distributedLogConfiguration);
            if (resolveDLConfig2.getBkZkServersForWriter().equals(resolveDLConfig2.getBkZkServersForReader())) {
                this.readerBKCBuilder = this.writerBKCBuilder;
                this.ownReaderBKC = false;
            } else {
                this.readerBKCBuilder = BookKeeperClientBuilder.newBuilder().dlConfig(distributedLogConfiguration).name(String.format("bk:%s:dlm_reader_shared", str)).ledgersPath(resolveDLConfig2.getBkLedgersPath()).channelFactory(clientSocketChannelFactory).requestTimer(hashedWheelTimer).statsLogger(statsLogger);
                if (null == zooKeeperClient2) {
                    this.readerBKCBuilder.zkServers(resolveDLConfig2.getBkZkServersForReader());
                } else {
                    this.readerBKCBuilder.zkc(zooKeeperClient2);
                }
                this.ownReaderBKC = true;
            }
        } else {
            this.readerBKCBuilder = bookKeeperClientBuilder2;
            this.ownReaderBKC = false;
        }
        this.readerBKC = this.readerBKCBuilder.build();
        this.featureProvider = featureProvider;
        this.alertStatsLogger = new AlertStatsLogger(this.perLogStatsLogger, "dl_alert");
        this.readAheadExceptionsLogger = readAheadExceptionsLogger;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized OrderedScheduler getLockStateExecutor(boolean z) {
        if (z && null == this.lockStateExecutor && this.ownExecutor) {
            this.lockStateExecutor = OrderedScheduler.newBuilder().corePoolSize(1).name("BKDL-LockState").build();
        }
        return this.lockStateExecutor;
    }

    private synchronized SessionLockFactory getLockFactory(boolean z) {
        if (z && null == this.lockFactory) {
            this.lockFactory = new ZKSessionLockFactory(this.writerZKC, this.clientId, getLockStateExecutor(z), this.conf.getZKNumRetries(), this.conf.getLockTimeoutMilliSeconds(), this.conf.getZKRetryBackoffStartMillis(), this.statsLogger);
        }
        return this.lockFactory;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DistributedLogConfiguration getConf() {
        return this.conf;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OrderedScheduler getScheduler() {
        return this.scheduler;
    }

    @VisibleForTesting
    BookKeeperClient getWriterBKC() {
        return this.writerBKC;
    }

    @VisibleForTesting
    BookKeeperClient getReaderBKC() {
        return this.readerBKC;
    }

    @VisibleForTesting
    FuturePool getReaderFuturePool() {
        return this.readerFuturePool;
    }

    @VisibleForTesting
    FeatureProvider getFeatureProvider() {
        return this.featureProvider;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized BKLogReadHandler getReadHandlerForListener(boolean z) {
        if (null == this.readHandlerForListener && z) {
            this.readHandlerForListener = createReadHandler();
            this.readHandlerForListener.scheduleGetLedgersTask(true, true);
        }
        return this.readHandlerForListener;
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public List<LogSegmentMetadata> getLogSegments() throws IOException {
        return (List) FutureUtils.result(getLogSegmentsAsync());
    }

    protected Future<List<LogSegmentMetadata>> getLogSegmentsAsync() {
        final BKLogReadHandler createReadHandler = createReadHandler();
        return createReadHandler.asyncGetFullLedgerList(true, false).ensure(new AbstractFunction0<BoxedUnit>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.3
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public BoxedUnit m10apply() {
                createReadHandler.asyncClose();
                return BoxedUnit.UNIT;
            }
        });
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public void registerListener(LogSegmentListener logSegmentListener) throws IOException {
        getReadHandlerForListener(true).registerListener(logSegmentListener);
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public synchronized void unregisterListener(LogSegmentListener logSegmentListener) {
        if (null != this.readHandlerForListener) {
            this.readHandlerForListener.unregisterListener(logSegmentListener);
        }
    }

    @Override // com.twitter.distributedlog.ZKMetadataAccessor
    public void checkClosedOrInError(String str) throws AlreadyClosedException {
        if (null != this.closePromise) {
            throw new AlreadyClosedException("Executing " + str + " on already closed DistributedLogManager");
        }
        if (null != this.writerBKC) {
            this.writerBKC.checkClosedOrInError();
        }
        if (null != this.readerBKC) {
            this.readerBKC.checkClosedOrInError();
        }
    }

    synchronized BKLogReadHandler createReadHandler() {
        return createReadHandler(Optional.absent(), false);
    }

    synchronized BKLogReadHandler createReadHandler(Optional<String> optional) {
        return createReadHandler(optional, false);
    }

    synchronized BKLogReadHandler createReadHandler(Optional<String> optional, boolean z) {
        return createReadHandler(optional, getLockStateExecutor(true), null, true, z);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized BKLogReadHandler createReadHandler(Optional<String> optional, OrderedScheduler orderedScheduler, AsyncNotification asyncNotification, boolean z, boolean z2) {
        return new BKLogReadHandler(ZKLogMetadataForReader.of(this.uri, this.name, this.streamIdentifier), optional, this.conf, this.dynConf, this.readerZKCBuilder, this.readerBKCBuilder, this.readerMetadataStore, this.scheduler, orderedScheduler, this.readAheadScheduler, this.alertStatsLogger, this.readAheadExceptionsLogger, this.statsLogger, this.perLogStatsLogger, this.clientId, asyncNotification, z2, z);
    }

    LedgerAllocator createLedgerAllocator(ZKLogMetadataForWriter zKLogMetadataForWriter) throws IOException {
        LedgerAllocator ledgerAllocator;
        if (this.dynConf.getEnableLedgerAllocatorPool()) {
            ledgerAllocator = this.ledgerAllocator;
        } else {
            ledgerAllocator = new LedgerAllocatorDelegator(new SimpleLedgerAllocator(zKLogMetadataForWriter.getAllocationPath(), zKLogMetadataForWriter.getAllocationData(), new DynamicQuorumConfigProvider(this.dynConf), this.writerZKC, this.writerBKC), true);
        }
        return ledgerAllocator;
    }

    public BKLogWriteHandler createWriteHandler(boolean z) throws IOException {
        return (BKLogWriteHandler) FutureUtils.result(asyncCreateWriteHandler(z));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future<BKLogWriteHandler> asyncCreateWriteHandler(final boolean z) {
        try {
            ZooKeeper zooKeeper = this.writerZKC.get();
            boolean z2 = null == this.ledgerAllocator;
            return ZKLogMetadataForWriter.of(this.uri, this.name, this.streamIdentifier, zooKeeper, this.writerZKC.getDefaultACL(), z2, this.conf.getCreateStreamIfNotExists() || z2).flatMap(new AbstractFunction1<ZKLogMetadataForWriter, Future<BKLogWriteHandler>>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.4
                public Future<BKLogWriteHandler> apply(ZKLogMetadataForWriter zKLogMetadataForWriter) {
                    Promise promise = new Promise();
                    BKDistributedLogManager.this.createWriteHandler(zKLogMetadataForWriter, z, promise);
                    return promise;
                }
            });
        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
            return Future.exception(FutureUtils.zkException(e, this.uri.getPath()));
        } catch (InterruptedException e2) {
            LOG.error("Failed to initialize zookeeper client : ", e2);
            return Future.exception(new DLInterruptedException("Failed to initialize zookeeper client", e2));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void createWriteHandler(ZKLogMetadataForWriter zKLogMetadataForWriter, boolean z, final Promise<BKLogWriteHandler> promise) {
        DistributedLock distributedLock = new DistributedLock(getLockStateExecutor(true), getLockFactory(true), zKLogMetadataForWriter.getLockPath(), this.conf.getLockTimeoutMilliSeconds(), this.statsLogger);
        try {
            final BKLogWriteHandler bKLogWriteHandler = new BKLogWriteHandler(zKLogMetadataForWriter, this.conf, this.writerZKCBuilder, this.writerBKCBuilder, this.writerMetadataStore, this.scheduler, createLedgerAllocator(zKLogMetadataForWriter), this.statsLogger, this.perLogStatsLogger, this.alertStatsLogger, this.clientId, this.regionId, this.writeLimiter, this.featureProvider, this.dynConf, distributedLock);
            PermitManager logSegmentRollingPermitManager = getLogSegmentRollingPermitManager();
            if (logSegmentRollingPermitManager instanceof Watcher) {
                bKLogWriteHandler.register((Watcher) logSegmentRollingPermitManager);
            }
            if (z) {
                bKLogWriteHandler.lockHandler().addEventListener(new FutureEventListener<DistributedLock>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.5
                    public void onSuccess(DistributedLock distributedLock2) {
                        FutureUtils.setValue(promise, bKLogWriteHandler);
                    }

                    public void onFailure(final Throwable th) {
                        bKLogWriteHandler.asyncClose().ensure(new AbstractFunction0<BoxedUnit>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.5.1
                            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
                            public BoxedUnit m11apply() {
                                FutureUtils.setException(promise, th);
                                return BoxedUnit.UNIT;
                            }
                        });
                    }
                });
            } else {
                FutureUtils.setValue(promise, bKLogWriteHandler);
            }
        } catch (IOException e) {
            FutureUtils.setException(promise, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PermitManager getLogSegmentRollingPermitManager() {
        return this.logSegmentRollingPermitManager;
    }

    <T> Future<T> processReaderOperation(final Function<BKLogReadHandler, Future<T>> function) {
        initializeFuturePool(false);
        return this.readerFuturePool.apply(new ExceptionalFunction0<BKLogReadHandler>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.7
            /* renamed from: applyE, reason: merged with bridge method [inline-methods] */
            public BKLogReadHandler m12applyE() throws Throwable {
                return BKDistributedLogManager.this.getReadHandlerForListener(true);
            }
        }).flatMap(new ExceptionalFunction<BKLogReadHandler, Future<T>>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.6
            public Future<T> applyE(BKLogReadHandler bKLogReadHandler) throws Throwable {
                return (Future) function.apply(bKLogReadHandler);
            }
        });
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public boolean isEndOfStreamMarked() throws IOException {
        checkClosedOrInError("isEndOfStreamMarked");
        return ((LogRecordWithDLSN) FutureUtils.result(getLastLogRecordAsyncInternal(false, true))).getTransactionId() == DistributedLogConstants.MAX_TXID;
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException {
        long j;
        try {
            j = ((LogRecordWithDLSN) FutureUtils.result(getLastLogRecordAsyncInternal(true, false))).getTransactionId();
            if (-999 == j || -99 == j) {
                j = 0;
            }
        } catch (LogEmptyException e) {
            j = 0;
        } catch (LogNotFoundException e2) {
            j = 0;
        }
        return new AppendOnlyStreamWriter(startAsyncLogSegmentNonPartitioned(), j);
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException {
        return new AppendOnlyStreamReader(this);
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public BKSyncLogWriter startLogSegmentNonPartitioned() throws IOException {
        checkClosedOrInError("startLogSegmentNonPartitioned");
        BKSyncLogWriter bKSyncLogWriter = new BKSyncLogWriter(this.conf, this.dynConf, this);
        boolean z = false;
        try {
            bKSyncLogWriter.createAndCacheWriteHandler();
            FutureUtils.result(bKSyncLogWriter.getWriteHandler().lockHandler());
            z = true;
            if (1 == 0) {
                bKSyncLogWriter.abort();
            }
            return bKSyncLogWriter;
        } catch (Throwable th) {
            if (!z) {
                bKSyncLogWriter.abort();
            }
            throw th;
        }
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public BKAsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException {
        return (BKAsyncLogWriter) FutureUtils.result(openAsyncLogWriter());
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public Future<AsyncLogWriter> openAsyncLogWriter() {
        Future<BKLogWriteHandler> asyncCreateWriteHandler;
        try {
            checkClosedOrInError("startLogSegmentNonPartitioned");
            synchronized (this) {
                asyncCreateWriteHandler = asyncCreateWriteHandler(true);
            }
            return asyncCreateWriteHandler.flatMap(new AbstractFunction1<BKLogWriteHandler, Future<AsyncLogWriter>>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.8
                public Future<AsyncLogWriter> apply(BKLogWriteHandler bKLogWriteHandler) {
                    final BKAsyncLogWriter bKAsyncLogWriter;
                    synchronized (BKDistributedLogManager.this) {
                        bKAsyncLogWriter = new BKAsyncLogWriter(BKDistributedLogManager.this.conf, BKDistributedLogManager.this.dynConf, BKDistributedLogManager.this, bKLogWriteHandler, BKDistributedLogManager.this.featureProvider, BKDistributedLogManager.this.statsLogger);
                    }
                    return bKLogWriteHandler.recoverIncompleteLogSegments().map(new AbstractFunction1<Long, AsyncLogWriter>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.8.2
                        public AsyncLogWriter apply(Long l) {
                            bKAsyncLogWriter.setLastTxId(l.longValue());
                            return bKAsyncLogWriter;
                        }
                    }).onFailure(new AbstractFunction1<Throwable, BoxedUnit>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.8.1
                        public BoxedUnit apply(Throwable th) {
                            bKAsyncLogWriter.asyncAbort();
                            return BoxedUnit.UNIT;
                        }
                    });
                }
            });
        } catch (AlreadyClosedException e) {
            return Future.exception(e);
        }
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public Future<DLSN> getDLSNNotLessThanTxId(final long j) {
        return getLogSegmentsAsync().flatMap(new AbstractFunction1<List<LogSegmentMetadata>, Future<DLSN>>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.9
            public Future<DLSN> apply(List<LogSegmentMetadata> list) {
                return BKDistributedLogManager.this.getDLSNNotLessThanTxId(j, list);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Future<DLSN> getDLSNNotLessThanTxId(long j, List<LogSegmentMetadata> list) {
        if (list.isEmpty()) {
            return getLastDLSNAsync();
        }
        int findLogSegmentNotLessThanTxnId = DLUtils.findLogSegmentNotLessThanTxnId(list, j);
        if (findLogSegmentNotLessThanTxnId < 0) {
            return Future.value(new DLSN(list.get(0).getLogSegmentSequenceNumber(), 0L, 0L));
        }
        final LedgerHandleCache build = LedgerHandleCache.newBuilder().bkc(this.readerBKC).conf(this.conf).build();
        return getDLSNNotLessThanTxIdInSegment(j, findLogSegmentNotLessThanTxnId, list, build).ensure(new AbstractFunction0<BoxedUnit>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.10
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public BoxedUnit m8apply() {
                build.clear();
                return BoxedUnit.UNIT;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Future<DLSN> getDLSNNotLessThanTxIdInSegment(final long j, final int i, final List<LogSegmentMetadata> list, final LedgerHandleCache ledgerHandleCache) {
        return ReadUtils.getLogRecordNotLessThanTxId(this.name, list.get(i), j, this.scheduler, ledgerHandleCache, Math.max(2, this.dynConf.getReadAheadBatchSize())).flatMap(new AbstractFunction1<Optional<LogRecordWithDLSN>, Future<DLSN>>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.11
            public Future<DLSN> apply(Optional<LogRecordWithDLSN> optional) {
                return optional.isPresent() ? Future.value(((LogRecordWithDLSN) optional.get()).getDlsn()) : list.size() - 1 == i ? BKDistributedLogManager.this.getLastLogRecordAsync().map(new AbstractFunction1<LogRecordWithDLSN, DLSN>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.11.1
                    public DLSN apply(LogRecordWithDLSN logRecordWithDLSN) {
                        return logRecordWithDLSN.getTransactionId() >= j ? logRecordWithDLSN.getDlsn() : logRecordWithDLSN.getDlsn().getNextDLSN();
                    }
                }) : BKDistributedLogManager.this.getDLSNNotLessThanTxIdInSegment(j, i + 1, list, ledgerHandleCache);
            }
        });
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public LogReader getInputStream(long j) throws IOException {
        return getInputStreamInternal(j);
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public LogReader getInputStream(DLSN dlsn) throws IOException {
        return getInputStreamInternal(dlsn, Optional.absent());
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public AsyncLogReader getAsyncLogReader(long j) throws IOException {
        return (AsyncLogReader) FutureUtils.result(openAsyncLogReader(j));
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public Future<AsyncLogReader> openAsyncLogReader(long j) {
        final Promise promise = new Promise();
        getDLSNNotLessThanTxId(j).addEventListener(new FutureEventListener<DLSN>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.12
            public void onSuccess(DLSN dlsn) {
                promise.setValue(dlsn);
            }

            public void onFailure(Throwable th) {
                if (th instanceof LogEmptyException) {
                    promise.setValue(DLSN.InitialDLSN);
                } else {
                    promise.setException(th);
                }
            }
        });
        return promise.flatMap(new AbstractFunction1<DLSN, Future<AsyncLogReader>>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.13
            public Future<AsyncLogReader> apply(DLSN dlsn) {
                return BKDistributedLogManager.this.openAsyncLogReader(dlsn);
            }
        });
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public AsyncLogReader getAsyncLogReader(DLSN dlsn) throws IOException {
        return (AsyncLogReader) FutureUtils.result(openAsyncLogReader(dlsn));
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public Future<AsyncLogReader> openAsyncLogReader(DLSN dlsn) {
        return Future.value(new BKAsyncLogReaderDLSN(this, this.scheduler, getLockStateExecutor(true), dlsn, Optional.absent(), false, this.dynConf.getDeserializeRecordSetOnReads(), this.statsLogger));
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN dlsn) {
        return getAsyncLogReaderWithLock(Optional.of(dlsn), Optional.absent());
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN dlsn, String str) {
        return getAsyncLogReaderWithLock(Optional.of(dlsn), Optional.of(str));
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public Future<AsyncLogReader> getAsyncLogReaderWithLock(String str) {
        return getAsyncLogReaderWithLock(Optional.absent(), Optional.of(str));
    }

    protected Future<AsyncLogReader> getAsyncLogReaderWithLock(final Optional<DLSN> optional, final Optional<String> optional2) {
        if (!optional.isPresent() && !optional2.isPresent()) {
            return Future.exception(new UnexpectedException("Neither from dlsn nor subscriber id is provided."));
        }
        final BKAsyncLogReaderDLSN bKAsyncLogReaderDLSN = new BKAsyncLogReaderDLSN(this, this.scheduler, getLockStateExecutor(true), optional.isPresent() ? (DLSN) optional.get() : DLSN.InitialDLSN, optional2, false, this.dynConf.getDeserializeRecordSetOnReads(), this.statsLogger);
        this.pendingReaders.add(bKAsyncLogReaderDLSN);
        final Future<Void> lockStream = bKAsyncLogReaderDLSN.lockStream();
        final Promise promise = new Promise(new Function<Throwable, BoxedUnit>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.14
            public BoxedUnit apply(Throwable th) {
                lockStream.cancel();
                return BoxedUnit.UNIT;
            }
        });
        lockStream.flatMap(new Function<Void, Future<AsyncLogReader>>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.16
            public Future<AsyncLogReader> apply(Void r6) {
                if (optional.isPresent()) {
                    return Future.value(bKAsyncLogReaderDLSN);
                }
                BKDistributedLogManager.LOG.info("Reader {} @ {} reading last commit position from subscription store after acquired lock.", optional2.get(), BKDistributedLogManager.this.name);
                return BKDistributedLogManager.this.getSubscriptionStateStore((String) optional2.get()).getLastCommitPosition().map(new ExceptionalFunction<DLSN, AsyncLogReader>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.16.1
                    public AsyncLogReader applyE(DLSN dlsn) throws UnexpectedException {
                        BKDistributedLogManager.LOG.info("Reader {} @ {} positioned to last commit position {}.", new Object[]{optional2.get(), BKDistributedLogManager.this.name, dlsn});
                        bKAsyncLogReaderDLSN.setStartDLSN(dlsn);
                        return bKAsyncLogReaderDLSN;
                    }
                });
            }
        }).addEventListener(new FutureEventListener<AsyncLogReader>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.15
            public void onSuccess(AsyncLogReader asyncLogReader) {
                BKDistributedLogManager.this.pendingReaders.remove(bKAsyncLogReaderDLSN);
                FutureUtils.setValue(promise, asyncLogReader);
            }

            public void onFailure(final Throwable th) {
                BKDistributedLogManager.this.pendingReaders.remove(bKAsyncLogReaderDLSN);
                bKAsyncLogReaderDLSN.asyncClose().ensure(new AbstractFunction0<BoxedUnit>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.15.1
                    /* renamed from: apply, reason: merged with bridge method [inline-methods] */
                    public BoxedUnit m9apply() {
                        FutureUtils.setException(promise, th);
                        return BoxedUnit.UNIT;
                    }
                });
            }
        });
        return promise;
    }

    LogReader getInputStreamInternal(long j) throws IOException {
        DLSN dlsn;
        try {
            dlsn = (DLSN) FutureUtils.result(getDLSNNotLessThanTxId(j));
        } catch (LogEmptyException e) {
            dlsn = DLSN.InitialDLSN;
        }
        return getInputStreamInternal(dlsn, Optional.of(Long.valueOf(j)));
    }

    LogReader getInputStreamInternal(DLSN dlsn, Optional<Long> optional) throws IOException {
        LOG.info("Create async reader starting from {}", dlsn);
        checkClosedOrInError("getInputStream");
        return new BKSyncLogReaderDLSN(this.conf, new BKAsyncLogReaderDLSN(this, this.scheduler, getLockStateExecutor(true), dlsn, Optional.absent(), true, this.dynConf.getDeserializeRecordSetOnReads(), this.statsLogger), this.scheduler, optional);
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public LogRecordWithDLSN getLastLogRecord() throws IOException {
        checkClosedOrInError("getLastLogRecord");
        return (LogRecordWithDLSN) FutureUtils.result(getLastLogRecordAsync());
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public long getFirstTxId() throws IOException {
        checkClosedOrInError("getFirstTxId");
        return ((LogRecordWithDLSN) FutureUtils.result(getFirstRecordAsyncInternal())).getTransactionId();
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public long getLastTxId() throws IOException {
        checkClosedOrInError("getLastTxId");
        return ((Long) FutureUtils.result(getLastTxIdAsync())).longValue();
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public DLSN getLastDLSN() throws IOException {
        checkClosedOrInError("getLastDLSN");
        return ((LogRecordWithDLSN) FutureUtils.result(getLastLogRecordAsyncInternal(false, false))).getDlsn();
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public Future<LogRecordWithDLSN> getLastLogRecordAsync() {
        return getLastLogRecordAsyncInternal(false, false);
    }

    private Future<LogRecordWithDLSN> getLastLogRecordAsyncInternal(final boolean z, final boolean z2) {
        return processReaderOperation(new Function<BKLogReadHandler, Future<LogRecordWithDLSN>>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.17
            public Future<LogRecordWithDLSN> apply(BKLogReadHandler bKLogReadHandler) {
                return bKLogReadHandler.getLastLogRecordAsync(z, z2);
            }
        });
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public Future<Long> getLastTxIdAsync() {
        return getLastLogRecordAsyncInternal(false, false).map(RECORD_2_TXID_FUNCTION);
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public Future<DLSN> getFirstDLSNAsync() {
        return getFirstRecordAsyncInternal().map(RECORD_2_DLSN_FUNCTION);
    }

    private Future<LogRecordWithDLSN> getFirstRecordAsyncInternal() {
        return processReaderOperation(new Function<BKLogReadHandler, Future<LogRecordWithDLSN>>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.18
            public Future<LogRecordWithDLSN> apply(BKLogReadHandler bKLogReadHandler) {
                return bKLogReadHandler.asyncGetFirstLogRecord();
            }
        });
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public Future<DLSN> getLastDLSNAsync() {
        return getLastLogRecordAsyncInternal(false, false).map(RECORD_2_DLSN_FUNCTION);
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public long getLogRecordCount() throws IOException {
        checkClosedOrInError("getLogRecordCount");
        return ((Long) FutureUtils.result(getLogRecordCountAsync(DLSN.InitialDLSN))).longValue();
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public Future<Long> getLogRecordCountAsync(final DLSN dlsn) {
        return processReaderOperation(new Function<BKLogReadHandler, Future<Long>>() { // from class: com.twitter.distributedlog.BKDistributedLogManager.19
            public Future<Long> apply(BKLogReadHandler bKLogReadHandler) {
                return bKLogReadHandler.asyncGetLogRecordCount(dlsn);
            }
        });
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public void recover() throws IOException {
        recoverInternal(this.conf.getUnpartitionedStreamName());
    }

    private void recoverInternal(String str) throws IOException {
        checkClosedOrInError("recoverInternal");
        BKLogWriteHandler createWriteHandler = createWriteHandler(true);
        try {
            FutureUtils.result(createWriteHandler.recoverIncompleteLogSegments());
            Utils.closeQuietly(createWriteHandler);
        } catch (Throwable th) {
            Utils.closeQuietly(createWriteHandler);
            throw th;
        }
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public void delete() throws IOException {
        BKLogWriteHandler createWriteHandler = createWriteHandler(true);
        try {
            createWriteHandler.deleteLog();
            Utils.closeQuietly(createWriteHandler);
            String zKPath = getZKPath();
            if (!zKPath.toLowerCase().contains(DistributedLogConstants.SCHEME_PREFIX)) {
                LOG.warn("Skip deletion of unrecognized ZK Path {}", zKPath);
                return;
            }
            try {
                LOG.info("Delete the path associated with the log {}, ZK Path {}", this.name, zKPath);
                ZKUtil.deleteRecursive(this.writerZKC.get(), zKPath);
            } catch (KeeperException e) {
                LOG.error("Error accessing entry in zookeeper", e);
                throw new IOException("Error initializing zk", e);
            } catch (InterruptedException e2) {
                LOG.error("Interrupted while accessing ZK", e2);
                throw new DLInterruptedException("Error initializing zk", e2);
            }
        } catch (Throwable th) {
            Utils.closeQuietly(createWriteHandler);
            throw th;
        }
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public void purgeLogsOlderThan(long j) throws IOException {
        Preconditions.checkArgument(j > 0, "Invalid transaction id " + j);
        checkClosedOrInError("purgeLogSegmentsOlderThan");
        BKLogWriteHandler createWriteHandler = createWriteHandler(true);
        try {
            LOG.info("Purging logs for {} older than {}", createWriteHandler.getFullyQualifiedName(), Long.valueOf(j));
            FutureUtils.result(createWriteHandler.purgeLogSegmentsOlderThanTxnId(j));
            Utils.closeQuietly(createWriteHandler);
        } catch (Throwable th) {
            Utils.closeQuietly(createWriteHandler);
            throw th;
        }
    }

    @Override // com.twitter.distributedlog.ZKMetadataAccessor, com.twitter.distributedlog.io.AsyncCloseable
    public Future<Void> asyncClose() {
        synchronized (this) {
            if (null != this.closePromise) {
                return this.closePromise;
            }
            Promise<Void> promise = new Promise<>();
            this.closePromise = promise;
            Utils.closeSequence(null, true, this.readHandlerForListener, this.pendingReaders, new AsyncCloseable() { // from class: com.twitter.distributedlog.BKDistributedLogManager.20
                @Override // com.twitter.distributedlog.io.AsyncCloseable
                public Future<Void> asyncClose() {
                    int schedulerShutdownTimeoutMs = BKDistributedLogManager.this.conf.getSchedulerShutdownTimeoutMs();
                    if (BKDistributedLogManager.this.ownExecutor) {
                        SchedulerUtils.shutdownScheduler(BKDistributedLogManager.this.scheduler, schedulerShutdownTimeoutMs, TimeUnit.MILLISECONDS);
                        BKDistributedLogManager.LOG.info("Stopped BKDL executor service for {}.", BKDistributedLogManager.this.name);
                        if (BKDistributedLogManager.this.scheduler != BKDistributedLogManager.this.readAheadScheduler) {
                            SchedulerUtils.shutdownScheduler(BKDistributedLogManager.this.readAheadScheduler, schedulerShutdownTimeoutMs, TimeUnit.MILLISECONDS);
                            BKDistributedLogManager.LOG.info("Stopped BKDL ReadAhead Executor Service for {}.", BKDistributedLogManager.this.name);
                        }
                        SchedulerUtils.shutdownScheduler(BKDistributedLogManager.this.getLockStateExecutor(false), schedulerShutdownTimeoutMs, TimeUnit.MILLISECONDS);
                        BKDistributedLogManager.LOG.info("Stopped BKDL Lock State Executor for {}.", BKDistributedLogManager.this.name);
                    }
                    if (BKDistributedLogManager.this.ownWriterBKC) {
                        BKDistributedLogManager.this.writerBKC.close();
                    }
                    if (BKDistributedLogManager.this.ownReaderBKC) {
                        BKDistributedLogManager.this.readerBKC.close();
                    }
                    return Future.Void();
                }
            }, new AsyncCloseable() { // from class: com.twitter.distributedlog.BKDistributedLogManager.21
                @Override // com.twitter.distributedlog.io.AsyncCloseable
                public Future<Void> asyncClose() {
                    return BKDistributedLogManager.super.asyncClose();
                }
            }).proxyTo(promise);
            return promise;
        }
    }

    @Override // com.twitter.distributedlog.ZKMetadataAccessor, com.twitter.distributedlog.MetadataAccessor, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        FutureUtils.result(asyncClose());
    }

    public boolean scheduleTask(Runnable runnable) {
        try {
            this.scheduler.submit(runnable);
            return true;
        } catch (RejectedExecutionException e) {
            LOG.error("Task {} is rejected : ", runnable, e);
            return false;
        }
    }

    private FuturePool buildFuturePool(ExecutorService executorService, StatsLogger statsLogger) {
        return new MonitoredFuturePool(new ExecutorServiceFuturePool(executorService), statsLogger, this.conf.getEnableTaskExecutionStats(), this.conf.getTaskExecutionWarnTimeMicros());
    }

    private void initializeFuturePool(boolean z) {
        if (null == this.readerFuturePool) {
            this.readerFuturePool = buildFuturePool(this.scheduler, this.statsLogger.scope("reader_future_pool"));
        }
    }

    public String toString() {
        return String.format("DLM:%s:%s", getZKPath(), getStreamName());
    }

    public void raiseAlert(String str, Object... objArr) {
        this.alertStatsLogger.raise(str, objArr);
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    @Deprecated
    public SubscriptionStateStore getSubscriptionStateStore(String str) {
        return getSubscriptionStateStoreInternal(this.conf.getUnpartitionedStreamName(), str);
    }

    private SubscriptionStateStore getSubscriptionStateStoreInternal(String str, String str2) {
        return new ZKSubscriptionStateStore(this.writerZKC, ZKLogMetadataForReader.getSubscriberPath(this.uri, this.name, str, str2));
    }

    @Override // com.twitter.distributedlog.DistributedLogManager
    public SubscriptionsStore getSubscriptionsStore() {
        return getSubscriptionsStoreInternal(this.conf.getUnpartitionedStreamName());
    }

    private SubscriptionsStore getSubscriptionsStoreInternal(String str) {
        return new ZKSubscriptionsStore(this.writerZKC, ZKLogMetadataForReader.getSubscribersPath(this.uri, this.name, str));
    }
}
