package io.zeebe.broker.workflow.processor;

import io.zeebe.broker.clustering.base.topology.TopologyManager;
import io.zeebe.broker.logstreams.processor.CommandProcessor;
import io.zeebe.broker.logstreams.processor.KeyGenerator;
import io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware;
import io.zeebe.broker.logstreams.processor.TypedEventStreamProcessorBuilder;
import io.zeebe.broker.logstreams.processor.TypedRecordProcessor;
import io.zeebe.broker.logstreams.processor.TypedStreamEnvironment;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessor;
import io.zeebe.broker.logstreams.processor.TypedStreamReader;
import io.zeebe.broker.subscription.command.SubscriptionCommandSender;
import io.zeebe.broker.workflow.processor.deployment.DeploymentCreateProcessor;
import io.zeebe.broker.workflow.processor.deployment.TransformingDeploymentCreateProcessor;
import io.zeebe.broker.workflow.processor.instance.CancelWorkflowInstanceProcessor;
import io.zeebe.broker.workflow.processor.instance.CreateWorkflowInstanceEventProcessor;
import io.zeebe.broker.workflow.processor.instance.UpdatePayloadProcessor;
import io.zeebe.broker.workflow.processor.instance.WorkflowInstanceCreatedEventProcessor;
import io.zeebe.broker.workflow.processor.instance.WorkflowInstanceRejectedEventProcessor;
import io.zeebe.broker.workflow.processor.job.JobCompletedEventProcessor;
import io.zeebe.broker.workflow.processor.job.JobCreatedProcessor;
import io.zeebe.broker.workflow.processor.message.CorrelateWorkflowInstanceSubscription;
import io.zeebe.broker.workflow.processor.message.OpenWorkflowInstanceSubscriptionProcessor;
import io.zeebe.broker.workflow.state.WorkflowState;
import io.zeebe.logstreams.processor.StreamProcessorContext;
import io.zeebe.logstreams.state.StateSnapshotController;
import io.zeebe.logstreams.state.StateStorage;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.intent.DeploymentIntent;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.protocol.intent.WorkflowInstanceSubscriptionIntent;
import java.util.function.Consumer;

/* loaded from: input_file:io/zeebe/broker/workflow/processor/WorkflowInstanceStreamProcessor.class */
public class WorkflowInstanceStreamProcessor implements StreamProcessorLifecycleAware {
    private TypedStreamReader streamReader;
    private final TopologyManager topologyManager;
    private final WorkflowState workflowState;
    private final SubscriptionCommandSender subscriptionCommandSender;
    private final Consumer<StreamProcessorContext> onRecoveredCallback;
    private final Runnable onClosedCallback;

    public WorkflowInstanceStreamProcessor(WorkflowState workflowState, SubscriptionCommandSender subscriptionCommandSender, TopologyManager topologyManager) {
        this(streamProcessorContext -> {
        }, () -> {
        }, workflowState, subscriptionCommandSender, topologyManager);
    }

    public WorkflowInstanceStreamProcessor(Consumer<StreamProcessorContext> consumer, Runnable runnable, WorkflowState workflowState, SubscriptionCommandSender subscriptionCommandSender, TopologyManager topologyManager) {
        this.onRecoveredCallback = consumer;
        this.onClosedCallback = runnable;
        this.workflowState = workflowState;
        this.subscriptionCommandSender = subscriptionCommandSender;
        this.topologyManager = topologyManager;
    }

    public TypedStreamProcessor createStreamProcessor(TypedStreamEnvironment typedStreamEnvironment) {
        int partitionId = typedStreamEnvironment.getStream().getPartitionId();
        TypedEventStreamProcessorBuilder keyGenerator = typedStreamEnvironment.newStreamProcessor().keyGenerator(KeyGenerator.createWorkflowInstanceKeyGenerator(partitionId, this.workflowState));
        addWorkflowInstanceEventStreamProcessors(keyGenerator);
        addBpmnStepProcessor(keyGenerator);
        addJobStreamProcessors(keyGenerator);
        addMessageStreamProcessors(keyGenerator);
        addDeploymentStreamProcessors(keyGenerator, partitionId);
        return keyGenerator.withStateController(this.workflowState).withListener(this).build();
    }

    public StateSnapshotController createSnapshotController(StateStorage stateStorage) {
        return new StateSnapshotController(this.workflowState, stateStorage);
    }

    private void addDeploymentStreamProcessors(TypedEventStreamProcessorBuilder typedEventStreamProcessorBuilder, int i) {
        typedEventStreamProcessorBuilder.onCommand(ValueType.DEPLOYMENT, (Intent) DeploymentIntent.CREATE, 0 == i ? new TransformingDeploymentCreateProcessor(this.workflowState) : new DeploymentCreateProcessor(this.workflowState));
    }

