/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.streamingdispatch;

import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest;
import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.client.impl.Backoff;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingEntryReader
implements AsyncCallbacks.ReadEntryCallback,
WaitingEntryCallBack {
    private static final Logger log = LoggerFactory.getLogger(StreamingEntryReader.class);
    private final int maxRetry = 3;
    private ConcurrentLinkedQueue<PendingReadEntryRequest> issuedReads = new ConcurrentLinkedQueue();
    private ConcurrentLinkedQueue<PendingReadEntryRequest> pendingReads = new ConcurrentLinkedQueue();
    private final ManagedCursorImpl cursor;
    private final StreamingDispatcher dispatcher;
    private final PersistentTopic topic;
    private AtomicInteger currentReadSizeByte = new AtomicInteger(0);
    private volatile State state;
    private static final AtomicReferenceFieldUpdater<StreamingEntryReader, State> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(StreamingEntryReader.class, State.class, "state");
    private volatile long maxReadSizeByte;
    private final Backoff readFailureBackoff = new Backoff(10L, TimeUnit.MILLISECONDS, 1L, TimeUnit.SECONDS, 0L, TimeUnit.MILLISECONDS);

    public synchronized void asyncReadEntries(int numEntriesToRead, long maxReadSizeByte, Object ctx) {
        if (STATE_UPDATER.compareAndSet(this, State.Canceling, State.Canceled)) {
            this.internalCancelReadRequests();
        }
        if (!this.issuedReads.isEmpty() || !this.pendingReads.isEmpty()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] There's pending streaming read not completed yet. Not scheduling next read request.", (Object)this.cursor.getName());
            }
            return;
        }
        PositionImpl nextReadPosition = (PositionImpl)this.cursor.getReadPosition();
        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)this.cursor.getManagedLedger();
        if (!managedLedger.isValidPosition(nextReadPosition)) {
            nextReadPosition = managedLedger.getNextValidPosition(nextReadPosition);
        }
        boolean hasEntriesToRead = managedLedger.hasMoreEntries(nextReadPosition);
        this.currentReadSizeByte.set(0);
        STATE_UPDATER.set(this, State.Issued);
        this.maxReadSizeByte = maxReadSizeByte;
        for (int c = 0; c < numEntriesToRead; ++c) {
            PendingReadEntryRequest pendingReadEntryRequest = PendingReadEntryRequest.create(ctx, nextReadPosition);
            if (hasEntriesToRead && managedLedger.hasMoreEntries(nextReadPosition)) {
                this.issuedReads.offer(pendingReadEntryRequest);
            } else {
                this.pendingReads.offer(pendingReadEntryRequest);
            }
            nextReadPosition = managedLedger.getNextValidPosition(nextReadPosition);
        }
        for (PendingReadEntryRequest request : this.issuedReads) {
            managedLedger.asyncReadEntry(request.position, (AsyncCallbacks.ReadEntryCallback)this, (Object)request);
        }
        if (!this.pendingReads.isEmpty()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}} Streaming entry reader has {} pending read requests waiting on new entry.", (Object)this.cursor.getName(), (Object)this.pendingReads.size());
            }
            if (managedLedger.hasMoreEntries(this.pendingReads.peek().position)) {
                this.entriesAvailable();
            } else if (managedLedger.isTerminated()) {
                this.dispatcher.notifyConsumersEndOfTopic();
                this.cleanQueue(this.pendingReads);
                if (this.issuedReads.size() == 0) {
                    this.dispatcher.canReadMoreEntries(true);
                }
            } else {
                managedLedger.addWaitingEntryCallBack((WaitingEntryCallBack)this);
            }
        }
    }

    public void readEntryComplete(Entry entry, Object ctx) {
        this.topic.getBrokerService().getTopicOrderedExecutor().executeOrdered((Object)this.dispatcher.getName(), (SafeRunnable)SafeRun.safeRun(() -> this.internalReadEntryComplete(entry, ctx)));
    }

    private void internalReadEntryComplete(Entry entry, Object ctx) {
        PendingReadEntryRequest pendingReadEntryRequest = (PendingReadEntryRequest)ctx;
        pendingReadEntryRequest.entry = entry;
        this.readFailureBackoff.reduceToHalf();
        if (!this.issuedReads.isEmpty() && this.issuedReads.peek() == pendingReadEntryRequest) {
            while (!this.issuedReads.isEmpty() && this.issuedReads.peek().entry != null) {
                PendingReadEntryRequest firstPendingReadEntryRequest = this.issuedReads.poll();
                Entry readEntry = firstPendingReadEntryRequest.entry;
                this.currentReadSizeByte.addAndGet(readEntry.getLength());
                if ((long)this.currentReadSizeByte.get() > this.maxReadSizeByte) {
                    this.cancelReadRequests(readEntry.getPosition());
                    this.dispatcher.canReadMoreEntries(false);
                    STATE_UPDATER.set(this, State.Completed);
                    return;
                }
                if (this.issuedReads.isEmpty() && this.pendingReads.isEmpty()) {
                    firstPendingReadEntryRequest.isLast = true;
                    STATE_UPDATER.set(this, State.Completed);
                }
                this.dispatcher.readEntryComplete(readEntry, firstPendingReadEntryRequest);
            }
        } else if (!this.issuedReads.isEmpty() && this.issuedReads.peek().retry > 3) {
            this.cancelReadRequests((Position)this.issuedReads.peek().position);
            this.dispatcher.canReadMoreEntries(true);
            STATE_UPDATER.set(this, State.Completed);
        }
    }

    public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
        this.topic.getBrokerService().getTopicOrderedExecutor().executeOrdered((Object)this.dispatcher.getName(), (SafeRunnable)SafeRun.safeRun(() -> this.internalReadEntryFailed(exception, ctx)));
    }

    private void internalReadEntryFailed(ManagedLedgerException exception, Object ctx) {
        PendingReadEntryRequest pendingReadEntryRequest = (PendingReadEntryRequest)ctx;
        PositionImpl readPosition = pendingReadEntryRequest.position;
        ++pendingReadEntryRequest.retry;
        long waitTimeMillis = this.readFailureBackoff.next();
        if (exception.getCause() instanceof TransactionNotSealedException) {
            waitTimeMillis = 1L;
            if (log.isDebugEnabled()) {
                log.debug("[{}] Error reading transaction entries : {}, - Retrying to read in {} seconds", new Object[]{this.cursor.getName(), exception.getMessage(), (double)waitTimeMillis / 1000.0});
            }
        } else if (!(exception instanceof ManagedLedgerException.TooManyRequestsException)) {
            log.error("[{} Error reading entries at {} : {} - Retrying to read in {} seconds", new Object[]{this.cursor.getName(), readPosition, exception.getMessage(), (double)waitTimeMillis / 1000.0});
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Got throttled by bookies while reading at {} : {} - Retrying to read in {} seconds", new Object[]{this.cursor.getName(), readPosition, exception.getMessage(), (double)waitTimeMillis / 1000.0});
        }
        if (!this.issuedReads.isEmpty()) {
            if (this.issuedReads.peek().retry > 3) {
                this.cancelReadRequests((Position)this.issuedReads.peek().position);
                this.dispatcher.canReadMoreEntries(true);
                STATE_UPDATER.set(this, State.Completed);
                return;
            }
            if (pendingReadEntryRequest.retry <= 3) {
                this.retryReadRequest(pendingReadEntryRequest, waitTimeMillis);
            }
        }
    }

    private void cancelReadRequests(Position position) {
        if (!this.issuedReads.isEmpty()) {
            this.cleanQueue(this.issuedReads);
            this.cursor.seek(position);
        }
        if (!this.pendingReads.isEmpty()) {
            this.cleanQueue(this.pendingReads);
        }
    }

    private void internalCancelReadRequests() {
        PositionImpl readPosition = !this.issuedReads.isEmpty() ? this.issuedReads.peek().position : this.pendingReads.peek().position;
        this.cancelReadRequests((Position)readPosition);
    }

    public boolean cancelReadRequests() {
        if (STATE_UPDATER.compareAndSet(this, State.Issued, State.Canceling)) {
            this.topic.getBrokerService().getTopicOrderedExecutor().executeOrdered((Object)this.topic.getName(), (SafeRunnable)SafeRun.safeRun(() -> {
                StreamingEntryReader streamingEntryReader = this;
                synchronized (streamingEntryReader) {
                    if (STATE_UPDATER.compareAndSet(this, State.Canceling, State.Canceled)) {
                        this.internalCancelReadRequests();
                    }
                }
            }));
            return true;
        }
        return false;
    }

    private void cleanQueue(Queue<PendingReadEntryRequest> queue) {
        while (!queue.isEmpty()) {
            PendingReadEntryRequest pendingReadEntryRequest = queue.poll();
            if (pendingReadEntryRequest.entry == null) continue;
            pendingReadEntryRequest.entry.release();
            pendingReadEntryRequest.recycle();
        }
    }

    private void retryReadRequest(PendingReadEntryRequest pendingReadEntryRequest, long delay) {
        this.topic.getBrokerService().executor().schedule(() -> this.topic.getBrokerService().getTopicOrderedExecutor().executeOrdered((Object)this.dispatcher.getName(), (SafeRunnable)SafeRun.safeRun(() -> {
            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)this.cursor.getManagedLedger();
            managedLedger.asyncReadEntry(pendingReadEntryRequest.position, (AsyncCallbacks.ReadEntryCallback)this, (Object)pendingReadEntryRequest);
        })), delay, TimeUnit.MILLISECONDS);
    }

    public void entriesAvailable() {
        this.topic.getBrokerService().getTopicOrderedExecutor().executeOrdered((Object)this.dispatcher.getName(), (SafeRunnable)SafeRun.safeRun(() -> this.internalEntriesAvailable()));
    }

    private synchronized void internalEntriesAvailable() {
        if (log.isDebugEnabled()) {
            log.debug("[{}} Streaming entry reader get notification of newly added entries from managed ledger, trying to issued pending read requests.", (Object)this.cursor.getName());
        }
        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)this.cursor.getManagedLedger();
        ArrayList<PendingReadEntryRequest> newlyIssuedRequests = new ArrayList<PendingReadEntryRequest>();
        if (!this.pendingReads.isEmpty()) {
            if (!managedLedger.isValidPosition(this.pendingReads.peek().position)) {
                this.pendingReads.peek().position = managedLedger.getNextValidPosition(this.pendingReads.peek().position);
            }
            while (!this.pendingReads.isEmpty() && managedLedger.hasMoreEntries(this.pendingReads.peek().position)) {
                PendingReadEntryRequest next = this.pendingReads.poll();
                this.issuedReads.offer(next);
                newlyIssuedRequests.add(next);
                if (this.pendingReads.isEmpty()) continue;
                this.pendingReads.peek().position = managedLedger.getNextValidPosition(next.position);
            }
            for (PendingReadEntryRequest request : newlyIssuedRequests) {
                managedLedger.asyncReadEntry(request.position, (AsyncCallbacks.ReadEntryCallback)this, (Object)request);
            }
            if (!this.pendingReads.isEmpty()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}} Streaming entry reader has {} pending read requests waiting on new entry.", (Object)this.cursor.getName(), (Object)this.pendingReads.size());
                }
                if (managedLedger.hasMoreEntries(this.pendingReads.peek().position)) {
                    this.entriesAvailable();
                } else {
                    managedLedger.addWaitingEntryCallBack((WaitingEntryCallBack)this);
                }
            }
        }
    }

    protected State getState() {
        return STATE_UPDATER.get(this);
    }

    public StreamingEntryReader(ManagedCursorImpl cursor, StreamingDispatcher dispatcher, PersistentTopic topic) {
        this.cursor = cursor;
        this.dispatcher = dispatcher;
        this.topic = topic;
    }

    static enum State {
        Issued,
        Canceling,
        Canceled,
        Completed;

    }
}

