package io.camunda.zeebe.broker.system.partitions.impl.steps;

import io.atomix.raft.RaftServer;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionContext;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep;
import io.camunda.zeebe.engine.Engine;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.streamprocessor.StreamProcessorMode;
import java.util.List;
import java.util.function.BiFunction;

/* loaded from: input_file:io/camunda/zeebe/broker/system/partitions/impl/steps/StreamProcessorTransitionStep.class */
public final class StreamProcessorTransitionStep implements PartitionTransitionStep {
    private final BiFunction<PartitionTransitionContext, RaftServer.Role, StreamProcessor> streamProcessorCreator;

    public StreamProcessorTransitionStep() {
        this(StreamProcessorTransitionStep::createStreamProcessor);
    }

    public StreamProcessorTransitionStep(BiFunction<PartitionTransitionContext, RaftServer.Role, StreamProcessor> biFunction) {
        this.streamProcessorCreator = biFunction;
    }

    @Override // io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep
    public void onNewRaftRole(PartitionTransitionContext partitionTransitionContext, RaftServer.Role role) {
        RaftServer.Role currentRole = partitionTransitionContext.getCurrentRole();
        StreamProcessor streamProcessor = partitionTransitionContext.getStreamProcessor();
        if (streamProcessor == null) {
            return;
        }
        if (shouldInstallOnTransition(role, currentRole) || role == RaftServer.Role.INACTIVE) {
            streamProcessor.pauseProcessing();
        }
    }

    @Override // io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep
    public ActorFuture<Void> prepareTransition(PartitionTransitionContext partitionTransitionContext, long j, RaftServer.Role role) {
        ConcurrencyControl concurrencyControl = partitionTransitionContext.getConcurrencyControl();
        RaftServer.Role currentRole = partitionTransitionContext.getCurrentRole();
        StreamProcessor streamProcessor = partitionTransitionContext.getStreamProcessor();
        if (streamProcessor == null || !(shouldInstallOnTransition(role, currentRole) || role == RaftServer.Role.INACTIVE)) {
            return concurrencyControl.createCompletedFuture();
        }
        partitionTransitionContext.getComponentHealthMonitor().removeComponent(streamProcessor.getName());
        ActorFuture<Void> closeAsync = streamProcessor.closeAsync();
        closeAsync.onComplete((r4, th) -> {
            if (th == null) {
                partitionTransitionContext.setStreamProcessor(null);
            }
        });
        return closeAsync;
    }

    @Override // io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep
    public ActorFuture<Void> transitionTo(PartitionTransitionContext partitionTransitionContext, long j, RaftServer.Role role) {
        RaftServer.Role currentRole = partitionTransitionContext.getCurrentRole();
        ConcurrencyControl concurrencyControl = partitionTransitionContext.getConcurrencyControl();
        if (!shouldInstallOnTransition(role, currentRole) && (partitionTransitionContext.getStreamProcessor() != null || role == RaftServer.Role.INACTIVE)) {
            return concurrencyControl.createCompletedFuture();
        }
        StreamProcessor apply = this.streamProcessorCreator.apply(partitionTransitionContext, role);
        partitionTransitionContext.setStreamProcessor(apply);
        ActorFuture openAsync = apply.openAsync(!partitionTransitionContext.shouldProcess());
        ActorFuture<Void> createFuture = concurrencyControl.createFuture();
        openAsync.onComplete((r7, th) -> {
            if (th != null) {
                createFuture.completeExceptionally(th);
                return;
            }
            if (partitionTransitionContext.shouldProcess()) {
                apply.resumeProcessing();
            } else {
                apply.pauseProcessing();
            }
            partitionTransitionContext.getComponentHealthMonitor().registerComponent(apply.getName(), apply);
            createFuture.complete((Object) null);
        });
        return createFuture;
    }

    @Override // io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep
    public String getName() {
        return "StreamProcessor";
    }

    private boolean shouldInstallOnTransition(RaftServer.Role role, RaftServer.Role role2) {
        return role == RaftServer.Role.LEADER || (role == RaftServer.Role.FOLLOWER && role2 != RaftServer.Role.CANDIDATE) || (role == RaftServer.Role.CANDIDATE && role2 != RaftServer.Role.FOLLOWER);
    }

    private static StreamProcessor createStreamProcessor(final PartitionTransitionContext partitionTransitionContext, RaftServer.Role role) {
        StreamProcessorMode streamProcessorMode = role == RaftServer.Role.LEADER ? StreamProcessorMode.PROCESSING : StreamProcessorMode.REPLAY;
        boolean isEnableBackup = partitionTransitionContext.getBrokerCfg().getExperimental().getFeatures().isEnableBackup();
        Engine engine = new Engine(partitionTransitionContext.getTypedRecordProcessorFactory());
        return StreamProcessor.builder().logStream(partitionTransitionContext.getLogStream()).actorSchedulingService(partitionTransitionContext.getActorSchedulingService()).zeebeDb(partitionTransitionContext.getZeebeDb()).recordProcessors(isEnableBackup ? List.of(engine, partitionTransitionContext.getCheckpointProcessor()) : List.of(engine)).eventApplierFactory(EventAppliers::new).nodeId(partitionTransitionContext.getNodeId()).commandResponseWriter(partitionTransitionContext.getCommandResponseWriter()).listener(new StreamProcessorListener() { // from class: io.camunda.zeebe.broker.system.partitions.impl.steps.StreamProcessorTransitionStep.1
            public void onProcessed(TypedRecord<?> typedRecord) {
                PartitionTransitionContext.this.getOnProcessedListener().accept(typedRecord);
            }

            public void onSkipped(LoggedEvent loggedEvent) {
            }
        }).streamProcessorMode(streamProcessorMode).partitionCommandSender(partitionTransitionContext.getPartitionCommandSender()).build();
    }
}
