package io.camunda.zeebe.streamprocessor;

import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.api.RecordProcessor;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.metrics.StreamProcessorMetrics;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordValues;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableProcessingState;
import io.camunda.zeebe.engine.state.processing.DbKeyGenerator;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.log.LogRecordAwaiter;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.clock.ActorClock;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.streamprocessor.state.StreamProcessorDbState;
import io.camunda.zeebe.util.exception.UnrecoverableException;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthMonitorable;
import io.camunda.zeebe.util.health.HealthReport;
import io.prometheus.client.Gauge;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/streamprocessor/StreamProcessor.class */
public class StreamProcessor extends Actor implements HealthMonitorable, LogRecordAwaiter {
    public static final long UNSET_POSITION = -1;
    private static final String ERROR_MESSAGE_RECOVER_FROM_SNAPSHOT_FAILED = "Expected to find event with the snapshot position %s in log stream, but nothing was found. Failed to recover '%s'.";
    private final ActorSchedulingService actorSchedulingService;
    private final List<StreamProcessorLifecycleAware> lifecycleAwareListeners;
    private final Function<MutableProcessingState, EventApplier> eventApplierFactory;
    private final StreamProcessorMetrics metrics;
    private final LogStream logStream;
    private final int partitionId;
    private final ZeebeDb zeebeDb;
    private final StreamProcessorContext streamProcessorContext;
    private final String actorName;
    private LogStreamReader logStreamReader;
    private ProcessingStateMachine processingStateMachine;
    private ReplayStateMachine replayStateMachine;
    private CompletableActorFuture<Void> openFuture;
    private volatile long lastTickTime;
    private ActorFuture<LastProcessingPositions> replayCompletedFuture;
    private StreamProcessorDbState streamProcessorDbState;
    private ProcessingScheduleServiceImpl processorActorService;
    private ProcessingScheduleServiceImpl asyncScheduleService;
    private AsyncProcessingScheduleServiceActor asyncActor;
    private final int nodeId;
    public static final Duration HEALTH_CHECK_TICK_DURATION = Duration.ofSeconds(5);
    private static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    private final AtomicBoolean isOpened = new AtomicBoolean(false);
    private final Set<FailureListener> failureListeners = new HashSet();
    private final CompletableActorFuture<Void> closeFuture = new CompletableActorFuture<>();
    private boolean shouldProcess = true;
    private final List<RecordProcessor> recordProcessors = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/camunda/zeebe/streamprocessor/StreamProcessor$AsyncProcessingScheduleServiceActor.class */
    public static final class AsyncProcessingScheduleServiceActor extends Actor {
        private final ProcessingScheduleServiceImpl scheduleService;
        private CompletableActorFuture<Void> closeFuture = CompletableActorFuture.completed((Object) null);
        private final String asyncScheduleActorName;
        private final int partitionId;

        public AsyncProcessingScheduleServiceActor(ProcessingScheduleServiceImpl processingScheduleServiceImpl, int i) {
            this.scheduleService = processingScheduleServiceImpl;
            this.asyncScheduleActorName = buildActorName("AsyncProcessingScheduleActor", i);
            this.partitionId = i;
        }

        protected Map<String, String> createContext() {
            Map<String, String> createContext = super.createContext();
            createContext.put("partitionId", Integer.toString(this.partitionId));
            return createContext;
        }

        public String getName() {
            return this.asyncScheduleActorName;
        }

        protected void onActorStarting() {
            this.actor.runOnCompletionBlockingCurrentPhase(this.scheduleService.open(this.actor), (r4, th) -> {
                if (th != null) {
                    this.actor.fail(th);
                }
            });
            this.closeFuture = new CompletableActorFuture<>();
        }

        protected void onActorClosed() {
            this.closeFuture.complete((Object) null);
        }

        /* renamed from: closeAsync, reason: merged with bridge method [inline-methods] */
        public CompletableActorFuture<Void> m65closeAsync() {
            this.actor.close();
            return this.closeFuture;
        }

        public void onActorFailed() {
            this.closeFuture.complete((Object) null);
        }

        public ActorControl getActorControl() {
            return this.actor;
        }
    }

