package io.camunda.zeebe.streamprocessor;

import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDbTransaction;
import io.camunda.zeebe.engine.api.RecordProcessor;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.metrics.ReplayMetrics;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordValues;
import io.camunda.zeebe.engine.state.KeyGeneratorControls;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.impl.log.LogStreamBatchReaderImpl;
import io.camunda.zeebe.logstreams.log.LogRecordAwaiter;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamBatchReader;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.scheduler.retry.RecoverableRetryStrategy;
import io.camunda.zeebe.scheduler.retry.RetryStrategy;
import io.camunda.zeebe.streamprocessor.state.MutableLastProcessedPositionState;
import io.prometheus.client.Histogram;
import java.util.List;
import java.util.function.BooleanSupplier;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/streamprocessor/ReplayStateMachine.class */
public final class ReplayStateMachine implements LogRecordAwaiter {
    private static final String LOG_STMT_REPLAY_FINISHED = "Processor finished replay, with {}";
    private static final String ERROR_INCONSISTENT_LOG = "Expected that position '%d' of current event is higher then position '%d' of last event, but was not. Inconsistent log detected!";
    private static final String ERROR_MSG_EXPECTED_TO_READ_METADATA = "Expected to read the metadata for the record '%s', but an exception was thrown.";
    private final KeyGeneratorControls keyGeneratorControls;
    private final MutableLastProcessedPositionState lastProcessedPositionState;
    private final ActorControl actor;
    private final TypedRecordImpl typedEvent;
    private final RecordValues recordValues;
    private final LogStreamBatchReader logStreamBatchReader;
    private final TransactionContext transactionContext;
    private final RetryStrategy replayStrategy;
    private final BooleanSupplier abortCondition;
    private long snapshotPosition;
    private ActorFuture<LastProcessingPositions> recoveryFuture;
    private ZeebeDbTransaction zeebeDbTransaction;
    private final StreamProcessorMode streamProcessorMode;
    private final LogStream logStream;
    private final BooleanSupplier shouldPause;
    private final ReplayMetrics replayMetrics;
    private final List<RecordProcessor> recordProcessors;
    private final int partitionId;
    private static final Logger LOG = Loggers.PROCESSOR_LOGGER;
    private static final MetadataFilter REPLAY_FILTER = recordMetadata -> {
        return recordMetadata.getRecordType() == RecordType.EVENT;
    };
    private final RecordMetadata metadata = new RecordMetadata();
    private final EventFilter eventFilter = new MetadataEventFilter(new RecordProtocolVersionFilter().and(REPLAY_FILTER));
    private long lastSourceEventPosition = -1;
    private long batchSourceEventPosition = -1;
    private long lastReadRecordPosition = -1;
    private long lastReplayedEventPosition = -1;
    private State currentState = State.AWAIT_RECORD;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/camunda/zeebe/streamprocessor/ReplayStateMachine$State.class */
    public enum State {
        AWAIT_RECORD,
        REPLAY_EVENT
    }