    public void addWorkflowInstanceEventStreamProcessors(TypedEventStreamProcessorBuilder typedEventStreamProcessorBuilder) {
        typedEventStreamProcessorBuilder.onCommand(ValueType.WORKFLOW_INSTANCE, (Intent) WorkflowInstanceIntent.CREATE, (TypedRecordProcessor<?>) new CreateWorkflowInstanceEventProcessor(this.workflowState)).onEvent(ValueType.WORKFLOW_INSTANCE, (Intent) WorkflowInstanceIntent.CREATED, (TypedRecordProcessor<?>) new WorkflowInstanceCreatedEventProcessor(this.workflowState)).onRejection(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.CREATE, new WorkflowInstanceRejectedEventProcessor()).onCommand(ValueType.WORKFLOW_INSTANCE, (Intent) WorkflowInstanceIntent.CANCEL, (TypedRecordProcessor<?>) new CancelWorkflowInstanceProcessor(this.workflowState)).onCommand(ValueType.WORKFLOW_INSTANCE, (Intent) WorkflowInstanceIntent.UPDATE_PAYLOAD, (CommandProcessor) new UpdatePayloadProcessor(this.workflowState));
    }

    private void addBpmnStepProcessor(TypedEventStreamProcessorBuilder typedEventStreamProcessorBuilder) {
        BpmnStepProcessor bpmnStepProcessor = new BpmnStepProcessor(this.workflowState, this.subscriptionCommandSender);
        typedEventStreamProcessorBuilder.onEvent(ValueType.WORKFLOW_INSTANCE, (Intent) WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN, (TypedRecordProcessor<?>) bpmnStepProcessor).onEvent(ValueType.WORKFLOW_INSTANCE, (Intent) WorkflowInstanceIntent.ELEMENT_READY, (TypedRecordProcessor<?>) bpmnStepProcessor).onEvent(ValueType.WORKFLOW_INSTANCE, (Intent) WorkflowInstanceIntent.ELEMENT_ACTIVATED, (TypedRecordProcessor<?>) bpmnStepProcessor).onEvent(ValueType.WORKFLOW_INSTANCE, (Intent) WorkflowInstanceIntent.ELEMENT_COMPLETING, (TypedRecordProcessor<?>) bpmnStepProcessor).onEvent(ValueType.WORKFLOW_INSTANCE, (Intent) WorkflowInstanceIntent.START_EVENT_OCCURRED, (TypedRecordProcessor<?>) bpmnStepProcessor).onEvent(ValueType.WORKFLOW_INSTANCE, (Intent) WorkflowInstanceIntent.END_EVENT_OCCURRED, (TypedRecordProcessor<?>) bpmnStepProcessor).onEvent(ValueType.WORKFLOW_INSTANCE, (Intent) WorkflowInstanceIntent.GATEWAY_ACTIVATED, (TypedRecordProcessor<?>) bpmnStepProcessor).onEvent(ValueType.WORKFLOW_INSTANCE, (Intent) WorkflowInstanceIntent.ELEMENT_COMPLETED, (TypedRecordProcessor<?>) bpmnStepProcessor).onEvent(ValueType.WORKFLOW_INSTANCE, (Intent) WorkflowInstanceIntent.ELEMENT_TERMINATING, (TypedRecordProcessor<?>) bpmnStepProcessor).onEvent(ValueType.WORKFLOW_INSTANCE, (Intent) WorkflowInstanceIntent.ELEMENT_TERMINATED, (TypedRecordProcessor<?>) bpmnStepProcessor);
    }

    private void addMessageStreamProcessors(TypedEventStreamProcessorBuilder typedEventStreamProcessorBuilder) {
        typedEventStreamProcessorBuilder.onCommand(ValueType.WORKFLOW_INSTANCE_SUBSCRIPTION, (Intent) WorkflowInstanceSubscriptionIntent.OPEN, (TypedRecordProcessor<?>) new OpenWorkflowInstanceSubscriptionProcessor(this.workflowState)).onCommand(ValueType.WORKFLOW_INSTANCE_SUBSCRIPTION, (Intent) WorkflowInstanceSubscriptionIntent.CORRELATE, (TypedRecordProcessor<?>) new CorrelateWorkflowInstanceSubscription(this.topologyManager, this.workflowState, this.subscriptionCommandSender));
    }

    private void addJobStreamProcessors(TypedEventStreamProcessorBuilder typedEventStreamProcessorBuilder) {
        typedEventStreamProcessorBuilder.onEvent(ValueType.JOB, (Intent) JobIntent.CREATED, (TypedRecordProcessor<?>) new JobCreatedProcessor(this.workflowState)).onEvent(ValueType.JOB, (Intent) JobIntent.COMPLETED, (TypedRecordProcessor<?>) new JobCompletedEventProcessor(this.workflowState));
    }

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onOpen(TypedStreamProcessor typedStreamProcessor) {
        this.streamReader = typedStreamProcessor.getEnvironment().buildStreamReader();
    }

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onRecovered(TypedStreamProcessor typedStreamProcessor) {
        this.onRecoveredCallback.accept(typedStreamProcessor.getStreamProcessorContext());
    }

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onClose() {
        this.onClosedCallback.run();
        this.streamReader.close();
    }
}