    /* loaded from: input_file:io/camunda/zeebe/streamprocessor/StreamProcessor$Phase.class */
    public enum Phase {
        INITIAL,
        REPLAY,
        PROCESSING,
        FAILED,
        PAUSED
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/camunda/zeebe/streamprocessor/StreamProcessor$Step.class */
    public interface Step {
        ActorFuture<Void> run();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamProcessor(StreamProcessorBuilder streamProcessorBuilder) {
        this.actorSchedulingService = streamProcessorBuilder.getActorSchedulingService();
        this.lifecycleAwareListeners = streamProcessorBuilder.getLifecycleListeners();
        this.zeebeDb = streamProcessorBuilder.getZeebeDb();
        this.eventApplierFactory = streamProcessorBuilder.getEventApplierFactory();
        this.streamProcessorContext = streamProcessorBuilder.getProcessingContext().eventCache(new RecordValues()).actor(this.actor).abortCondition(this::isClosed);
        this.logStream = this.streamProcessorContext.getLogStream();
        this.partitionId = this.logStream.getPartitionId();
        this.nodeId = streamProcessorBuilder.getNodeId();
        this.actorName = buildActorName("StreamProcessor", this.partitionId);
        this.metrics = new StreamProcessorMetrics(this.partitionId);
        this.recordProcessors.addAll(streamProcessorBuilder.getRecordProcessors());
    }

    public static StreamProcessorBuilder builder() {
        return new StreamProcessorBuilder();
    }

    @Deprecated
    public StreamProcessorDbState getStreamProcessorDbState() {
        return this.streamProcessorDbState;
    }

    protected Map<String, String> createContext() {
        Map<String, String> createContext = super.createContext();
        createContext.put("partitionId", Integer.toString(this.partitionId));
        return createContext;
    }

    public String getName() {
        return this.actorName;
    }

    protected void onActorStarting() {
        this.actor.runOnCompletionBlockingCurrentPhase(this.logStream.newLogStreamReader(), this::onRetrievingReader);
    }

    protected void onActorStarted() {
        try {
            LOG.debug("Recovering state of partition {} from snapshot", Integer.valueOf(this.partitionId));
            Gauge.Timer startRecoveryTimer = this.metrics.startRecoveryTimer();
            long recoverFromSnapshot = recoverFromSnapshot();
            StreamProcessorContext streamProcessorContext = this.streamProcessorContext;
            Objects.requireNonNull(streamProcessorContext);
            Supplier supplier = streamProcessorContext::getStreamProcessorPhase;
            BooleanSupplier abortCondition = this.streamProcessorContext.getAbortCondition();
            LogStream logStream = this.logStream;
            Objects.requireNonNull(logStream);
            this.processorActorService = new ProcessingScheduleServiceImpl(supplier, abortCondition, logStream::newLogStreamBatchWriter);
            StreamProcessorContext streamProcessorContext2 = this.streamProcessorContext;
            Objects.requireNonNull(streamProcessorContext2);
            Supplier supplier2 = streamProcessorContext2::getStreamProcessorPhase;
            BooleanSupplier booleanSupplier = () -> {
                return false;
            };
            LogStream logStream2 = this.logStream;
            Objects.requireNonNull(logStream2);
            this.asyncScheduleService = new ProcessingScheduleServiceImpl(supplier2, booleanSupplier, logStream2::newLogStreamBatchWriter);
            this.asyncActor = new AsyncProcessingScheduleServiceActor(this.asyncScheduleService, this.partitionId);
            this.streamProcessorContext.scheduleService(new ExtendedProcessingScheduleServiceImpl(this.processorActorService, this.asyncScheduleService, this.asyncActor.getActorControl()));
            initRecordProcessors();
            healthCheckTick();
            this.replayStateMachine = new ReplayStateMachine(this.recordProcessors, this.streamProcessorContext, this::shouldProcessNext);
            this.openFuture.complete((Object) null);
            this.replayCompletedFuture = this.replayStateMachine.startRecover(recoverFromSnapshot);
            if (this.shouldProcess) {
                this.streamProcessorContext.streamProcessorPhase(Phase.REPLAY);
            } else {
                setStateToPausedAndNotifyListeners();
            }
            if (isInReplayOnlyMode()) {
                this.replayCompletedFuture.onComplete((lastProcessingPositions, th) -> {
                    if (th != null) {
                        LOG.error("The replay of events failed.", th);
                        onFailure(th);
                    }
                });
            } else {
                this.replayCompletedFuture.onComplete((lastProcessingPositions2, th2) -> {
                    if (th2 != null) {
                        LOG.error("The replay of events failed.", th2);
                        onFailure(th2);
                    } else {
                        onRecovered(lastProcessingPositions2);
                        startRecoveryTimer.close();
                    }
                });
            }
        } catch (RuntimeException e) {
            onFailure(e);
        }
    }

