package com.twitter.distributedlog.service;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.acl.AccessControlManager;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.RegionUnavailableException;
import com.twitter.distributedlog.exceptions.ServiceUnavailableException;
import com.twitter.distributedlog.exceptions.StreamUnavailableException;
import com.twitter.distributedlog.exceptions.TooManyStreamsException;
import com.twitter.distributedlog.feature.AbstractFeatureProvider;
import com.twitter.distributedlog.namespace.DistributedLogNamespace;
import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
import com.twitter.distributedlog.rate.MovingAverageRate;
import com.twitter.distributedlog.rate.MovingAverageRateFactory;
import com.twitter.distributedlog.service.config.ServerConfiguration;
import com.twitter.distributedlog.service.config.StreamConfigProvider;
import com.twitter.distributedlog.service.stream.BulkWriteOp;
import com.twitter.distributedlog.service.stream.CreateOp;
import com.twitter.distributedlog.service.stream.DeleteOp;
import com.twitter.distributedlog.service.stream.HeartbeatOp;
import com.twitter.distributedlog.service.stream.ReleaseOp;
import com.twitter.distributedlog.service.stream.Stream;
import com.twitter.distributedlog.service.stream.StreamFactory;
import com.twitter.distributedlog.service.stream.StreamFactoryImpl;
import com.twitter.distributedlog.service.stream.StreamManager;
import com.twitter.distributedlog.service.stream.StreamManagerImpl;
import com.twitter.distributedlog.service.stream.StreamOp;
import com.twitter.distributedlog.service.stream.StreamOpStats;
import com.twitter.distributedlog.service.stream.TruncateOp;
import com.twitter.distributedlog.service.stream.WriteOp;
import com.twitter.distributedlog.service.stream.WriteOpWithPayload;
import com.twitter.distributedlog.service.stream.limiter.ServiceRequestLimiter;
import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
import com.twitter.distributedlog.thrift.service.BulkWriteResponse;
import com.twitter.distributedlog.thrift.service.ClientInfo;
import com.twitter.distributedlog.thrift.service.DistributedLogService;
import com.twitter.distributedlog.thrift.service.HeartbeatOptions;
import com.twitter.distributedlog.thrift.service.ResponseHeader;
import com.twitter.distributedlog.thrift.service.ServerInfo;
import com.twitter.distributedlog.thrift.service.ServerStatus;
import com.twitter.distributedlog.thrift.service.StatusCode;
import com.twitter.distributedlog.thrift.service.WriteContext;
import com.twitter.distributedlog.thrift.service.WriteResponse;
import com.twitter.distributedlog.util.ConfUtils;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.distributedlog.util.SchedulerUtils;
import com.twitter.util.Await;
import com.twitter.util.Duration;
import com.twitter.util.Function0;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.ScheduledThreadPoolTimer;
import com.twitter.util.Timer;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.StatsLogger;
import org.jboss.netty.util.HashedWheelTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:com/twitter/distributedlog/service/DistributedLogServiceImpl.class */
public class DistributedLogServiceImpl implements DistributedLogService.ServiceIface, FatalErrorHandler {
    static final Logger logger = LoggerFactory.getLogger(DistributedLogServiceImpl.class);
    private final ServerConfiguration serverConfig;
    private final DistributedLogConfiguration dlConfig;
    private final DistributedLogNamespace dlNamespace;
    private final int serverRegionId;
    private final CountDownLatch keepAliveLatch;
    private final byte dlsnVersion;
    private final String clientId;
    private final OrderedScheduler scheduler;
    private final AccessControlManager accessControlManager;
    private final StreamConfigProvider streamConfigProvider;
    private final StreamManager streamManager;
    private final StreamFactory streamFactory;
    private final MovingAverageRateFactory movingAvgFactory;
    private final MovingAverageRate windowedRps;
    private final MovingAverageRate windowedBps;
    private final ServiceRequestLimiter limiter;
    private final Timer timer;
    private final HashedWheelTimer requestTimer;
    private final FeatureProvider featureProvider;
    private final Feature featureRegionStopAcceptNewStream;
    private final Feature featureChecksumDisabled;
    private final Feature limiterDisabledFeature;
    private final StatsLogger statsLogger;
    private final StatsLogger perStreamStatsLogger;
    private final StreamOpStats streamOpStats;
    private final Counter bulkWritePendingStat;
    private final Counter writePendingStat;
    private final Counter redirects;
    private final Counter receivedRecordCounter;
    private final StatsLogger statusCodeStatLogger;
    private final Counter statusCodeTotal;
    private final int MOVING_AVERAGE_WINDOW_SECS = 60;
    private ServerStatus serverStatus = ServerStatus.WRITE_AND_ACCEPT;
    private final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
    private final ConcurrentHashMap<StatusCode, Counter> statusCodeCounters = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public DistributedLogServiceImpl(ServerConfiguration serverConfiguration, DistributedLogConfiguration distributedLogConfiguration, DynamicDistributedLogConfiguration dynamicDistributedLogConfiguration, StreamConfigProvider streamConfigProvider, URI uri, StreamPartitionConverter streamPartitionConverter, StatsLogger statsLogger, StatsLogger statsLogger2, CountDownLatch countDownLatch) throws IOException {
        this.serverConfig = serverConfiguration;
        this.dlConfig = distributedLogConfiguration;
        this.perStreamStatsLogger = statsLogger2;
        this.dlsnVersion = serverConfiguration.getDlsnVersion();
        this.serverRegionId = serverConfiguration.getRegionId();
        int serverPort = serverConfiguration.getServerPort();
        int serverShardId = serverConfiguration.getServerShardId();
        int serverThreads = serverConfiguration.getServerThreads();
        this.clientId = DLSocketAddress.toLockId(DLSocketAddress.getSocketAddress(serverPort), serverShardId);
        String format = String.format("allocator_%04d_%010d", Integer.valueOf(this.serverRegionId), Integer.valueOf(serverShardId));
        distributedLogConfiguration.setLedgerAllocatorPoolName(format);
        this.featureProvider = AbstractFeatureProvider.getFeatureProvider("", distributedLogConfiguration, statsLogger.scope("features"));
        if (this.featureProvider instanceof AbstractFeatureProvider) {
            this.featureProvider.start();
        }
        this.dlNamespace = DistributedLogNamespaceBuilder.newBuilder().conf(distributedLogConfiguration).uri(uri).statsLogger(statsLogger).featureProvider(this.featureProvider).clientId(this.clientId).regionId(this.serverRegionId).build();
        this.accessControlManager = this.dlNamespace.createAccessControlManager();
        this.keepAliveLatch = countDownLatch;
        this.streamConfigProvider = streamConfigProvider;
        this.streamOpStats = new StreamOpStats(statsLogger, statsLogger2);
        this.scheduler = OrderedScheduler.newBuilder().corePoolSize(serverThreads).name("DistributedLogService-Executor").traceTaskExecution(true).statsLogger(statsLogger.scope("scheduler")).build();
        this.requestTimer = new HashedWheelTimer(new ThreadFactoryBuilder().setNameFormat("DLServiceTimer-%d").build(), distributedLogConfiguration.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, distributedLogConfiguration.getTimeoutTimerNumTicks());
        this.streamFactory = new StreamFactoryImpl(this.clientId, this.streamOpStats, serverConfiguration, distributedLogConfiguration, this.featureProvider, streamConfigProvider, streamPartitionConverter, this.dlNamespace, this.scheduler, this, this.requestTimer);
        this.streamManager = new StreamManagerImpl(this.clientId, distributedLogConfiguration, this.scheduler, this.streamFactory, streamPartitionConverter, streamConfigProvider, this.dlNamespace);
        this.featureRegionStopAcceptNewStream = this.featureProvider.getFeature(ServerFeatureKeys.REGION_STOP_ACCEPT_NEW_STREAM.name().toLowerCase());
        this.featureChecksumDisabled = this.featureProvider.getFeature(ServerFeatureKeys.SERVICE_CHECKSUM_DISABLED.name().toLowerCase());
        this.limiterDisabledFeature = this.featureProvider.getFeature(ServerFeatureKeys.SERVICE_GLOBAL_LIMITER_DISABLED.name().toLowerCase());
        this.timer = new ScheduledThreadPoolTimer(1, "timer", true);
        this.movingAvgFactory = new MovingAverageRateFactory(this.timer);
        this.windowedRps = this.movingAvgFactory.create(60);
        this.windowedBps = this.movingAvgFactory.create(60);
        this.limiter = new ServiceRequestLimiter(dynamicDistributedLogConfiguration, this.streamOpStats.baseScope("service_limiter"), this.windowedRps, this.windowedBps, this.streamManager, this.limiterDisabledFeature);
        this.statsLogger = statsLogger;
        statsLogger.registerGauge("proxy_status", new Gauge<Number>() { // from class: com.twitter.distributedlog.service.DistributedLogServiceImpl.1
            public Number getDefaultValue() {
                return 0;
            }

            public Number getSample() {
                return Integer.valueOf(ServerStatus.DOWN == DistributedLogServiceImpl.this.serverStatus ? -1 : DistributedLogServiceImpl.this.featureRegionStopAcceptNewStream.isAvailable() ? 3 : ServerStatus.WRITE_AND_ACCEPT == DistributedLogServiceImpl.this.serverStatus ? 1 : 2);
            }
        });
        statsLogger.registerGauge("moving_avg_rps", new Gauge<Number>() { // from class: com.twitter.distributedlog.service.DistributedLogServiceImpl.2
            public Number getDefaultValue() {
                return 0;
            }

            public Number getSample() {
                return Double.valueOf(DistributedLogServiceImpl.this.windowedRps.get());
            }
        });
        statsLogger.registerGauge("moving_avg_bps", new Gauge<Number>() { // from class: com.twitter.distributedlog.service.DistributedLogServiceImpl.3
            public Number getDefaultValue() {
                return 0;
            }

            public Number getSample() {
                return Double.valueOf(DistributedLogServiceImpl.this.windowedBps.get());
            }
        });
        this.bulkWritePendingStat = this.streamOpStats.requestPendingCounter("bulkWritePending");
        this.writePendingStat = this.streamOpStats.requestPendingCounter("writePending");
        this.redirects = this.streamOpStats.requestCounter("redirect");
        this.statusCodeStatLogger = this.streamOpStats.requestScope("statuscode");
        this.statusCodeTotal = this.streamOpStats.requestCounter("statuscode_count");
        this.receivedRecordCounter = this.streamOpStats.recordsCounter("received");
        StatsLogger scope = statsLogger.scope("streams");
        scope.registerGauge("acquired", new Gauge<Number>() { // from class: com.twitter.distributedlog.service.DistributedLogServiceImpl.4
            public Number getDefaultValue() {
                return 0;
            }

            public Number getSample() {
                return Integer.valueOf(DistributedLogServiceImpl.this.streamManager.numAcquired());
            }
        });
        scope.registerGauge("cached", new Gauge<Number>() { // from class: com.twitter.distributedlog.service.DistributedLogServiceImpl.5
            public Number getDefaultValue() {
                return 0;
            }

            public Number getSample() {
                return Integer.valueOf(DistributedLogServiceImpl.this.streamManager.numCached());
            }
        });
        logger.info("Running distributedlog server : client id {}, allocator pool {}, perstream stat {}, dlsn version {}.", new Object[]{this.clientId, format, Boolean.valueOf(serverConfiguration.isPerStreamStatEnabled()), Byte.valueOf(this.dlsnVersion)});
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void countStatusCode(StatusCode statusCode) {
        Counter counter = this.statusCodeCounters.get(statusCode);
        if (null == counter) {
            counter = this.statusCodeStatLogger.getCounter(statusCode.name());
            Counter putIfAbsent = this.statusCodeCounters.putIfAbsent(statusCode, counter);
            if (null != putIfAbsent) {
                counter = putIfAbsent;
            }
        }
        counter.inc();
        this.statusCodeTotal.inc();
    }

    public Future<ServerInfo> handshake() {
        return handshakeWithClientInfo(new ClientInfo());
    }

    public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) {
        ServerInfo serverInfo = new ServerInfo();
        this.closeLock.readLock().lock();
        try {
            serverInfo.setServerStatus(this.serverStatus);
            this.closeLock.readLock().unlock();
            if (clientInfo.isSetGetOwnerships() && !clientInfo.isGetOwnerships()) {
                return Future.value(serverInfo);
            }
            Optional<String> absent = Optional.absent();
            if (clientInfo.isSetStreamNameRegex()) {
                absent = Optional.of(clientInfo.getStreamNameRegex());
            }
            serverInfo.setOwnerships(this.streamManager.getStreamOwnershipMap(absent));
            return Future.value(serverInfo);
        } catch (Throwable th) {
            this.closeLock.readLock().unlock();
            throw th;
        }
    }

