package io.zeebe.broker.logstreams;

import io.zeebe.broker.clustering.base.partitions.Partition;
import io.zeebe.broker.clustering.base.topology.TopologyManager;
import io.zeebe.broker.incident.processor.IncidentEventProcessors;
import io.zeebe.broker.job.JobEventProcessors;
import io.zeebe.broker.logstreams.processor.StreamProcessorServiceFactory;
import io.zeebe.broker.logstreams.processor.TypedEventStreamProcessorBuilder;
import io.zeebe.broker.logstreams.processor.TypedRecordProcessor;
import io.zeebe.broker.logstreams.processor.TypedStreamEnvironment;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessor;
import io.zeebe.broker.logstreams.state.DefaultZeebeDbFactory;
import io.zeebe.broker.logstreams.state.ZeebeState;
import io.zeebe.broker.subscription.command.SubscriptionCommandSender;
import io.zeebe.broker.subscription.message.processor.MessageEventProcessors;
import io.zeebe.broker.system.configuration.ClusterCfg;
import io.zeebe.broker.transport.controlmessage.ControlMessageHandlerManager;
import io.zeebe.broker.workflow.deployment.distribute.processor.DeploymentDistributeProcessor;
import io.zeebe.broker.workflow.processor.BpmnStepProcessor;
import io.zeebe.broker.workflow.processor.CatchEventBehavior;
import io.zeebe.broker.workflow.processor.WorkflowEventProcessors;
import io.zeebe.broker.workflow.processor.deployment.DeploymentCreatedProcessor;
import io.zeebe.broker.workflow.processor.deployment.DeploymentEventProcessors;
import io.zeebe.broker.workflow.processor.timer.DueDateTimerChecker;
import io.zeebe.broker.workflow.repository.WorkflowRepository;
import io.zeebe.broker.workflow.state.WorkflowState;
import io.zeebe.logstreams.log.LogStreamWriterImpl;
import io.zeebe.logstreams.state.StateSnapshotController;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.intent.DeploymentIntent;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.servicecontainer.Injector;
import io.zeebe.servicecontainer.Service;
import io.zeebe.servicecontainer.ServiceGroupReference;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.servicecontainer.ServiceStartContext;
import io.zeebe.transport.ClientTransport;
import io.zeebe.transport.ServerTransport;

/* loaded from: input_file:io/zeebe/broker/logstreams/ZbStreamProcessorService.class */
public class ZbStreamProcessorService implements Service<ZbStreamProcessorService> {
    public static final String PROCESSOR_NAME = "zb-stream-processor";
    private final Injector<ServerTransport> clientApiTransportInjector = new Injector<>();
    private final Injector<ClientTransport> managementApiClientInjector = new Injector<>();
    private final Injector<ClientTransport> subscriptionApiClientInjector = new Injector<>();
    private final Injector<TopologyManager> topologyManagerInjector = new Injector<>();
    private final Injector<ControlMessageHandlerManager> controlMessageHandlerManagerServiceInjector = new Injector<>();
    private final Injector<StreamProcessorServiceFactory> streamProcessorServiceFactoryInjector = new Injector<>();
    private final ServiceGroupReference<Partition> partitionsGroupReference = ServiceGroupReference.create().onAdd((serviceName, partition) -> {
        startStreamProcessors(serviceName, partition);
    }).build();
    private final ClusterCfg clusterCfg;
    private ControlMessageHandlerManager controlMessageHandlerManager;
    private StreamProcessorServiceFactory streamProcessorServiceFactory;
    private ServerTransport clientApiTransport;
    private TopologyManager topologyManager;
    private ServiceStartContext startContext;
    private ClientTransport managementApi;

    public ZbStreamProcessorService(ClusterCfg clusterCfg) {
        this.clusterCfg = clusterCfg;
    }

    public void start(ServiceStartContext serviceStartContext) {
        this.startContext = serviceStartContext;
        this.managementApi = (ClientTransport) this.managementApiClientInjector.getValue();
        this.clientApiTransport = (ServerTransport) this.clientApiTransportInjector.getValue();
        this.streamProcessorServiceFactory = (StreamProcessorServiceFactory) this.streamProcessorServiceFactoryInjector.getValue();
        this.topologyManager = (TopologyManager) this.topologyManagerInjector.getValue();
        this.controlMessageHandlerManager = (ControlMessageHandlerManager) this.controlMessageHandlerManagerServiceInjector.getValue();
    }

    public void startStreamProcessors(ServiceName<Partition> serviceName, Partition partition) {
        int partitionId = partition.getInfo().getPartitionId();
        this.streamProcessorServiceFactory.createService(partition, serviceName).processorId(partitionId).processorName(PROCESSOR_NAME).snapshotController(new StateSnapshotController(DefaultZeebeDbFactory.DEFAULT_DB_FACTORY, partition.getStateStorageFactory().create(partitionId, PROCESSOR_NAME))).streamProcessorFactory(zeebeDb -> {
            return createTypedStreamProcessor(serviceName, partitionId, new TypedStreamEnvironment(partition.getLogStream(), this.clientApiTransport.getOutput()), new ZeebeState(partitionId, zeebeDb));
        }).build();
    }

