package com.twitter.distributedlog.service.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.twitter.distributedlog.AsyncLogWriter;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.DistributedLogManager;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.AlreadyClosedException;
import com.twitter.distributedlog.exceptions.DLException;
import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
import com.twitter.distributedlog.exceptions.OverCapacityException;
import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
import com.twitter.distributedlog.exceptions.StreamNotReadyException;
import com.twitter.distributedlog.exceptions.StreamUnavailableException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
import com.twitter.distributedlog.io.Abortables;
import com.twitter.distributedlog.io.AsyncAbortable;
import com.twitter.distributedlog.namespace.DistributedLogNamespace;
import com.twitter.distributedlog.service.FatalErrorHandler;
import com.twitter.distributedlog.service.ServerFeatureKeys;
import com.twitter.distributedlog.service.config.ServerConfiguration;
import com.twitter.distributedlog.service.config.StreamConfigProvider;
import com.twitter.distributedlog.service.stream.limiter.StreamRequestLimiter;
import com.twitter.distributedlog.service.streamset.Partition;
import com.twitter.distributedlog.stats.BroadCastStatsLogger;
import com.twitter.distributedlog.thrift.service.StatusCode;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.distributedlog.util.TimeSequencer;
import com.twitter.distributedlog.util.Utils;
import com.twitter.util.Function0;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    */
/* loaded from: input_file:com/twitter/distributedlog/service/stream/StreamImpl.class */
public class StreamImpl implements Stream {
    static final Logger logger = LoggerFactory.getLogger(StreamImpl.class);
    private final String name;
    private final Partition partition;
    private DistributedLogManager manager;
    private volatile AsyncLogWriter writer;
    private volatile String owner;
    private volatile Throwable lastException;
    private final long nextAcquireWaitTimeMs;
    private final StreamRequestLimiter limiter;
    private final DynamicDistributedLogConfiguration dynConf;
    private final DistributedLogConfiguration dlConfig;
    private final DistributedLogNamespace dlNamespace;
    private final String clientId;
    private final OrderedScheduler scheduler;
    private final Feature featureRateLimitDisabled;
    private final StreamManager streamManager;
    private final StreamConfigProvider streamConfigProvider;
    private final FatalErrorHandler fatalErrorHandler;
    private final long streamProbationTimeoutMs;
    private long serviceTimeoutMs;
    private final boolean failFastOnStreamNotReady;
    private final HashedWheelTimer requestTimer;
    private final StatsLogger streamLogger;
    private final StatsLogger streamExceptionStatLogger;
    private final StatsLogger limiterStatLogger;
    private final Counter serviceTimeout;
    private final OpStatsLogger streamAcquireStat;
    private final Counter pendingOpsCounter;
    private final Counter unexpectedExceptions;
    private final StatsLogger exceptionStatLogger;
    private volatile boolean writeSinceLastAcquire = false;
    private volatile boolean running = true;
    private volatile boolean suspended = false;
    private volatile Queue<StreamOp> pendingOps = new ArrayDeque();
    private final Promise<Void> closePromise = new Promise<>();
    private final Object txnLock = new Object();
    private final TimeSequencer sequencer = new TimeSequencer();
    private final Stopwatch lastAcquireWatch = Stopwatch.createUnstarted();
    private final Stopwatch lastAcquireFailureWatch = Stopwatch.createUnstarted();
    private ScheduledFuture<?> tryAcquireScheduledFuture = null;
    private long scheduledAcquireDelayMs = 0;
    private final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
    private final ConcurrentHashMap<String, Counter> exceptionCounters = new ConcurrentHashMap<>();
    private volatile StreamStatus status = StreamStatus.UNINITIALIZED;

    /* renamed from: com.twitter.distributedlog.service.stream.StreamImpl$1 */
    /* loaded from: input_file:com/twitter/distributedlog/service/stream/StreamImpl$1.class */
    class AnonymousClass1 implements Gauge<Number> {
        AnonymousClass1() {
        }

        public Number getDefaultValue() {
            return Integer.valueOf(StreamStatus.UNINITIALIZED.getCode());
        }

        public Number getSample() {
            return Integer.valueOf(StreamImpl.this.status.getCode());
        }
    }

    /* renamed from: com.twitter.distributedlog.service.stream.StreamImpl$10 */
    /* loaded from: input_file:com/twitter/distributedlog/service/stream/StreamImpl$10.class */
    public class AnonymousClass10 implements FutureEventListener<Void> {
        AnonymousClass10() {
        }

        public void onSuccess(Void r4) {
            StreamImpl.this.closeManagerAndErrorOutPendingRequests();
            FutureUtils.setValue(StreamImpl.this.closePromise, (Object) null);
        }

        public void onFailure(Throwable th) {
            StreamImpl.this.closeManagerAndErrorOutPendingRequests();
            FutureUtils.setValue(StreamImpl.this.closePromise, (Object) null);
        }
    }