    @VisibleForTesting
    Stream getLogWriter(String str) throws IOException {
        Stream stream = this.streamManager.getStream(str);
        if (null == stream) {
            this.closeLock.readLock().lock();
            try {
                if (this.featureRegionStopAcceptNewStream.isAvailable()) {
                    throw new RegionUnavailableException("Region is unavailable right now.");
                }
                if (ServerStatus.WRITE_AND_ACCEPT != this.serverStatus) {
                    return null;
                }
                stream = this.streamManager.getOrCreateStream(str);
                this.closeLock.readLock().unlock();
            } finally {
                this.closeLock.readLock().unlock();
            }
        }
        return stream;
    }

    public Future<WriteResponse> write(String str, ByteBuffer byteBuffer) {
        this.receivedRecordCounter.inc();
        return doWrite(str, byteBuffer, null, false);
    }

    public Future<BulkWriteResponse> writeBulkWithContext(String str, List<ByteBuffer> list, WriteContext writeContext) {
        this.bulkWritePendingStat.inc();
        this.receivedRecordCounter.add(list.size());
        BulkWriteOp bulkWriteOp = new BulkWriteOp(str, list, this.statsLogger, this.perStreamStatsLogger, getChecksum(writeContext), this.featureChecksumDisabled, this.accessControlManager);
        executeStreamOp(bulkWriteOp);
        return bulkWriteOp.result().ensure(new Function0<BoxedUnit>() { // from class: com.twitter.distributedlog.service.DistributedLogServiceImpl.6
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public BoxedUnit m4apply() {
                DistributedLogServiceImpl.this.bulkWritePendingStat.dec();
                return null;
            }
        });
    }

