package org.neo4j.coreedge.catchup.tx;

import io.netty.channel.ChannelHandlerContext;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.neo4j.coreedge.VersionCheckerChannelInboundHandler;
import org.neo4j.coreedge.catchup.CatchupServerProtocol;
import org.neo4j.coreedge.catchup.ResponseMessageType;
import org.neo4j.coreedge.identity.StoreId;
import org.neo4j.coreedge.messaging.Message;
import org.neo4j.cursor.IOCursor;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.NoSuchTransactionException;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

/* loaded from: input_file:org/neo4j/coreedge/catchup/tx/TxPullRequestHandler.class */
public class TxPullRequestHandler extends VersionCheckerChannelInboundHandler<TxPullRequest> {
    private final CatchupServerProtocol protocol;
    private final StoreId storeId;
    private final TransactionIdStore transactionIdStore;
    private final LogicalTransactionStore logicalTransactionStore;
    private final TxPullRequestsMonitor monitor;
    private final Log log;

    public TxPullRequestHandler(Predicate<Message> predicate, CatchupServerProtocol catchupServerProtocol, Supplier<StoreId> supplier, Supplier<TransactionIdStore> supplier2, Supplier<LogicalTransactionStore> supplier3, Monitors monitors, LogProvider logProvider) {
        super(predicate, logProvider);
        this.protocol = catchupServerProtocol;
        this.storeId = supplier.get();
        this.transactionIdStore = supplier2.get();
        this.logicalTransactionStore = supplier3.get();
        this.monitor = (TxPullRequestsMonitor) monitors.newMonitor(TxPullRequestsMonitor.class, new String[0]);
        this.log = logProvider.getLog(getClass());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.neo4j.coreedge.VersionCheckerChannelInboundHandler
    public void doChannelRead0(ChannelHandlerContext channelHandlerContext, TxPullRequest txPullRequest) throws Exception {
        long max = Math.max(txPullRequest.txId(), 1L);
        long j = max;
        boolean z = true;
        if (!this.storeId.equals(txPullRequest.storeId())) {
            z = false;
            this.log.info("Failed to serve TxPullRequest for tx %d and storeId %s because that storeId is different from this machine with %s", new Object[]{Long.valueOf(j), txPullRequest.storeId(), this.storeId});
        } else if (this.transactionIdStore.getLastCommittedTransactionId() > max) {
            try {
                IOCursor transactions = this.logicalTransactionStore.getTransactions(max + 1);
                Throwable th = null;
                while (transactions.next()) {
                    try {
                        try {
                            channelHandlerContext.write(ResponseMessageType.TX);
                            CommittedTransactionRepresentation committedTransactionRepresentation = (CommittedTransactionRepresentation) transactions.get();
                            j = committedTransactionRepresentation.getCommitEntry().getTxId();
                            channelHandlerContext.write(new TxPullResponse((byte) 0, this.storeId, committedTransactionRepresentation));
                        } finally {
                        }
                    } finally {
                    }
                }
                channelHandlerContext.flush();
                if (transactions != null) {
                    if (0 != 0) {
                        try {
                            transactions.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        transactions.close();
                    }
                }
            } catch (NoSuchTransactionException e) {
                z = false;
                this.log.info("Failed to serve TxPullRequest for tx %d because the transaction does not exist.", new Object[]{Long.valueOf(j)});
            }
        }
        channelHandlerContext.write(ResponseMessageType.TX_STREAM_FINISHED);
        channelHandlerContext.write(new TxStreamFinishedResponse((byte) 0, j, z));
        channelHandlerContext.flush();
        this.monitor.increment();
        this.protocol.expect(CatchupServerProtocol.State.MESSAGE_TYPE);
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        th.printStackTrace();
        channelHandlerContext.close();
    }
}
