/*
 * Decompiled with CFR 0.152.
 */
package org.apache.distributedlog;

import dlshade.com.google.common.collect.Lists;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.Entry;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
import org.apache.distributedlog.logsegment.LogSegmentRandomAccessEntryReader;
import org.apache.distributedlog.selector.FirstDLSNNotLessThanSelector;
import org.apache.distributedlog.selector.FirstTxIdNotLessThanSelector;
import org.apache.distributedlog.selector.LastRecordSelector;
import org.apache.distributedlog.selector.LogRecordSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadUtils {
    static final Logger LOG = LoggerFactory.getLogger(ReadUtils.class);
    private static final int MIN_SEARCH_BATCH_SIZE = 2;

    public static CompletableFuture<LogRecordWithDLSN> asyncReadLastRecord(String streamName, LogSegmentMetadata l, boolean fence, boolean includeControl, boolean includeEndOfStream, int scanStartBatchSize, int scanMaxBatchSize, AtomicInteger numRecordsScanned, ExecutorService executorService, LogSegmentEntryStore entryStore) {
        LastRecordSelector selector = new LastRecordSelector();
        return ReadUtils.asyncReadRecord(streamName, l, fence, includeControl, includeEndOfStream, scanStartBatchSize, scanMaxBatchSize, numRecordsScanned, executorService, entryStore, selector, true, 0L);
    }

    public static CompletableFuture<LogRecordWithDLSN> asyncReadFirstUserRecord(String streamName, LogSegmentMetadata l, int scanStartBatchSize, int scanMaxBatchSize, AtomicInteger numRecordsScanned, ExecutorService executorService, LogSegmentEntryStore entryStore, DLSN dlsn) {
        long startEntryId = 0L;
        if (l.getLogSegmentSequenceNumber() == dlsn.getLogSegmentSequenceNo()) {
            startEntryId = dlsn.getEntryId();
        }
        FirstDLSNNotLessThanSelector selector = new FirstDLSNNotLessThanSelector(dlsn);
        return ReadUtils.asyncReadRecord(streamName, l, false, false, false, scanStartBatchSize, scanMaxBatchSize, numRecordsScanned, executorService, entryStore, selector, false, startEntryId);
    }

    private static CompletableFuture<LogRecordWithDLSN> asyncReadRecordFromEntries(final String streamName, LogSegmentRandomAccessEntryReader reader, final LogSegmentMetadata metadata, ExecutorService executorService, final ScanContext context, final LogRecordSelector selector) {
        final CompletableFuture<LogRecordWithDLSN> promise = new CompletableFuture<LogRecordWithDLSN>();
        final long startEntryId = context.curStartEntryId.get();
        final long endEntryId = context.curEndEntryId.get();
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} reading entries [{} - {}] from {}.", new Object[]{streamName, startEntryId, endEntryId, metadata});
        }
        FutureEventListener<List<Entry.Reader>> readEntriesListener = new FutureEventListener<List<Entry.Reader>>(){

            @Override
            public void onSuccess(List<Entry.Reader> entries) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} finished reading entries [{} - {}] from {}", new Object[]{streamName, startEntryId, endEntryId, metadata});
                }
                for (Entry.Reader entry : entries) {
                    try {
                        ReadUtils.visitEntryRecords(entry, context, selector);
                    }
                    catch (IOException ioe) {
                        promise.completeExceptionally(ioe);
                        return;
                    }
                }
                LogRecordWithDLSN record = selector.result();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} got record from entries [{} - {}] of {} : {}", new Object[]{streamName, startEntryId, endEntryId, metadata, record});
                }
                promise.complete(record);
            }

            @Override
            public void onFailure(Throwable cause) {
                promise.completeExceptionally(cause);
            }
        };
        reader.readEntries(startEntryId, endEntryId).whenCompleteAsync(readEntriesListener, (Executor)executorService);
        return promise;
    }

    private static void visitEntryRecords(Entry.Reader entry, ScanContext context, LogRecordSelector selector) throws IOException {
        LogRecordWithDLSN nextRecord = entry.nextRecord();
        while (nextRecord != null) {
            LogRecordWithDLSN record = nextRecord;
            nextRecord = entry.nextRecord();
            context.numRecordsScanned.incrementAndGet();
            if (!context.includeControl && record.isControl() || !context.includeEndOfStream && record.isEndOfStream()) continue;
            selector.process(record);
        }
    }

    private static void asyncReadRecordFromEntries(final String streamName, final LogSegmentRandomAccessEntryReader reader, final LogSegmentMetadata metadata, final ExecutorService executorService, final CompletableFuture<LogRecordWithDLSN> promise, final ScanContext context, final LogRecordSelector selector) {
        FutureEventListener<LogRecordWithDLSN> readEntriesListener = new FutureEventListener<LogRecordWithDLSN>(){

            @Override
            public void onSuccess(LogRecordWithDLSN value) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} read record from [{} - {}] of {} : {}", new Object[]{streamName, context.curStartEntryId.get(), context.curEndEntryId.get(), metadata, value});
                }
                if (null != value) {
                    promise.complete(value);
                    return;
                }
                if (!context.moveToNextRange()) {
                    promise.complete(null);
                    return;
                }
                ReadUtils.asyncReadRecordFromEntries(streamName, reader, metadata, executorService, promise, context, selector);
            }

            @Override
            public void onFailure(Throwable cause) {
                promise.completeExceptionally(cause);
            }
        };
        ReadUtils.asyncReadRecordFromEntries(streamName, reader, metadata, executorService, context, selector).whenCompleteAsync(readEntriesListener, (Executor)executorService);
    }

    private static void asyncReadRecordFromLogSegment(String streamName, LogSegmentRandomAccessEntryReader reader, LogSegmentMetadata metadata, ExecutorService executorService, int scanStartBatchSize, int scanMaxBatchSize, boolean includeControl, boolean includeEndOfStream, CompletableFuture<LogRecordWithDLSN> promise, AtomicInteger numRecordsScanned, LogRecordSelector selector, boolean backward, long startEntryId) {
        long lastAddConfirmed = reader.getLastAddConfirmed();
        if (lastAddConfirmed < 0L) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Log segment {} is empty for {}.", new Object[]{metadata, streamName});
            }
            promise.complete(null);
            return;
        }
        ScanContext context = new ScanContext(startEntryId, lastAddConfirmed, scanStartBatchSize, scanMaxBatchSize, includeControl, includeEndOfStream, backward, numRecordsScanned);
        ReadUtils.asyncReadRecordFromEntries(streamName, reader, metadata, executorService, promise, context, selector);
    }

    private static CompletableFuture<LogRecordWithDLSN> asyncReadRecord(final String streamName, final LogSegmentMetadata l, boolean fence, final boolean includeControl, final boolean includeEndOfStream, final int scanStartBatchSize, final int scanMaxBatchSize, final AtomicInteger numRecordsScanned, final ExecutorService executorService, LogSegmentEntryStore entryStore, final LogRecordSelector selector, final boolean backward, final long startEntryId) {
        final CompletableFuture<LogRecordWithDLSN> promise = new CompletableFuture<LogRecordWithDLSN>();
        FutureEventListener<LogSegmentRandomAccessEntryReader> openReaderListener = new FutureEventListener<LogSegmentRandomAccessEntryReader>(){

            @Override
            public void onSuccess(LogSegmentRandomAccessEntryReader reader) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} Opened log segment {} for reading record", (Object)streamName, (Object)l);
                }
                promise.whenComplete((value, cause) -> reader.asyncClose());
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} {} scanning {}.", new Object[]{backward ? "backward" : "forward", streamName, l});
                }
                ReadUtils.asyncReadRecordFromLogSegment(streamName, reader, l, executorService, scanStartBatchSize, scanMaxBatchSize, includeControl, includeEndOfStream, promise, numRecordsScanned, selector, backward, startEntryId);
            }

            @Override
            public void onFailure(Throwable cause) {
                promise.completeExceptionally(cause);
            }
        };
        entryStore.openRandomAccessReader(l, fence).whenCompleteAsync(openReaderListener, (Executor)executorService);
        return promise;
    }

    public static CompletableFuture<Optional<LogRecordWithDLSN>> getLogRecordNotLessThanTxId(final String logName, final LogSegmentMetadata segment, final long transactionId, final ExecutorService executorService, LogSegmentEntryStore entryStore, final int nWays) {
        if (!segment.isInProgress() && segment.getLastTxId() < transactionId) {
            Optional noneRecord = Optional.empty();
            return FutureUtils.value(noneRecord);
        }
        final CompletableFuture<Optional<LogRecordWithDLSN>> promise = new CompletableFuture<Optional<LogRecordWithDLSN>>();
        FutureEventListener<LogSegmentRandomAccessEntryReader> openReaderListener = new FutureEventListener<LogSegmentRandomAccessEntryReader>(){

            @Override
            public void onSuccess(LogSegmentRandomAccessEntryReader reader) {
                promise.whenComplete((value, cause) -> reader.asyncClose());
                long lastEntryId = reader.getLastAddConfirmed();
                if (lastEntryId < 0L) {
                    Optional nonRecord = Optional.empty();
                    promise.complete(nonRecord);
                    return;
                }
                if (segment.getFirstTxId() >= transactionId) {
                    final FirstTxIdNotLessThanSelector selector = new FirstTxIdNotLessThanSelector(transactionId);
                    ReadUtils.asyncReadRecordFromEntries(logName, reader, segment, executorService, new SingleEntryScanContext(0L), selector).whenComplete(new FutureEventListener<LogRecordWithDLSN>(){

                        @Override
                        public void onSuccess(LogRecordWithDLSN value) {
                            promise.complete(Optional.of(selector.result()));
                        }

                        @Override
                        public void onFailure(Throwable cause) {
                            promise.completeExceptionally(cause);
                        }
                    });
                    return;
                }
                ReadUtils.getLogRecordNotLessThanTxIdFromEntries(logName, segment, transactionId, executorService, reader, Lists.newArrayList(0L, lastEntryId), nWays, Optional.empty(), promise);
            }

            @Override
            public void onFailure(Throwable cause) {
                promise.completeExceptionally(cause);
            }
        };
        entryStore.openRandomAccessReader(segment, false).whenCompleteAsync(openReaderListener, (Executor)executorService);
        return promise;
    }

    private static void getLogRecordNotLessThanTxIdFromEntries(final String logName, final LogSegmentMetadata segment, final long transactionId, final ExecutorService executorService, final LogSegmentRandomAccessEntryReader reader, List<Long> entriesToSearch, final int nWays, final Optional<LogRecordWithDLSN> prevFoundRecord, final CompletableFuture<Optional<LogRecordWithDLSN>> promise) {
        ArrayList searchResults = Lists.newArrayListWithExpectedSize(entriesToSearch.size());
        for (Long entryId : entriesToSearch) {
            FirstTxIdNotLessThanSelector selector = new FirstTxIdNotLessThanSelector(transactionId);
            CompletableFuture<LogRecordWithDLSN> searchResult = ReadUtils.asyncReadRecordFromEntries(logName, reader, segment, executorService, new SingleEntryScanContext(entryId), selector);
            searchResults.add(searchResult);
        }
        FutureEventListener<List<LogRecordWithDLSN>> processSearchResultsListener = new FutureEventListener<List<LogRecordWithDLSN>>(){

            @Override
            public void onSuccess(List<LogRecordWithDLSN> resultList) {
                ReadUtils.processSearchResults(logName, segment, transactionId, executorService, reader, resultList, nWays, prevFoundRecord, promise);
            }

            @Override
            public void onFailure(Throwable cause) {
                promise.completeExceptionally(cause);
            }
        };
        FutureUtils.collect(searchResults).whenCompleteAsync(processSearchResultsListener, (Executor)executorService);
    }

    static void processSearchResults(String logName, LogSegmentMetadata segment, long transactionId, ExecutorService executorService, LogSegmentRandomAccessEntryReader reader, List<LogRecordWithDLSN> searchResults, int nWays, Optional<LogRecordWithDLSN> prevFoundRecord, CompletableFuture<Optional<LogRecordWithDLSN>> promise) {
        int found = -1;
        for (int i = 0; i < searchResults.size(); ++i) {
            LogRecordWithDLSN record = searchResults.get(i);
            if (record.getTransactionId() < transactionId) continue;
            found = i;
            break;
        }
        if (found == -1) {
            promise.complete(prevFoundRecord);
            return;
        }
        LogRecordWithDLSN foundRecord = searchResults.get(found);
        if (foundRecord.getDlsn().getSlotId() != 0L || found == 0 || foundRecord.getDlsn().getEntryId() == searchResults.get(found - 1).getDlsn().getEntryId() + 1L) {
            promise.complete(Optional.of(foundRecord));
            return;
        }
        List<Long> nextSearchBatch = ReadUtils.getEntriesToSearch(transactionId, searchResults.get(found - 1), searchResults.get(found), nWays);
        if (nextSearchBatch.isEmpty()) {
            promise.complete(prevFoundRecord);
            return;
        }
        ReadUtils.getLogRecordNotLessThanTxIdFromEntries(logName, segment, transactionId, executorService, reader, nextSearchBatch, nWays, Optional.of(foundRecord), promise);
    }

    static List<Long> getEntriesToSearch(long transactionId, LogRecordWithDLSN firstRecord, LogRecordWithDLSN lastRecord, int nWays) {
        long txnDiff = lastRecord.getTransactionId() - firstRecord.getTransactionId();
        if (txnDiff > 0L) {
            if (lastRecord.getTransactionId() == transactionId) {
                List<Long> entries = ReadUtils.getEntriesToSearch(firstRecord.getDlsn().getEntryId() + 1L, lastRecord.getDlsn().getEntryId() - 2L, Math.max(2, nWays - 1));
                entries.add(lastRecord.getDlsn().getEntryId() - 1L);
                return entries;
            }
            return ReadUtils.getEntriesToSearch(firstRecord.getDlsn().getEntryId() + 1L, lastRecord.getDlsn().getEntryId() - 1L, nWays);
        }
        return Lists.newArrayList();
    }

    static List<Long> getEntriesToSearch(long startEntryId, long endEntryId, int nWays) {
        if (startEntryId > endEntryId) {
            return Lists.newArrayList();
        }
        long numEntries = endEntryId - startEntryId + 1L;
        long step = Math.max(1L, numEntries / (long)nWays);
        ArrayList<Long> entryList = Lists.newArrayListWithExpectedSize(nWays);
        long i = startEntryId;
        for (long j = (long)(nWays - 1); i <= endEntryId && j > 0L; i += step, --j) {
            entryList.add(i);
        }
        if ((Long)entryList.get(entryList.size() - 1) < endEntryId) {
            entryList.add(endEntryId);
        }
        return entryList;
    }

    private static class SingleEntryScanContext
    extends ScanContext {
        SingleEntryScanContext(long entryId) {
            super(entryId, entryId, 1, 1, true, true, false, new AtomicInteger(0));
        }
    }

    private static class ScanContext {
        final AtomicInteger numEntriesToScan;
        final AtomicLong curStartEntryId;
        final AtomicLong curEndEntryId;
        final long startEntryId;
        final long endEntryId;
        final int scanStartBatchSize;
        final int scanMaxBatchSize;
        final boolean includeControl;
        final boolean includeEndOfStream;
        final boolean backward;
        final AtomicInteger numRecordsScanned;

        ScanContext(long startEntryId, long endEntryId, int scanStartBatchSize, int scanMaxBatchSize, boolean includeControl, boolean includeEndOfStream, boolean backward, AtomicInteger numRecordsScanned) {
            this.startEntryId = startEntryId;
            this.endEntryId = endEntryId;
            this.scanStartBatchSize = scanStartBatchSize;
            this.scanMaxBatchSize = scanMaxBatchSize;
            this.includeControl = includeControl;
            this.includeEndOfStream = includeEndOfStream;
            this.backward = backward;
            this.numEntriesToScan = new AtomicInteger(scanStartBatchSize);
            if (backward) {
                this.curStartEntryId = new AtomicLong(Math.max(startEntryId, endEntryId - (long)scanStartBatchSize + 1L));
                this.curEndEntryId = new AtomicLong(endEntryId);
            } else {
                this.curStartEntryId = new AtomicLong(startEntryId);
                this.curEndEntryId = new AtomicLong(Math.min(endEntryId, startEntryId + (long)scanStartBatchSize - 1L));
            }
            this.numRecordsScanned = numRecordsScanned;
        }

        boolean moveToNextRange() {
            if (this.backward) {
                return this.moveBackward();
            }
            return this.moveForward();
        }

        boolean moveBackward() {
            long nextEndEntryId = this.curStartEntryId.get() - 1L;
            if (nextEndEntryId < this.startEntryId) {
                return false;
            }
            this.curEndEntryId.set(nextEndEntryId);
            this.numEntriesToScan.set(Math.min(this.numEntriesToScan.get() * 2, this.scanMaxBatchSize));
            this.curStartEntryId.set(Math.max(this.startEntryId, nextEndEntryId - (long)this.numEntriesToScan.get() + 1L));
            return true;
        }

        boolean moveForward() {
            long nextStartEntryId = this.curEndEntryId.get() + 1L;
            if (nextStartEntryId > this.endEntryId) {
                return false;
            }
            this.curStartEntryId.set(nextStartEntryId);
            this.numEntriesToScan.set(Math.min(this.numEntriesToScan.get() * 2, this.scanMaxBatchSize));
            this.curEndEntryId.set(Math.min(this.endEntryId, nextStartEntryId + (long)this.numEntriesToScan.get() - 1L));
            return true;
        }
    }
}