    public Future<WriteResponse> writeWithContext(String str, ByteBuffer byteBuffer, WriteContext writeContext) {
        return doWrite(str, byteBuffer, getChecksum(writeContext), writeContext.isIsRecordSet());
    }

    public Future<WriteResponse> heartbeat(String str, WriteContext writeContext) {
        HeartbeatOp heartbeatOp = new HeartbeatOp(str, this.statsLogger, this.perStreamStatsLogger, this.dlsnVersion, getChecksum(writeContext), this.featureChecksumDisabled, this.accessControlManager);
        executeStreamOp(heartbeatOp);
        return heartbeatOp.result();
    }

    public Future<WriteResponse> heartbeatWithOptions(String str, WriteContext writeContext, HeartbeatOptions heartbeatOptions) {
        HeartbeatOp heartbeatOp = new HeartbeatOp(str, this.statsLogger, this.perStreamStatsLogger, this.dlsnVersion, getChecksum(writeContext), this.featureChecksumDisabled, this.accessControlManager);
        if (heartbeatOptions.isSendHeartBeatToReader()) {
            heartbeatOp.setWriteControlRecord(true);
        }
        executeStreamOp(heartbeatOp);
        return heartbeatOp.result();
    }

    public Future<WriteResponse> truncate(String str, String str2, WriteContext writeContext) {
        TruncateOp truncateOp = new TruncateOp(str, DLSN.deserialize(str2), this.statsLogger, this.perStreamStatsLogger, getChecksum(writeContext), this.featureChecksumDisabled, this.accessControlManager);
        executeStreamOp(truncateOp);
        return truncateOp.result();
    }