    /* renamed from: com.twitter.distributedlog.service.stream.StreamImpl$11 */
    /* loaded from: input_file:com/twitter/distributedlog/service/stream/StreamImpl$11.class */
    public static /* synthetic */ class AnonymousClass11 {
        static final /* synthetic */ int[] $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode = new int[StatusCode.values().length];

        static {
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.FOUND.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.ALREADY_CLOSED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.NOT_IMPLEMENTED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.METADATA_EXCEPTION.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.LOG_EMPTY.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.LOG_NOT_FOUND.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.TRUNCATED_TRANSACTION.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.END_OF_STREAM.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.TRANSACTION_OUT_OF_ORDER.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.INVALID_STREAM_NAME.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.TOO_LARGE_RECORD.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.STREAM_NOT_READY.ordinal()] = 12;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[StatusCode.OVER_CAPACITY.ordinal()] = 13;
            } catch (NoSuchFieldError e13) {
            }
            $SwitchMap$com$twitter$distributedlog$service$stream$StreamImpl$StreamStatus = new int[StreamStatus.values().length];
            try {
                $SwitchMap$com$twitter$distributedlog$service$stream$StreamImpl$StreamStatus[StreamStatus.INITIALIZING.ordinal()] = 1;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$service$stream$StreamImpl$StreamStatus[StreamStatus.FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e15) {
            }
            try {
                $SwitchMap$com$twitter$distributedlog$service$stream$StreamImpl$StreamStatus[StreamStatus.BACKOFF.ordinal()] = 3;
            } catch (NoSuchFieldError e16) {
            }
        }
    }

    /* renamed from: com.twitter.distributedlog.service.stream.StreamImpl$2 */
    /* loaded from: input_file:com/twitter/distributedlog/service/stream/StreamImpl$2.class */
    public class AnonymousClass2 implements FutureEventListener<Boolean> {
        AnonymousClass2() {
        }

        public void onSuccess(Boolean bool) {
            synchronized (StreamImpl.this) {
                StreamImpl.access$102(StreamImpl.this, 0L);
                StreamImpl.this.tryAcquireScheduledFuture = null;
            }
            if (bool.booleanValue()) {
                return;
            }
            StreamImpl.this.scheduleTryAcquireOnce(StreamImpl.this.nextAcquireWaitTimeMs);
        }

        public void onFailure(Throwable th) {
            StreamImpl.logger.error("Stream {} threw unhandled exception : ", StreamImpl.this.name, th);
            StreamImpl.this.setStreamInErrorStatus();
            StreamImpl.this.requestClose("Unhandled exception");
        }
    }

    /* renamed from: com.twitter.distributedlog.service.stream.StreamImpl$3 */
    /* loaded from: input_file:com/twitter/distributedlog/service/stream/StreamImpl$3.class */
    public class AnonymousClass3 implements Runnable {
        AnonymousClass3() {
        }

        @Override // java.lang.Runnable
        public void run() {
            StreamImpl.this.tryAcquireStreamOnce();
        }
    }

    /* renamed from: com.twitter.distributedlog.service.stream.StreamImpl$4 */
    /* loaded from: input_file:com/twitter/distributedlog/service/stream/StreamImpl$4.class */
    public class AnonymousClass4 implements TimerTask {
        final /* synthetic */ StreamOp val$op;

        AnonymousClass4(StreamOp streamOp) {
            r5 = streamOp;
        }

        public void run(Timeout timeout) throws Exception {
            if (timeout.isCancelled()) {
                return;
            }
            StreamImpl.this.serviceTimeout.inc();
            StreamImpl.this.handleServiceTimeout("Operation " + r5.getClass().getName() + " timeout");
        }
    }

    /* renamed from: com.twitter.distributedlog.service.stream.StreamImpl$5 */
    /* loaded from: input_file:com/twitter/distributedlog/service/stream/StreamImpl$5.class */
    public class AnonymousClass5 extends Function0<BoxedUnit> {
        final /* synthetic */ Timeout val$timeout;

        AnonymousClass5(Timeout timeout) {
            r5 = timeout;
        }

        /* renamed from: apply */
        public BoxedUnit m22apply() {
            r5.cancel();
            return null;
        }
    }

    /* renamed from: com.twitter.distributedlog.service.stream.StreamImpl$6 */
    /* loaded from: input_file:com/twitter/distributedlog/service/stream/StreamImpl$6.class */
    public class AnonymousClass6 extends AbstractFunction1<Void, BoxedUnit> {
        AnonymousClass6() {
        }

        public BoxedUnit apply(Void r6) {
            StreamImpl.this.streamManager.scheduleRemoval(StreamImpl.this, StreamImpl.this.streamProbationTimeoutMs);
            return BoxedUnit.UNIT;
        }
    }

    /* renamed from: com.twitter.distributedlog.service.stream.StreamImpl$7 */
    /* loaded from: input_file:com/twitter/distributedlog/service/stream/StreamImpl$7.class */
    public class AnonymousClass7 implements FutureEventListener<Void> {
        static final /* synthetic */ boolean $assertionsDisabled;
        final /* synthetic */ StreamOp val$op;

        AnonymousClass7(StreamOp streamOp) {
            r5 = streamOp;
        }

        public void onSuccess(Void r2) {
        }

        public void onFailure(Throwable th) {
            boolean z = true;
            if (th instanceof DLException) {
                switch (AnonymousClass11.$SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[((DLException) th).getCode().ordinal()]) {
                    case 1:
                        if (!$assertionsDisabled && !(th instanceof OwnershipAcquireFailedException)) {
                            throw new AssertionError();
                        }
                        z = false;
                        StreamImpl.this.handleOwnershipAcquireFailedException(r5, (OwnershipAcquireFailedException) th);
                        break;
                    case 2:
                        if (!$assertionsDisabled && !(th instanceof AlreadyClosedException)) {
                            throw new AssertionError();
                        }
                        r5.fail(th);
                        StreamImpl.this.handleAlreadyClosedException((AlreadyClosedException) th);
                        break;
                        break;
                    case 3:
                    case 4:
                    case 5:
                    case 6:
                    case 7:
                    case 8:
                    case 9:
                    case 10:
                    case 11:
                    case 12:
                    case 13:
                        r5.fail(th);
                        break;
                    default:
                        StreamImpl.this.handleRecoverableDLException(r5, th);
                        break;
                }
            } else {
                StreamImpl.this.handleUnknownException(r5, th);
            }
            if (z) {
                StreamImpl.this.countException(th, StreamImpl.this.streamExceptionStatLogger);
            }
        }

        static {
            $assertionsDisabled = !StreamImpl.class.desiredAssertionStatus();
        }
    }

    /* renamed from: com.twitter.distributedlog.service.stream.StreamImpl$8 */
    /* loaded from: input_file:com/twitter/distributedlog/service/stream/StreamImpl$8.class */
    public class AnonymousClass8 implements FutureEventListener<AsyncLogWriter> {
        final /* synthetic */ Stopwatch val$stopwatch;
        final /* synthetic */ Promise val$acquirePromise;

        AnonymousClass8(Stopwatch stopwatch, Promise promise) {
            r5 = stopwatch;
            r6 = promise;
        }

        public void onSuccess(AsyncLogWriter asyncLogWriter) {
            AsyncAbortable streamStatus;
            Queue<StreamOp> queue;
            boolean z;
            synchronized (StreamImpl.this.txnLock) {
                StreamImpl.this.sequencer.setLastId(asyncLogWriter.getLastTxId());
            }
            synchronized (StreamImpl.this) {
                streamStatus = StreamImpl.this.setStreamStatus(StreamStatus.INITIALIZED, StreamStatus.INITIALIZING, asyncLogWriter, null, null);
                queue = StreamImpl.this.pendingOps;
                StreamImpl.this.pendingOps = new ArrayDeque();
                z = true;
            }
            if (!StreamImpl.this.streamManager.allowAcquire(StreamImpl.this)) {
                if (null != streamStatus) {
                    Abortables.asyncAbort(streamStatus, true);
                }
                Throwable streamUnavailableException = new StreamUnavailableException("Stream " + StreamImpl.this.partition.getStream() + " is not allowed to acquire more than " + StreamImpl.this.dynConf.getMaxAcquiredPartitionsPerProxy() + " partitions");
                StreamImpl.this.countException(streamUnavailableException, StreamImpl.this.exceptionStatLogger);
                StreamImpl.logger.error("Failed to acquire stream {} because it is unavailable : {}", StreamImpl.this.name, streamUnavailableException.getMessage());
                synchronized (this) {
                    streamStatus = StreamImpl.this.setStreamStatus(StreamStatus.ERROR, StreamStatus.INITIALIZED, null, null, streamUnavailableException);
                    z = false;
                }
            }
            processPendingRequestsAfterOpen(z, streamStatus, queue);
        }

        public void onFailure(Throwable th) {
            AsyncLogWriter streamStatus;
            Queue<StreamOp> queue;
            boolean z;
            if (th instanceof AlreadyClosedException) {
                StreamImpl.this.countException(th, StreamImpl.this.streamExceptionStatLogger);
                StreamImpl.this.handleAlreadyClosedException((AlreadyClosedException) th);
                return;
            }
            if (th instanceof OwnershipAcquireFailedException) {
                Throwable th2 = (OwnershipAcquireFailedException) th;
                StreamImpl.logger.warn("Failed to acquire stream ownership for {}, current owner is {} : {}", new Object[]{StreamImpl.this.name, th2.getCurrentOwner(), th2.getMessage()});
                synchronized (StreamImpl.this) {
                    streamStatus = StreamImpl.this.setStreamStatus(StreamStatus.BACKOFF, StreamStatus.INITIALIZING, null, th2.getCurrentOwner(), th2);
                    queue = StreamImpl.this.pendingOps;
                    StreamImpl.this.pendingOps = new ArrayDeque();
                    z = false;
                }
            } else if (th instanceof InvalidStreamNameException) {
                Throwable th3 = (InvalidStreamNameException) th;
                StreamImpl.this.countException(th3, StreamImpl.this.streamExceptionStatLogger);
                StreamImpl.logger.error("Failed to acquire stream {} due to its name is invalid", StreamImpl.this.name);
                synchronized (StreamImpl.this) {
                    streamStatus = StreamImpl.this.setStreamStatus(StreamStatus.ERROR, StreamStatus.INITIALIZING, null, null, th3);
                    queue = StreamImpl.this.pendingOps;
                    StreamImpl.this.pendingOps = new ArrayDeque();
                    z = false;
                }
            } else {
                StreamImpl.this.countException(th, StreamImpl.this.streamExceptionStatLogger);
                StreamImpl.logger.error("Failed to initialize stream {} : ", StreamImpl.this.name, th);
                synchronized (StreamImpl.this) {
                    streamStatus = StreamImpl.this.setStreamStatus(StreamStatus.FAILED, StreamStatus.INITIALIZING, null, null, th);
                    queue = StreamImpl.this.pendingOps;
                    StreamImpl.this.pendingOps = new ArrayDeque();
                    z = false;
                }
            }
            processPendingRequestsAfterOpen(z, streamStatus, queue);
        }

        void processPendingRequestsAfterOpen(boolean z, AsyncLogWriter asyncLogWriter, Queue<StreamOp> queue) {
            if (z) {
                StreamImpl.this.streamAcquireStat.registerSuccessfulEvent(r5.elapsed(TimeUnit.MICROSECONDS));
            } else {
                StreamImpl.this.streamAcquireStat.registerFailedEvent(r5.elapsed(TimeUnit.MICROSECONDS));
            }
            Iterator<StreamOp> it = queue.iterator();
            while (it.hasNext()) {
                StreamImpl.this.executeOp(it.next(), z);
                StreamImpl.this.pendingOpsCounter.dec();
            }
            Abortables.asyncAbort(asyncLogWriter, true);
            FutureUtils.setValue(r6, Boolean.valueOf(z));
        }
    }

    /* renamed from: com.twitter.distributedlog.service.stream.StreamImpl$9 */
    /* loaded from: input_file:com/twitter/distributedlog/service/stream/StreamImpl$9.class */
    public class AnonymousClass9 extends AbstractFunction1<Void, BoxedUnit> {
        AnonymousClass9() {
        }

        public BoxedUnit apply(Void r5) {
            if (StreamImpl.this.streamManager.notifyRemoved(StreamImpl.this)) {
                StreamImpl.logger.info("Removed cached stream {} after closed.", StreamImpl.this.name);
            }
            return BoxedUnit.UNIT;
        }
    }

    /* loaded from: input_file:com/twitter/distributedlog/service/stream/StreamImpl$StreamStatus.class */
    public enum StreamStatus {
        UNINITIALIZED(-1),
        INITIALIZING(0),
        INITIALIZED(1),
        FAILED(-2),
        BACKOFF(-3),
        CLOSING(-4),
        CLOSED(-5),
        ERROR(-6);

        final int code;

        StreamStatus(int i) {
            this.code = i;
        }

        int getCode() {
            return this.code;
        }

        static boolean isUnavailable(StreamStatus streamStatus) {
            return ERROR == streamStatus || CLOSING == streamStatus || CLOSED == streamStatus;
        }
    }

    public StreamImpl(String str, Partition partition, String str2, StreamManager streamManager, StreamOpStats streamOpStats, ServerConfiguration serverConfiguration, DistributedLogConfiguration distributedLogConfiguration, DynamicDistributedLogConfiguration dynamicDistributedLogConfiguration, FeatureProvider featureProvider, StreamConfigProvider streamConfigProvider, DistributedLogNamespace distributedLogNamespace, OrderedScheduler orderedScheduler, FatalErrorHandler fatalErrorHandler, HashedWheelTimer hashedWheelTimer) {
        this.clientId = str2;
        this.dlConfig = distributedLogConfiguration;
        this.streamManager = streamManager;
        this.name = str;
        this.partition = partition;
        this.lastException = new IOException("Fail to write record to stream " + str);
        this.nextAcquireWaitTimeMs = (distributedLogConfiguration.getZKSessionTimeoutMilliseconds() * 3) / 5;
        this.streamConfigProvider = streamConfigProvider;
        this.dlNamespace = distributedLogNamespace;
        this.featureRateLimitDisabled = featureProvider.getFeature(ServerFeatureKeys.SERVICE_RATE_LIMIT_DISABLED.name().toLowerCase());
        this.scheduler = orderedScheduler;
        this.serviceTimeoutMs = serverConfiguration.getServiceTimeoutMs();
        this.streamProbationTimeoutMs = serverConfiguration.getStreamProbationTimeoutMs();
        this.failFastOnStreamNotReady = distributedLogConfiguration.getFailFastOnStreamNotReady();
        this.fatalErrorHandler = fatalErrorHandler;
        this.dynConf = dynamicDistributedLogConfiguration;
        this.limiter = new StreamRequestLimiter(str, this.dynConf, BroadCastStatsLogger.two(streamOpStats.baseScope("stream_limiter"), streamOpStats.streamRequestScope(str, "limiter")), this.featureRateLimitDisabled);
        this.requestTimer = hashedWheelTimer;
        this.streamLogger = streamOpStats.streamRequestStatsLogger(str);
        this.limiterStatLogger = streamOpStats.baseScope("request_limiter");
        this.streamExceptionStatLogger = this.streamLogger.scope("exceptions");
        this.serviceTimeout = streamOpStats.baseCounter("serviceTimeout");
        this.streamAcquireStat = streamOpStats.baseScope("streams").getOpStatsLogger("acquire");
        this.pendingOpsCounter = streamOpStats.baseCounter("pending_ops");
        this.unexpectedExceptions = streamOpStats.baseCounter("unexpected_exceptions");
        this.exceptionStatLogger = streamOpStats.requestScope("exceptions");
    }

    @Override // com.twitter.distributedlog.service.stream.Stream
    public String getOwner() {
        return this.owner;
    }

    @Override // com.twitter.distributedlog.service.stream.Stream
    public String getStreamName() {
        return this.name;
    }

    @Override // com.twitter.distributedlog.service.stream.Stream
    public DynamicDistributedLogConfiguration getStreamConfiguration() {
        return this.dynConf;
    }

    @Override // com.twitter.distributedlog.service.stream.Stream
    public Partition getPartition() {
        return this.partition;
    }

    private DistributedLogManager openLog(String str) throws IOException {
        return this.dlNamespace.openLog(str, Optional.absent(), Optional.of(this.dynConf));
    }

    @Override // com.twitter.distributedlog.service.stream.Stream
    public void initialize() throws IOException {
        this.manager = openLog(this.name);
        this.streamLogger.registerGauge("stream_status", new Gauge<Number>() { // from class: com.twitter.distributedlog.service.stream.StreamImpl.1
            AnonymousClass1() {
            }

            public Number getDefaultValue() {
                return Integer.valueOf(StreamStatus.UNINITIALIZED.getCode());
            }

            public Number getSample() {
                return Integer.valueOf(StreamImpl.this.status.getCode());
            }
        });
        this.status = StreamStatus.INITIALIZING;
    }

    public String toString() {
        return String.format("Stream:%s, %s, %s Status:%s", this.name, this.manager, this.writer, this.status);
    }

    public void tryAcquireStreamOnce() {
        if (this.running) {
            boolean z = false;
            boolean z2 = false;
            synchronized (this) {
                switch (this.status) {
                    case INITIALIZING:
                        this.streamManager.notifyReleased(this);
                        z = true;
                        break;
                    case FAILED:
                        this.status = StreamStatus.INITIALIZING;
                        this.streamManager.notifyReleased(this);
                        z = true;
                        break;
                    case BACKOFF:
                        if (!this.writeSinceLastAcquire) {
                            z2 = true;
                            break;
                        } else {
                            this.status = StreamStatus.INITIALIZING;
                            this.streamManager.notifyReleased(this);
                            z = true;
                            break;
                        }
                }
            }
            if (z) {
                this.lastAcquireWatch.reset().start();
                acquireStream().addEventListener(new FutureEventListener<Boolean>() { // from class: com.twitter.distributedlog.service.stream.StreamImpl.2
                    AnonymousClass2() {
                    }

                    public void onSuccess(Boolean bool) {
                        synchronized (StreamImpl.this) {
                            StreamImpl.access$102(StreamImpl.this, 0L);
                            StreamImpl.this.tryAcquireScheduledFuture = null;
                        }
                        if (bool.booleanValue()) {
                            return;
                        }
                        StreamImpl.this.scheduleTryAcquireOnce(StreamImpl.this.nextAcquireWaitTimeMs);
                    }

                    public void onFailure(Throwable th) {
                        StreamImpl.logger.error("Stream {} threw unhandled exception : ", StreamImpl.this.name, th);
                        StreamImpl.this.setStreamInErrorStatus();
                        StreamImpl.this.requestClose("Unhandled exception");
                    }
                });
                return;
            }
            if (StreamStatus.isUnavailable(this.status)) {
                requestClose("Stream is unavailable anymore");
                return;
            }
            if (StreamStatus.INITIALIZED != this.status && this.lastAcquireWatch.elapsed(TimeUnit.HOURS) > 2) {
                requestClose("Stream not used anymore");
            } else if (z2) {
                synchronized (this) {
                    this.scheduledAcquireDelayMs = 0L;
                    this.tryAcquireScheduledFuture = null;
                }
                scheduleTryAcquireOnce(this.nextAcquireWaitTimeMs);
            }
        }
    }

    public synchronized void scheduleTryAcquireOnce(long j) {
        if (null != this.tryAcquireScheduledFuture) {
            if (j > 0 || this.scheduledAcquireDelayMs <= 0) {
                return;
            }
            if (this.scheduledAcquireDelayMs > 0 && !this.tryAcquireScheduledFuture.cancel(false)) {
                return;
            }
        }
        this.tryAcquireScheduledFuture = schedule(new Runnable() { // from class: com.twitter.distributedlog.service.stream.StreamImpl.3
            AnonymousClass3() {
            }

            @Override // java.lang.Runnable
            public void run() {
                StreamImpl.this.tryAcquireStreamOnce();
            }
        }, j);
        this.scheduledAcquireDelayMs = j;
    }

    @Override // com.twitter.distributedlog.service.stream.Stream
    public void start() {
        scheduleTryAcquireOnce(0L);
    }

    ScheduledFuture<?> schedule(Runnable runnable, long j) {
        if (!this.running) {
            return null;
        }
        try {
            return this.scheduler.schedule(this.name, runnable, j, TimeUnit.MILLISECONDS);
        } catch (RejectedExecutionException e) {
            logger.error("Failed to schedule task {} in {} ms : ", new Object[]{runnable, Long.valueOf(j), e});
            return null;
        }
    }

    void scheduleTimeout(StreamOp streamOp) {
        streamOp.responseHeader().ensure(new Function0<BoxedUnit>() { // from class: com.twitter.distributedlog.service.stream.StreamImpl.5
            final /* synthetic */ Timeout val$timeout;

            AnonymousClass5(Timeout timeout) {
                r5 = timeout;
            }

            /* renamed from: apply */
            public BoxedUnit m22apply() {
                r5.cancel();
                return null;
            }
        });
    }

    synchronized void handleServiceTimeout(String str) {
        if (StreamStatus.isUnavailable(this.status)) {
            return;
        }
        setStreamInErrorStatus();
        requestClose(str, false).onSuccess(new AbstractFunction1<Void, BoxedUnit>() { // from class: com.twitter.distributedlog.service.stream.StreamImpl.6
            AnonymousClass6() {
            }

            public BoxedUnit apply(Void r6) {
                StreamImpl.this.streamManager.scheduleRemoval(StreamImpl.this, StreamImpl.this.streamProbationTimeoutMs);
                return BoxedUnit.UNIT;
            }
        });
    }

    @Override // com.twitter.distributedlog.service.stream.Stream
    public void submit(StreamOp streamOp) {
        this.writeSinceLastAcquire = true;
        try {
            this.limiter.apply(streamOp);
            if (this.serviceTimeoutMs > 0) {
                scheduleTimeout(streamOp);
            }
            boolean z = false;
            boolean z2 = false;
            boolean z3 = true;
            if (StreamStatus.isUnavailable(this.status)) {
                streamOp.fail(new StreamUnavailableException("Stream " + this.name + " is closed."));
                return;
            }
            if (StreamStatus.INITIALIZED != this.status || this.writer == null) {
                synchronized (this) {
                    if (StreamStatus.isUnavailable(this.status)) {
                        z2 = true;
                        z3 = true;
                    }
                    if (StreamStatus.INITIALIZED == this.status) {
                        z2 = true;
                        z3 = true;
                    } else if (StreamStatus.BACKOFF == this.status && this.lastAcquireFailureWatch.elapsed(TimeUnit.MILLISECONDS) < this.nextAcquireWaitTimeMs) {
                        z2 = true;
                        z3 = false;
                    } else if (this.failFastOnStreamNotReady) {
                        z = true;
                        z2 = false;
                        z3 = false;
                        streamOp.fail(new StreamNotReadyException("Stream " + this.name + " is not ready; status = " + this.status));
                    } else {
                        z = true;
                        this.pendingOps.add(streamOp);
                        this.pendingOpsCounter.inc();
                        if (1 == this.pendingOps.size() && (streamOp instanceof HeartbeatOp)) {
                            ((HeartbeatOp) streamOp).setWriteControlRecord(true);
                        }
                    }
                }
            } else {
                z2 = true;
                z3 = true;
            }
            if (z && !this.suspended) {
                scheduleTryAcquireOnce(0L);
            }
            if (z2) {
                executeOp(streamOp, z3);
            }
        } catch (OverCapacityException e) {
            streamOp.fail(e);
        }
    }

    void executeOp(StreamOp streamOp, boolean z) {
        this.closeLock.readLock().lock();
        try {
            if (StreamStatus.isUnavailable(this.status)) {
                streamOp.fail(new StreamUnavailableException("Stream " + this.name + " is closed."));
                this.closeLock.readLock().unlock();
            } else {
                doExecuteOp(streamOp, z);
                this.closeLock.readLock().unlock();
            }
        } catch (Throwable th) {
            this.closeLock.readLock().unlock();
            throw th;
        }
    }

    private void doExecuteOp(StreamOp streamOp, boolean z) {
        AsyncLogWriter asyncLogWriter;
        Throwable th;
        synchronized (this) {
            asyncLogWriter = this.writer;
            th = this.lastException;
        }
        if (null == asyncLogWriter || !z) {
            streamOp.fail(th);
        } else {
            streamOp.execute(asyncLogWriter, this.sequencer, this.txnLock).addEventListener(new FutureEventListener<Void>() { // from class: com.twitter.distributedlog.service.stream.StreamImpl.7
                static final /* synthetic */ boolean $assertionsDisabled;
                final /* synthetic */ StreamOp val$op;

                AnonymousClass7(StreamOp streamOp2) {
                    r5 = streamOp2;
                }

                public void onSuccess(Void r2) {
                }

                public void onFailure(Throwable th2) {
                    boolean z2 = true;
                    if (th2 instanceof DLException) {
                        switch (AnonymousClass11.$SwitchMap$com$twitter$distributedlog$thrift$service$StatusCode[((DLException) th2).getCode().ordinal()]) {
                            case 1:
                                if (!$assertionsDisabled && !(th2 instanceof OwnershipAcquireFailedException)) {
                                    throw new AssertionError();
                                }
                                z2 = false;
                                StreamImpl.this.handleOwnershipAcquireFailedException(r5, (OwnershipAcquireFailedException) th2);
                                break;
                            case 2:
                                if (!$assertionsDisabled && !(th2 instanceof AlreadyClosedException)) {
                                    throw new AssertionError();
                                }
                                r5.fail(th2);
                                StreamImpl.this.handleAlreadyClosedException((AlreadyClosedException) th2);
                                break;
                                break;
                            case 3:
                            case 4:
                            case 5:
                            case 6:
                            case 7:
                            case 8:
                            case 9:
                            case 10:
                            case 11:
                            case 12:
                            case 13:
                                r5.fail(th2);
                                break;
                            default:
                                StreamImpl.this.handleRecoverableDLException(r5, th2);
                                break;
                        }
                    } else {
                        StreamImpl.this.handleUnknownException(r5, th2);
                    }
                    if (z2) {
                        StreamImpl.this.countException(th2, StreamImpl.this.streamExceptionStatLogger);
                    }
                }

                static {
                    $assertionsDisabled = !StreamImpl.class.desiredAssertionStatus();
                }
            });
        }
    }

    public void handleRecoverableDLException(StreamOp streamOp, Throwable th) {
        AsyncLogWriter asyncLogWriter = null;
        boolean z = false;
        synchronized (this) {
            if (StreamStatus.INITIALIZED == this.status) {
                asyncLogWriter = setStreamStatus(StreamStatus.FAILED, StreamStatus.INITIALIZED, null, null, th);
                z = true;
            }
        }
        if (z) {
            Abortables.asyncAbort(asyncLogWriter, false);
            logger.error("Failed to write data into stream {} : ", this.name, th);
            scheduleTryAcquireOnce(0L);
        }
        streamOp.fail(th);
    }

    public void handleUnknownException(StreamOp streamOp, Throwable th) {
        AsyncLogWriter asyncLogWriter = null;
        boolean z = false;
        synchronized (this) {
            if (StreamStatus.INITIALIZED == this.status) {
                asyncLogWriter = setStreamStatus(StreamStatus.FAILED, StreamStatus.INITIALIZED, null, null, th);
                z = true;
            }
        }
        if (z) {
            Abortables.asyncAbort(asyncLogWriter, false);
            logger.error("Failed to write data into stream {} : ", this.name, th);
            scheduleTryAcquireOnce(0L);
        }
        streamOp.fail(th);
    }

    public void handleOwnershipAcquireFailedException(StreamOp streamOp, OwnershipAcquireFailedException ownershipAcquireFailedException) {
        logger.warn("Failed to write data into stream {} because stream is acquired by {} : {}", new Object[]{this.name, ownershipAcquireFailedException.getCurrentOwner(), ownershipAcquireFailedException.getMessage()});
        AsyncLogWriter asyncLogWriter = null;
        boolean z = false;
        synchronized (this) {
            if (StreamStatus.INITIALIZED == this.status) {
                asyncLogWriter = setStreamStatus(StreamStatus.BACKOFF, StreamStatus.INITIALIZED, null, ownershipAcquireFailedException.getCurrentOwner(), ownershipAcquireFailedException);
                z = true;
            }
        }
        if (z) {
            Abortables.asyncAbort(asyncLogWriter, false);
            scheduleTryAcquireOnce(this.nextAcquireWaitTimeMs);
        }
        streamOp.fail(ownershipAcquireFailedException);
    }

    public void handleAlreadyClosedException(AlreadyClosedException alreadyClosedException) {
        this.unexpectedExceptions.inc();
        logger.error("Encountered unexpected exception when writing data into stream {} : ", this.name, alreadyClosedException);
        this.fatalErrorHandler.notifyFatalError();
    }

    void countException(Throwable th, StatsLogger statsLogger) {
        String name = null == th ? "null" : th.getClass().getName();
        Counter counter = this.exceptionCounters.get(name);
        if (null == counter) {
            counter = this.exceptionStatLogger.getCounter(name);
            Counter putIfAbsent = this.exceptionCounters.putIfAbsent(name, counter);
            if (null != putIfAbsent) {
                counter = putIfAbsent;
            }
        }
        counter.inc();
        statsLogger.getCounter(name).inc();
    }

    Future<Boolean> acquireStream() {
        this.writeSinceLastAcquire = false;
        Stopwatch createStarted = Stopwatch.createStarted();
        Promise promise = new Promise();
        this.manager.openAsyncLogWriter().addEventListener(FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<AsyncLogWriter>() { // from class: com.twitter.distributedlog.service.stream.StreamImpl.8
            final /* synthetic */ Stopwatch val$stopwatch;
            final /* synthetic */ Promise val$acquirePromise;

            AnonymousClass8(Stopwatch createStarted2, Promise promise2) {
                r5 = createStarted2;
                r6 = promise2;
            }

            public void onSuccess(AsyncLogWriter asyncLogWriter) {
                AsyncAbortable streamStatus;
                Queue<StreamOp> queue;
                boolean z;
                synchronized (StreamImpl.this.txnLock) {
                    StreamImpl.this.sequencer.setLastId(asyncLogWriter.getLastTxId());
                }
                synchronized (StreamImpl.this) {
                    streamStatus = StreamImpl.this.setStreamStatus(StreamStatus.INITIALIZED, StreamStatus.INITIALIZING, asyncLogWriter, null, null);
                    queue = StreamImpl.this.pendingOps;
                    StreamImpl.this.pendingOps = new ArrayDeque();
                    z = true;
                }
                if (!StreamImpl.this.streamManager.allowAcquire(StreamImpl.this)) {
                    if (null != streamStatus) {
                        Abortables.asyncAbort(streamStatus, true);
                    }
                    Throwable streamUnavailableException = new StreamUnavailableException("Stream " + StreamImpl.this.partition.getStream() + " is not allowed to acquire more than " + StreamImpl.this.dynConf.getMaxAcquiredPartitionsPerProxy() + " partitions");
                    StreamImpl.this.countException(streamUnavailableException, StreamImpl.this.exceptionStatLogger);
                    StreamImpl.logger.error("Failed to acquire stream {} because it is unavailable : {}", StreamImpl.this.name, streamUnavailableException.getMessage());
                    synchronized (this) {
                        streamStatus = StreamImpl.this.setStreamStatus(StreamStatus.ERROR, StreamStatus.INITIALIZED, null, null, streamUnavailableException);
                        z = false;
                    }
                }
                processPendingRequestsAfterOpen(z, streamStatus, queue);
            }

            public void onFailure(Throwable th) {
                AsyncLogWriter streamStatus;
                Queue<StreamOp> queue;
                boolean z;
                if (th instanceof AlreadyClosedException) {
                    StreamImpl.this.countException(th, StreamImpl.this.streamExceptionStatLogger);
                    StreamImpl.this.handleAlreadyClosedException((AlreadyClosedException) th);
                    return;
                }
                if (th instanceof OwnershipAcquireFailedException) {
                    Throwable th2 = (OwnershipAcquireFailedException) th;
                    StreamImpl.logger.warn("Failed to acquire stream ownership for {}, current owner is {} : {}", new Object[]{StreamImpl.this.name, th2.getCurrentOwner(), th2.getMessage()});
                    synchronized (StreamImpl.this) {
                        streamStatus = StreamImpl.this.setStreamStatus(StreamStatus.BACKOFF, StreamStatus.INITIALIZING, null, th2.getCurrentOwner(), th2);
                        queue = StreamImpl.this.pendingOps;
                        StreamImpl.this.pendingOps = new ArrayDeque();
                        z = false;
                    }
                } else if (th instanceof InvalidStreamNameException) {
                    Throwable th3 = (InvalidStreamNameException) th;
                    StreamImpl.this.countException(th3, StreamImpl.this.streamExceptionStatLogger);
                    StreamImpl.logger.error("Failed to acquire stream {} due to its name is invalid", StreamImpl.this.name);
                    synchronized (StreamImpl.this) {
                        streamStatus = StreamImpl.this.setStreamStatus(StreamStatus.ERROR, StreamStatus.INITIALIZING, null, null, th3);
                        queue = StreamImpl.this.pendingOps;
                        StreamImpl.this.pendingOps = new ArrayDeque();
                        z = false;
                    }
                } else {
                    StreamImpl.this.countException(th, StreamImpl.this.streamExceptionStatLogger);
                    StreamImpl.logger.error("Failed to initialize stream {} : ", StreamImpl.this.name, th);
                    synchronized (StreamImpl.this) {
                        streamStatus = StreamImpl.this.setStreamStatus(StreamStatus.FAILED, StreamStatus.INITIALIZING, null, null, th);
                        queue = StreamImpl.this.pendingOps;
                        StreamImpl.this.pendingOps = new ArrayDeque();
                        z = false;
                    }
                }
                processPendingRequestsAfterOpen(z, streamStatus, queue);
            }

            void processPendingRequestsAfterOpen(boolean z, AsyncLogWriter asyncLogWriter, Queue<StreamOp> queue) {
                if (z) {
                    StreamImpl.this.streamAcquireStat.registerSuccessfulEvent(r5.elapsed(TimeUnit.MICROSECONDS));
                } else {
                    StreamImpl.this.streamAcquireStat.registerFailedEvent(r5.elapsed(TimeUnit.MICROSECONDS));
                }
                Iterator<StreamOp> it = queue.iterator();
                while (it.hasNext()) {
                    StreamImpl.this.executeOp(it.next(), z);
                    StreamImpl.this.pendingOpsCounter.dec();
                }
                Abortables.asyncAbort(asyncLogWriter, true);
                FutureUtils.setValue(r6, Boolean.valueOf(z));
            }
        }, this.scheduler, getStreamName()));
        return promise2;
    }

    synchronized void setStreamInErrorStatus() {
        if (StreamStatus.CLOSING == this.status || StreamStatus.CLOSED == this.status) {
            return;
        }
        this.status = StreamStatus.ERROR;
    }

    synchronized AsyncLogWriter setStreamStatus(StreamStatus streamStatus, StreamStatus streamStatus2, AsyncLogWriter asyncLogWriter, String str, Throwable th) {
        if (streamStatus2 != this.status) {
            logger.info("Stream {} status already changed from {} -> {} when trying to change it to {}", new Object[]{this.name, streamStatus2, this.status, streamStatus});
            return null;
        }
        AsyncLogWriter asyncLogWriter2 = this.writer;
        this.writer = asyncLogWriter;
        if (null == str || !str.equals(this.clientId)) {
            this.owner = str;
        } else {
            this.unexpectedExceptions.inc();
            logger.error("I am waiting myself {} to release lock on stream {}, so have to shut myself down :", new Object[]{str, this.name, th});
            this.fatalErrorHandler.notifyFatalError();
            this.owner = null;
        }
        this.lastException = th;
        this.status = streamStatus;
        if (StreamStatus.BACKOFF == streamStatus && null != str) {
            this.lastAcquireFailureWatch.reset().start();
        }
        if (StreamStatus.INITIALIZED == streamStatus) {
            this.streamManager.notifyAcquired(this);
            logger.info("Inserted acquired stream {} -> writer {}", this.name, this);
        } else {
            this.streamManager.notifyReleased(this);
            logger.info("Removed acquired stream {} -> writer {}", this.name, this);
        }
        return asyncLogWriter2;
    }

    void close(DistributedLogManager distributedLogManager) {
        if (null != distributedLogManager) {
            try {
                distributedLogManager.close();
            } catch (IOException e) {
                logger.warn("Failed to close dlm for {} : ", e);
            }
        }
    }

    @Override // com.twitter.distributedlog.service.stream.Stream
    public Future<Void> requestClose(String str) {
        return requestClose(str, true);
    }

    Future<Void> requestClose(String str, boolean z) {
        this.closeLock.writeLock().lock();
        try {
            if (StreamStatus.CLOSING == this.status || StreamStatus.CLOSED == this.status) {
                Promise<Void> promise = this.closePromise;
                this.closeLock.writeLock().unlock();
                return promise;
            }
            logger.info("Request to close stream {} : {}", getStreamName(), str);
            boolean z2 = StreamStatus.INITIALIZED != this.status;
            this.status = StreamStatus.CLOSING;
            this.streamManager.notifyReleased(this);
            this.closeLock.writeLock().unlock();
            close(z2);
            if (z) {
                this.closePromise.onSuccess(new AbstractFunction1<Void, BoxedUnit>() { // from class: com.twitter.distributedlog.service.stream.StreamImpl.9
                    AnonymousClass9() {
                    }

                    public BoxedUnit apply(Void r5) {
                        if (StreamImpl.this.streamManager.notifyRemoved(StreamImpl.this)) {
                            StreamImpl.logger.info("Removed cached stream {} after closed.", StreamImpl.this.name);
                        }
                        return BoxedUnit.UNIT;
                    }
                });
            }
            return this.closePromise;
        } catch (Throwable th) {
            this.closeLock.writeLock().unlock();
            throw th;
        }
    }

    @Override // com.twitter.distributedlog.service.stream.Stream
    public void delete() throws IOException {
        if (null != this.writer) {
            Utils.close(this.writer);
            synchronized (this) {
                this.writer = null;
                this.lastException = new StreamUnavailableException("Stream was deleted");
            }
        }
        if (null == this.manager) {
            throw new UnexpectedException("No stream " + this.name + " to delete");
        }
        this.manager.delete();
    }

    private Future<Void> close(boolean z) {
        this.closeLock.writeLock().lock();
        try {
            if (StreamStatus.CLOSED == this.status) {
                Promise<Void> promise = this.closePromise;
                this.closeLock.writeLock().unlock();
                return promise;
            }
            boolean z2 = z || !(StreamStatus.INITIALIZED == this.status || StreamStatus.CLOSING == this.status);
            this.status = StreamStatus.CLOSED;
            this.streamManager.notifyReleased(this);
            this.closeLock.writeLock().unlock();
            logger.info("Closing stream {} ...", this.name);
            this.running = false;
            synchronized (this) {
                if (null != this.tryAcquireScheduledFuture) {
                    this.tryAcquireScheduledFuture.cancel(true);
                }
            }
            logger.info("Stopped threads of stream {}.", this.name);
            (z2 ? Abortables.asyncAbort(this.writer, true) : Utils.asyncClose(this.writer, true)).addEventListener(FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<Void>() { // from class: com.twitter.distributedlog.service.stream.StreamImpl.10
                AnonymousClass10() {
                }

                public void onSuccess(Void r4) {
                    StreamImpl.this.closeManagerAndErrorOutPendingRequests();
                    FutureUtils.setValue(StreamImpl.this.closePromise, (Object) null);
                }

                public void onFailure(Throwable th) {
                    StreamImpl.this.closeManagerAndErrorOutPendingRequests();
                    FutureUtils.setValue(StreamImpl.this.closePromise, (Object) null);
                }
            }, this.scheduler, this.name));
            return this.closePromise;
        } catch (Throwable th) {
            this.closeLock.writeLock().unlock();
            throw th;
        }
    }

    public void closeManagerAndErrorOutPendingRequests() {
        Queue<StreamOp> queue;
        close(this.manager);
        synchronized (this) {
            queue = this.pendingOps;
            this.pendingOps = new ArrayDeque();
        }
        Throwable streamUnavailableException = new StreamUnavailableException("Stream " + this.name + " is closed.");
        Iterator<StreamOp> it = queue.iterator();
        while (it.hasNext()) {
            it.next().fail(streamUnavailableException);
            this.pendingOpsCounter.dec();
        }
        this.limiter.close();
        logger.info("Closed stream {}.", this.name);
    }

    @VisibleForTesting
    public StreamImpl suspendAcquiring() {
        this.suspended = true;
        return this;
    }

    @VisibleForTesting
    public StreamImpl resumeAcquiring() {
        this.suspended = false;
        scheduleTryAcquireOnce(0L);
        return this;
    }

    @VisibleForTesting
    public int numPendingOps() {
        Queue<StreamOp> queue = this.pendingOps;
        if (null == queue) {
            return 0;
        }
        return queue.size();
    }

    @VisibleForTesting
    public StreamStatus getStatus() {
        return this.status;
    }

    @VisibleForTesting
    public void setStatus(StreamStatus streamStatus) {
        this.status = streamStatus;
    }

    @VisibleForTesting
    public AsyncLogWriter getWriter() {
        return this.writer;
    }

    @VisibleForTesting
    public DistributedLogManager getManager() {
        return this.manager;
    }

    @VisibleForTesting
    public Throwable getLastException() {
        return this.lastException;
    }

    @VisibleForTesting
    public Future<Void> getCloseFuture() {
        return this.closePromise;
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: com.twitter.distributedlog.service.stream.StreamImpl.access$102(com.twitter.distributedlog.service.stream.StreamImpl, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$102(com.twitter.distributedlog.service.stream.StreamImpl r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.scheduledAcquireDelayMs = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: com.twitter.distributedlog.service.stream.StreamImpl.access$102(com.twitter.distributedlog.service.stream.StreamImpl, long):long");
    }

    static {
    }
}
