package org.apache.distributedlog;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.distributedlog.Entry;
import org.apache.distributedlog.callback.LogSegmentListener;
import org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException;
import org.apache.distributedlog.exceptions.DLIllegalStateException;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.exceptions.EndOfLogSegmentException;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.io.AsyncCloseable;
import org.apache.distributedlog.logsegment.LogSegmentEntryReader;
import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
import org.apache.distributedlog.logsegment.LogSegmentFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/distributedlog/ReadAheadEntryReader.class */
public class ReadAheadEntryReader implements AsyncCloseable, LogSegmentListener, LogSegmentEntryReader.StateChangeListener, FutureEventListener<List<Entry.Reader>> {
    private final Function<LogSegmentEntryReader, CompletableFuture<List<Entry.Reader>>> readFunc;
    private final DistributedLogConfiguration conf;
    private final BKLogReadHandler readHandler;
    private final LogSegmentEntryStore entryStore;
    private final OrderedScheduler scheduler;
    private final String streamName;
    private final DLSN fromDLSN;
    private final int maxCachedEntries;
    private final int numReadAheadEntries;
    private final int idleWarnThresholdMillis;
    private long currentSegmentSequenceNumber;
    private SegmentReader currentSegmentReader;
    private SegmentReader nextSegmentReader;
    private DLSN lastDLSN;
    private final EntryPosition nextEntryPosition;
    private final Stopwatch lastEntryAddedTime;
    private final AlertStatsLogger alertStatsLogger;
    private static final Logger logger = LoggerFactory.getLogger(ReadAheadEntryReader.class);
    private static Function<LogSegmentEntryReader, Void> START_READER_FUNC = logSegmentEntryReader -> {
        logSegmentEntryReader.start();
        return null;
    };
    private State state = State.IDLE;
    private final Runnable removeClosedSegmentReadersFunc = () -> {
        removeClosedSegmentReaders();
    };
    private final AtomicBoolean started = new AtomicBoolean(false);
    private boolean isInitialized = false;
    private boolean readAheadPaused = false;
    private CompletableFuture<Void> closePromise = null;
    private volatile boolean isCatchingUp = true;
    private final AtomicReference<IOException> lastException = new AtomicReference<>(null);
    private final CopyOnWriteArraySet<AsyncNotification> stateChangeNotifications = new CopyOnWriteArraySet<>();
    private final LinkedList<SegmentReader> segmentReaders = new LinkedList<>();
    private final LinkedList<SegmentReader> segmentReadersToClose = new LinkedList<>();
    private final LinkedBlockingQueue<Entry.Reader> entryQueue = new LinkedBlockingQueue<>();
    private final ScheduledFuture<?> idleReaderCheckTask = scheduleIdleReaderTaskIfNecessary();

    /* renamed from: org.apache.distributedlog.ReadAheadEntryReader$8, reason: invalid class name */
    /* loaded from: input_file:org/apache/distributedlog/ReadAheadEntryReader$8.class */
    static /* synthetic */ class AnonymousClass8 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$distributedlog$ReadAheadEntryReader$State = new int[State.values().length];

