package org.apache.pulsar.broker.transaction.buffer.impl;

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import lombok.Generated;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.RecoverTimeRecord;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.class */
public class TopicTransactionBuffer extends TopicTransactionBufferState implements TransactionBuffer, TimerTask {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(TopicTransactionBuffer.class);
    private final PersistentTopic topic;
    private volatile PositionImpl maxReadPosition;
    private final LinkedMap<TxnID, PositionImpl> ongoingTxns;
    private final AtomicLong changeMaxReadPositionCount;
    private final LongAdder txnCommittedCounter;
    private final LongAdder txnAbortedCounter;
    private final Timer timer;
    private final int takeSnapshotIntervalNumber;
    private final int takeSnapshotIntervalTime;
    private final CompletableFuture<Void> transactionBufferFuture;
    private final ConcurrentHashMap<Long, Long> lowWaterMarks;
    public final RecoverTimeRecord recoverTime;
    private final Semaphore handleLowWaterMark;
    private final AbortedTxnProcessor snapshotAbortedTxnProcessor;
    private final MaxReadPositionCallBack maxReadPositionCallBack;

    /* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer$FillEntryQueueCallback.class */
    static class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback {
        private final SpscArrayQueue<Entry> entryQueue;
        private final ManagedCursor cursor;
        private final TopicTransactionBufferRecover recover;
        private static final int NUMBER_OF_PER_READ_ENTRY = 100;
        private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
        private volatile boolean isReadable = true;

        private FillEntryQueueCallback(SpscArrayQueue<Entry> spscArrayQueue, ManagedCursor managedCursor, TopicTransactionBufferRecover topicTransactionBufferRecover) {
            this.entryQueue = spscArrayQueue;
            this.cursor = managedCursor;
            this.recover = topicTransactionBufferRecover;
        }

        boolean fillQueue() {
            if (this.entryQueue.size() + NUMBER_OF_PER_READ_ENTRY < this.entryQueue.capacity() && this.outstandingReadsRequests.get() == 0) {
                if (this.cursor.hasMoreEntries()) {
                    this.outstandingReadsRequests.incrementAndGet();
                    this.cursor.asyncReadEntries(NUMBER_OF_PER_READ_ENTRY, this, Long.valueOf(System.nanoTime()), PositionImpl.LATEST);
                } else if (this.entryQueue.size() == 0) {
                    this.isReadable = false;
                }
            }
            return this.isReadable;
        }

        public void readEntriesComplete(final List<Entry> list, Object obj) {
            this.entryQueue.fill(new MessagePassingQueue.Supplier<Entry>() { // from class: org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer.FillEntryQueueCallback.1
                private int i = 0;

                /* renamed from: get, reason: merged with bridge method [inline-methods] */
                public Entry m706get() {
                    Entry entry = (Entry) list.get(this.i);
                    this.i++;
                    return entry;
                }
            }, list.size());
            this.outstandingReadsRequests.decrementAndGet();
        }

        public void readEntriesFailed(ManagedLedgerException managedLedgerException, Object obj) {
            if ((this.recover.topic.getManagedLedger().getConfig().isAutoSkipNonRecoverableData() && (managedLedgerException instanceof ManagedLedgerException.NonRecoverableLedgerException)) || (managedLedgerException instanceof ManagedLedgerException.ManagedLedgerFencedException) || (managedLedgerException instanceof ManagedLedgerException.CursorAlreadyClosedException)) {
                this.isReadable = false;
            } else {
                this.outstandingReadsRequests.decrementAndGet();
            }
            this.recover.callBackException(managedLedgerException);
        }
    }