    public TypedStreamProcessor createTypedStreamProcessor(ServiceName<Partition> serviceName, int i, TypedStreamEnvironment typedStreamEnvironment, ZeebeState zeebeState) {
        TypedEventStreamProcessorBuilder keyGenerator = typedStreamEnvironment.newStreamProcessor().keyGenerator(zeebeState.getKeyGenerator());
        addDistributeDeploymentProcessors(zeebeState, typedStreamEnvironment, keyGenerator);
        BpmnStepProcessor addWorkflowProcessors = addWorkflowProcessors(zeebeState, keyGenerator);
        addDeploymentRelatedProcessorAndServices(serviceName, i, zeebeState, keyGenerator);
        addIncidentProcessors(zeebeState, addWorkflowProcessors, keyGenerator);
        addJobProcessors(zeebeState, keyGenerator);
        addMessageProcessors(zeebeState, keyGenerator);
        return keyGenerator.build();
    }

    private void addDistributeDeploymentProcessors(ZeebeState zeebeState, TypedStreamEnvironment typedStreamEnvironment, TypedEventStreamProcessorBuilder typedEventStreamProcessorBuilder) {
        typedEventStreamProcessorBuilder.onCommand(ValueType.DEPLOYMENT, (Intent) DeploymentIntent.DISTRIBUTE, (TypedRecordProcessor<?>) new DeploymentDistributeProcessor(this.clusterCfg, this.topologyManager, zeebeState.getDeploymentState(), this.managementApi, new LogStreamWriterImpl(typedStreamEnvironment.getStream())));
    }

    private BpmnStepProcessor addWorkflowProcessors(ZeebeState zeebeState, TypedEventStreamProcessorBuilder typedEventStreamProcessorBuilder) {
        return WorkflowEventProcessors.addWorkflowProcessors(typedEventStreamProcessorBuilder, zeebeState, new SubscriptionCommandSender(this.clusterCfg, (ClientTransport) this.subscriptionApiClientInjector.getValue()), this.topologyManager, new DueDateTimerChecker(zeebeState.getWorkflowState()));
    }

    public void addDeploymentRelatedProcessorAndServices(ServiceName<Partition> serviceName, int i, ZeebeState zeebeState, TypedEventStreamProcessorBuilder typedEventStreamProcessorBuilder) {
        WorkflowState workflowState = zeebeState.getWorkflowState();
        if (i == 0) {
            typedEventStreamProcessorBuilder.withListener(new WorkflowRepository(this.clientApiTransport, this.controlMessageHandlerManager, this.startContext, workflowState, serviceName));
            DeploymentEventProcessors.addTransformingDeploymentProcessor(typedEventStreamProcessorBuilder, zeebeState, new CatchEventBehavior(zeebeState, new SubscriptionCommandSender(this.clusterCfg, this.managementApi)));
        } else {
            DeploymentEventProcessors.addDeploymentCreateProcessor(typedEventStreamProcessorBuilder, workflowState);
        }
        typedEventStreamProcessorBuilder.onEvent(ValueType.DEPLOYMENT, DeploymentIntent.CREATED, new DeploymentCreatedProcessor(workflowState));
    }

    private void addIncidentProcessors(ZeebeState zeebeState, BpmnStepProcessor bpmnStepProcessor, TypedEventStreamProcessorBuilder typedEventStreamProcessorBuilder) {
        IncidentEventProcessors.addProcessors(typedEventStreamProcessorBuilder, zeebeState, bpmnStepProcessor);
    }

    private void addJobProcessors(ZeebeState zeebeState, TypedEventStreamProcessorBuilder typedEventStreamProcessorBuilder) {
        JobEventProcessors.addJobProcessors(typedEventStreamProcessorBuilder, zeebeState);
    }

    private void addMessageProcessors(ZeebeState zeebeState, TypedEventStreamProcessorBuilder typedEventStreamProcessorBuilder) {
        MessageEventProcessors.addMessageProcessors(typedEventStreamProcessorBuilder, zeebeState, new SubscriptionCommandSender(this.clusterCfg, (ClientTransport) getSubscriptionApiClientInjector().getValue()), this.topologyManager);
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public ZbStreamProcessorService m40get() {
        return this;
    }

    public Injector<ServerTransport> getClientApiTransportInjector() {
        return this.clientApiTransportInjector;
    }

    public ServiceGroupReference<Partition> getPartitionsGroupReference() {
        return this.partitionsGroupReference;
    }

    public Injector<StreamProcessorServiceFactory> getStreamProcessorServiceFactoryInjector() {
        return this.streamProcessorServiceFactoryInjector;
    }

    public Injector<TopologyManager> getTopologyManagerInjector() {
        return this.topologyManagerInjector;
    }

    public Injector<ClientTransport> getManagementApiClientInjector() {
        return this.managementApiClientInjector;
    }

    public Injector<ClientTransport> getSubscriptionApiClientInjector() {
        return this.subscriptionApiClientInjector;
    }

    public Injector<ControlMessageHandlerManager> getControlMessageHandlerManagerServiceInjector() {
        return this.controlMessageHandlerManagerServiceInjector;
    }
}
