package org.neo4j.coreedge.catchup.tx.edge;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionQueue;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.storageengine.api.TransactionApplicationMode;

/* loaded from: input_file:org/neo4j/coreedge/catchup/tx/edge/BatchingTxApplier.class */
public class BatchingTxApplier extends LifecycleAdapter implements Runnable {
    private final int maxBatchSize;
    private final Supplier<TransactionIdStore> txIdStoreSupplier;
    private final Supplier<TransactionCommitProcess> commitProcessSupplier;
    private final Supplier<DatabaseHealth> healthSupplier;
    private final PullRequestMonitor monitor;
    private final Log log;
    private final ArrayBlockingQueue<CommittedTransactionRepresentation> txQueue;
    private TransactionQueue txBatcher;
    private volatile long lastQueuedTxId;
    private volatile long lastAppliedTxId;

    public BatchingTxApplier(int i, Supplier<TransactionIdStore> supplier, Supplier<TransactionCommitProcess> supplier2, Supplier<DatabaseHealth> supplier3, Monitors monitors, LogProvider logProvider) {
        this.maxBatchSize = i;
        this.txIdStoreSupplier = supplier;
        this.commitProcessSupplier = supplier2;
        this.healthSupplier = supplier3;
        this.log = logProvider.getLog(getClass());
        this.monitor = (PullRequestMonitor) monitors.newMonitor(PullRequestMonitor.class, new String[0]);
        this.txQueue = new ArrayBlockingQueue<>(i);
    }

    public void start() throws Throwable {
        TransactionCommitProcess transactionCommitProcess = this.commitProcessSupplier.get();
        this.txBatcher = new TransactionQueue(this.maxBatchSize, (transactionToApply, transactionToApply2) -> {
            transactionCommitProcess.commit(transactionToApply, CommitEvent.NULL, TransactionApplicationMode.EXTERNAL);
        });
        long lastCommittedTransactionId = this.txIdStoreSupplier.get().getLastCommittedTransactionId();
        this.lastAppliedTxId = lastCommittedTransactionId;
        this.lastQueuedTxId = lastCommittedTransactionId;
    }

    public void queue(CommittedTransactionRepresentation committedTransactionRepresentation) {
        long txId = committedTransactionRepresentation.getCommitEntry().getTxId();
        long j = this.lastQueuedTxId + 1;
        if (txId != j) {
            this.log.warn("Out of order transaction. Received: " + txId + " Expected: " + j);
            return;
        }
        try {
            this.txQueue.put(committedTransactionRepresentation);
            this.lastQueuedTxId = txId;
            this.monitor.txPullResponse(txId);
        } catch (InterruptedException e) {
            this.log.warn("Not expecting to be interrupted", e);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        long txId;
        CommittedTransactionRepresentation poll;
        CommittedTransactionRepresentation committedTransactionRepresentation = null;
        try {
            committedTransactionRepresentation = this.txQueue.poll(1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            this.log.warn("Not expecting to be interrupted", e);
        }
        if (committedTransactionRepresentation == null) {
            return;
        }
        do {
            try {
                txId = committedTransactionRepresentation.getCommitEntry().getTxId();
                this.txBatcher.queue(new TransactionToApply(committedTransactionRepresentation.getTransactionRepresentation(), txId));
                poll = this.txQueue.poll();
                committedTransactionRepresentation = poll;
            } catch (Exception e2) {
                this.log.error("Error during transaction application", e2);
                this.healthSupplier.get().panic(e2);
                return;
            }
        } while (poll != null);
        this.txBatcher.empty();
        this.lastAppliedTxId = txId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean workPending() {
        return this.lastQueuedTxId > this.lastAppliedTxId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long lastAppliedTxId() {
        return this.lastAppliedTxId;
    }
}
