package org.neo4j.causalclustering.catchup.tx;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.stream.ChunkedInput;
import org.neo4j.causalclustering.catchup.CatchupResult;
import org.neo4j.causalclustering.catchup.CatchupServerProtocol;
import org.neo4j.causalclustering.catchup.ResponseMessageType;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.cursor.IOCursor;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.logging.Log;

/* loaded from: input_file:org/neo4j/causalclustering/catchup/tx/ChunkedTransactionStream.class */
public class ChunkedTransactionStream implements ChunkedInput<Object> {
    private final Log log;
    private final StoreId storeId;
    private final IOCursor<CommittedTransactionRepresentation> txCursor;
    private final CatchupServerProtocol protocol;
    private final long txIdPromise;
    private boolean endOfInput;
    private boolean noMoreTransactions;
    private long expectedTxId;
    private long lastTxId;
    private Object pending;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChunkedTransactionStream(Log log, StoreId storeId, long j, long j2, IOCursor<CommittedTransactionRepresentation> iOCursor, CatchupServerProtocol catchupServerProtocol) {
        this.log = log;
        this.storeId = storeId;
        this.expectedTxId = j;
        this.txIdPromise = j2;
        this.txCursor = iOCursor;
        this.protocol = catchupServerProtocol;
    }

    public boolean isEndOfInput() {
        return this.endOfInput;
    }

    public void close() throws Exception {
        this.txCursor.close();
    }

    public Object readChunk(ChannelHandlerContext channelHandlerContext) throws Exception {
        return readChunk(channelHandlerContext.alloc());
    }

    public Object readChunk(ByteBufAllocator byteBufAllocator) throws Exception {
        CatchupResult catchupResult;
        if (!$assertionsDisabled && this.endOfInput) {
            throw new AssertionError();
        }
        if (this.pending != null) {
            if (this.noMoreTransactions) {
                this.endOfInput = true;
            }
            return consumePending();
        }
        if (this.noMoreTransactions) {
            throw new IllegalStateException();
        }
        if (this.txCursor.next()) {
            if (!$assertionsDisabled && this.pending != null) {
                throw new AssertionError();
            }
            CommittedTransactionRepresentation committedTransactionRepresentation = (CommittedTransactionRepresentation) this.txCursor.get();
            this.lastTxId = committedTransactionRepresentation.getCommitEntry().getTxId();
            if (this.lastTxId != this.expectedTxId) {
                throw new IllegalStateException(String.format("Transaction cursor out of order. Expected %d but was %d", Long.valueOf(this.expectedTxId), Long.valueOf(this.lastTxId)));
            }
            this.expectedTxId++;
            this.pending = new TxPullResponse(this.storeId, committedTransactionRepresentation);
            return ResponseMessageType.TX;
        }
        if (!$assertionsDisabled && this.pending != null) {
            throw new AssertionError();
        }
        this.noMoreTransactions = true;
        this.protocol.expect(CatchupServerProtocol.State.MESSAGE_TYPE);
        if (this.lastTxId >= this.txIdPromise) {
            catchupResult = CatchupResult.SUCCESS_END_OF_STREAM;
        } else {
            catchupResult = CatchupResult.E_TRANSACTION_PRUNED;
            this.log.warn("Transaction cursor fell short. Expected at least %d but only got to %d.", new Object[]{Long.valueOf(this.txIdPromise), Long.valueOf(this.lastTxId)});
        }
        this.pending = new TxStreamFinishedResponse(catchupResult, this.lastTxId);
        return ResponseMessageType.TX_STREAM_FINISHED;
    }

    private Object consumePending() {
        Object obj = this.pending;
        this.pending = null;
        return obj;
    }

    public long length() {
        return -1L;
    }

    public long progress() {
        return 0L;
    }

    public long lastTxId() {
        return this.lastTxId;
    }

    static {
        $assertionsDisabled = !ChunkedTransactionStream.class.desiredAssertionStatus();
    }
}