    protected void onActorClosing() {
        tearDown();
    }

    protected void onActorClosed() {
        this.closeFuture.complete((Object) null);
        LOG.debug("Closed stream processor controller {}.", getName());
    }

    protected void onActorCloseRequested() {
        if (isFailed()) {
            return;
        }
        this.lifecycleAwareListeners.forEach((v0) -> {
            v0.onClose();
        });
    }

    public ActorFuture<Void> closeAsync() {
        if (this.isOpened.getAndSet(false)) {
            this.actor.run(() -> {
                this.asyncActor.m65closeAsync().onComplete((r3, th) -> {
                    this.actor.close();
                });
            });
        }
        return this.closeFuture;
    }

    protected void handleFailure(Throwable th) {
        onFailure(th);
    }

    public void onActorFailed() {
        this.streamProcessorContext.streamProcessorPhase(Phase.FAILED);
        this.isOpened.set(false);
        this.lifecycleAwareListeners.forEach((v0) -> {
            v0.onFailed();
        });
        tearDown();
        this.closeFuture.complete((Object) null);
    }

    private boolean shouldProcessNext() {
        return isOpened() && this.shouldProcess;
    }

    private void tearDown() {
        this.processorActorService.close();
        this.asyncScheduleService.close();
        this.streamProcessorContext.getLogStreamReader().close();
        this.logStream.removeRecordAvailableListener(this);
        this.replayStateMachine.close();
    }

    private void healthCheckTick() {
        this.lastTickTime = ActorClock.currentTimeMillis();
        this.actor.runDelayed(HEALTH_CHECK_TICK_DURATION, this::healthCheckTick);
    }

    private void chainSteps(int i, Step[] stepArr, Runnable runnable) {
        if (i == stepArr.length) {
            runnable.run();
        } else {
            stepArr[i].run().onComplete((r9, th) -> {
                if (th == null) {
                    chainSteps(i + 1, stepArr, runnable);
                } else {
                    onFailure(th);
                }
            });
        }
    }

    private void onRetrievingWriter(LogStreamBatchWriter logStreamBatchWriter, Throwable th, LastProcessingPositions lastProcessingPositions) {
        if (th != null) {
            onFailure(th);
            return;
        }
        this.streamProcessorContext.logStreamBatchWriter(logStreamBatchWriter);
        this.streamProcessorContext.streamProcessorPhase(Phase.PROCESSING);
        chainSteps(0, new Step[]{() -> {
            return this.processorActorService.open(this.actor);
        }, () -> {
            return this.actorSchedulingService.submitActor(this.asyncActor);
        }}, () -> {
            startProcessing(lastProcessingPositions);
        });
    }

    private void startProcessing(LastProcessingPositions lastProcessingPositions) {
        this.processingStateMachine = new ProcessingStateMachine(this.streamProcessorContext, this::shouldProcessNext, this.recordProcessors);
        this.logStream.registerRecordAvailableListener(this);
        this.lifecycleAwareListeners.forEach(streamProcessorLifecycleAware -> {
            streamProcessorLifecycleAware.onRecovered(this.streamProcessorContext);
        });
        this.processingStateMachine.startProcessing(lastProcessingPositions);
        if (this.shouldProcess) {
            return;
        }
        setStateToPausedAndNotifyListeners();
    }

    private void onRetrievingReader(LogStreamReader logStreamReader, Throwable th) {
        if (th == null) {
            this.logStreamReader = logStreamReader;
            this.streamProcessorContext.logStreamReader(logStreamReader);
        } else {
            LOG.error("Unexpected error on retrieving reader from log stream.", th);
            this.actor.close();
        }
    }