    /* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer$MaxReadPositionCallBack.class */
    public interface MaxReadPositionCallBack {
        void maxReadPositionMovedForward(PositionImpl positionImpl, PositionImpl positionImpl2);
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer$TopicTransactionBufferRecover.class */
    public static class TopicTransactionBufferRecover implements Runnable {
        private final PersistentTopic topic;
        private final TopicTransactionBufferRecoverCallBack callBack;
        public static final String SUBSCRIPTION_NAME = "transaction-buffer-sub";
        private final TopicTransactionBuffer topicTransactionBuffer;
        private final AbortedTxnProcessor abortedTxnProcessor;
        private Position startReadCursorPosition = PositionImpl.EARLIEST;
        private final AtomicLong exceptionNumber = new AtomicLong();
        private final SpscArrayQueue<Entry> entryQueue = new SpscArrayQueue<>(2000);

        private TopicTransactionBufferRecover(TopicTransactionBufferRecoverCallBack topicTransactionBufferRecoverCallBack, PersistentTopic persistentTopic, TopicTransactionBuffer topicTransactionBuffer, AbortedTxnProcessor abortedTxnProcessor) {
            this.topic = persistentTopic;
            this.callBack = topicTransactionBufferRecoverCallBack;
            this.topicTransactionBuffer = topicTransactionBuffer;
            this.abortedTxnProcessor = abortedTxnProcessor;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.topicTransactionBuffer.changeToInitializingState()) {
                this.abortedTxnProcessor.recoverFromSnapshot().thenAccept(positionImpl -> {
                    if (positionImpl == null) {
                        this.callBack.noNeedToRecover();
                        return;
                    }
                    this.startReadCursorPosition = positionImpl;
                    try {
                        ManagedCursor newNonDurableCursor = this.topic.getManagedLedger().newNonDurableCursor(this.startReadCursorPosition, SUBSCRIPTION_NAME);
                        PositionImpl lastConfirmedEntry = this.topic.getManagedLedger().getLastConfirmedEntry();
                        PositionImpl positionImpl = this.startReadCursorPosition;
                        FillEntryQueueCallback fillEntryQueueCallback = new FillEntryQueueCallback(this.entryQueue, newNonDurableCursor, this);
                        if (lastConfirmedEntry.getEntryId() != -1) {
                            while (lastConfirmedEntry.compareTo(positionImpl) > 0 && fillEntryQueueCallback.fillQueue()) {
                                Entry entry = (Entry) this.entryQueue.poll();
                                if (entry != null) {
                                    try {
                                        positionImpl = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
                                        this.callBack.handleTxnEntry(entry);
                                        entry.release();
                                    } catch (Throwable th) {
                                        entry.release();
                                        throw th;
                                    }
                                } else {
                                    try {
                                        Thread.sleep(1L);
                                    } catch (InterruptedException e) {
                                    }
                                }
                            }
                        }
                        closeCursor(SUBSCRIPTION_NAME);
                        this.callBack.recoverComplete();
                    } catch (ManagedLedgerException e2) {
                        this.callBack.recoverExceptionally(e2);
                        TopicTransactionBuffer.log.error("[{}]Transaction buffer recover fail when open cursor!", this.topic.getName(), e2);
                    }
                }).exceptionally(th -> {
                    this.callBack.recoverExceptionally(th.getCause());
                    TopicTransactionBuffer.log.error("[{}]Transaction buffer failed to recover snapshot!", this.topic.getName(), th);
                    return null;
                });
            } else {
                TopicTransactionBuffer.log.warn("TransactionBuffer {} of topic {} can not change state to Initializing", this, this.topic.getName());
            }
        }

        private void closeCursor(String str) {
            this.topic.getManagedLedger().asyncDeleteCursor(Codec.encode(str), new AsyncCallbacks.DeleteCursorCallback() { // from class: org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer.TopicTransactionBufferRecover.1
                public void deleteCursorComplete(Object obj) {
                    TopicTransactionBuffer.log.info("[{}]Transaction buffer snapshot recover cursor close complete.", TopicTransactionBufferRecover.this.topic.getName());
                }

                public void deleteCursorFailed(ManagedLedgerException managedLedgerException, Object obj) {
                    TopicTransactionBuffer.log.error("[{}]Transaction buffer snapshot recover cursor close fail.", TopicTransactionBufferRecover.this.topic.getName());
                }
            }, (Object) null);
        }

