package com.twitter.distributedlog;

import com.google.common.annotations.VisibleForTesting;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.AlreadyClosedException;
import com.twitter.distributedlog.exceptions.LockingException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
import com.twitter.distributedlog.exceptions.ZKException;
import com.twitter.distributedlog.io.Abortable;
import com.twitter.distributedlog.io.Abortables;
import com.twitter.distributedlog.io.AsyncAbortable;
import com.twitter.distributedlog.io.AsyncCloseable;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.PermitManager;
import com.twitter.distributedlog.util.Utils;
import com.twitter.util.Function;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/twitter/distributedlog/BKAbstractLogWriter.class */
public abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortable, AsyncAbortable {
    static final Logger LOG = LoggerFactory.getLogger(BKAbstractLogWriter.class);
    protected final DistributedLogConfiguration conf;
    private final DynamicDistributedLogConfiguration dynConf;
    protected final BKDistributedLogManager bkDistributedLogManager;
    private Promise<Void> closePromise = null;
    private boolean forceRolling = false;
    private boolean forceRecovery = false;
    private Future<List<LogSegmentMetadata>> lastTruncationAttempt = null;

    @VisibleForTesting
    private Long minTimestampToKeepOverride = null;
    protected BKLogSegmentWriter segmentWriter = null;
    protected Future<BKLogSegmentWriter> segmentWriterFuture = null;
    protected BKLogSegmentWriter allocatedSegmentWriter = null;
    protected BKLogWriteHandler writeHandler = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BKAbstractLogWriter(DistributedLogConfiguration distributedLogConfiguration, DynamicDistributedLogConfiguration dynamicDistributedLogConfiguration, BKDistributedLogManager bKDistributedLogManager) {
        this.conf = distributedLogConfiguration;
        this.dynConf = dynamicDistributedLogConfiguration;
        this.bkDistributedLogManager = bKDistributedLogManager;
        LOG.debug("Initial retention period for {} : {}", bKDistributedLogManager.getStreamName(), Long.valueOf(TimeUnit.MILLISECONDS.convert(dynamicDistributedLogConfiguration.getRetentionPeriodHours(), TimeUnit.HOURS)));
    }

    protected synchronized BKLogWriteHandler getCachedWriteHandler() {
        return this.writeHandler;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BKLogWriteHandler getWriteHandler() throws IOException {
        BKLogWriteHandler createAndCacheWriteHandler = createAndCacheWriteHandler();
        createAndCacheWriteHandler.checkMetadataException();
        return createAndCacheWriteHandler;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BKLogWriteHandler createAndCacheWriteHandler() throws IOException {
        BKLogWriteHandler bKLogWriteHandler;
        synchronized (this) {
            if (this.writeHandler != null) {
                return this.writeHandler;
            }
            BKLogWriteHandler bKLogWriteHandler2 = (BKLogWriteHandler) FutureUtils.result(this.bkDistributedLogManager.asyncCreateWriteHandler(false));
            boolean z = false;
            try {
                synchronized (this) {
                    if (this.writeHandler == null) {
                        this.writeHandler = bKLogWriteHandler2;
                        z = true;
                    }
                    bKLogWriteHandler = this.writeHandler;
                }
                return bKLogWriteHandler;
            } finally {
                if (!z) {
                    bKLogWriteHandler2.asyncAbort();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized BKLogSegmentWriter getCachedLogWriter() {
        return this.segmentWriter;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized Future<BKLogSegmentWriter> getCachedLogWriterFuture() {
        return this.segmentWriterFuture;
    }

    protected synchronized void cacheLogWriter(BKLogSegmentWriter bKLogSegmentWriter) {
        this.segmentWriter = bKLogSegmentWriter;
        this.segmentWriterFuture = Future.value(bKLogSegmentWriter);
    }

    protected synchronized BKLogSegmentWriter removeCachedLogWriter() {
        try {
            BKLogSegmentWriter bKLogSegmentWriter = this.segmentWriter;
            this.segmentWriter = null;
            this.segmentWriterFuture = null;
            return bKLogSegmentWriter;
        } catch (Throwable th) {
            this.segmentWriter = null;
            this.segmentWriterFuture = null;
            throw th;
        }
    }

    protected synchronized BKLogSegmentWriter getAllocatedLogWriter() {
        return this.allocatedSegmentWriter;
    }

    protected synchronized void cacheAllocatedLogWriter(BKLogSegmentWriter bKLogSegmentWriter) {
        this.allocatedSegmentWriter = bKLogSegmentWriter;
    }

    protected synchronized BKLogSegmentWriter removeAllocatedLogWriter() {
        try {
            BKLogSegmentWriter bKLogSegmentWriter = this.allocatedSegmentWriter;
            this.allocatedSegmentWriter = null;
            return bKLogSegmentWriter;
        } catch (Throwable th) {
            this.allocatedSegmentWriter = null;
            throw th;
        }
    }

    private Future<Void> asyncCloseAndComplete(boolean z) {
        BKLogSegmentWriter cachedLogWriter = getCachedLogWriter();
        BKLogWriteHandler cachedWriteHandler = getCachedWriteHandler();
        if (null == cachedLogWriter || null == cachedWriteHandler) {
            return closeNoThrow();
        }
        cancelTruncation();
        Promise<Void> promise = new Promise<>();
        asyncCloseAndComplete(cachedLogWriter, cachedWriteHandler, promise, z);
        return promise;
    }

    private void asyncCloseAndComplete(BKLogSegmentWriter bKLogSegmentWriter, BKLogWriteHandler bKLogWriteHandler, final Promise<Void> promise, final boolean z) {
        bKLogWriteHandler.completeAndCloseLogSegment(bKLogSegmentWriter).addEventListener(new FutureEventListener<LogSegmentMetadata>() { // from class: com.twitter.distributedlog.BKAbstractLogWriter.1
            public void onSuccess(LogSegmentMetadata logSegmentMetadata) {
                BKAbstractLogWriter.this.removeCachedLogWriter();
                complete(null);
            }

            public void onFailure(Throwable th) {
                BKAbstractLogWriter.LOG.error("Completing Log segments encountered exception", th);
                complete(th);
            }

            private void complete(final Throwable th) {
                BKAbstractLogWriter.this.closeNoThrow().ensure(new AbstractFunction0<BoxedUnit>() { // from class: com.twitter.distributedlog.BKAbstractLogWriter.1.1
                    /* renamed from: apply, reason: merged with bridge method [inline-methods] */
                    public BoxedUnit m3apply() {
                        if (null == th || !z) {
                            FutureUtils.setValue(promise, null);
                        } else {
                            FutureUtils.setException(promise, th);
                        }
                        return BoxedUnit.UNIT;
                    }
                });
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void closeAndComplete() throws IOException {
        FutureUtils.result(asyncCloseAndComplete(true));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Future<Void> asyncCloseAndComplete() {
        return asyncCloseAndComplete(true);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        FutureUtils.result(asyncClose());
    }

    @Override // com.twitter.distributedlog.io.AsyncCloseable
    public Future<Void> asyncClose() {
        return asyncCloseAndComplete(false);
    }

    protected Future<Void> closeNoThrow() {
        synchronized (this) {
            if (null != this.closePromise) {
                return this.closePromise;
            }
            Promise<Void> promise = new Promise<>();
            this.closePromise = promise;
            cancelTruncation();
            Utils.closeSequence(this.bkDistributedLogManager.getScheduler(), true, getCachedLogWriter(), getAllocatedLogWriter(), getCachedWriteHandler()).proxyTo(promise);
            return promise;
        }
    }

    @Override // com.twitter.distributedlog.io.Abortable
    public void abort() throws IOException {
        FutureUtils.result(asyncAbort());
    }

    @Override // com.twitter.distributedlog.io.AsyncAbortable
    public Future<Void> asyncAbort() {
        synchronized (this) {
            if (null != this.closePromise) {
                return this.closePromise;
            }
            Promise<Void> promise = new Promise<>();
            this.closePromise = promise;
            cancelTruncation();
            Abortables.abortSequence(this.bkDistributedLogManager.getScheduler(), getCachedLogWriter(), getAllocatedLogWriter(), getCachedWriteHandler()).proxyTo(promise);
            return promise;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BKLogSegmentWriter getLedgerWriter(long j, boolean z) throws IOException {
        Future<BKLogSegmentWriter> asyncGetLedgerWriter = asyncGetLedgerWriter(true);
        BKLogSegmentWriter bKLogSegmentWriter = null;
        if (null != asyncGetLedgerWriter) {
            bKLogSegmentWriter = (BKLogSegmentWriter) FutureUtils.result(asyncGetLedgerWriter);
        }
        if (null == bKLogSegmentWriter || shouldStartNewSegment(bKLogSegmentWriter) || this.forceRolling) {
            bKLogSegmentWriter = (BKLogSegmentWriter) FutureUtils.result(rollLogSegmentIfNecessary(bKLogSegmentWriter, j, true, z));
        }
        return bKLogSegmentWriter;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized Future<BKLogSegmentWriter> asyncGetLedgerWriter(boolean z) {
        final BKLogSegmentWriter cachedLogWriter = getCachedLogWriter();
        Future<BKLogSegmentWriter> cachedLogWriterFuture = getCachedLogWriterFuture();
        if (null == cachedLogWriterFuture || null == cachedLogWriter) {
            return null;
        }
        if ((cachedLogWriter.isLogSegmentInError() || this.forceRecovery) && z) {
            return (cachedLogWriter.isLogSegmentInError() ? cachedLogWriter.asyncAbort() : cachedLogWriter.asyncClose()).flatMap(new AbstractFunction1<Void, Future<BKLogSegmentWriter>>() { // from class: com.twitter.distributedlog.BKAbstractLogWriter.2
                public Future<BKLogSegmentWriter> apply(Void r6) {
                    BKAbstractLogWriter.this.removeCachedLogWriter();
                    if (cachedLogWriter.isLogSegmentInError()) {
                        return Future.value((Object) null);
                    }
                    try {
                        BKLogWriteHandler writeHandler = BKAbstractLogWriter.this.getWriteHandler();
                        return (null == writeHandler || !BKAbstractLogWriter.this.forceRecovery) ? Future.value((Object) null) : writeHandler.completeAndCloseLogSegment(cachedLogWriter).map(new AbstractFunction1<LogSegmentMetadata, BKLogSegmentWriter>() { // from class: com.twitter.distributedlog.BKAbstractLogWriter.2.1
                            public BKLogSegmentWriter apply(LogSegmentMetadata logSegmentMetadata) {
                                return null;
                            }
                        });
                    } catch (IOException e) {
                        return Future.exception(e);
                    }
                }
            });
        }
        return cachedLogWriterFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean shouldStartNewSegment(BKLogSegmentWriter bKLogSegmentWriter) throws IOException {
        return null == bKLogSegmentWriter || getWriteHandler().shouldStartNewSegment(bKLogSegmentWriter) || this.forceRolling;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void truncateLogSegmentsIfNecessary(BKLogWriteHandler bKLogWriteHandler) {
        boolean z = false;
        long j = 0;
        long convert = TimeUnit.MILLISECONDS.convert(this.dynConf.getRetentionPeriodHours(), TimeUnit.HOURS);
        if (convert > 0) {
            j = Utils.nowInMillis() - convert;
            z = true;
        }
        if (null != this.minTimestampToKeepOverride) {
            j = this.minTimestampToKeepOverride.longValue();
            z = true;
        }
        if (z) {
            if (this.lastTruncationAttempt == null || this.lastTruncationAttempt.isDefined()) {
                this.lastTruncationAttempt = bKLogWriteHandler.purgeLogSegmentsOlderThanTimestamp(j);
            }
        }
    }

    private Future<BKLogSegmentWriter> asyncStartNewLogSegment(final BKLogWriteHandler bKLogWriteHandler, final long j, final boolean z) {
        return bKLogWriteHandler.recoverIncompleteLogSegments().flatMap(new AbstractFunction1<Long, Future<BKLogSegmentWriter>>() { // from class: com.twitter.distributedlog.BKAbstractLogWriter.3
            public Future<BKLogSegmentWriter> apply(Long l) {
                return bKLogWriteHandler.asyncStartLogSegment(j, false, z).onSuccess(new AbstractFunction1<BKLogSegmentWriter, BoxedUnit>() { // from class: com.twitter.distributedlog.BKAbstractLogWriter.3.1
                    public BoxedUnit apply(BKLogSegmentWriter bKLogSegmentWriter) {
                        BKAbstractLogWriter.this.cacheLogWriter(bKLogSegmentWriter);
                        return BoxedUnit.UNIT;
                    }
                });
            }
        });
    }

    private Future<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOneWithPermit(final BKLogSegmentWriter bKLogSegmentWriter, final BKLogWriteHandler bKLogWriteHandler, long j, boolean z, boolean z2) {
        final PermitManager.Permit acquirePermit = this.bkDistributedLogManager.getLogSegmentRollingPermitManager().acquirePermit();
        if (acquirePermit.isAllowed()) {
            return closeOldLogSegmentAndStartNewOne(bKLogSegmentWriter, bKLogWriteHandler, j, z, z2).rescue(new Function<Throwable, Future<BKLogSegmentWriter>>() { // from class: com.twitter.distributedlog.BKAbstractLogWriter.5
                /* JADX WARN: Multi-variable type inference failed */
                public Future<BKLogSegmentWriter> apply(Throwable th) {
                    if (th instanceof LockingException) {
                        BKAbstractLogWriter.LOG.warn("We lost lock during completeAndClose log segment for {}. Disable ledger rolling until it is recovered : ", bKLogWriteHandler.getFullyQualifiedName(), th);
                        BKAbstractLogWriter.this.bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(acquirePermit);
                        return Future.value(bKLogSegmentWriter);
                    }
                    if (th instanceof ZKException) {
                        ZKException zKException = (ZKException) th;
                        if (ZKException.isRetryableZKException(zKException)) {
                            BKAbstractLogWriter.LOG.warn("Encountered zookeeper connection issues during completeAndClose log segment for {}. Disable ledger rolling until it is recovered : {}", bKLogWriteHandler.getFullyQualifiedName(), zKException.getKeeperExceptionCode());
                            BKAbstractLogWriter.this.bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(acquirePermit);
                            return Future.value(bKLogSegmentWriter);
                        }
                    }
                    return Future.exception(th);
                }
            }).ensure(new AbstractFunction0<BoxedUnit>() { // from class: com.twitter.distributedlog.BKAbstractLogWriter.4
                /* renamed from: apply, reason: merged with bridge method [inline-methods] */
                public BoxedUnit m4apply() {
                    BKAbstractLogWriter.this.bkDistributedLogManager.getLogSegmentRollingPermitManager().releasePermit(acquirePermit);
                    return BoxedUnit.UNIT;
                }
            });
        }
        this.bkDistributedLogManager.getLogSegmentRollingPermitManager().releasePermit(acquirePermit);
        return Future.value(bKLogSegmentWriter);
    }

    private Future<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOne(final BKLogSegmentWriter bKLogSegmentWriter, final BKLogWriteHandler bKLogWriteHandler, final long j, final boolean z, boolean z2) {
        BKLogSegmentWriter allocatedLogWriter = getAllocatedLogWriter();
        if (null != allocatedLogWriter) {
            return completeOldSegmentAndCacheNewLogSegmentWriter(bKLogSegmentWriter, allocatedLogWriter);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Allocating a new log segment from {} for {}.", Long.valueOf(j), bKLogWriteHandler.getFullyQualifiedName());
        }
        return bKLogWriteHandler.asyncStartLogSegment(j, z, z2).flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<BKLogSegmentWriter>>() { // from class: com.twitter.distributedlog.BKAbstractLogWriter.6
            public Future<BKLogSegmentWriter> apply(BKLogSegmentWriter bKLogSegmentWriter2) {
                if (null == bKLogSegmentWriter2) {
                    return z ? Future.value(bKLogSegmentWriter) : Future.exception(new UnexpectedException("StartLogSegment returns null for bestEffort rolling"));
                }
                BKAbstractLogWriter.this.cacheAllocatedLogWriter(bKLogSegmentWriter2);
                if (BKAbstractLogWriter.LOG.isDebugEnabled()) {
                    BKAbstractLogWriter.LOG.debug("Allocated a new log segment from {} for {}.", Long.valueOf(j), bKLogWriteHandler.getFullyQualifiedName());
                }
                return BKAbstractLogWriter.this.completeOldSegmentAndCacheNewLogSegmentWriter(bKLogSegmentWriter, bKLogSegmentWriter2);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Future<BKLogSegmentWriter> completeOldSegmentAndCacheNewLogSegmentWriter(BKLogSegmentWriter bKLogSegmentWriter, final BKLogSegmentWriter bKLogSegmentWriter2) {
        final Promise promise = new Promise();
        this.writeHandler.completeAndCloseLogSegment(bKLogSegmentWriter).addEventListener(new FutureEventListener<LogSegmentMetadata>() { // from class: com.twitter.distributedlog.BKAbstractLogWriter.7
            public void onSuccess(LogSegmentMetadata logSegmentMetadata) {
                BKAbstractLogWriter.this.cacheLogWriter(bKLogSegmentWriter2);
                BKAbstractLogWriter.this.removeAllocatedLogWriter();
                FutureUtils.setValue(promise, bKLogSegmentWriter2);
            }

            public void onFailure(Throwable th) {
                FutureUtils.setException(promise, th);
            }
        });
        return promise;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized Future<BKLogSegmentWriter> rollLogSegmentIfNecessary(final BKLogSegmentWriter bKLogSegmentWriter, long j, boolean z, boolean z2) {
        try {
            final BKLogWriteHandler writeHandler = getWriteHandler();
            return ((null == bKLogSegmentWriter || !(writeHandler.shouldStartNewSegment(bKLogSegmentWriter) || this.forceRolling)) ? null == bKLogSegmentWriter ? asyncStartNewLogSegment(writeHandler, j, z2) : Future.value(bKLogSegmentWriter) : closeOldLogSegmentAndStartNewOneWithPermit(bKLogSegmentWriter, writeHandler, j, z, z2)).map(new AbstractFunction1<BKLogSegmentWriter, BKLogSegmentWriter>() { // from class: com.twitter.distributedlog.BKAbstractLogWriter.8
                public BKLogSegmentWriter apply(BKLogSegmentWriter bKLogSegmentWriter2) {
                    if (bKLogSegmentWriter == bKLogSegmentWriter2) {
                        return bKLogSegmentWriter2;
                    }
                    BKAbstractLogWriter.this.truncateLogSegmentsIfNecessary(writeHandler);
                    return bKLogSegmentWriter2;
                }
            });
        } catch (IOException e) {
            return Future.exception(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void checkClosedOrInError(String str) throws AlreadyClosedException {
        if (null != this.closePromise) {
            LOG.error("Executing " + str + " on already closed Log Writer");
            throw new AlreadyClosedException("Executing " + str + " on already closed Log Writer");
        }
    }

    @VisibleForTesting
    public synchronized void setForceRolling(boolean z) {
        this.forceRolling = z;
    }

    @VisibleForTesting
    public synchronized void overRideMinTimeStampToKeep(Long l) {
        this.minTimestampToKeepOverride = l;
    }

    protected synchronized void cancelTruncation() {
        if (null != this.lastTruncationAttempt) {
            FutureUtils.cancel(this.lastTruncationAttempt);
            this.lastTruncationAttempt = null;
        }
    }

    @VisibleForTesting
    public synchronized void setForceRecovery(boolean z) {
        this.forceRecovery = z;
    }
}