    public ActorFuture<Void> openAsync(boolean z) {
        if (this.isOpened.compareAndSet(false, true)) {
            this.shouldProcess = !z;
            this.openFuture = new CompletableActorFuture<>();
            this.actorSchedulingService.submitActor(this);
        }
        return this.openFuture;
    }

    private void initRecordProcessors() {
        RecordProcessorContextImpl recordProcessorContextImpl = new RecordProcessorContextImpl(this.partitionId, this.streamProcessorContext.getScheduleService(), this.zeebeDb, this.streamProcessorContext.getTransactionContext(), this.eventApplierFactory, this.streamProcessorContext.getPartitionCommandSender(), this.streamProcessorContext.getKeyGeneratorControls());
        this.recordProcessors.forEach(recordProcessor -> {
            recordProcessor.init(recordProcessorContextImpl);
        });
        this.lifecycleAwareListeners.addAll(recordProcessorContextImpl.getLifecycleListeners());
    }

    private long recoverFromSnapshot() {
        TransactionContext createContext = this.zeebeDb.createContext();
        this.streamProcessorContext.transactionContext(createContext);
        this.streamProcessorContext.keyGeneratorControls(new DbKeyGenerator(this.partitionId, this.zeebeDb, createContext));
        this.streamProcessorDbState = new StreamProcessorDbState(this.zeebeDb, createContext);
        this.streamProcessorContext.lastProcessedPositionState(this.streamProcessorDbState.getLastProcessedPositionState());
        long lastSuccessfulProcessedRecordPosition = this.streamProcessorDbState.getLastProcessedPositionState().getLastSuccessfulProcessedRecordPosition();
        if ((!this.logStreamReader.seekToNextEvent(lastSuccessfulProcessedRecordPosition)) && this.streamProcessorContext.getProcessorMode() == StreamProcessorMode.PROCESSING) {
            throw new IllegalStateException(String.format(ERROR_MESSAGE_RECOVER_FROM_SNAPSHOT_FAILED, Long.valueOf(lastSuccessfulProcessedRecordPosition), getName()));
        }
        LOG.info("Recovered state of partition {} from snapshot at position {}", Integer.valueOf(this.partitionId), Long.valueOf(lastSuccessfulProcessedRecordPosition));
        return lastSuccessfulProcessedRecordPosition;
    }

    private void onRecovered(LastProcessingPositions lastProcessingPositions) {
        this.logStream.newLogStreamBatchWriter().onComplete((logStreamBatchWriter, th) -> {
            onRetrievingWriter(logStreamBatchWriter, th, lastProcessingPositions);
        });
    }

    private void onFailure(Throwable th) {
        LOG.error("Actor {} failed in phase {}.", new Object[]{this.actorName, this.actor.getLifecyclePhase(), th});
        this.asyncActor.m65closeAsync().onComplete((r5, th2) -> {
            this.actor.fail(th);
            if (!this.openFuture.isDone()) {
                this.openFuture.completeExceptionally(th);
            }
            if (th instanceof UnrecoverableException) {
                HealthReport withIssue = HealthReport.dead(this).withIssue(th);
                this.failureListeners.forEach(failureListener -> {
                    failureListener.onUnrecoverableFailure(withIssue);
                });
            } else {
                HealthReport withIssue2 = HealthReport.unhealthy(this).withIssue(th);
                this.failureListeners.forEach(failureListener2 -> {
                    failureListener2.onFailure(withIssue2);
                });
            }
        });
    }

    public boolean isOpened() {
        return this.isOpened.get();
    }

    public boolean isClosed() {
        return !this.isOpened.get();
    }

    public boolean isFailed() {
        return this.streamProcessorContext.getStreamProcessorPhase() == Phase.FAILED;
    }

    public ActorFuture<Long> getLastProcessedPositionAsync() {
        return this.actor.call(() -> {
            if (isInReplayOnlyMode()) {
                return Long.valueOf(this.replayStateMachine.getLastSourceEventPosition());
            }
            if (this.processingStateMachine == null) {
                return -1L;
            }
            return Long.valueOf(this.processingStateMachine.getLastSuccessfulProcessedRecordPosition());
        });
    }

    private boolean isInReplayOnlyMode() {
        return this.streamProcessorContext.getProcessorMode() == StreamProcessorMode.REPLAY;
    }

