package com.twitter.distributedlog;

import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import com.twitter.distributedlog.callback.ReadAheadCallback;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/twitter/distributedlog/ReadAheadCache.class */
public class ReadAheadCache {
    static final Logger LOG = LoggerFactory.getLogger(ReadAheadCache.class);
    private final String streamName;
    private final int maxCachedRecords;
    private final boolean deserializeRecordSet;
    private final AsyncNotification notification;
    private final Stopwatch lastEntryProcessTime;
    private final AlertStatsLogger alertStatsLogger;
    private final StatsLogger statsLogger;
    private final OpStatsLogger readAheadDeliveryLatencyStat;
    private final OpStatsLogger negativeReadAheadDeliveryLatencyStat;
    private final boolean traceDeliveryLatencyEnabled;
    private final long deliveryLatencyWarnThresholdMillis;
    private final AtomicReference<DLSN> minActiveDLSN = new AtomicReference<>(DLSN.NonInclusiveLowerBound);
    private DLSN lastReadAheadDLSN = DLSN.InvalidDLSN;
    private DLSN lastReadAheadUserDLSN = DLSN.InvalidDLSN;
    private final AtomicReference<IOException> lastException = new AtomicReference<>();
    private ReadAheadCallback readAheadCallback = null;
    private final AtomicLong cacheBytes = new AtomicLong(0);
    private volatile boolean suppressDeliveryLatency = true;
    private final LinkedBlockingQueue<LogRecordWithDLSN> readAheadRecords = new LinkedBlockingQueue<>();

    public ReadAheadCache(String str, StatsLogger statsLogger, AlertStatsLogger alertStatsLogger, AsyncNotification asyncNotification, int i, boolean z, boolean z2, long j, Ticker ticker) {
        this.streamName = str;
        this.maxCachedRecords = i;
        this.notification = asyncNotification;
        this.deserializeRecordSet = z;
        this.lastEntryProcessTime = Stopwatch.createStarted(ticker);
        this.traceDeliveryLatencyEnabled = z2;
        this.deliveryLatencyWarnThresholdMillis = j;
        StatsLogger scope = statsLogger.scope("readahead");
        this.statsLogger = scope;
        this.alertStatsLogger = alertStatsLogger;
        this.readAheadDeliveryLatencyStat = scope.getOpStatsLogger("delivery_latency");
        this.negativeReadAheadDeliveryLatencyStat = scope.getOpStatsLogger("negative_delivery_latency");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DLSN getLastReadAheadUserDLSN() {
        return this.lastReadAheadUserDLSN;
    }

    private synchronized void invokeReadAheadCallback() {
        if (null != this.readAheadCallback) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Cache has space, schedule the read ahead");
            }
            this.readAheadCallback.resumeReadAhead();
            this.readAheadCallback = null;
        }
    }

    public synchronized void setReadAheadCallback(ReadAheadCallback readAheadCallback) {
        this.readAheadCallback = readAheadCallback;
        if (isCacheFull()) {
            return;
        }
        invokeReadAheadCallback();
    }

    private void setLastException(IOException iOException) {
        this.lastException.set(iOException);
    }

    public LogRecordWithDLSN getNextReadAheadRecord() throws IOException {
        if (null != this.lastException.get()) {
            throw this.lastException.get();
        }
        LogRecordWithDLSN poll = this.readAheadRecords.poll();
        if (null != poll) {
            this.cacheBytes.addAndGet(-poll.getPayload().length);
            if (!isCacheFull()) {
                invokeReadAheadCallback();
            }
        }
        return poll;
    }

    public boolean isReadAheadIdle(int i, TimeUnit timeUnit) {
        return this.lastEntryProcessTime.elapsed(timeUnit) > ((long) i);
    }

    public void set(LedgerReadPosition ledgerReadPosition, LedgerEntry ledgerEntry, String str, boolean z, long j) {
        processNewLedgerEntry(ledgerReadPosition, ledgerEntry, str, z, j);
        this.lastEntryProcessTime.reset().start();
        AsyncNotification asyncNotification = this.notification;
        if (null != asyncNotification) {
            asyncNotification.notifyOnOperationComplete();
        }
    }

    public boolean isCacheFull() {
        return getNumCachedRecords() >= this.maxCachedRecords;
    }

    public int getNumCachedRecords() {
        return this.readAheadRecords.size();
    }

    public long getNumCachedBytes() {
        return this.cacheBytes.get();
    }

    public void setSuppressDeliveryLatency(boolean z) {
        this.suppressDeliveryLatency = z;
    }

    public void setMinActiveDLSN(DLSN dlsn) {
        this.minActiveDLSN.set(dlsn);
    }

    /* JADX WARN: Code restructure failed: missing block: B:34:0x004c, code lost:
    
        com.twitter.distributedlog.ReadAheadCache.LOG.error("Out of order reads last {} : curr {}", r8.lastReadAheadDLSN, r0.getDlsn());
     */
    /* JADX WARN: Code restructure failed: missing block: B:35:0x0068, code lost:
    
        throw new com.twitter.distributedlog.exceptions.LogReadException("Out of order reads");
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void processNewLedgerEntry(com.twitter.distributedlog.LedgerReadPosition r9, org.apache.bookkeeper.client.LedgerEntry r10, java.lang.String r11, boolean r12, long r13) {
        /*
            Method dump skipped, instructions count: 366
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.twitter.distributedlog.ReadAheadCache.processNewLedgerEntry(com.twitter.distributedlog.LedgerReadPosition, org.apache.bookkeeper.client.LedgerEntry, java.lang.String, boolean, long):void");
    }

    public void clear() {
        this.readAheadRecords.clear();
        this.cacheBytes.set(0L);
    }

    public String toString() {
        return String.format("%s: Cache Bytes: %d, Num Cached Records: %d", this.streamName, Long.valueOf(this.cacheBytes.get()), Integer.valueOf(getNumCachedRecords()));
    }
}