        static {
            try {
                $SwitchMap$org$apache$distributedlog$ReadAheadEntryReader$State[State.IDLE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$distributedlog$ReadAheadEntryReader$State[State.PAUSED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$distributedlog$ReadAheadEntryReader$State[State.READING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$distributedlog$ReadAheadEntryReader$State[State.ERROR.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$distributedlog$ReadAheadEntryReader$State[State.CLOSED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* loaded from: input_file:org/apache/distributedlog/ReadAheadEntryReader$CloseableRunnable.class */
    private abstract class CloseableRunnable implements SafeRunnable {
        private CloseableRunnable() {
        }

        public void run() {
            synchronized (ReadAheadEntryReader.this) {
                if (null != ReadAheadEntryReader.this.closePromise) {
                    return;
                }
                try {
                    safeRun();
                } catch (Throwable th) {
                    ReadAheadEntryReader.logger.error("Caught unexpected exception : ", th);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/distributedlog/ReadAheadEntryReader$ReadEntriesFunc.class */
    private class ReadEntriesFunc implements Function<LogSegmentEntryReader, CompletableFuture<List<Entry.Reader>>> {
        private final int numEntries;

        ReadEntriesFunc(int i) {
            this.numEntries = i;
        }

        @Override // java.util.function.Function
        public CompletableFuture<List<Entry.Reader>> apply(LogSegmentEntryReader logSegmentEntryReader) {
            ReadAheadEntryReader.this.checkCatchingUpStatus(logSegmentEntryReader);
            return logSegmentEntryReader.readNext(this.numEntries);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/distributedlog/ReadAheadEntryReader$SegmentReader.class */
    public class SegmentReader implements FutureEventListener<LogSegmentEntryReader> {
        private LogSegmentMetadata metadata;
        private final long startEntryId;
        private CompletableFuture<LogSegmentEntryReader> openFuture = null;
        private LogSegmentEntryReader reader = null;
        private boolean isStarted = false;
        private boolean isClosed = false;

        SegmentReader(LogSegmentMetadata logSegmentMetadata, long j) {
            this.metadata = logSegmentMetadata;
            this.startEntryId = j;
        }

        synchronized LogSegmentEntryReader getEntryReader() {
            return this.reader;
        }

        synchronized boolean isBeyondLastAddConfirmed() {
            return null != this.reader && this.reader.isBeyondLastAddConfirmed();
        }

        synchronized LogSegmentMetadata getSegment() {
            return this.metadata;
        }

        synchronized boolean isReaderOpen() {
            return null != this.openFuture;
        }

        synchronized void openReader() {
            if (null != this.openFuture) {
                return;
            }
            this.openFuture = ReadAheadEntryReader.this.entryStore.openReader(this.metadata, this.startEntryId).whenComplete((BiConsumer<? super LogSegmentEntryReader, ? super Throwable>) this);
        }

        synchronized boolean isReaderStarted() {
            return this.isStarted;
        }

        synchronized void startRead() {
            if (this.isStarted) {
                return;
            }
            this.isStarted = true;
            if (null != this.reader) {
                this.reader.start();
            } else {
                this.openFuture.thenApply(ReadAheadEntryReader.START_READER_FUNC);
            }
        }

        synchronized CompletableFuture<List<Entry.Reader>> readNext() {
            if (null == this.reader) {
                return this.openFuture.thenCompose(ReadAheadEntryReader.this.readFunc);
            }
            ReadAheadEntryReader.this.checkCatchingUpStatus(this.reader);
            return this.reader.readNext(ReadAheadEntryReader.this.numReadAheadEntries);
        }

        synchronized void updateLogSegmentMetadata(LogSegmentMetadata logSegmentMetadata) {
            if (null == this.reader) {
                this.openFuture.thenAccept(logSegmentEntryReader -> {
                    logSegmentEntryReader.onLogSegmentMetadataUpdated(logSegmentMetadata);
                    synchronized (this) {
                        this.metadata = logSegmentMetadata;
                    }
                });
            } else {
                this.reader.onLogSegmentMetadataUpdated(logSegmentMetadata);
                this.metadata = logSegmentMetadata;
            }
        }

        public synchronized void onSuccess(LogSegmentEntryReader logSegmentEntryReader) {
            this.reader = logSegmentEntryReader;
            if (logSegmentEntryReader.getSegment().isInProgress()) {
                logSegmentEntryReader.registerListener(ReadAheadEntryReader.this);
            }
        }

        public void onFailure(Throwable th) {
        }

        synchronized boolean isClosed() {
            return this.isClosed;
        }

        synchronized CompletableFuture<Void> close() {
            return null == this.openFuture ? FutureUtils.Void() : FutureUtils.ensure(this.openFuture.thenCompose(logSegmentEntryReader -> {
                return logSegmentEntryReader.asyncClose();
            }), () -> {
                synchronized (this) {
                    this.isClosed = true;
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/distributedlog/ReadAheadEntryReader$State.class */
    public enum State {
        IDLE,
        READING,
        PAUSED,
        CLOSED,
        ERROR
    }

    public ReadAheadEntryReader(String str, DLSN dlsn, DistributedLogConfiguration distributedLogConfiguration, BKLogReadHandler bKLogReadHandler, LogSegmentEntryStore logSegmentEntryStore, OrderedScheduler orderedScheduler, Ticker ticker, AlertStatsLogger alertStatsLogger) {
        this.streamName = str;
        this.lastDLSN = dlsn;
        this.fromDLSN = dlsn;
        this.nextEntryPosition = new EntryPosition(dlsn.getLogSegmentSequenceNo(), dlsn.getEntryId());
        this.conf = distributedLogConfiguration;
        this.maxCachedEntries = distributedLogConfiguration.getReadAheadMaxRecords();
        this.numReadAheadEntries = distributedLogConfiguration.getReadAheadBatchSize();
        this.idleWarnThresholdMillis = distributedLogConfiguration.getReaderIdleWarnThresholdMillis();
        this.readHandler = bKLogReadHandler;
        this.entryStore = logSegmentEntryStore;
        this.scheduler = orderedScheduler;
        this.readFunc = new ReadEntriesFunc(this.numReadAheadEntries);
        this.alertStatsLogger = alertStatsLogger;
        this.lastEntryAddedTime = Stopwatch.createStarted(ticker);
    }

    private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
        if (this.idleWarnThresholdMillis >= Integer.MAX_VALUE || this.idleWarnThresholdMillis <= 0) {
            return null;
        }
        return this.scheduler.scheduleAtFixedRateOrdered(this.streamName, () -> {
            if (isReaderIdle(this.idleWarnThresholdMillis, TimeUnit.MILLISECONDS)) {
                unsafeCheckIfReadAheadIsIdle();
            }
        }, this.idleWarnThresholdMillis, this.idleWarnThresholdMillis, TimeUnit.MILLISECONDS);
    }

    private void unsafeCheckIfReadAheadIsIdle() {
        if (null == this.currentSegmentReader || this.currentSegmentReader.isBeyondLastAddConfirmed()) {
            this.readHandler.readLogSegmentsFromStore(LogSegmentMetadata.COMPARATOR, LogSegmentFilter.DEFAULT_FILTER, null).whenComplete((BiConsumer<? super Versioned<List<LogSegmentMetadata>>, ? super Throwable>) new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { // from class: org.apache.distributedlog.ReadAheadEntryReader.1
                public void onFailure(Throwable th) {
                }

                public void onSuccess(Versioned<List<LogSegmentMetadata>> versioned) {
                    ReadAheadEntryReader.this.onSegmentsUpdated((List) versioned.getValue());
                }
            });
        }
    }

    private void cancelIdleReaderTask() {
        if (null != this.idleReaderCheckTask) {
            this.idleReaderCheckTask.cancel(true);
        }
    }

    @VisibleForTesting
    EntryPosition getNextEntryPosition() {
        return this.nextEntryPosition;
    }

    @VisibleForTesting
    SegmentReader getCurrentSegmentReader() {
        return this.currentSegmentReader;
    }

    @VisibleForTesting
    long getCurrentSegmentSequenceNumber() {
        return this.currentSegmentSequenceNumber;
    }

    @VisibleForTesting
    SegmentReader getNextSegmentReader() {
        return this.nextSegmentReader;
    }

    @VisibleForTesting
    LinkedList<SegmentReader> getSegmentReaders() {
        return this.segmentReaders;
    }

    @VisibleForTesting
    boolean isInitialized() {
        return this.isInitialized;
    }

    private void orderedSubmit(SafeRunnable safeRunnable) {
        synchronized (this) {
            if (null != this.closePromise) {
                return;
            }
            try {
                this.scheduler.executeOrdered(this.streamName, safeRunnable);
            } catch (RejectedExecutionException e) {
                logger.debug("Failed to submit and execute an operation for readhead entry reader of {}", this.streamName, e);
            }
        }
    }

    public void start(List<LogSegmentMetadata> list) {
        logger.info("Starting the readahead entry reader for {} : segments = {}", this.readHandler.getFullyQualifiedName(), list);
        this.started.set(true);
        processLogSegments(list);
    }

    private void removeClosedSegmentReaders() {
        orderedSubmit(new CloseableRunnable() { // from class: org.apache.distributedlog.ReadAheadEntryReader.2
            public void safeRun() {
                ReadAheadEntryReader.this.unsafeRemoveClosedSegmentReaders();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void unsafeRemoveClosedSegmentReaders() {
        SegmentReader peekFirst = this.segmentReadersToClose.peekFirst();
        while (true) {
            SegmentReader segmentReader = peekFirst;
            if (null == segmentReader || !segmentReader.isClosed()) {
                return;
            }
            this.segmentReadersToClose.pollFirst();
            peekFirst = this.segmentReadersToClose.peekFirst();
        }
    }

    synchronized boolean isClosed() {
        return null != this.closePromise;
    }

    public CompletableFuture<Void> asyncClose() {
        synchronized (this) {
            if (null != this.closePromise) {
                return this.closePromise;
            }
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            this.closePromise = completableFuture;
            cancelIdleReaderTask();
            try {
                this.scheduler.executeOrdered(this.streamName, () -> {
                    unsafeAsyncClose(completableFuture);
                });
            } catch (RejectedExecutionException e) {
                logger.warn("Scheduler has been shutdown before closing the readahead entry reader for stream {}", this.streamName, e);
                unsafeAsyncClose(completableFuture);
            }
            return completableFuture;
        }
    }

    private void unsafeAsyncClose(CompletableFuture<Void> completableFuture) {
        if (logger.isDebugEnabled()) {
            logger.debug("[{}][state:{}] Closing read ahead", this.streamName, this.state);
        }
        this.state = State.CLOSED;
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(this.segmentReaders.size() + this.segmentReadersToClose.size() + 1);
        if (null != this.currentSegmentReader) {
            this.segmentReadersToClose.add(this.currentSegmentReader);
        }
        if (null != this.nextSegmentReader) {
            this.segmentReadersToClose.add(this.nextSegmentReader);
        }
        Iterator<SegmentReader> it = this.segmentReaders.iterator();
        while (it.hasNext()) {
            this.segmentReadersToClose.add(it.next());
        }
        this.segmentReaders.clear();
        Iterator<SegmentReader> it2 = this.segmentReadersToClose.iterator();
        while (it2.hasNext()) {
            newArrayListWithExpectedSize.add(it2.next().close());
        }
        FutureUtils.proxyTo(FutureUtils.collect(newArrayListWithExpectedSize).thenApply(list -> {
            return null;
        }), completableFuture);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReadAheadEntryReader addStateChangeNotification(AsyncNotification asyncNotification) {
        this.stateChangeNotifications.add(asyncNotification);
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReadAheadEntryReader removeStateChangeNotification(AsyncNotification asyncNotification) {
        this.stateChangeNotifications.remove(asyncNotification);
        return this;
    }

    private void notifyStateChangeOnSuccess() {
        Iterator<AsyncNotification> it = this.stateChangeNotifications.iterator();
        while (it.hasNext()) {
            it.next().notifyOnOperationComplete();
        }
    }

    private void notifyStateChangeOnFailure(Throwable th) {
        Iterator<AsyncNotification> it = this.stateChangeNotifications.iterator();
        while (it.hasNext()) {
            it.next().notifyOnError(th);
        }
    }

    void setLastException(IOException iOException) {
        if (!this.lastException.compareAndSet(null, iOException)) {
            logger.debug("last exception has already been set to ", this.lastException.get());
        }
        notifyStateChangeOnFailure(iOException);
        orderedSubmit(new CloseableRunnable() { // from class: org.apache.distributedlog.ReadAheadEntryReader.3
            public void safeRun() {
                if (ReadAheadEntryReader.logger.isDebugEnabled()) {
                    ReadAheadEntryReader.logger.debug("[{}][state:{}] Read ahead errored", ReadAheadEntryReader.this.streamName, ReadAheadEntryReader.this.state);
                }
                ReadAheadEntryReader.this.state = State.ERROR;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void checkLastException() throws IOException {
        if (null != this.lastException.get()) {
            throw this.lastException.get();
        }
    }

    void checkCatchingUpStatus(LogSegmentEntryReader logSegmentEntryReader) {
        if (logSegmentEntryReader.getSegment().isInProgress() && this.isCatchingUp && logSegmentEntryReader.hasCaughtUpOnInprogress()) {
            logger.info("ReadAhead for {} is caught up at entry {} @ log segment {}.", new Object[]{this.readHandler.getFullyQualifiedName(), Long.valueOf(logSegmentEntryReader.getLastAddConfirmed()), logSegmentEntryReader.getSegment()});
            this.isCatchingUp = false;
        }
    }

    void markCaughtup() {
        if (this.isCatchingUp) {
            this.isCatchingUp = false;
            logger.info("ReadAhead for {} is caught up", this.readHandler.getFullyQualifiedName());
        }
    }

    public boolean isReadAheadCaughtUp() {
        return !this.isCatchingUp;
    }

    @Override // org.apache.distributedlog.logsegment.LogSegmentEntryReader.StateChangeListener
    public void onCaughtupOnInprogress() {
        markCaughtup();
    }

    public void onSuccess(List<Entry.Reader> list) {
        if (isClosed()) {
            Iterator<Entry.Reader> it = list.iterator();
            while (it.hasNext()) {
                it.next().release();
            }
            return;
        }
        this.lastEntryAddedTime.reset().start();
        Iterator<Entry.Reader> it2 = list.iterator();
        while (it2.hasNext()) {
            this.entryQueue.add(it2.next());
        }
        if (!list.isEmpty()) {
            Entry.Reader reader = list.get(list.size() - 1);
            this.nextEntryPosition.advance(reader.getLSSN(), reader.getEntryId() + 1);
        }
        notifyStateChangeOnSuccess();
        completeRead();
        scheduleRead();
    }

    public void onFailure(Throwable th) {
        completeRead();
        if (th instanceof EndOfLogSegmentException) {
            moveToNextLogSegment();
        } else if (th instanceof IOException) {
            setLastException((IOException) th);
        } else {
            setLastException(new UnexpectedException("Unexpected non I/O exception", th));
        }
    }

    /* JADX WARN: Unreachable blocks removed: 9, instructions: 11 */
    public Entry.Reader getNextReadAheadEntry(long j, TimeUnit timeUnit) throws IOException {
        if (null != this.lastException.get()) {
            throw this.lastException.get();
        }
        try {
            Entry.Reader poll = this.entryQueue.poll(j, timeUnit);
            if (null != poll && !isCacheFull()) {
                scheduleRead();
            }
            return poll;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new DLInterruptedException("Interrupted on waiting next readahead entry : ", e);
        }
    }

    public int getNumCachedEntries() {
        return this.entryQueue.size();
    }

    public boolean isCacheFull() {
        return getNumCachedEntries() >= this.maxCachedEntries;
    }

    @VisibleForTesting
    public boolean isCacheEmpty() {
        return this.entryQueue.isEmpty();
    }

    public boolean isReaderIdle(int i, TimeUnit timeUnit) {
        return this.lastEntryAddedTime.elapsed(timeUnit) > ((long) i);
    }

    void processLogSegments(final List<LogSegmentMetadata> list) {
        orderedSubmit(new CloseableRunnable() { // from class: org.apache.distributedlog.ReadAheadEntryReader.4
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            public void safeRun() {
                ReadAheadEntryReader.this.unsafeProcessLogSegments(list);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void unsafeProcessLogSegments(List<LogSegmentMetadata> list) {
        if (this.isInitialized) {
            unsafeReinitializeLogSegments(list);
        } else {
            unsafeInitializeLogSegments(list);
        }
    }

    private boolean updateLogSegmentMetadata(SegmentReader segmentReader, LogSegmentMetadata logSegmentMetadata) {
        if (segmentReader.getSegment().getLogSegmentSequenceNumber() != logSegmentMetadata.getLogSegmentSequenceNumber()) {
            logger.error("Inconsistent state found in entry reader for {} : current segment = {}, new segment = {}", new Object[]{this.streamName, segmentReader.getSegment(), logSegmentMetadata});
            setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for " + this.streamName + " : current segment = " + segmentReader.getSegment() + ", new segment = " + logSegmentMetadata));
            return false;
        }
        if (!segmentReader.getSegment().isInProgress() && logSegmentMetadata.isInProgress()) {
            setLastException(new DLIllegalStateException("An inprogress log segment " + logSegmentMetadata + " received after a closed log segment " + segmentReader.getSegment() + " on reading segment " + logSegmentMetadata.getLogSegmentSequenceNumber() + " @ stream " + this.streamName));
            return false;
        }
        if (!segmentReader.getSegment().isInProgress() || logSegmentMetadata.isInProgress()) {
            return true;
        }
        segmentReader.updateLogSegmentMetadata(logSegmentMetadata);
        return true;
    }

    private void unsafeReinitializeLogSegments(List<LogSegmentMetadata> list) {
        logger.info("Reinitialize log segments with {}", list);
        int i = 0;
        while (i < list.size() && list.get(i).getLogSegmentSequenceNumber() < this.currentSegmentSequenceNumber) {
            i++;
        }
        if (i >= list.size()) {
            return;
        }
        LogSegmentMetadata logSegmentMetadata = list.get(i);
        if (null != this.currentSegmentReader) {
            if (!updateLogSegmentMetadata(this.currentSegmentReader, logSegmentMetadata)) {
                return;
            }
        } else if (this.currentSegmentSequenceNumber != logSegmentMetadata.getLogSegmentSequenceNumber()) {
            logger.error("Inconsistent state found in entry reader for {} : current segment sn = {}, new segment sn = {}", new Object[]{this.streamName, Long.valueOf(this.currentSegmentSequenceNumber), Long.valueOf(logSegmentMetadata.getLogSegmentSequenceNumber())});
            setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for " + this.streamName + " : current segment sn = " + this.currentSegmentSequenceNumber + ", new segment sn = " + logSegmentMetadata.getLogSegmentSequenceNumber()));
            return;
        }
        int i2 = i + 1;
        if (i2 >= list.size()) {
            return;
        }
        LogSegmentMetadata logSegmentMetadata2 = list.get(i2);
        if (null != this.nextSegmentReader) {
            if (!updateLogSegmentMetadata(this.nextSegmentReader, logSegmentMetadata2)) {
                return;
            } else {
                i2++;
            }
        }
        int i3 = 0;
        while (i3 < this.segmentReaders.size() && i2 < list.size()) {
            if (!updateLogSegmentMetadata(this.segmentReaders.get(i3), list.get(i2))) {
                return;
            }
            i3++;
            i2++;
        }
        while (i2 < list.size()) {
            SegmentReader segmentReader = new SegmentReader(list.get(i2), 0L);
            segmentReader.openReader();
            this.segmentReaders.add(segmentReader);
            i2++;
        }
        if (null == this.currentSegmentReader) {
            unsafeMoveToNextLogSegment();
        }
        scheduleRead();
    }

    private void unsafeInitializeLogSegments(List<LogSegmentMetadata> list) {
        if (list.isEmpty()) {
            return;
        }
        boolean z = true;
        DLSN dlsn = this.fromDLSN;
        for (int i = 0; i < list.size(); i++) {
            LogSegmentMetadata logSegmentMetadata = list.get(i);
            if (logSegmentMetadata.getLogSegmentSequenceNumber() >= this.fromDLSN.getLogSegmentSequenceNo() && (!z || this.conf.getIgnoreTruncationStatus() || !logSegmentMetadata.isTruncated())) {
                if (z && !this.conf.getIgnoreTruncationStatus() && logSegmentMetadata.isPartiallyTruncated() && logSegmentMetadata.getMinActiveDLSN().compareTo(this.fromDLSN) > 0) {
                    dlsn = logSegmentMetadata.getMinActiveDLSN();
                }
                z = false;
                if (!isAllowedToPosition(logSegmentMetadata, dlsn)) {
                    logger.error("segment {} is not allowed to position at {}", logSegmentMetadata, dlsn);
                    return;
                }
                this.segmentReaders.add(new SegmentReader(logSegmentMetadata, logSegmentMetadata.getLogSegmentSequenceNumber() == dlsn.getLogSegmentSequenceNo() ? dlsn.getEntryId() : 0L));
            }
        }
        if (this.segmentReaders.isEmpty()) {
            return;
        }
        this.currentSegmentReader = this.segmentReaders.pollFirst();
        this.currentSegmentReader.openReader();
        this.currentSegmentReader.startRead();
        this.currentSegmentSequenceNumber = this.currentSegmentReader.getSegment().getLogSegmentSequenceNumber();
        scheduleRead();
        if (!this.segmentReaders.isEmpty()) {
            Iterator<SegmentReader> it = this.segmentReaders.iterator();
            while (it.hasNext()) {
                it.next().openReader();
            }
            unsafePrefetchNextSegment(true);
        }
        this.isInitialized = true;
    }

    private void unsafePrefetchNextSegment(boolean z) {
        SegmentReader peekFirst = this.segmentReaders.peekFirst();
        if (null != peekFirst) {
            if (!z || peekFirst.getSegment().isInProgress()) {
                peekFirst.startRead();
                this.nextSegmentReader = peekFirst;
                this.segmentReaders.pollFirst();
            }
        }
    }

    private boolean isAllowedToPosition(LogSegmentMetadata logSegmentMetadata, DLSN dlsn) {
        if (logSegmentMetadata.isTruncated() && logSegmentMetadata.getLastDLSN().compareTo(dlsn) >= 0 && !this.conf.getIgnoreTruncationStatus()) {
            setLastException(new AlreadyTruncatedTransactionException(this.streamName + " : trying to position read ahead at " + dlsn + " on a segment " + logSegmentMetadata + " that is already marked as truncated"));
            return false;
        }
        if (!logSegmentMetadata.isPartiallyTruncated() || logSegmentMetadata.getMinActiveDLSN().compareTo(dlsn) <= 0) {
            return true;
        }
        if (this.conf.getAlertWhenPositioningOnTruncated()) {
            this.alertStatsLogger.raise("Trying to position reader on {} when {} is marked partially truncated", new Object[]{dlsn, logSegmentMetadata});
        }
        if (this.conf.getIgnoreTruncationStatus()) {
            return true;
        }
        logger.error("{}: Trying to position reader on {} when {} is marked partially truncated", new Object[]{this.streamName, dlsn, logSegmentMetadata});
        setLastException(new AlreadyTruncatedTransactionException(this.streamName + " : trying to position read ahead at " + dlsn + " on a segment " + logSegmentMetadata + " that is already marked as truncated"));
        return false;
    }

    void moveToNextLogSegment() {
        orderedSubmit(new CloseableRunnable() { // from class: org.apache.distributedlog.ReadAheadEntryReader.5
            public void safeRun() {
                ReadAheadEntryReader.this.unsafeMoveToNextLogSegment();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void unsafeMoveToNextLogSegment() {
        if (null != this.currentSegmentReader) {
            this.segmentReadersToClose.add(this.currentSegmentReader);
            FutureUtils.ensure(this.currentSegmentReader.close(), this.removeClosedSegmentReadersFunc);
            logger.debug("close current segment reader {}", this.currentSegmentReader.getSegment());
            this.currentSegmentReader = null;
        }
        boolean z = false;
        if (null != this.nextSegmentReader) {
            this.currentSegmentReader = this.nextSegmentReader;
            logger.debug("move to read segment {}", this.currentSegmentReader.getSegment());
            this.currentSegmentSequenceNumber = this.currentSegmentReader.getSegment().getLogSegmentSequenceNumber();
            this.nextSegmentReader = null;
            unsafePrefetchNextSegment(true);
            z = true;
        } else {
            unsafePrefetchNextSegment(false);
            if (null != this.nextSegmentReader) {
                this.currentSegmentReader = this.nextSegmentReader;
                logger.debug("move to read segment {}", this.currentSegmentReader.getSegment());
                this.currentSegmentSequenceNumber = this.currentSegmentReader.getSegment().getLogSegmentSequenceNumber();
                this.nextSegmentReader = null;
                unsafePrefetchNextSegment(true);
                z = true;
            }
        }
        if (!z && this.isCatchingUp) {
            logger.info("ReadAhead for {} is caught up and no log segments to read now", this.readHandler.getFullyQualifiedName());
            this.isCatchingUp = false;
        }
        scheduleRead();
    }

    void completeRead() {
        orderedSubmit(new CloseableRunnable() { // from class: org.apache.distributedlog.ReadAheadEntryReader.6
            public void safeRun() {
                if (ReadAheadEntryReader.logger.isDebugEnabled()) {
                    ReadAheadEntryReader.logger.debug("[{}][state:{}] Read completed", ReadAheadEntryReader.this.streamName, ReadAheadEntryReader.this.state);
                }
                if (ReadAheadEntryReader.this.state == State.READING) {
                    ReadAheadEntryReader.this.state = State.IDLE;
                }
            }
        });
    }

    void scheduleRead() {
        orderedSubmit(new CloseableRunnable() { // from class: org.apache.distributedlog.ReadAheadEntryReader.7
            public void safeRun() {
                boolean isCacheFull = ReadAheadEntryReader.this.isCacheFull();
                SegmentReader segmentReader = ReadAheadEntryReader.this.currentSegmentReader;
                boolean z = segmentReader != null;
                if (ReadAheadEntryReader.logger.isDebugEnabled()) {
                    ReadAheadEntryReader.logger.debug("[{}][state:{}] scheduling read, cacheFull {}, hasMoreSegments {}", new Object[]{ReadAheadEntryReader.this.streamName, ReadAheadEntryReader.this.state, Boolean.valueOf(isCacheFull), Boolean.valueOf(z)});
                }
                switch (AnonymousClass8.$SwitchMap$org$apache$distributedlog$ReadAheadEntryReader$State[ReadAheadEntryReader.this.state.ordinal()]) {
                    case 1:
                        if (isCacheFull || !z) {
                            ReadAheadEntryReader.this.state = State.PAUSED;
                            return;
                        } else {
                            segmentReader.readNext().whenComplete((BiConsumer<? super List<Entry.Reader>, ? super Throwable>) ReadAheadEntryReader.this);
                            ReadAheadEntryReader.this.state = State.READING;
                            return;
                        }
                    case 2:
                        if (isCacheFull || !z) {
                            return;
                        }
                        segmentReader.readNext().whenComplete((BiConsumer<? super List<Entry.Reader>, ? super Throwable>) ReadAheadEntryReader.this);
                        ReadAheadEntryReader.this.state = State.READING;
                        return;
                    case 3:
                    case 4:
                    case 5:
                    default:
                        return;
                }
            }
        });
    }

    @Override // org.apache.distributedlog.callback.LogSegmentListener
    public void onSegmentsUpdated(List<LogSegmentMetadata> list) {
        if (this.started.get()) {
            logger.info("segments is updated with {}", list);
            processLogSegments(list);
        }
    }

    @Override // org.apache.distributedlog.callback.LogSegmentListener
    public void onLogStreamDeleted() {
        setLastException(new LogNotFoundException("Log stream " + this.streamName + " is deleted"));
    }
}