    public Future<WriteResponse> delete(String str, WriteContext writeContext) {
        DeleteOp deleteOp = new DeleteOp(str, this.statsLogger, this.perStreamStatsLogger, this.streamManager, getChecksum(writeContext), this.featureChecksumDisabled, this.accessControlManager);
        executeStreamOp(deleteOp);
        return deleteOp.result();
    }

    public Future<WriteResponse> release(String str, WriteContext writeContext) {
        ReleaseOp releaseOp = new ReleaseOp(str, this.statsLogger, this.perStreamStatsLogger, this.streamManager, getChecksum(writeContext), this.featureChecksumDisabled, this.accessControlManager);
        executeStreamOp(releaseOp);
        return releaseOp.result();
    }

    public Future<WriteResponse> create(String str, WriteContext writeContext) {
        CreateOp createOp = new CreateOp(str, this.statsLogger, this.streamManager, getChecksum(writeContext), this.featureChecksumDisabled);
        executeStreamOp(createOp);
        return createOp.result();
    }

    public Future<Void> setAcceptNewStream(boolean z) {
        this.closeLock.writeLock().lock();
        try {
            logger.info("Set AcceptNewStream = {}", Boolean.valueOf(z));
            if (ServerStatus.DOWN != this.serverStatus) {
                if (z) {
                    this.serverStatus = ServerStatus.WRITE_AND_ACCEPT;
                } else {
                    this.serverStatus = ServerStatus.WRITE_ONLY;
                }
            }
            return Future.Void();
        } finally {
            this.closeLock.writeLock().unlock();
        }
    }

