package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.metrics.StreamProcessorMetrics;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriterImpl;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.util.exception.UnrecoverableException;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthMonitorable;
import io.camunda.zeebe.util.health.HealthStatus;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.ActorCondition;
import io.camunda.zeebe.util.sched.ActorControl;
import io.camunda.zeebe.util.sched.ActorScheduler;
import io.camunda.zeebe.util.sched.clock.ActorClock;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessor.class */
public class StreamProcessor extends Actor implements HealthMonitorable {
    public static final long UNSET_POSITION = -1;
    private static final String ERROR_MESSAGE_RECOVER_FROM_SNAPSHOT_FAILED = "Expected to find event with the snapshot position %s in log stream, but nothing was found. Failed to recover '%s'.";
    private final ActorScheduler actorScheduler;
    private final List<StreamProcessorLifecycleAware> lifecycleAwareListeners;
    private final Function<MutableZeebeState, EventApplier> eventApplierFactory;
    private final LogStream logStream;
    private final int partitionId;
    private final ZeebeDb zeebeDb;
    private final ProcessingContext processingContext;
    private final TypedRecordProcessorFactory typedRecordProcessorFactory;
    private final String actorName;
    private LogStreamReader logStreamReader;
    private ActorCondition onCommitPositionUpdatedCondition;
    private ProcessingStateMachine processingStateMachine;
    private CompletableActorFuture<Void> openFuture;
    private FailureListener failureListener;
    private volatile long lastTickTime;
    private ActorFuture<Long> recoverFuture;
    public static final Duration HEALTH_CHECK_TICK_DURATION = Duration.ofSeconds(5);
    private static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    private final AtomicBoolean isOpened = new AtomicBoolean(false);
    private long snapshotPosition = -1;
    private volatile Phase phase = Phase.REPROCESSING;
    private CompletableActorFuture<Void> closeFuture = CompletableActorFuture.completed((Object) null);
    private boolean shouldProcess = true;

    /* loaded from: input_file:io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessor$Phase.class */
    public enum Phase {
        REPROCESSING,
        PROCESSING,
        FAILED,
        PAUSED
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamProcessor(StreamProcessorBuilder streamProcessorBuilder) {
        this.actorScheduler = streamProcessorBuilder.getActorScheduler();
        this.lifecycleAwareListeners = streamProcessorBuilder.getLifecycleListeners();
        this.typedRecordProcessorFactory = streamProcessorBuilder.getTypedRecordProcessorFactory();
        this.zeebeDb = streamProcessorBuilder.getZeebeDb();
        this.eventApplierFactory = streamProcessorBuilder.getEventApplierFactory();
        this.processingContext = streamProcessorBuilder.getProcessingContext().eventCache(new RecordValues()).actor(this.actor).abortCondition(this::isClosed);
        this.logStream = this.processingContext.getLogStream();
        this.partitionId = this.logStream.getPartitionId();
        this.actorName = buildActorName(streamProcessorBuilder.getNodeId(), "StreamProcessor", this.partitionId);
    }

    public static StreamProcessorBuilder builder() {
        return new StreamProcessorBuilder();
    }

    public String getName() {
        return this.actorName;
    }

    protected void onActorStarting() {
        this.actor.runOnCompletionBlockingCurrentPhase(this.logStream.newLogStreamBatchWriter(), this::onRetrievingWriter);
    }

    protected void onActorStarted() {
        try {
            LOG.debug("Recovering state of partition {} from snapshot", Integer.valueOf(this.partitionId));
            long currentTimeMillis = System.currentTimeMillis();
            this.snapshotPosition = recoverFromSnapshot();
            initProcessors();
            this.processingStateMachine = new ProcessingStateMachine(this.processingContext, this::shouldProcessNext);
            healthCheckTick();
            this.openFuture.complete((Object) null);
            ReProcessingStateMachine reProcessingStateMachine = new ReProcessingStateMachine(this.processingContext);
            this.processingContext.disableLogStreamWriter();
            this.processingContext.enableReprocessingStreamWriter();
            this.recoverFuture = reProcessingStateMachine.startRecover(this.snapshotPosition);
            this.actor.runOnCompletion(this.recoverFuture, (l, th) -> {
                if (th != null) {
                    LOG.error("Unexpected error on recovery happens.", th);
                    onFailure(th);
                } else {
                    onRecovered(l.longValue());
                    new StreamProcessorMetrics(this.partitionId).recoveryTime(System.currentTimeMillis() - currentTimeMillis);
                }
            });
        } catch (RuntimeException e) {
            onFailure(e);
        }
    }

