/*
 * Decompiled with CFR 0.152.
 */
package org.apache.distributedlog;

import dlshade.com.google.common.annotations.VisibleForTesting;
import dlshade.com.google.common.base.Stopwatch;
import dlshade.com.google.common.base.Ticker;
import dlshade.com.google.common.collect.Lists;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import dlshade.org.apache.bookkeeper.common.util.OrderedScheduler;
import dlshade.org.apache.bookkeeper.common.util.SafeRunnable;
import dlshade.org.apache.bookkeeper.stats.AlertStatsLogger;
import dlshade.org.apache.bookkeeper.versioning.Versioned;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.distributedlog.AsyncNotification;
import org.apache.distributedlog.BKLogReadHandler;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.Entry;
import org.apache.distributedlog.EntryPosition;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.callback.LogSegmentListener;
import org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException;
import org.apache.distributedlog.exceptions.DLIllegalStateException;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.exceptions.EndOfLogSegmentException;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.io.AsyncCloseable;
import org.apache.distributedlog.logsegment.LogSegmentEntryReader;
import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
import org.apache.distributedlog.logsegment.LogSegmentFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ReadAheadEntryReader
implements AsyncCloseable,
LogSegmentListener,
LogSegmentEntryReader.StateChangeListener,
FutureEventListener<List<Entry.Reader>> {
    private static final Logger logger = LoggerFactory.getLogger(ReadAheadEntryReader.class);
    private State state = State.IDLE;
    private static Function<LogSegmentEntryReader, Void> START_READER_FUNC = reader -> {
        reader.start();
        return null;
    };
    private final Function<LogSegmentEntryReader, CompletableFuture<List<Entry.Reader>>> readFunc;
    private final Runnable removeClosedSegmentReadersFunc = () -> this.removeClosedSegmentReaders();
    private final DistributedLogConfiguration conf;
    private final BKLogReadHandler readHandler;
    private final LogSegmentEntryStore entryStore;
    private final OrderedScheduler scheduler;
    private final String streamName;
    private final DLSN fromDLSN;
    private final int maxCachedEntries;
    private final int numReadAheadEntries;
    private final int idleWarnThresholdMillis;
    private final LinkedBlockingQueue<Entry.Reader> entryQueue;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private boolean isInitialized = false;
    private boolean readAheadPaused = false;
    private CompletableFuture<Void> closePromise = null;
    private long currentSegmentSequenceNumber;
    private SegmentReader currentSegmentReader;
    private SegmentReader nextSegmentReader;
    private DLSN lastDLSN;
    private final EntryPosition nextEntryPosition;
    private volatile boolean isCatchingUp = true;
    private final LinkedList<SegmentReader> segmentReaders;
    private final LinkedList<SegmentReader> segmentReadersToClose;
    private final AtomicReference<IOException> lastException = new AtomicReference<Object>(null);
    private final Stopwatch lastEntryAddedTime;
    private final CopyOnWriteArraySet<AsyncNotification> stateChangeNotifications = new CopyOnWriteArraySet();
    private final ScheduledFuture<?> idleReaderCheckTask;
    private final AlertStatsLogger alertStatsLogger;

    public ReadAheadEntryReader(String streamName, DLSN fromDLSN, DistributedLogConfiguration conf, BKLogReadHandler readHandler, LogSegmentEntryStore entryStore, OrderedScheduler scheduler, Ticker ticker, AlertStatsLogger alertStatsLogger) {
        this.streamName = streamName;
        this.fromDLSN = this.lastDLSN = fromDLSN;
        this.nextEntryPosition = new EntryPosition(fromDLSN.getLogSegmentSequenceNo(), fromDLSN.getEntryId());
        this.conf = conf;
        this.maxCachedEntries = conf.getReadAheadMaxRecords();
        this.numReadAheadEntries = conf.getReadAheadBatchSize();
        this.idleWarnThresholdMillis = conf.getReaderIdleWarnThresholdMillis();
        this.readHandler = readHandler;
        this.entryStore = entryStore;
        this.scheduler = scheduler;
        this.readFunc = new ReadEntriesFunc(this.numReadAheadEntries);
        this.alertStatsLogger = alertStatsLogger;
        this.segmentReaders = new LinkedList();
        this.segmentReadersToClose = new LinkedList();
        this.entryQueue = new LinkedBlockingQueue();
        this.lastEntryAddedTime = Stopwatch.createStarted(ticker);
        this.idleReaderCheckTask = this.scheduleIdleReaderTaskIfNecessary();
    }

    private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
        if (this.idleWarnThresholdMillis < Integer.MAX_VALUE && this.idleWarnThresholdMillis > 0) {
            return this.scheduler.scheduleAtFixedRateOrdered(this.streamName, () -> {
                if (!this.isReaderIdle(this.idleWarnThresholdMillis, TimeUnit.MILLISECONDS)) {
                    return;
                }
                this.unsafeCheckIfReadAheadIsIdle();
            }, this.idleWarnThresholdMillis, this.idleWarnThresholdMillis, TimeUnit.MILLISECONDS);
        }
        return null;
    }

    private void unsafeCheckIfReadAheadIsIdle() {
        boolean forceReadLogSegments;
        boolean bl = forceReadLogSegments = null == this.currentSegmentReader || this.currentSegmentReader.isBeyondLastAddConfirmed();
        if (forceReadLogSegments) {
            this.readHandler.readLogSegmentsFromStore(LogSegmentMetadata.COMPARATOR, LogSegmentFilter.DEFAULT_FILTER, null).whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>(){

                @Override
                public void onFailure(Throwable cause) {
                }

                @Override
                public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) {
                    ReadAheadEntryReader.this.onSegmentsUpdated(segments.getValue());
                }
            });
        }
    }

    private void cancelIdleReaderTask() {
        if (null != this.idleReaderCheckTask) {
            this.idleReaderCheckTask.cancel(true);
        }
    }

    @VisibleForTesting
    EntryPosition getNextEntryPosition() {
        return this.nextEntryPosition;
    }

    @VisibleForTesting
    SegmentReader getCurrentSegmentReader() {
        return this.currentSegmentReader;
    }

    @VisibleForTesting
    long getCurrentSegmentSequenceNumber() {
        return this.currentSegmentSequenceNumber;
    }

    @VisibleForTesting
    SegmentReader getNextSegmentReader() {
        return this.nextSegmentReader;
    }

    @VisibleForTesting
    LinkedList<SegmentReader> getSegmentReaders() {
        return this.segmentReaders;
    }

    @VisibleForTesting
    boolean isInitialized() {
        return this.isInitialized;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void orderedSubmit(SafeRunnable runnable) {
        ReadAheadEntryReader readAheadEntryReader = this;
        synchronized (readAheadEntryReader) {
            if (null != this.closePromise) {
                return;
            }
        }
        try {
            this.scheduler.executeOrdered(this.streamName, runnable);
        }
        catch (RejectedExecutionException ree) {
            logger.debug("Failed to submit and execute an operation for readhead entry reader of {}", (Object)this.streamName, (Object)ree);
        }
    }

    public void start(List<LogSegmentMetadata> segmentList) {
        logger.info("Starting the readahead entry reader for {} : number of segments: {}, top 10 segments = {}", new Object[]{this.readHandler.getFullyQualifiedName(), segmentList.size(), segmentList.size() > 10 ? segmentList.stream().limit(10L).collect(Collectors.toList()) : segmentList});
        this.started.set(true);
        this.processLogSegments(segmentList);
    }

    private void removeClosedSegmentReaders() {
        this.orderedSubmit(new CloseableRunnable(){

            @Override
            public void safeRun() {
                ReadAheadEntryReader.this.unsafeRemoveClosedSegmentReaders();
            }
        });
    }

    private void unsafeRemoveClosedSegmentReaders() {
        SegmentReader reader = this.segmentReadersToClose.peekFirst();
        while (null != reader && reader.isClosed()) {
            this.segmentReadersToClose.pollFirst();
            reader = this.segmentReadersToClose.peekFirst();
        }
    }

    synchronized boolean isClosed() {
        return null != this.closePromise;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> asyncClose() {
        CompletableFuture<Void> closeFuture;
        ReadAheadEntryReader readAheadEntryReader = this;
        synchronized (readAheadEntryReader) {
            if (null != this.closePromise) {
                return this.closePromise;
            }
            closeFuture = new CompletableFuture<Void>();
            this.closePromise = closeFuture;
        }
        this.cancelIdleReaderTask();
        try {
            this.scheduler.executeOrdered(this.streamName, () -> this.unsafeAsyncClose(closeFuture));
        }
        catch (RejectedExecutionException ree) {
            logger.warn("Scheduler has been shutdown before closing the readahead entry reader for stream {}", (Object)this.streamName, (Object)ree);
            this.unsafeAsyncClose(closeFuture);
        }
        return closeFuture;
    }

    private void unsafeAsyncClose(CompletableFuture<Void> closePromise) {
        if (logger.isDebugEnabled()) {
            logger.debug("[{}][state:{}] Closing read ahead", (Object)this.streamName, (Object)this.state);
        }
        this.state = State.CLOSED;
        ArrayList closeFutures = Lists.newArrayListWithExpectedSize(this.segmentReaders.size() + this.segmentReadersToClose.size() + 1);
        if (null != this.currentSegmentReader) {
            this.segmentReadersToClose.add(this.currentSegmentReader);
        }
        if (null != this.nextSegmentReader) {
            this.segmentReadersToClose.add(this.nextSegmentReader);
        }
        this.segmentReadersToClose.addAll(this.segmentReaders);
        this.segmentReaders.clear();
        for (SegmentReader reader : this.segmentReadersToClose) {
            closeFutures.add(reader.close());
        }
        FutureUtils.proxyTo(FutureUtils.collect(closeFutures).thenApply(value -> null), closePromise);
    }

    ReadAheadEntryReader addStateChangeNotification(AsyncNotification notification) {
        this.stateChangeNotifications.add(notification);
        return this;
    }

    ReadAheadEntryReader removeStateChangeNotification(AsyncNotification notification) {
        this.stateChangeNotifications.remove(notification);
        return this;
    }

    private void notifyStateChangeOnSuccess() {
        for (AsyncNotification notification : this.stateChangeNotifications) {
            notification.notifyOnOperationComplete();
        }
    }

    private void notifyStateChangeOnFailure(Throwable cause) {
        for (AsyncNotification notification : this.stateChangeNotifications) {
            notification.notifyOnError(cause);
        }
    }

    void setLastException(IOException cause) {
        if (!this.lastException.compareAndSet(null, cause)) {
            logger.debug("last exception has already been set to ", (Throwable)this.lastException.get());
        }
        this.notifyStateChangeOnFailure(cause);
        this.orderedSubmit(new CloseableRunnable(){

            @Override
            public void safeRun() {
                if (logger.isDebugEnabled()) {
                    logger.debug("[{}][state:{}] Read ahead errored", (Object)ReadAheadEntryReader.this.streamName, (Object)ReadAheadEntryReader.this.state);
                }
                ReadAheadEntryReader.this.state = State.ERROR;
            }
        });
    }

    void checkLastException() throws IOException {
        if (null != this.lastException.get()) {
            throw this.lastException.get();
        }
    }

    void checkCatchingUpStatus(LogSegmentEntryReader reader) {
        if (reader.getSegment().isInProgress() && this.isCatchingUp && reader.hasCaughtUpOnInprogress()) {
            logger.info("ReadAhead for {} is caught up at entry {} @ log segment {}.", new Object[]{this.readHandler.getFullyQualifiedName(), reader.getLastAddConfirmed(), reader.getSegment()});
            this.isCatchingUp = false;
        }
    }

    void markCaughtup() {
        if (this.isCatchingUp) {
            this.isCatchingUp = false;
            logger.info("ReadAhead for {} is caught up", (Object)this.readHandler.getFullyQualifiedName());
        }
    }

    public boolean isReadAheadCaughtUp() {
        return !this.isCatchingUp;
    }

    @Override
    public void onCaughtupOnInprogress() {
        this.markCaughtup();
    }

    @Override
    public void onSuccess(List<Entry.Reader> entries) {
        if (this.isClosed()) {
            for (Entry.Reader entry : entries) {
                entry.release();
            }
            return;
        }
        this.lastEntryAddedTime.reset().start();
        this.entryQueue.addAll(entries);
        if (!entries.isEmpty()) {
            Entry.Reader lastEntry = entries.get(entries.size() - 1);
            this.nextEntryPosition.advance(lastEntry.getLSSN(), lastEntry.getEntryId() + 1L);
        }
        this.notifyStateChangeOnSuccess();
        this.completeRead();
        this.scheduleRead();
    }

    @Override
    public void onFailure(Throwable cause) {
        this.completeRead();
        if (cause instanceof EndOfLogSegmentException) {
            this.moveToNextLogSegment();
            return;
        }
        if (cause instanceof IOException) {
            this.setLastException((IOException)cause);
        } else {
            this.setLastException(new UnexpectedException("Unexpected non I/O exception", cause));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Entry.Reader getNextReadAheadEntry(long waitTime, TimeUnit waitTimeUnit) throws IOException {
        Entry.Reader entry;
        if (null != this.lastException.get()) {
            throw this.lastException.get();
        }
        try {
            entry = this.entryQueue.poll(waitTime, waitTimeUnit);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new DLInterruptedException("Interrupted on waiting next readahead entry : ", (Throwable)e);
        }
        try {
            Entry.Reader reader = entry;
            return reader;
        }
        finally {
            if (null != entry && !this.isCacheFull()) {
                this.scheduleRead();
            }
        }
    }

    public int getNumCachedEntries() {
        return this.entryQueue.size();
    }

    public boolean isCacheFull() {
        return this.getNumCachedEntries() >= this.maxCachedEntries;
    }

    @VisibleForTesting
    public boolean isCacheEmpty() {
        return this.entryQueue.isEmpty();
    }

    public boolean isReaderIdle(int idleReaderErrorThreshold, TimeUnit timeUnit) {
        return this.lastEntryAddedTime.elapsed(timeUnit) > (long)idleReaderErrorThreshold;
    }

    void processLogSegments(final List<LogSegmentMetadata> segments) {
        this.orderedSubmit(new CloseableRunnable(){

            @Override
            public void safeRun() {
                ReadAheadEntryReader.this.unsafeProcessLogSegments(segments);
            }
        });
    }

    private void unsafeProcessLogSegments(List<LogSegmentMetadata> segments) {
        if (this.isInitialized) {
            this.unsafeReinitializeLogSegments(segments);
        } else {
            this.unsafeInitializeLogSegments(segments);
        }
    }

    private boolean updateLogSegmentMetadata(SegmentReader reader, LogSegmentMetadata newMetadata) {
        if (reader.getSegment().getLogSegmentSequenceNumber() != newMetadata.getLogSegmentSequenceNumber()) {
            logger.error("Inconsistent state found in entry reader for {} : current segment = {}, new segment = {}", new Object[]{this.streamName, reader.getSegment(), newMetadata});
            this.setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for " + this.streamName + " : current segment = " + reader.getSegment() + ", new segment = " + newMetadata));
            return false;
        }
        if (!reader.getSegment().isInProgress() && newMetadata.isInProgress()) {
            this.setLastException(new DLIllegalStateException("An inprogress log segment " + newMetadata + " received after a closed log segment " + reader.getSegment() + " on reading segment " + newMetadata.getLogSegmentSequenceNumber() + " @ stream " + this.streamName));
            return false;
        }
        if (reader.getSegment().isInProgress() && !newMetadata.isInProgress()) {
            reader.updateLogSegmentMetadata(newMetadata);
        }
        return true;
    }

    private void unsafeReinitializeLogSegments(List<LogSegmentMetadata> segments) {
        LogSegmentMetadata segment;
        int segmentIdx;
        logger.info("Reinitialize log segments with {}", segments);
        for (segmentIdx = 0; segmentIdx < segments.size() && (segment = segments.get(segmentIdx)).getLogSegmentSequenceNumber() < this.currentSegmentSequenceNumber; ++segmentIdx) {
        }
        if (segmentIdx >= segments.size()) {
            return;
        }
        segment = segments.get(segmentIdx);
        if (null != this.currentSegmentReader) {
            if (!this.updateLogSegmentMetadata(this.currentSegmentReader, segment)) {
                return;
            }
        } else if (this.currentSegmentSequenceNumber != segment.getLogSegmentSequenceNumber()) {
            logger.error("Inconsistent state found in entry reader for {} : current segment sn = {}, new segment sn = {}", new Object[]{this.streamName, this.currentSegmentSequenceNumber, segment.getLogSegmentSequenceNumber()});
            this.setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for " + this.streamName + " : current segment sn = " + this.currentSegmentSequenceNumber + ", new segment sn = " + segment.getLogSegmentSequenceNumber()));
            return;
        }
        if (++segmentIdx >= segments.size()) {
            return;
        }
        segment = segments.get(segmentIdx);
        if (null != this.nextSegmentReader) {
            if (!this.updateLogSegmentMetadata(this.nextSegmentReader, segment)) {
                return;
            }
            ++segmentIdx;
        }
        for (int readerIdx = 0; readerIdx < this.segmentReaders.size() && segmentIdx < segments.size(); ++readerIdx, ++segmentIdx) {
            SegmentReader reader = this.segmentReaders.get(readerIdx);
            if (this.updateLogSegmentMetadata(reader, segment = segments.get(segmentIdx))) continue;
            return;
        }
        while (segmentIdx < segments.size()) {
            segment = segments.get(segmentIdx);
            SegmentReader reader = new SegmentReader(segment, 0L);
            reader.openReader();
            this.segmentReaders.add(reader);
            ++segmentIdx;
        }
        if (null == this.currentSegmentReader) {
            this.unsafeMoveToNextLogSegment();
        }
        this.scheduleRead();
    }

    private void unsafeInitializeLogSegments(List<LogSegmentMetadata> segments) {
        if (segments.isEmpty()) {
            return;
        }
        boolean skipTruncatedLogSegments = true;
        DLSN dlsnToStart = this.fromDLSN;
        for (int i = 0; i < segments.size(); ++i) {
            LogSegmentMetadata segment = segments.get(i);
            if (segment.getLogSegmentSequenceNumber() < this.fromDLSN.getLogSegmentSequenceNo() || skipTruncatedLogSegments && !this.conf.getIgnoreTruncationStatus() && segment.isTruncated()) continue;
            if (skipTruncatedLogSegments && !this.conf.getIgnoreTruncationStatus() && segment.isPartiallyTruncated() && segment.getMinActiveDLSN().compareTo(this.fromDLSN) > 0) {
                dlsnToStart = segment.getMinActiveDLSN();
            }
            skipTruncatedLogSegments = false;
            if (!this.isAllowedToPosition(segment, dlsnToStart)) {
                logger.error("segment {} is not allowed to position at {}", (Object)segment, (Object)dlsnToStart);
                return;
            }
            SegmentReader reader = new SegmentReader(segment, segment.getLogSegmentSequenceNumber() == dlsnToStart.getLogSegmentSequenceNo() ? dlsnToStart.getEntryId() : 0L);
            this.segmentReaders.add(reader);
        }
        if (this.segmentReaders.isEmpty()) {
            return;
        }
        this.currentSegmentReader = this.segmentReaders.pollFirst();
        this.currentSegmentReader.openReader();
        this.currentSegmentReader.startRead();
        this.currentSegmentSequenceNumber = this.currentSegmentReader.getSegment().getLogSegmentSequenceNumber();
        this.scheduleRead();
        if (!this.segmentReaders.isEmpty()) {
            for (SegmentReader reader : this.segmentReaders) {
                reader.openReader();
            }
            this.unsafePrefetchNextSegment(true);
        }
        this.isInitialized = true;
    }

    private void unsafePrefetchNextSegment(boolean onlyInprogressLogSegment) {
        SegmentReader nextReader = this.segmentReaders.peekFirst();
        if (null != nextReader) {
            if (onlyInprogressLogSegment && !nextReader.getSegment().isInProgress()) {
                return;
            }
            nextReader.startRead();
            this.nextSegmentReader = nextReader;
            this.segmentReaders.pollFirst();
        }
    }

    private boolean isAllowedToPosition(LogSegmentMetadata segment, DLSN fromDLSN) {
        if (segment.isTruncated() && segment.getLastDLSN().compareTo(fromDLSN) >= 0 && !this.conf.getIgnoreTruncationStatus()) {
            this.setLastException(new AlreadyTruncatedTransactionException(this.streamName + " : trying to position read ahead at " + fromDLSN + " on a segment " + segment + " that is already marked as truncated"));
            return false;
        }
        if (segment.isPartiallyTruncated() && segment.getMinActiveDLSN().compareTo(fromDLSN) > 0) {
            if (this.conf.getAlertWhenPositioningOnTruncated()) {
                this.alertStatsLogger.raise("Trying to position reader on {} when {} is marked partially truncated", fromDLSN, segment);
            }
            if (!this.conf.getIgnoreTruncationStatus()) {
                logger.error("{}: Trying to position reader on {} when {} is marked partially truncated", new Object[]{this.streamName, fromDLSN, segment});
                this.setLastException(new AlreadyTruncatedTransactionException(this.streamName + " : trying to position read ahead at " + fromDLSN + " on a segment " + segment + " that is already marked as truncated"));
                return false;
            }
        }
        return true;
    }

    void moveToNextLogSegment() {
        this.orderedSubmit(new CloseableRunnable(){

            @Override
            public void safeRun() {
                ReadAheadEntryReader.this.unsafeMoveToNextLogSegment();
            }
        });
    }

    private void unsafeMoveToNextLogSegment() {
        if (null != this.currentSegmentReader) {
            this.segmentReadersToClose.add(this.currentSegmentReader);
            FutureUtils.ensure(this.currentSegmentReader.close(), this.removeClosedSegmentReadersFunc);
            logger.debug("close current segment reader {}", (Object)this.currentSegmentReader.getSegment());
            this.currentSegmentReader = null;
        }
        boolean hasSegmentToRead = false;
        if (null != this.nextSegmentReader) {
            this.currentSegmentReader = this.nextSegmentReader;
            logger.debug("move to read segment {}", (Object)this.currentSegmentReader.getSegment());
            this.currentSegmentSequenceNumber = this.currentSegmentReader.getSegment().getLogSegmentSequenceNumber();
            this.nextSegmentReader = null;
            this.unsafePrefetchNextSegment(true);
            hasSegmentToRead = true;
        } else {
            this.unsafePrefetchNextSegment(false);
            if (null != this.nextSegmentReader) {
                this.currentSegmentReader = this.nextSegmentReader;
                logger.debug("move to read segment {}", (Object)this.currentSegmentReader.getSegment());
                this.currentSegmentSequenceNumber = this.currentSegmentReader.getSegment().getLogSegmentSequenceNumber();
                this.nextSegmentReader = null;
                this.unsafePrefetchNextSegment(true);
                hasSegmentToRead = true;
            }
        }
        if (!hasSegmentToRead && this.isCatchingUp) {
            logger.info("ReadAhead for {} is caught up and no log segments to read now", (Object)this.readHandler.getFullyQualifiedName());
            this.isCatchingUp = false;
        }
        this.scheduleRead();
    }

    void completeRead() {
        this.orderedSubmit(new CloseableRunnable(){

            @Override
            public void safeRun() {
                if (logger.isDebugEnabled()) {
                    logger.debug("[{}][state:{}] Read completed", (Object)ReadAheadEntryReader.this.streamName, (Object)ReadAheadEntryReader.this.state);
                }
                if (ReadAheadEntryReader.this.state == State.READING) {
                    ReadAheadEntryReader.this.state = State.IDLE;
                }
            }
        });
    }

    void scheduleRead() {
        this.orderedSubmit(new CloseableRunnable(){

            @Override
            public void safeRun() {
                boolean hasMoreSegments;
                boolean cacheFull = ReadAheadEntryReader.this.isCacheFull();
                SegmentReader reader = ReadAheadEntryReader.this.currentSegmentReader;
                boolean bl = hasMoreSegments = reader != null;
                if (logger.isDebugEnabled()) {
                    logger.debug("[{}][state:{}] scheduling read, cacheFull {}, hasMoreSegments {}", new Object[]{ReadAheadEntryReader.this.streamName, ReadAheadEntryReader.this.state, cacheFull, hasMoreSegments});
                }
                switch (ReadAheadEntryReader.this.state) {
                    case IDLE: {
                        if (cacheFull || !hasMoreSegments) {
                            ReadAheadEntryReader.this.state = State.PAUSED;
                            break;
                        }
                        reader.readNext().whenComplete((BiConsumer)ReadAheadEntryReader.this);
                        ReadAheadEntryReader.this.state = State.READING;
                        break;
                    }
                    case PAUSED: {
                        if (cacheFull || !hasMoreSegments) break;
                        reader.readNext().whenComplete((BiConsumer)ReadAheadEntryReader.this);
                        ReadAheadEntryReader.this.state = State.READING;
                        break;
                    }
                }
            }
        });
    }

    @Override
    public void onSegmentsUpdated(List<LogSegmentMetadata> segments) {
        if (!this.started.get()) {
            return;
        }
        logger.info("segments is updated with {}", segments);
        this.processLogSegments(segments);
    }

    @Override
    public void onLogStreamDeleted() {
        this.setLastException(new LogNotFoundException("Log stream " + this.streamName + " is deleted"));
    }

    private abstract class CloseableRunnable
    implements SafeRunnable {
        private CloseableRunnable() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            ReadAheadEntryReader readAheadEntryReader = ReadAheadEntryReader.this;
            synchronized (readAheadEntryReader) {
                if (null != ReadAheadEntryReader.this.closePromise) {
                    return;
                }
            }
            try {
                this.safeRun();
            }
            catch (Throwable cause) {
                logger.error("Caught unexpected exception : ", cause);
            }
        }
    }

    private class ReadEntriesFunc
    implements Function<LogSegmentEntryReader, CompletableFuture<List<Entry.Reader>>> {
        private final int numEntries;

        ReadEntriesFunc(int numEntries) {
            this.numEntries = numEntries;
        }

        @Override
        public CompletableFuture<List<Entry.Reader>> apply(LogSegmentEntryReader reader) {
            ReadAheadEntryReader.this.checkCatchingUpStatus(reader);
            return reader.readNext(this.numEntries);
        }
    }

    class SegmentReader
    implements FutureEventListener<LogSegmentEntryReader> {
        private LogSegmentMetadata metadata;
        private final long startEntryId;
        private CompletableFuture<LogSegmentEntryReader> openFuture = null;
        private LogSegmentEntryReader reader = null;
        private boolean isStarted = false;
        private boolean isClosed = false;

        SegmentReader(LogSegmentMetadata metadata, long startEntryId) {
            this.metadata = metadata;
            this.startEntryId = startEntryId;
        }

        synchronized LogSegmentEntryReader getEntryReader() {
            return this.reader;
        }

        synchronized boolean isBeyondLastAddConfirmed() {
            return null != this.reader && this.reader.isBeyondLastAddConfirmed();
        }

        synchronized LogSegmentMetadata getSegment() {
            return this.metadata;
        }

        synchronized boolean isReaderOpen() {
            return null != this.openFuture;
        }

        synchronized void openReader() {
            if (null != this.openFuture) {
                return;
            }
            this.openFuture = ReadAheadEntryReader.this.entryStore.openReader(this.metadata, this.startEntryId).whenComplete((BiConsumer)this);
        }

        synchronized boolean isReaderStarted() {
            return this.isStarted;
        }

        synchronized void startRead() {
            if (this.isStarted) {
                return;
            }
            this.isStarted = true;
            if (null != this.reader) {
                this.reader.start();
            } else {
                this.openFuture.thenApply(START_READER_FUNC);
            }
        }

        synchronized CompletableFuture<List<Entry.Reader>> readNext() {
            if (null != this.reader) {
                ReadAheadEntryReader.this.checkCatchingUpStatus(this.reader);
                return this.reader.readNext(ReadAheadEntryReader.this.numReadAheadEntries);
            }
            return this.openFuture.thenCompose(ReadAheadEntryReader.this.readFunc);
        }

        synchronized void updateLogSegmentMetadata(LogSegmentMetadata segment) {
            if (null != this.reader) {
                this.reader.onLogSegmentMetadataUpdated(segment);
                this.metadata = segment;
            } else {
                this.openFuture.thenAccept(reader1 -> {
                    reader1.onLogSegmentMetadataUpdated(segment);
                    SegmentReader segmentReader = this;
                    synchronized (segmentReader) {
                        this.metadata = segment;
                    }
                });
            }
        }

        @Override
        public synchronized void onSuccess(LogSegmentEntryReader reader) {
            this.reader = reader;
            if (reader.getSegment().isInProgress()) {
                reader.registerListener(ReadAheadEntryReader.this);
            }
        }

        @Override
        public void onFailure(Throwable cause) {
        }

        synchronized boolean isClosed() {
            return this.isClosed;
        }

        synchronized CompletableFuture<Void> close() {
            if (null == this.openFuture) {
                return FutureUtils.Void();
            }
            return FutureUtils.ensure(this.openFuture.thenCompose(reader1 -> reader1.asyncClose()), () -> {
                SegmentReader segmentReader = this;
                synchronized (segmentReader) {
                    this.isClosed = true;
                }
            });
        }
    }

    private static enum State {
        IDLE,
        READING,
        PAUSED,
        CLOSED,
        ERROR;

    }
}