    private Future<WriteResponse> doWrite(String str, ByteBuffer byteBuffer, Long l, boolean z) {
        this.writePendingStat.inc();
        this.receivedRecordCounter.inc();
        WriteOp newWriteOp = newWriteOp(str, byteBuffer, l, z);
        executeStreamOp(newWriteOp);
        return newWriteOp.result().ensure(new Function0<BoxedUnit>() { // from class: com.twitter.distributedlog.service.DistributedLogServiceImpl.7
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public BoxedUnit m5apply() {
                DistributedLogServiceImpl.this.writePendingStat.dec();
                return null;
            }
        });
    }

    private Long getChecksum(WriteContext writeContext) {
        if (writeContext.isSetCrc32()) {
            return Long.valueOf(writeContext.getCrc32());
        }
        return null;
    }

    private void executeStreamOp(StreamOp streamOp) {
        streamOp.responseHeader().addEventListener(new FutureEventListener<ResponseHeader>() { // from class: com.twitter.distributedlog.service.DistributedLogServiceImpl.8
            public void onSuccess(ResponseHeader responseHeader) {
                if (responseHeader.getLocation() != null || responseHeader.getCode() == StatusCode.FOUND) {
                    DistributedLogServiceImpl.this.redirects.inc();
                }
                DistributedLogServiceImpl.this.countStatusCode(responseHeader.getCode());
            }

            public void onFailure(Throwable th) {
            }
        });
        try {
            this.limiter.apply(streamOp);
            streamOp.preExecute();
            try {
                Stream logWriter = getLogWriter(streamOp.streamName());
                if (null == logWriter) {
                    streamOp.fail(new ServiceUnavailableException("Server " + this.clientId + " is closed."));
                    return;
                }
                if (streamOp instanceof WriteOpWithPayload) {
                    this.windowedBps.add(((WriteOpWithPayload) streamOp).getPayloadSize());
                    this.windowedRps.inc();
                }
                logWriter.submit(streamOp);
            } catch (IOException e) {
                streamOp.fail(e);
            } catch (RegionUnavailableException e2) {
                streamOp.fail(new RegionUnavailableException("Region " + this.serverRegionId + " is unavailable."));
            }
        } catch (Exception e3) {
            streamOp.fail(e3);
        } catch (TooManyStreamsException e4) {
            streamOp.fail(new StreamUnavailableException(e4.getMessage()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() {
        try {
            try {
                this.closeLock.writeLock().lock();
                try {
                    if (ServerStatus.DOWN == this.serverStatus) {
                        this.keepAliveLatch.countDown();
                        logger.info("Finished shutting down distributedlog service.");
                        return;
                    }
                    this.serverStatus = ServerStatus.DOWN;
                    this.closeLock.writeLock().unlock();
                    this.streamManager.close();
                    this.movingAvgFactory.close();
                    this.limiter.close();
                    Stopwatch createStarted = Stopwatch.createStarted();
                    Future<List<Void>> closeStreams = this.streamManager.closeStreams();
                    logger.info("Waiting for closing all streams ...");
                    try {
                        Await.result(closeStreams, Duration.fromTimeUnit(5L, TimeUnit.MINUTES));
                        logger.info("Closed all streams in {} millis.", Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)));
                    } catch (InterruptedException e) {
                        logger.warn("Interrupted on waiting for closing all streams : ", e);
                        Thread.currentThread().interrupt();
                    } catch (Exception e2) {
                        logger.warn("Sorry, we didn't close all streams gracefully in 5 minutes : ", e2);
                    }
                    logger.info("Closing distributedlog namespace ...");
                    this.dlNamespace.close();
                    logger.info("Closed distributedlog namespace .");
                    if (this.featureProvider instanceof AbstractFeatureProvider) {
                        this.featureProvider.stop();
                    }
                    this.timer.stop();
                    SchedulerUtils.shutdownScheduler(this.scheduler, 60L, TimeUnit.SECONDS);
                    this.keepAliveLatch.countDown();
                    logger.info("Finished shutting down distributedlog service.");
                } finally {
                    this.closeLock.writeLock().unlock();
                }
            } catch (Exception e3) {
                logger.info("Exception while shutting down distributedlog service.");
                this.keepAliveLatch.countDown();
                logger.info("Finished shutting down distributedlog service.");
            }
        } catch (Throwable th) {
            this.keepAliveLatch.countDown();
            logger.info("Finished shutting down distributedlog service.");
            throw th;
        }
    }

    @Override // com.twitter.distributedlog.service.FatalErrorHandler
    public void notifyFatalError() {
        triggerShutdown();
    }

    private void triggerShutdown() {
        logger.info("Releasing KeepAlive Latch to trigger shutdown ...");
        this.keepAliveLatch.countDown();
        logger.info("Released KeepAlive Latch. Main thread will shut the service down.");
    }

    @VisibleForTesting
    java.util.concurrent.Future<?> schedule(Runnable runnable, long j) {
        this.closeLock.readLock().lock();
        try {
            try {
                if (this.serverStatus != ServerStatus.WRITE_AND_ACCEPT) {
                    this.closeLock.readLock().unlock();
                    return null;
                }
                if (j > 0) {
                    ScheduledFuture schedule = this.scheduler.schedule(runnable, j, TimeUnit.MILLISECONDS);
                    this.closeLock.readLock().unlock();
                    return schedule;
                }
                java.util.concurrent.Future<?> submit = this.scheduler.submit(runnable);
                this.closeLock.readLock().unlock();
                return submit;
            } catch (RejectedExecutionException e) {
                logger.error("Failed to schedule task {} in {} ms : ", new Object[]{runnable, Long.valueOf(j), e});
                this.closeLock.readLock().unlock();
                return null;
            }
        } catch (Throwable th) {
            this.closeLock.readLock().unlock();
            throw th;
        }
    }

    private DynamicDistributedLogConfiguration getDynConf(String str) {
        Optional<DynamicDistributedLogConfiguration> dynamicStreamConfig = this.streamConfigProvider.getDynamicStreamConfig(str);
        return dynamicStreamConfig.isPresent() ? (DynamicDistributedLogConfiguration) dynamicStreamConfig.get() : ConfUtils.getConstDynConf(this.dlConfig);
    }

    @VisibleForTesting
    Stream newStream(String str) {
        return this.streamFactory.create(str, getDynConf(str), this.streamManager);
    }

    @VisibleForTesting
    WriteOp newWriteOp(String str, ByteBuffer byteBuffer, Long l) {
        return newWriteOp(str, byteBuffer, l, false);
    }

    WriteOp newWriteOp(String str, ByteBuffer byteBuffer, Long l, boolean z) {
        return new WriteOp(str, byteBuffer, this.statsLogger, this.perStreamStatsLogger, this.serverConfig, this.dlsnVersion, l, z, this.featureChecksumDisabled, this.accessControlManager);
    }

    @VisibleForTesting
    Future<List<Void>> closeStreams() {
        return this.streamManager.closeStreams();
    }

    @VisibleForTesting
    public DistributedLogNamespace getDistributedLogNamespace() {
        return this.dlNamespace;
    }

    @VisibleForTesting
    StreamManager getStreamManager() {
        return this.streamManager;
    }
}