    protected void onActorClosing() {
        tearDown();
    }

    protected void onActorClosed() {
        this.closeFuture.complete((Object) null);
        LOG.debug("Closed stream processor controller {}.", getName());
    }

    protected void onActorCloseRequested() {
        if (isFailed()) {
            return;
        }
        this.lifecycleAwareListeners.forEach((v0) -> {
            v0.onClose();
        });
    }

    public ActorFuture<Void> closeAsync() {
        if (this.isOpened.compareAndSet(true, false)) {
            this.closeFuture = new CompletableActorFuture<>();
            this.actor.close();
        }
        return this.closeFuture;
    }

    protected void handleFailure(Exception exc) {
        onFailure(exc);
    }

    public void onActorFailed() {
        this.phase = Phase.FAILED;
        this.closeFuture = CompletableActorFuture.completed((Object) null);
        this.isOpened.set(false);
        this.lifecycleAwareListeners.forEach((v0) -> {
            v0.onFailed();
        });
        tearDown();
    }

    private boolean shouldProcessNext() {
        return isOpened() && this.shouldProcess;
    }

    private void tearDown() {
        this.processingContext.getLogStreamReader().close();
        if (this.onCommitPositionUpdatedCondition != null) {
            this.logStream.removeOnCommitPositionUpdatedCondition(this.onCommitPositionUpdatedCondition);
            this.onCommitPositionUpdatedCondition = null;
        }
    }

    private void healthCheckTick() {
        this.lastTickTime = ActorClock.currentTimeMillis();
        this.actor.runDelayed(HEALTH_CHECK_TICK_DURATION, this::healthCheckTick);
    }

    private void onRetrievingWriter(LogStreamBatchWriter logStreamBatchWriter, Throwable th) {
        if (th == null) {
            this.processingContext.maxFragmentSize(logStreamBatchWriter.getMaxFragmentLength()).logStreamWriter(new TypedStreamWriterImpl(logStreamBatchWriter));
            this.actor.runOnCompletionBlockingCurrentPhase(this.logStream.newLogStreamReader(), this::onRetrievingReader);
        } else {
            LOG.error("Unexpected error on retrieving batch writer from log stream.", th);
            this.actor.close();
        }
    }

    private void onRetrievingReader(LogStreamReader logStreamReader, Throwable th) {
        if (th == null) {
            this.logStreamReader = logStreamReader;
            this.processingContext.logStreamReader(logStreamReader);
        } else {
            LOG.error("Unexpected error on retrieving reader from log stream.", th);
            this.actor.close();
        }
    }

    public ActorFuture<Void> openAsync(boolean z) {
        if (this.isOpened.compareAndSet(false, true)) {
            this.shouldProcess = !z;
            this.openFuture = new CompletableActorFuture<>();
            this.actorScheduler.submitActor(this);
        }
        return this.openFuture;
    }

    private void initProcessors() {
        TypedRecordProcessors createProcessors = this.typedRecordProcessorFactory.createProcessors(this.processingContext);
        this.lifecycleAwareListeners.addAll(createProcessors.getLifecycleListeners());
        RecordProcessorMap recordProcessorMap = createProcessors.getRecordProcessorMap();
        Iterator<TypedRecordProcessor> values = recordProcessorMap.values();
        List<StreamProcessorLifecycleAware> list = this.lifecycleAwareListeners;
        Objects.requireNonNull(list);
        values.forEachRemaining((v1) -> {
            r1.add(v1);
        });
        this.processingContext.recordProcessorMap(recordProcessorMap);
    }

    private long recoverFromSnapshot() {
        long lastSuccessfulProcessedRecordPosition = recoverState().getLastProcessedPositionState().getLastSuccessfulProcessedRecordPosition();
        if (!this.logStreamReader.seekToNextEvent(lastSuccessfulProcessedRecordPosition)) {
            throw new IllegalStateException(String.format(ERROR_MESSAGE_RECOVER_FROM_SNAPSHOT_FAILED, Long.valueOf(lastSuccessfulProcessedRecordPosition), getName()));
        }
        LOG.info("Recovered state of partition {} from snapshot at position {}", Integer.valueOf(this.partitionId), Long.valueOf(lastSuccessfulProcessedRecordPosition));
        return lastSuccessfulProcessedRecordPosition;
    }

