package org.apache.pulsar.broker.service.streamingdispatch;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.exception.TransactionException;
import org.apache.pulsar.common.util.Backoff;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.class */
public class StreamingEntryReader implements AsyncCallbacks.ReadEntryCallback, WaitingEntryCallBack {
    private final ManagedCursorImpl cursor;
    private final StreamingDispatcher dispatcher;
    private final PersistentTopic topic;
    private final Executor topicExecutor;
    private final Executor dispatcherExecutor;
    private volatile State state;
    private volatile long maxReadSizeByte;
    private static final Logger log = LoggerFactory.getLogger(StreamingEntryReader.class);
    private static final AtomicReferenceFieldUpdater<StreamingEntryReader, State> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(StreamingEntryReader.class, State.class, "state");
    private final int maxRetry = 3;
    private ConcurrentLinkedQueue<PendingReadEntryRequest> issuedReads = new ConcurrentLinkedQueue<>();
    private ConcurrentLinkedQueue<PendingReadEntryRequest> pendingReads = new ConcurrentLinkedQueue<>();
    private AtomicInteger currentReadSizeByte = new AtomicInteger(0);
    private final Backoff readFailureBackoff = new Backoff(10, TimeUnit.MILLISECONDS, 1, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader$State.class */
    public enum State {
        Issued,
        Canceling,
        Canceled,
        Completed
    }

    public StreamingEntryReader(ManagedCursorImpl managedCursorImpl, StreamingDispatcher streamingDispatcher, PersistentTopic persistentTopic) {
        this.cursor = managedCursorImpl;
        this.dispatcher = streamingDispatcher;
        this.topic = persistentTopic;
        this.topicExecutor = persistentTopic.getBrokerService().getTopicOrderedExecutor().chooseThread(persistentTopic.getName());
        this.dispatcherExecutor = persistentTopic.getBrokerService().getTopicOrderedExecutor().chooseThread(streamingDispatcher.getName());
    }

    public synchronized void asyncReadEntries(int i, long j, Object obj) {
        if (STATE_UPDATER.compareAndSet(this, State.Canceling, State.Canceled)) {
            internalCancelReadRequests();
        }
        if (!this.issuedReads.isEmpty() || !this.pendingReads.isEmpty()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] There's pending streaming read not completed yet. Not scheduling next read request.", this.cursor.getName());
                return;
            }
            return;
        }
        PositionImpl readPosition = this.cursor.getReadPosition();
        ManagedLedgerImpl managedLedger = this.cursor.getManagedLedger();
        if (!managedLedger.isValidPosition(readPosition)) {
            readPosition = managedLedger.getNextValidPosition(readPosition);
        }
        boolean hasMoreEntries = managedLedger.hasMoreEntries(readPosition);
        this.currentReadSizeByte.set(0);
        STATE_UPDATER.set(this, State.Issued);
        this.maxReadSizeByte = j;
        for (int i2 = 0; i2 < i; i2++) {
            PendingReadEntryRequest create = PendingReadEntryRequest.create(obj, readPosition);
            if (hasMoreEntries && managedLedger.hasMoreEntries(readPosition)) {
                this.issuedReads.offer(create);
            } else {
                this.pendingReads.offer(create);
            }
            readPosition = managedLedger.getNextValidPosition(readPosition);
        }
        Iterator<PendingReadEntryRequest> it = this.issuedReads.iterator();
        while (it.hasNext()) {
            PendingReadEntryRequest next = it.next();
            managedLedger.asyncReadEntry(next.position, this, next);
        }
        if (this.pendingReads.isEmpty()) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}} Streaming entry reader has {} pending read requests waiting on new entry.", this.cursor.getName(), Integer.valueOf(this.pendingReads.size()));
        }
        if (managedLedger.hasMoreEntries(this.pendingReads.peek().position)) {
            entriesAvailable();
            return;
        }
        if (!managedLedger.isTerminated()) {
            managedLedger.addWaitingEntryCallBack(this);
            return;
        }
        this.dispatcher.notifyConsumersEndOfTopic();
        cleanQueue(this.pendingReads);
        if (this.issuedReads.size() == 0) {
            this.dispatcher.canReadMoreEntries(true);
        }
    }

    public void readEntryComplete(Entry entry, Object obj) {
        this.dispatcherExecutor.execute(() -> {
            internalReadEntryComplete(entry, obj);
        });
    }

    private void internalReadEntryComplete(Entry entry, Object obj) {
        PendingReadEntryRequest pendingReadEntryRequest = (PendingReadEntryRequest) obj;
        pendingReadEntryRequest.entry = entry;
        this.readFailureBackoff.reduceToHalf();
        if (this.issuedReads.isEmpty() || this.issuedReads.peek() != pendingReadEntryRequest) {
            if (this.issuedReads.isEmpty() || this.issuedReads.peek().retry <= 3) {
                return;
            }
            cancelReadRequests(this.issuedReads.peek().position);
            this.dispatcher.canReadMoreEntries(true);
            STATE_UPDATER.set(this, State.Completed);
            return;
        }
        while (!this.issuedReads.isEmpty() && this.issuedReads.peek().entry != null) {
            PendingReadEntryRequest poll = this.issuedReads.poll();
            Entry entry2 = poll.entry;
            this.currentReadSizeByte.addAndGet(entry2.getLength());
            if (this.currentReadSizeByte.get() > this.maxReadSizeByte) {
                cancelReadRequests(entry2.getPosition());
                this.dispatcher.canReadMoreEntries(false);
                STATE_UPDATER.set(this, State.Completed);
                return;
            } else {
                if (this.issuedReads.isEmpty() && this.pendingReads.isEmpty()) {
                    poll.isLast = true;
                    STATE_UPDATER.set(this, State.Completed);
                }
                this.dispatcher.readEntryComplete(entry2, poll);
            }
        }
    }

    public void readEntryFailed(ManagedLedgerException managedLedgerException, Object obj) {
        this.dispatcherExecutor.execute(() -> {
            internalReadEntryFailed(managedLedgerException, obj);
        });
    }

    private void internalReadEntryFailed(ManagedLedgerException managedLedgerException, Object obj) {
        PendingReadEntryRequest pendingReadEntryRequest = (PendingReadEntryRequest) obj;
        PositionImpl positionImpl = pendingReadEntryRequest.position;
        pendingReadEntryRequest.retry++;
        long next = this.readFailureBackoff.next();
        if ((managedLedgerException.getCause() instanceof TransactionException.TransactionNotSealedException) || (managedLedgerException.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException)) {
            next = 1;
            if (log.isDebugEnabled()) {
                log.debug("[{}] Error reading transaction entries : {}, - Retrying to read in {} seconds", new Object[]{this.cursor.getName(), managedLedgerException.getMessage(), Double.valueOf(1 / 1000.0d)});
            }
        } else if (!(managedLedgerException instanceof ManagedLedgerException.TooManyRequestsException)) {
            log.error("[{} Error reading entries at {} : {} - Retrying to read in {} seconds", new Object[]{this.cursor.getName(), positionImpl, managedLedgerException.getMessage(), Double.valueOf(next / 1000.0d)});
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Got throttled by bookies while reading at {} : {} - Retrying to read in {} seconds", new Object[]{this.cursor.getName(), positionImpl, managedLedgerException.getMessage(), Double.valueOf(next / 1000.0d)});
        }
        if (this.issuedReads.isEmpty()) {
            return;
        }
        if (this.issuedReads.peek().retry > 3) {
            cancelReadRequests(this.issuedReads.peek().position);
            this.dispatcher.canReadMoreEntries(true);
            STATE_UPDATER.set(this, State.Completed);
        } else if (pendingReadEntryRequest.retry <= 3) {
            retryReadRequest(pendingReadEntryRequest, next);
        }
    }

    private void cancelReadRequests(Position position) {
        if (!this.issuedReads.isEmpty()) {
            cleanQueue(this.issuedReads);
            this.cursor.seek(position);
        }
        if (this.pendingReads.isEmpty()) {
            return;
        }
        cleanQueue(this.pendingReads);
    }

    private void internalCancelReadRequests() {
        cancelReadRequests(!this.issuedReads.isEmpty() ? this.issuedReads.peek().position : this.pendingReads.peek().position);
    }

    public boolean cancelReadRequests() {
        if (!STATE_UPDATER.compareAndSet(this, State.Issued, State.Canceling)) {
            return false;
        }
        this.topicExecutor.execute(() -> {
            synchronized (this) {
                if (STATE_UPDATER.compareAndSet(this, State.Canceling, State.Canceled)) {
                    internalCancelReadRequests();
                }
            }
        });
        return true;
    }

    private void cleanQueue(Queue<PendingReadEntryRequest> queue) {
        while (!queue.isEmpty()) {
            PendingReadEntryRequest poll = queue.poll();
            if (poll.entry != null) {
                poll.entry.release();
                poll.recycle();
            }
        }
    }

    private void retryReadRequest(PendingReadEntryRequest pendingReadEntryRequest, long j) {
        this.topic.getBrokerService().executor().schedule(() -> {
            this.dispatcherExecutor.execute(() -> {
                this.cursor.getManagedLedger().asyncReadEntry(pendingReadEntryRequest.position, this, pendingReadEntryRequest);
            });
        }, j, TimeUnit.MILLISECONDS);
    }

    public void entriesAvailable() {
        this.dispatcherExecutor.execute(this::internalEntriesAvailable);
    }

    private synchronized void internalEntriesAvailable() {
        if (log.isDebugEnabled()) {
            log.debug("[{}} Streaming entry reader get notification of newly added entries from managed ledger, trying to issued pending read requests.", this.cursor.getName());
        }
        ManagedLedgerImpl managedLedger = this.cursor.getManagedLedger();
        ArrayList<PendingReadEntryRequest> arrayList = new ArrayList();
        if (this.pendingReads.isEmpty()) {
            return;
        }
        if (!managedLedger.isValidPosition(this.pendingReads.peek().position)) {
            this.pendingReads.peek().position = managedLedger.getNextValidPosition(this.pendingReads.peek().position);
        }
        while (!this.pendingReads.isEmpty() && managedLedger.hasMoreEntries(this.pendingReads.peek().position)) {
            PendingReadEntryRequest poll = this.pendingReads.poll();
            this.issuedReads.offer(poll);
            arrayList.add(poll);
            if (!this.pendingReads.isEmpty()) {
                this.pendingReads.peek().position = managedLedger.getNextValidPosition(poll.position);
            }
        }
        for (PendingReadEntryRequest pendingReadEntryRequest : arrayList) {
            managedLedger.asyncReadEntry(pendingReadEntryRequest.position, this, pendingReadEntryRequest);
        }
        if (this.pendingReads.isEmpty()) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}} Streaming entry reader has {} pending read requests waiting on new entry.", this.cursor.getName(), Integer.valueOf(this.pendingReads.size()));
        }
        if (managedLedger.hasMoreEntries(this.pendingReads.peek().position)) {
            entriesAvailable();
        } else {
            managedLedger.addWaitingEntryCallBack(this);
        }
    }

    protected State getState() {
        return STATE_UPDATER.get(this);
    }
}