        private void callBackException(ManagedLedgerException managedLedgerException) {
            TopicTransactionBuffer.log.error("Transaction buffer recover fail when recover transaction entry!", managedLedgerException);
            this.exceptionNumber.getAndIncrement();
        }

        private void closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> reader) {
            reader.closeAsync().exceptionally(th -> {
                TopicTransactionBuffer.log.error("[{}]Transaction buffer reader close error!", this.topic.getName(), th);
                return null;
            });
        }
    }

    public TopicTransactionBuffer(PersistentTopic persistentTopic) {
        super(TopicTransactionBufferState.State.None);
        this.ongoingTxns = new LinkedMap<>();
        this.changeMaxReadPositionCount = new AtomicLong();
        this.txnCommittedCounter = new LongAdder();
        this.txnAbortedCounter = new LongAdder();
        this.transactionBufferFuture = new CompletableFuture<>();
        this.lowWaterMarks = new ConcurrentHashMap<>();
        this.recoverTime = new RecoverTimeRecord();
        this.handleLowWaterMark = new Semaphore(1);
        this.topic = persistentTopic;
        this.timer = persistentTopic.getBrokerService().getPulsar().getTransactionTimer();
        this.takeSnapshotIntervalNumber = persistentTopic.getBrokerService().getPulsar().getConfiguration().getTransactionBufferSnapshotMaxTransactionCount();
        this.takeSnapshotIntervalTime = persistentTopic.getBrokerService().getPulsar().getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
        this.maxReadPosition = persistentTopic.getManagedLedger().getLastConfirmedEntry();
        if (persistentTopic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled()) {
            this.snapshotAbortedTxnProcessor = new SnapshotSegmentAbortedTxnProcessorImpl(persistentTopic);
        } else {
            this.snapshotAbortedTxnProcessor = new SingleSnapshotAbortedTxnProcessorImpl(persistentTopic);
        }
        this.maxReadPositionCallBack = persistentTopic.getMaxReadPositionCallBack();
        recover();
    }

    private void recover() {
        this.recoverTime.setRecoverStartTime(System.currentTimeMillis());
        this.topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor(this).execute(new TopicTransactionBufferRecover(new TopicTransactionBufferRecoverCallBack() { // from class: org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer.1
            @Override // org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferRecoverCallBack
            public void recoverComplete() {
                synchronized (TopicTransactionBuffer.this) {
                    if (TopicTransactionBuffer.this.ongoingTxns.isEmpty()) {
                        TopicTransactionBuffer.this.maxReadPosition = TopicTransactionBuffer.this.topic.getManagedLedger().getLastConfirmedEntry();
                    }
                    if (TopicTransactionBuffer.this.changeToReadyState()) {
                        TopicTransactionBuffer.this.timer.newTimeout(TopicTransactionBuffer.this, TopicTransactionBuffer.this.takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
                        TopicTransactionBuffer.this.transactionBufferFuture.complete(null);
                        TopicTransactionBuffer.this.recoverTime.setRecoverEndTime(System.currentTimeMillis());
                    } else {
                        TopicTransactionBuffer.log.error("[{}]Transaction buffer recover fail, current state: {}", TopicTransactionBuffer.this.topic.getName(), TopicTransactionBuffer.this.getState());
                        TopicTransactionBuffer.this.transactionBufferFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException("Transaction buffer recover failed to change the status to Ready,current state is: " + TopicTransactionBuffer.this.getState()));
                    }
                }
            }

            @Override // org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferRecoverCallBack
            public void noNeedToRecover() {
                synchronized (TopicTransactionBuffer.this) {
                    TopicTransactionBuffer.this.maxReadPosition = TopicTransactionBuffer.this.topic.getManagedLedger().getLastConfirmedEntry();
                    if (TopicTransactionBuffer.this.changeToNoSnapshotState()) {
                        TopicTransactionBuffer.this.transactionBufferFuture.complete(null);
                        TopicTransactionBuffer.this.recoverTime.setRecoverEndTime(System.currentTimeMillis());
                    } else {
                        TopicTransactionBuffer.log.error("[{}]Transaction buffer recover fail", TopicTransactionBuffer.this.topic.getName());
                    }
                }
            }

            @Override // org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferRecoverCallBack
            public void handleTxnEntry(Entry entry) {
                MessageMetadata peekMessageMetadata = Commands.peekMessageMetadata(entry.getDataBuffer(), TopicTransactionBufferRecover.SUBSCRIPTION_NAME, -1L);
                if (peekMessageMetadata != null && peekMessageMetadata.hasTxnidMostBits() && peekMessageMetadata.hasTxnidLeastBits()) {
                    TxnID txnID = new TxnID(peekMessageMetadata.getTxnidMostBits(), peekMessageMetadata.getTxnidLeastBits());
                    PositionImpl positionImpl = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
                    synchronized (TopicTransactionBuffer.this) {
                        if (Markers.isTxnMarker(peekMessageMetadata)) {
                            if (Markers.isTxnAbortMarker(peekMessageMetadata)) {
                                TopicTransactionBuffer.this.snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, positionImpl);
                            }
                            TopicTransactionBuffer.this.removeTxnAndUpdateMaxReadPosition(txnID);
                        } else {
                            TopicTransactionBuffer.this.handleTransactionMessage(txnID, positionImpl);
                        }
                    }
                }
            }

            @Override // org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferRecoverCallBack
            public void recoverExceptionally(Throwable th) {
                TopicTransactionBuffer.log.warn("Closing topic {} due to read transaction buffer snapshot while recovering the transaction buffer throw exception", TopicTransactionBuffer.this.topic.getName(), th);
                if (th instanceof PulsarClientException) {
                    TopicTransactionBuffer.this.transactionBufferFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(th.getMessage(), th));
                } else {
                    TopicTransactionBuffer.this.transactionBufferFuture.completeExceptionally(th);
                }
                TopicTransactionBuffer.this.recoverTime.setRecoverEndTime(System.currentTimeMillis());
                TopicTransactionBuffer.this.topic.close(true);
            }
        }, this.topic, this, this.snapshotAbortedTxnProcessor));
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
        return CompletableFuture.completedFuture(null);
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean z) {
        if (!z) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.transactionBufferFuture.thenRun(() -> {
            if (checkIfNoSnapshot()) {
                this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition).thenRun(() -> {
                    if (changeToReadyStateFromNoSnapshot()) {
                        this.timer.newTimeout(this, this.takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
                    }
                    completableFuture.complete(null);
                }).exceptionally(th -> {
                    log.error("Topic {} failed to take snapshot", this.topic.getName());
                    completableFuture.completeExceptionally(th);
                    return null;
                });
            } else {
                completableFuture.complete(null);
            }
        }).exceptionally(th -> {
            log.error("Topic {}: TransactionBuffer recover failed", this.topic.getName(), th.getCause());
            completableFuture.completeExceptionally(th.getCause());
            return null;
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public long getOngoingTxnCount() {
        return this.ongoingTxns.size();
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public long getAbortedTxnCount() {
        return this.txnAbortedCounter.sum();
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public long getCommittedTxnCount() {
        return this.txnCommittedCounter.sum();
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public CompletableFuture<Position> appendBufferToTxn(final TxnID txnID, long j, ByteBuf byteBuf) {
        final CompletableFuture<Position> completableFuture = new CompletableFuture<>();
        Long l = this.lowWaterMarks.get(Long.valueOf(txnID.getMostSigBits()));
        if (l == null || l.longValue() < txnID.getLeastSigBits()) {
            this.topic.getManagedLedger().asyncAddEntry(byteBuf, new AsyncCallbacks.AddEntryCallback() { // from class: org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer.2
                public void addComplete(Position position, ByteBuf byteBuf2, Object obj) {
                    synchronized (TopicTransactionBuffer.this) {
                        TopicTransactionBuffer.this.handleTransactionMessage(txnID, position);
                    }
                    completableFuture.complete(position);
                }

                public void addFailed(ManagedLedgerException managedLedgerException, Object obj) {
                    TopicTransactionBuffer.log.error("Failed to append buffer to txn {}", txnID, managedLedgerException);
                    completableFuture.completeExceptionally(managedLedgerException);
                }
            }, (Object) null);
            return completableFuture;
        }
        completableFuture.completeExceptionally(new BrokerServiceException.NotAllowedException("Transaction [" + txnID + "] has been ended. Please use a new transaction to send message."));
        return completableFuture;
    }

    private void handleTransactionMessage(TxnID txnID, Position position) {
        if (this.ongoingTxns.containsKey(txnID) || this.snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID)) {
            return;
        }
        this.ongoingTxns.put(txnID, (PositionImpl) position);
        updateMaxReadPosition(this.topic.getManagedLedger().getPreviousPosition((PositionImpl) this.ongoingTxns.get(this.ongoingTxns.firstKey())), false);
    }

    private void updateLastDispatchablePosition(Position position) {
        this.topic.updateLastDispatchablePosition(position);
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public CompletableFuture<TransactionBufferReader> openTransactionBufferReader(TxnID txnID, long j) {
        return null;
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public CompletableFuture<Void> commitTxn(TxnID txnID, long j) {
        if (log.isDebugEnabled()) {
            log.debug("Transaction {} commit on topic {}.", txnID.toString(), this.topic.getName());
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.transactionBufferFuture.thenRun(() -> {
            ByteBuf newTxnCommitMarker = Markers.newTxnCommitMarker(-1L, txnID.getMostSigBits(), txnID.getLeastSigBits());
            try {
                this.topic.getManagedLedger().asyncAddEntry(newTxnCommitMarker, new AsyncCallbacks.AddEntryCallback() { // from class: org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer.3
                    public void addComplete(Position position, ByteBuf byteBuf, Object obj) {
                        synchronized (TopicTransactionBuffer.this) {
                            TopicTransactionBuffer.this.removeTxnAndUpdateMaxReadPosition(txnID);
                            TopicTransactionBuffer.this.handleLowWaterMark(txnID, j);
                            TopicTransactionBuffer.this.snapshotAbortedTxnProcessor.trimExpiredAbortedTxns();
                            TopicTransactionBuffer.this.takeSnapshotByChangeTimes();
                        }
                        TopicTransactionBuffer.this.txnCommittedCounter.increment();
                        completableFuture.complete(null);
                    }

                    public void addFailed(ManagedLedgerException managedLedgerException, Object obj) {
                        TopicTransactionBuffer.log.error("Failed to commit for txn {}", txnID, managedLedgerException);
                        TopicTransactionBuffer.this.checkAppendMarkerException(managedLedgerException);
                        completableFuture.completeExceptionally(new BrokerServiceException.PersistenceException((Throwable) managedLedgerException));
                    }
                }, (Object) null);
                newTxnCommitMarker.release();
            } catch (Throwable th) {
                newTxnCommitMarker.release();
                throw th;
            }
        }).exceptionally(th -> {
            log.error("Transaction {} commit on topic {}.", new Object[]{txnID.toString(), this.topic.getName(), th.getCause()});
            completableFuture.completeExceptionally(th.getCause());
            return null;
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public CompletableFuture<Void> abortTxn(TxnID txnID, long j) {
        if (log.isDebugEnabled()) {
            log.debug("Transaction {} abort on topic {}.", txnID.toString(), this.topic.getName());
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.transactionBufferFuture.thenRun(() -> {
            if (!checkIfReady()) {
                completableFuture.complete(null);
                return;
            }
            ByteBuf newTxnAbortMarker = Markers.newTxnAbortMarker(-1L, txnID.getMostSigBits(), txnID.getLeastSigBits());
            try {
                this.topic.getManagedLedger().asyncAddEntry(newTxnAbortMarker, new AsyncCallbacks.AddEntryCallback() { // from class: org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer.4
                    public void addComplete(Position position, ByteBuf byteBuf, Object obj) {
                        synchronized (TopicTransactionBuffer.this) {
                            TopicTransactionBuffer.this.snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, (PositionImpl) position);
                            TopicTransactionBuffer.this.removeTxnAndUpdateMaxReadPosition(txnID);
                            TopicTransactionBuffer.this.snapshotAbortedTxnProcessor.trimExpiredAbortedTxns();
                            TopicTransactionBuffer.this.takeSnapshotByChangeTimes();
                            TopicTransactionBuffer.this.txnAbortedCounter.increment();
                            completableFuture.complete(null);
                            TopicTransactionBuffer.this.handleLowWaterMark(txnID, j);
                        }
                    }

                    public void addFailed(ManagedLedgerException managedLedgerException, Object obj) {
                        TopicTransactionBuffer.log.error("Failed to abort for txn {}", txnID, managedLedgerException);
                        TopicTransactionBuffer.this.checkAppendMarkerException(managedLedgerException);
                        completableFuture.completeExceptionally(new BrokerServiceException.PersistenceException((Throwable) managedLedgerException));
                    }
                }, (Object) null);
                newTxnAbortMarker.release();
            } catch (Throwable th) {
                newTxnAbortMarker.release();
                throw th;
            }
        }).exceptionally(th -> {
            log.error("Transaction {} abort on topic {}.", new Object[]{txnID.toString(), this.topic.getName(), th.getCause()});
            completableFuture.completeExceptionally(th.getCause());
            return null;
        });
        return completableFuture;
    }

    private void checkAppendMarkerException(ManagedLedgerException managedLedgerException) {
        if (managedLedgerException instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException) {
            this.topic.getManagedLedger().readyToCreateNewLedger();
        }
    }

    private void handleLowWaterMark(TxnID txnID, long j) {
        this.lowWaterMarks.compute(Long.valueOf(txnID.getMostSigBits()), (l, l2) -> {
            return (l2 == null || l2.longValue() < j) ? Long.valueOf(j) : l2;
        });
        if (this.handleLowWaterMark.tryAcquire()) {
            if (!this.ongoingTxns.isEmpty()) {
                TxnID txnID2 = (TxnID) this.ongoingTxns.firstKey();
                Long l3 = this.lowWaterMarks.get(Long.valueOf(txnID2.getMostSigBits()));
                if (l3 != null && txnID2.getLeastSigBits() <= l3.longValue()) {
                    abortTxn(txnID2, l3.longValue()).thenRun(() -> {
                        log.warn("Successes to abort low water mark for txn [{}], topic [{}], lowWaterMark [{}]", new Object[]{txnID2, this.topic.getName(), l3});
                        this.handleLowWaterMark.release();
                    }).exceptionally(th -> {
                        log.warn("Failed to abort low water mark for txn {}, topic [{}], lowWaterMark [{}], ", new Object[]{txnID2, this.topic.getName(), l3, th});
                        this.handleLowWaterMark.release();
                        return null;
                    });
                    return;
                }
            }
            this.handleLowWaterMark.release();
        }
    }

    private void takeSnapshotByChangeTimes() {
        if (this.changeMaxReadPositionCount.get() >= this.takeSnapshotIntervalNumber) {
            this.changeMaxReadPositionCount.set(0L);
            this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition);
        }
    }

    private void takeSnapshotByTimeout() {
        if (this.changeMaxReadPositionCount.get() > 0) {
            this.changeMaxReadPositionCount.set(0L);
            this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition);
        }
        this.timer.newTimeout(this, this.takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
    }

    void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
        this.ongoingTxns.remove(txnID);
        if (this.ongoingTxns.isEmpty()) {
            updateMaxReadPosition((PositionImpl) this.topic.getManagedLedger().getLastConfirmedEntry(), false);
        } else {
            updateMaxReadPosition(this.topic.getManagedLedger().getPreviousPosition((PositionImpl) this.ongoingTxns.get(this.ongoingTxns.firstKey())), false);
        }
        updateLastDispatchablePosition(null);
    }

    void updateMaxReadPosition(PositionImpl positionImpl, boolean z) {
        PositionImpl positionImpl2 = this.maxReadPosition;
        this.maxReadPosition = positionImpl;
        if (positionImpl2.compareTo(this.maxReadPosition) < 0) {
            if (!checkIfNoSnapshot()) {
                this.changeMaxReadPositionCount.getAndIncrement();
            }
            if (z) {
                return;
            }
            this.maxReadPositionCallBack.maxReadPositionMovedForward(positionImpl2, this.maxReadPosition);
        }
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public CompletableFuture<Void> purgeTxns(List<Long> list) {
        return null;
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public CompletableFuture<Void> clearSnapshot() {
        return this.snapshotAbortedTxnProcessor.clearAbortedTxnSnapshot();
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public CompletableFuture<Void> closeAsync() {
        changeToCloseState();
        return this.snapshotAbortedTxnProcessor.closeAsync();
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public synchronized boolean isTxnAborted(TxnID txnID, PositionImpl positionImpl) {
        return this.snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public void syncMaxReadPositionForNormalPublish(PositionImpl positionImpl, boolean z) {
        synchronized (this) {
            if (checkIfNoSnapshot()) {
                updateMaxReadPosition(positionImpl, z);
            } else if (checkIfReady() && this.ongoingTxns.isEmpty()) {
                updateMaxReadPosition(positionImpl, z);
            }
        }
        if (z) {
            return;
        }
        updateLastDispatchablePosition(positionImpl);
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public PositionImpl getMaxReadPosition() {
        return (checkIfReady() || checkIfNoSnapshot()) ? this.maxReadPosition : PositionImpl.EARLIEST;
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
        TransactionInBufferStats transactionInBufferStats = new TransactionInBufferStats();
        synchronized (this) {
            transactionInBufferStats.aborted = isTxnAborted(txnID, null);
            if (this.ongoingTxns.containsKey(txnID)) {
                transactionInBufferStats.startPosition = ((PositionImpl) this.ongoingTxns.get(txnID)).toString();
            }
        }
        return transactionInBufferStats;
    }

    @Override // org.apache.pulsar.broker.transaction.buffer.TransactionBuffer
    public TransactionBufferStats getStats(boolean z) {
        TransactionBufferStats transactionBufferStats = new TransactionBufferStats();
        transactionBufferStats.lastSnapshotTimestamps = this.snapshotAbortedTxnProcessor.getLastSnapshotTimestamps();
        transactionBufferStats.state = getState().name();
        transactionBufferStats.maxReadPosition = this.maxReadPosition.toString();
        if (z) {
            transactionBufferStats.lowWaterMarks = this.lowWaterMarks;
        }
        transactionBufferStats.ongoingTxnSize = this.ongoingTxns.size();
        transactionBufferStats.recoverStartTime = this.recoverTime.getRecoverStartTime();
        transactionBufferStats.recoverEndTime = this.recoverTime.getRecoverEndTime();
        return transactionBufferStats;
    }

    public void run(Timeout timeout) {
        if (checkIfReady()) {
            synchronized (this) {
                takeSnapshotByTimeout();
            }
        }
    }
}