    public ActorFuture<Long> getLastWrittenPositionAsync() {
        return this.actor.call(() -> {
            if (isInReplayOnlyMode()) {
                return Long.valueOf(this.replayStateMachine.getLastReplayedEventPosition());
            }
            if (this.processingStateMachine == null) {
                return -1L;
            }
            return Long.valueOf(this.processingStateMachine.getLastWrittenPosition());
        });
    }

    public HealthReport getHealthReport() {
        return this.actor.isClosed() ? HealthReport.unhealthy(this).withMessage("actor is closed") : (this.processingStateMachine == null || this.processingStateMachine.isMakingProgress()) ? ActorClock.currentTimeMillis() - this.lastTickTime > HEALTH_CHECK_TICK_DURATION.toMillis() * 2 ? HealthReport.unhealthy(this).withMessage("actor appears blocked") : this.streamProcessorContext.getStreamProcessorPhase() == Phase.FAILED ? HealthReport.unhealthy(this).withMessage("in failed phase") : HealthReport.healthy(this) : HealthReport.unhealthy(this).withMessage("not making progress");
    }

    public void addFailureListener(FailureListener failureListener) {
        this.actor.run(() -> {
            this.failureListeners.add(failureListener);
        });
    }

    public void removeFailureListener(FailureListener failureListener) {
        this.actor.run(() -> {
            this.failureListeners.remove(failureListener);
        });
    }

    public ActorFuture<Phase> getCurrentPhase() {
        ActorControl actorControl = this.actor;
        StreamProcessorContext streamProcessorContext = this.streamProcessorContext;
        Objects.requireNonNull(streamProcessorContext);
        return actorControl.call(streamProcessorContext::getStreamProcessorPhase);
    }

    public ActorFuture<Void> pauseProcessing() {
        return this.actor.call(() -> {
            if (this.shouldProcess) {
                setStateToPausedAndNotifyListeners();
            }
        });
    }

    public ActorFuture<Boolean> hasProcessingReachedTheEnd() {
        return this.actor.call(() -> {
            return Boolean.valueOf((this.processingStateMachine == null || isInReplayOnlyMode() || !this.processingStateMachine.hasReachedEnd()) ? false : true);
        });
    }

    private void setStateToPausedAndNotifyListeners() {
        if (isInReplayOnlyMode() || !this.replayCompletedFuture.isDone()) {
            LOG.debug("Paused replay for partition {}", Integer.valueOf(this.partitionId));
        } else {
            this.lifecycleAwareListeners.forEach((v0) -> {
                v0.onPaused();
            });
            LOG.debug("Paused processing for partition {}", Integer.valueOf(this.partitionId));
        }
        this.shouldProcess = false;
        this.streamProcessorContext.streamProcessorPhase(Phase.PAUSED);
    }

    public void resumeProcessing() {
        this.actor.call(() -> {
            if (this.shouldProcess) {
                return;
            }
            this.shouldProcess = true;
            if (isInReplayOnlyMode() || !this.replayCompletedFuture.isDone()) {
                this.streamProcessorContext.streamProcessorPhase(Phase.REPLAY);
                ActorControl actorControl = this.actor;
                ReplayStateMachine replayStateMachine = this.replayStateMachine;
                Objects.requireNonNull(replayStateMachine);
                actorControl.submit(replayStateMachine::replayNextEvent);
                LOG.debug("Resumed replay for partition {}", Integer.valueOf(this.partitionId));
                return;
            }
            this.lifecycleAwareListeners.forEach((v0) -> {
                v0.onResumed();
            });
            this.streamProcessorContext.streamProcessorPhase(Phase.PROCESSING);
            if (this.processingStateMachine != null) {
                ActorControl actorControl2 = this.actor;
                ProcessingStateMachine processingStateMachine = this.processingStateMachine;
                Objects.requireNonNull(processingStateMachine);
                actorControl2.submit(processingStateMachine::readNextRecord);
            }
            LOG.debug("Resumed processing for partition {}", Integer.valueOf(this.partitionId));
        });
    }

    public void onRecordAvailable() {
        ActorControl actorControl = this.actor;
        ProcessingStateMachine processingStateMachine = this.processingStateMachine;
        Objects.requireNonNull(processingStateMachine);
        actorControl.run(processingStateMachine::readNextRecord);
    }
}