    private ZeebeDbState recoverState() {
        TransactionContext createContext = this.zeebeDb.createContext();
        ZeebeDbState zeebeDbState = new ZeebeDbState(this.partitionId, this.zeebeDb, createContext);
        this.processingContext.transactionContext(createContext);
        this.processingContext.zeebeState(zeebeDbState);
        this.processingContext.eventApplier(this.eventApplierFactory.apply(zeebeDbState));
        return zeebeDbState;
    }

    private void onRecovered(long j) {
        this.phase = Phase.PROCESSING;
        this.processingContext.enableLogStreamWriter();
        ActorControl actorControl = this.actor;
        String str = getName() + "-on-commit-position-updated";
        ProcessingStateMachine processingStateMachine = this.processingStateMachine;
        Objects.requireNonNull(processingStateMachine);
        this.onCommitPositionUpdatedCondition = actorControl.onCondition(str, processingStateMachine::readNextEvent);
        this.logStream.registerOnCommitPositionUpdatedCondition(this.onCommitPositionUpdatedCondition);
        this.lifecycleAwareListeners.forEach(streamProcessorLifecycleAware -> {
            streamProcessorLifecycleAware.onRecovered(this.processingContext);
        });
        this.processingStateMachine.startProcessing(j);
        if (this.shouldProcess) {
            return;
        }
        setStateToPausedAndNotifyListeners();
    }

    private void onFailure(Throwable th) {
        LOG.error("Actor {} failed in phase {}.", new Object[]{this.actorName, this.actor.getLifecyclePhase(), th});
        this.actor.fail();
        if (!this.openFuture.isDone()) {
            this.openFuture.completeExceptionally(th);
        }
        if (this.failureListener != null) {
            if (th instanceof UnrecoverableException) {
                this.failureListener.onUnrecoverableFailure();
            } else {
                this.failureListener.onFailure();
            }
        }
    }

    public boolean isOpened() {
        return this.isOpened.get();
    }

    public boolean isClosed() {
        return !this.isOpened.get();
    }

    public boolean isFailed() {
        return this.phase == Phase.FAILED;
    }

    public ActorFuture<Long> getLastProcessedPositionAsync() {
        ActorControl actorControl = this.actor;
        ProcessingStateMachine processingStateMachine = this.processingStateMachine;
        Objects.requireNonNull(processingStateMachine);
        return actorControl.call(processingStateMachine::getLastSuccessfulProcessedEventPosition);
    }

    public ActorFuture<Long> getLastWrittenPositionAsync() {
        ActorControl actorControl = this.actor;
        ProcessingStateMachine processingStateMachine = this.processingStateMachine;
        Objects.requireNonNull(processingStateMachine);
        return actorControl.call(processingStateMachine::getLastWrittenEventPosition);
    }

    public HealthStatus getHealthStatus() {
        if (!this.actor.isClosed() && this.processingStateMachine.isMakingProgress() && ActorClock.currentTimeMillis() - this.lastTickTime <= HEALTH_CHECK_TICK_DURATION.toMillis() * 2 && this.phase != Phase.FAILED) {
            return HealthStatus.HEALTHY;
        }
        return HealthStatus.UNHEALTHY;
    }

    public void addFailureListener(FailureListener failureListener) {
        this.actor.run(() -> {
            this.failureListener = failureListener;
        });
    }

    public ActorFuture<Phase> getCurrentPhase() {
        return this.actor.call(() -> {
            return this.phase;
        });
    }

    public ActorFuture<Void> pauseProcessing() {
        return this.actor.call(() -> {
            this.recoverFuture.onComplete((l, th) -> {
                if (this.shouldProcess) {
                    setStateToPausedAndNotifyListeners();
                }
            });
        });
    }

    private void setStateToPausedAndNotifyListeners() {
        this.lifecycleAwareListeners.forEach((v0) -> {
            v0.onPaused();
        });
        this.shouldProcess = false;
        this.phase = Phase.PAUSED;
        LOG.debug("Paused processing for partition {}", Integer.valueOf(this.partitionId));
    }

    public void resumeProcessing() {
        this.actor.call(() -> {
            this.recoverFuture.onComplete((l, th) -> {
                if (this.shouldProcess) {
                    return;
                }
                this.lifecycleAwareListeners.forEach((v0) -> {
                    v0.onResumed();
                });
                this.shouldProcess = true;
                this.phase = Phase.PROCESSING;
                ActorControl actorControl = this.actor;
                ProcessingStateMachine processingStateMachine = this.processingStateMachine;
                Objects.requireNonNull(processingStateMachine);
                actorControl.submit(processingStateMachine::readNextEvent);
                LOG.debug("Resumed processing for partition {}", Integer.valueOf(this.partitionId));
            });
        });
    }
}