    public ReplayStateMachine(List<RecordProcessor> list, StreamProcessorContext streamProcessorContext, BooleanSupplier booleanSupplier) {
        this.partitionId = streamProcessorContext.getPartitionId();
        this.recordProcessors = list;
        this.shouldPause = () -> {
            return !booleanSupplier.getAsBoolean();
        };
        this.actor = streamProcessorContext.getActor();
        this.recordValues = streamProcessorContext.getRecordValues();
        this.transactionContext = streamProcessorContext.getTransactionContext();
        this.abortCondition = streamProcessorContext.getAbortCondition();
        this.keyGeneratorControls = streamProcessorContext.getKeyGeneratorControls();
        this.lastProcessedPositionState = streamProcessorContext.getLastProcessedPositionState();
        this.typedEvent = new TypedRecordImpl(streamProcessorContext.getLogStream().getPartitionId());
        this.replayStrategy = new RecoverableRetryStrategy(this.actor);
        this.streamProcessorMode = streamProcessorContext.getProcessorMode();
        this.logStream = streamProcessorContext.getLogStream();
        this.logStreamBatchReader = new LogStreamBatchReaderImpl(streamProcessorContext.getLogStreamReader());
        this.replayMetrics = new ReplayMetrics(this.logStream.getPartitionId());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ActorFuture<LastProcessingPositions> startRecover(long j) {
        this.recoveryFuture = new CompletableActorFuture();
        this.snapshotPosition = j;
        this.lastSourceEventPosition = j > 0 ? j : -1L;
        this.logStreamBatchReader.seekToNextBatch(j);
        LOG.info("Processor starts replay of events. [snapshot-position: {}, replay-mode: {}]", Long.valueOf(j), this.streamProcessorMode);
        if (this.streamProcessorMode == StreamProcessorMode.REPLAY) {
            this.logStream.registerRecordAvailableListener(this);
        }
        replayNextEvent();
        return this.recoveryFuture;
    }

    public void onRecordAvailable() {
        this.actor.call(() -> {
            if (this.currentState == State.AWAIT_RECORD) {
                replayNextEvent();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void replayNextEvent() {
        if (this.shouldPause.getAsBoolean()) {
            return;
        }
        try {
            if (this.logStreamBatchReader.hasNext()) {
                this.currentState = State.REPLAY_EVENT;
                Histogram.Timer startReplayDurationTimer = this.replayMetrics.startReplayDurationTimer();
                LogStreamBatchReader.Batch batch = (LogStreamBatchReader.Batch) this.logStreamBatchReader.next();
                this.replayStrategy.runWithRetry(() -> {
                    return tryToReplayBatch(batch);
                }, this.abortCondition).onComplete((bool, th) -> {
                    if (th != null) {
                        throw new RuntimeException("Failed to replay batch at '%s %s'".formatted(batch.current(), this.typedEvent.getMetadata()), th);
                    }
                    startReplayDurationTimer.close();
                    this.lastSourceEventPosition = Math.max(this.lastSourceEventPosition, this.batchSourceEventPosition);
                    this.replayMetrics.setLastSourcePosition(this.lastSourceEventPosition);
                    this.actor.submit(this::replayNextEvent);
                });
            } else if (this.streamProcessorMode == StreamProcessorMode.PROCESSING) {
                onRecordsReplayed();
            } else {
                this.currentState = State.AWAIT_RECORD;
            }
        } catch (RuntimeException e) {
            this.recoveryFuture.completeExceptionally(new RuntimeException(String.format("Failed to replay records. [snapshot-position: %d, last-read-record-position: %d, last-replayed-event-position: %d]", Long.valueOf(this.snapshotPosition), Long.valueOf(this.lastReadRecordPosition), Long.valueOf(this.lastReplayedEventPosition)), e));
        }
    }

    private boolean tryToReplayBatch(LogStreamBatchReader.Batch batch) throws Exception {
        if (this.zeebeDbTransaction != null) {
            this.zeebeDbTransaction.rollback();
            batch.head();
        }
        this.zeebeDbTransaction = this.transactionContext.getCurrentTransaction();
        this.zeebeDbTransaction.run(() -> {
            batch.forEachRemaining(this::replayEvent);
            if (this.batchSourceEventPosition > this.snapshotPosition) {
                this.lastProcessedPositionState.markAsProcessed(this.batchSourceEventPosition);
            }
        });
        this.zeebeDbTransaction.commit();
        this.zeebeDbTransaction = null;
        return true;
    }

    private void replayEvent(LoggedEvent loggedEvent) {
        if (this.eventFilter.applies(loggedEvent) && (loggedEvent.getSourceEventPosition() > this.snapshotPosition || loggedEvent.getSourceEventPosition() < 0)) {
            readMetadata(loggedEvent);
            TypedRecord<?> readRecordValue = readRecordValue(loggedEvent);
            this.recordProcessors.stream().filter(recordProcessor -> {
                return recordProcessor.accepts(readRecordValue.getValueType());
            }).findFirst().orElseThrow(() -> {
                return NoSuchProcessorException.forRecord(readRecordValue);
            }).replay(readRecordValue);
            this.lastReplayedEventPosition = readRecordValue.getPosition();
        }
        onRecordReplayed(loggedEvent);
    }

    private void onRecordsReplayed() {
        LastProcessingPositions lastProcessingPositions = new LastProcessingPositions(this.lastSourceEventPosition, this.lastReadRecordPosition);
        LOG.info(LOG_STMT_REPLAY_FINISHED, lastProcessingPositions);
        this.recoveryFuture.complete(lastProcessingPositions);
    }

    private void onRecordReplayed(LoggedEvent loggedEvent) {
        this.replayMetrics.event();
        long sourceEventPosition = loggedEvent.getSourceEventPosition();
        long position = loggedEvent.getPosition();
        long key = loggedEvent.getKey();
        if (this.lastReadRecordPosition >= position) {
            throw new IllegalStateException(String.format(ERROR_INCONSISTENT_LOG, Long.valueOf(position), Long.valueOf(this.lastReadRecordPosition)));
        }
        this.lastReadRecordPosition = position;
        this.batchSourceEventPosition = sourceEventPosition;
        if (Protocol.decodePartitionId(key) == this.partitionId) {
            this.keyGeneratorControls.setKeyIfHigher(key);
        }
    }

    private void readMetadata(LoggedEvent loggedEvent) throws ProcessingException {
        try {
            this.metadata.reset();
            loggedEvent.readMetadata(this.metadata);
        } catch (Exception e) {
            throw new ProcessingException(String.format(ERROR_MSG_EXPECTED_TO_READ_METADATA, loggedEvent), loggedEvent, null, e);
        }
    }

    private TypedRecord<?> readRecordValue(LoggedEvent loggedEvent) {
        this.typedEvent.wrap(loggedEvent, this.metadata, this.recordValues.readRecordValue(loggedEvent, this.metadata.getValueType()));
        return this.typedEvent;
    }

    public long getLastSourceEventPosition() {
        return this.lastSourceEventPosition;
    }

    public long getLastReplayedEventPosition() {
        return this.lastReplayedEventPosition;
    }

    public void close() {
        this.logStream.removeRecordAvailableListener(this);
    }
}
