package io.zeebe.broker.workflow.processor;

import io.zeebe.broker.clustering.base.topology.TopologyManager;
import io.zeebe.broker.incident.data.ErrorType;
import io.zeebe.broker.incident.data.IncidentRecord;
import io.zeebe.broker.job.data.JobHeaders;
import io.zeebe.broker.job.data.JobRecord;
import io.zeebe.broker.logstreams.processor.CommandProcessor;
import io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware;
import io.zeebe.broker.logstreams.processor.TypedBatchWriter;
import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.logstreams.processor.TypedRecordProcessor;
import io.zeebe.broker.logstreams.processor.TypedResponseWriter;
import io.zeebe.broker.logstreams.processor.TypedStreamEnvironment;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessor;
import io.zeebe.broker.logstreams.processor.TypedStreamReader;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.broker.workflow.data.WorkflowInstanceRecord;
import io.zeebe.broker.workflow.map.ActivityInstanceMap;
import io.zeebe.broker.workflow.map.DeployedWorkflow;
import io.zeebe.broker.workflow.map.PayloadCache;
import io.zeebe.broker.workflow.map.WorkflowCache;
import io.zeebe.broker.workflow.map.WorkflowInstanceIndex;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.processor.EventLifecycleContext;
import io.zeebe.map.ZbMap;
import io.zeebe.model.bpmn.BpmnAspect;
import io.zeebe.model.bpmn.instance.EndEvent;
import io.zeebe.model.bpmn.instance.ExclusiveGateway;
import io.zeebe.model.bpmn.instance.FlowElement;
import io.zeebe.model.bpmn.instance.FlowNode;
import io.zeebe.model.bpmn.instance.InputOutputMapping;
import io.zeebe.model.bpmn.instance.OutputBehavior;
import io.zeebe.model.bpmn.instance.SequenceFlow;
import io.zeebe.model.bpmn.instance.ServiceTask;
import io.zeebe.model.bpmn.instance.TaskDefinition;
import io.zeebe.model.bpmn.instance.TaskHeaders;
import io.zeebe.msgpack.el.JsonConditionException;
import io.zeebe.msgpack.el.JsonConditionInterpreter;
import io.zeebe.msgpack.mapping.Mapping;
import io.zeebe.msgpack.mapping.MappingException;
import io.zeebe.msgpack.mapping.MappingProcessor;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.impl.RecordMetadata;
import io.zeebe.protocol.intent.IncidentIntent;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.transport.ClientResponse;
import io.zeebe.transport.ClientTransport;
import io.zeebe.util.metrics.Metric;
import io.zeebe.util.metrics.MetricsManager;
import io.zeebe.util.sched.ActorControl;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

/* loaded from: input_file:io/zeebe/broker/workflow/processor/WorkflowInstanceStreamProcessor.class */
public class WorkflowInstanceStreamProcessor implements StreamProcessorLifecycleAware {
    private static final UnsafeBuffer EMPTY_JOB_TYPE = new UnsafeBuffer("".getBytes());
    private Metric workflowInstanceEventCreate;
    private Metric workflowInstanceEventCanceled;
    private Metric workflowInstanceEventCompleted;
    private final PayloadCache payloadCache;
    private ClientTransport managementApiClient;
    private TopologyManager topologyManager;
    private WorkflowCache workflowCache;
    private ActorControl actor;
    private final WorkflowInstanceIndex workflowInstanceIndex = new WorkflowInstanceIndex();
    private final ActivityInstanceMap activityInstanceMap = new ActivityInstanceMap();
    private final MappingProcessor payloadMappingProcessor = new MappingProcessor(4096);
    private final JsonConditionInterpreter conditionInterpreter = new JsonConditionInterpreter();

    /* loaded from: input_file:io/zeebe/broker/workflow/processor/WorkflowInstanceStreamProcessor$ActivityActivatedEventProcessor.class */
    private final class ActivityActivatedEventProcessor extends FlowElementEventProcessor<ServiceTask> {
        private final JobRecord jobCommand;

        private ActivityActivatedEventProcessor() {
            super();
            this.jobCommand = new JobRecord();
        }

        /* renamed from: processFlowElementEvent, reason: avoid collision after fix types in other method */
        void processFlowElementEvent2(TypedRecord<WorkflowInstanceRecord> typedRecord, ServiceTask serviceTask) {
            TaskDefinition taskDefinition = serviceTask.getTaskDefinition();
            WorkflowInstanceRecord value = typedRecord.getValue();
            this.jobCommand.reset();
            this.jobCommand.setType(taskDefinition.getTypeAsBuffer()).setRetries(taskDefinition.getRetries()).setPayload(value.getPayload()).headers().setBpmnProcessId(value.getBpmnProcessId()).setWorkflowDefinitionVersion(value.getVersion()).setWorkflowKey(value.getWorkflowKey()).setWorkflowInstanceKey(value.getWorkflowInstanceKey().longValue()).setActivityId(serviceTask.getIdAsBuffer()).setActivityInstanceKey(typedRecord.getKey());
            TaskHeaders taskHeaders = serviceTask.getTaskHeaders();
            if (taskHeaders.isEmpty()) {
                return;
            }
            this.jobCommand.setCustomHeaders(taskHeaders.asMsgpackEncoded());
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public long writeRecord(TypedRecord<WorkflowInstanceRecord> typedRecord, TypedStreamWriter typedStreamWriter) {
            return typedStreamWriter.writeNewCommand(JobIntent.CREATE, this.jobCommand);
        }

        @Override // io.zeebe.broker.workflow.processor.WorkflowInstanceStreamProcessor.FlowElementEventProcessor
        /* bridge */ /* synthetic */ void processFlowElementEvent(TypedRecord typedRecord, ServiceTask serviceTask) {
            processFlowElementEvent2((TypedRecord<WorkflowInstanceRecord>) typedRecord, serviceTask);
        }
    }

    /* loaded from: input_file:io/zeebe/broker/workflow/processor/WorkflowInstanceStreamProcessor$ActivityCompletingEventProcessor.class */
    private final class ActivityCompletingEventProcessor extends FlowElementEventProcessor<ServiceTask> {
        private final IncidentRecord incidentCommand;
        private boolean hasIncident;
        private boolean isResolvingIncident;

        private ActivityCompletingEventProcessor() {
            super();
            this.incidentCommand = new IncidentRecord();
        }

        /* renamed from: processFlowElementEvent, reason: avoid collision after fix types in other method */
        void processFlowElementEvent2(TypedRecord<WorkflowInstanceRecord> typedRecord, ServiceTask serviceTask) {
            this.hasIncident = false;
            this.isResolvingIncident = typedRecord.getMetadata().hasIncidentKey();
            WorkflowInstanceRecord value = typedRecord.getValue();
            tryToExecuteOutputBehavior(typedRecord, value, WorkflowInstanceStreamProcessor.this.payloadCache.getPayload(value.getWorkflowInstanceKey().longValue()), serviceTask.getInputOutputMapping());
        }

        private void tryToExecuteOutputBehavior(TypedRecord<WorkflowInstanceRecord> typedRecord, WorkflowInstanceRecord workflowInstanceRecord, DirectBuffer directBuffer, InputOutputMapping inputOutputMapping) {
            OutputBehavior outputBehavior = inputOutputMapping.getOutputBehavior();
            if (outputBehavior == OutputBehavior.NONE) {
                workflowInstanceRecord.setPayload(directBuffer);
                return;
            }
            if (outputBehavior == OutputBehavior.OVERWRITE) {
                directBuffer = WorkflowInstanceRecord.EMPTY_PAYLOAD;
            }
            Mapping[] outputMappings = inputOutputMapping.getOutputMappings();
            try {
                workflowInstanceRecord.setPayload(WorkflowInstanceStreamProcessor.this.payloadMappingProcessor.getResultBuffer(), 0, WorkflowInstanceStreamProcessor.this.payloadMappingProcessor.merge(workflowInstanceRecord.getPayload(), directBuffer, outputMappings));
            } catch (MappingException e) {
                createIncident(typedRecord, e.getMessage());
                this.hasIncident = true;
            }
        }

        private void createIncident(TypedRecord<WorkflowInstanceRecord> typedRecord, String str) {
            this.incidentCommand.reset();
            this.incidentCommand.initFromWorkflowInstanceFailure(typedRecord).setErrorType(ErrorType.IO_MAPPING_ERROR).setErrorMessage(str);
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public long writeRecord(TypedRecord<WorkflowInstanceRecord> typedRecord, TypedStreamWriter typedStreamWriter) {
            return !this.hasIncident ? typedStreamWriter.writeFollowUpEvent(typedRecord.getKey(), WorkflowInstanceIntent.ACTIVITY_COMPLETED, typedRecord.getValue()) : !this.isResolvingIncident ? typedStreamWriter.writeNewCommand(IncidentIntent.CREATE, this.incidentCommand) : typedStreamWriter.writeFollowUpEvent(typedRecord.getMetadata().getIncidentKey(), IncidentIntent.RESOLVE_FAILED, this.incidentCommand);
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public void updateState(TypedRecord<WorkflowInstanceRecord> typedRecord) {
            if (this.hasIncident) {
                return;
            }
            WorkflowInstanceStreamProcessor.this.workflowInstanceIndex.get(typedRecord.getValue().getWorkflowInstanceKey().longValue()).setActivityInstanceKey(-1L).write();
            WorkflowInstanceStreamProcessor.this.activityInstanceMap.remove(typedRecord.getKey());
        }

        @Override // io.zeebe.broker.workflow.processor.WorkflowInstanceStreamProcessor.FlowElementEventProcessor
        /* bridge */ /* synthetic */ void processFlowElementEvent(TypedRecord typedRecord, ServiceTask serviceTask) {
            processFlowElementEvent2((TypedRecord<WorkflowInstanceRecord>) typedRecord, serviceTask);
        }
    }

    /* loaded from: input_file:io/zeebe/broker/workflow/processor/WorkflowInstanceStreamProcessor$ActivityReadyEventProcessor.class */
    private final class ActivityReadyEventProcessor extends FlowElementEventProcessor<ServiceTask> {
        private final IncidentRecord incidentCommand;
        private boolean createsIncident;
        private boolean isResolvingIncident;
        private UnsafeBuffer wfInstancePayload;

        private ActivityReadyEventProcessor() {
            super();
            this.incidentCommand = new IncidentRecord();
            this.wfInstancePayload = new UnsafeBuffer(0L, 0);
        }

        /* renamed from: processFlowElementEvent, reason: avoid collision after fix types in other method */
        void processFlowElementEvent2(TypedRecord<WorkflowInstanceRecord> typedRecord, ServiceTask serviceTask) {
            this.createsIncident = false;
            this.isResolvingIncident = typedRecord.getMetadata().hasIncidentKey();
            WorkflowInstanceRecord value = typedRecord.getValue();
            this.wfInstancePayload.wrap(value.getPayload());
            Mapping[] inputMappings = serviceTask.getInputOutputMapping().getInputMappings();
            if (inputMappings.length > 0) {
                try {
                    value.setPayload(WorkflowInstanceStreamProcessor.this.payloadMappingProcessor.getResultBuffer(), 0, WorkflowInstanceStreamProcessor.this.payloadMappingProcessor.extract(value.getPayload(), inputMappings));
                } catch (MappingException e) {
                    this.incidentCommand.reset();
                    this.incidentCommand.initFromWorkflowInstanceFailure(typedRecord).setErrorType(ErrorType.IO_MAPPING_ERROR).setErrorMessage(e.getMessage());
                    this.createsIncident = true;
                }
            }
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public long writeRecord(TypedRecord<WorkflowInstanceRecord> typedRecord, TypedStreamWriter typedStreamWriter) {
            return !this.createsIncident ? typedStreamWriter.writeFollowUpEvent(typedRecord.getKey(), WorkflowInstanceIntent.ACTIVITY_ACTIVATED, typedRecord.getValue()) : !this.isResolvingIncident ? typedStreamWriter.writeNewCommand(IncidentIntent.CREATE, this.incidentCommand) : typedStreamWriter.writeFollowUpEvent(typedRecord.getMetadata().getIncidentKey(), IncidentIntent.RESOLVE_FAILED, this.incidentCommand);
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public void updateState(TypedRecord<WorkflowInstanceRecord> typedRecord) {
            WorkflowInstanceRecord value = typedRecord.getValue();
            WorkflowInstanceStreamProcessor.this.workflowInstanceIndex.get(value.getWorkflowInstanceKey().longValue()).setActivityInstanceKey(typedRecord.getKey()).write();
            WorkflowInstanceStreamProcessor.this.activityInstanceMap.newActivityInstance(typedRecord.getKey()).setActivityId(value.getActivityId()).setJobKey(-1L).write();
            if (this.createsIncident) {
                return;
            }
            WorkflowInstanceStreamProcessor.this.payloadCache.addPayload(value.getWorkflowInstanceKey().longValue(), typedRecord.getPosition(), this.wfInstancePayload);
        }

        @Override // io.zeebe.broker.workflow.processor.WorkflowInstanceStreamProcessor.FlowElementEventProcessor
        /* bridge */ /* synthetic */ void processFlowElementEvent(TypedRecord typedRecord, ServiceTask serviceTask) {
            processFlowElementEvent2((TypedRecord<WorkflowInstanceRecord>) typedRecord, serviceTask);
        }
    }

    /* loaded from: input_file:io/zeebe/broker/workflow/processor/WorkflowInstanceStreamProcessor$BpmnAspectEventProcessor.class */
    private final class BpmnAspectEventProcessor extends FlowElementEventProcessor<FlowElement> {
        private FlowElementEventProcessor delegate;
        protected final Map<BpmnAspect, FlowElementEventProcessor> aspectHandlers;

        private BpmnAspectEventProcessor() {
            super();
            this.aspectHandlers = new EnumMap(BpmnAspect.class);
            this.aspectHandlers.put(BpmnAspect.TAKE_SEQUENCE_FLOW, new TakeSequenceFlowAspectHandler());
            this.aspectHandlers.put(BpmnAspect.CONSUME_TOKEN, new ConsumeTokenAspectHandler());
            this.aspectHandlers.put(BpmnAspect.EXCLUSIVE_SPLIT, new ExclusiveSplitAspectHandler());
        }

        @Override // io.zeebe.broker.workflow.processor.WorkflowInstanceStreamProcessor.FlowElementEventProcessor
        void processFlowElementEvent(TypedRecord<WorkflowInstanceRecord> typedRecord, FlowElement flowElement) {
            this.delegate = this.aspectHandlers.get(flowElement.getBpmnAspect());
            this.delegate.processFlowElementEvent(typedRecord, flowElement);
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public boolean executeSideEffects(TypedRecord<WorkflowInstanceRecord> typedRecord, TypedResponseWriter typedResponseWriter) {
            return this.delegate.executeSideEffects(typedRecord, typedResponseWriter);
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public long writeRecord(TypedRecord<WorkflowInstanceRecord> typedRecord, TypedStreamWriter typedStreamWriter) {
            return this.delegate.writeRecord(typedRecord, typedStreamWriter);
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public void updateState(TypedRecord<WorkflowInstanceRecord> typedRecord) {
            this.delegate.updateState(typedRecord);
        }
    }

    /* loaded from: input_file:io/zeebe/broker/workflow/processor/WorkflowInstanceStreamProcessor$CancelWorkflowInstanceProcessor.class */
    private final class CancelWorkflowInstanceProcessor implements TypedRecordProcessor<WorkflowInstanceRecord> {
        private final WorkflowInstanceRecord activityInstanceEvent;
        private final JobRecord jobRecord;
        private boolean isCanceled;
        private RejectionType rejectionType;
        private String rejectionReason;
        private long activityInstanceKey;
        private long jobKey;
        private TypedStreamReader reader;
        private TypedRecord<WorkflowInstanceRecord> workflowInstanceEvent;

        private CancelWorkflowInstanceProcessor() {
            this.activityInstanceEvent = new WorkflowInstanceRecord();
            this.jobRecord = new JobRecord();
        }

        @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
        public void onOpen(TypedStreamProcessor typedStreamProcessor) {
            this.reader = typedStreamProcessor.getEnvironment().buildStreamReader();
        }

        @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
        public void onClose() {
            this.reader.close();
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public void processRecord(TypedRecord<WorkflowInstanceRecord> typedRecord) {
            WorkflowInstanceIndex.WorkflowInstance workflowInstance = WorkflowInstanceStreamProcessor.this.workflowInstanceIndex.get(typedRecord.getKey());
            this.isCanceled = workflowInstance != null && workflowInstance.getTokenCount() > 0;
            if (!this.isCanceled) {
                this.rejectionType = RejectionType.NOT_APPLICABLE;
                this.rejectionReason = "Workflow instance is not running";
            } else {
                this.workflowInstanceEvent = this.reader.readValue(workflowInstance.getPosition(), WorkflowInstanceRecord.class);
                this.workflowInstanceEvent.getValue().setPayload(WorkflowInstanceRecord.EMPTY_PAYLOAD);
                this.activityInstanceKey = workflowInstance.getActivityInstanceKey();
                this.jobKey = WorkflowInstanceStreamProcessor.this.activityInstanceMap.wrapActivityInstanceKey(this.activityInstanceKey).getJobKey();
            }
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public long writeRecord(TypedRecord<WorkflowInstanceRecord> typedRecord, TypedStreamWriter typedStreamWriter) {
            if (!this.isCanceled) {
                return typedStreamWriter.writeRejection(typedRecord, this.rejectionType, this.rejectionReason);
            }
            WorkflowInstanceStreamProcessor.this.activityInstanceMap.wrapActivityInstanceKey(this.activityInstanceKey);
            WorkflowInstanceRecord value = this.workflowInstanceEvent.getValue();
            TypedBatchWriter newBatch = typedStreamWriter.newBatch();
            if (this.jobKey > 0) {
                this.jobRecord.reset();
                this.jobRecord.setType(WorkflowInstanceStreamProcessor.EMPTY_JOB_TYPE).headers().setBpmnProcessId(value.getBpmnProcessId()).setWorkflowDefinitionVersion(value.getVersion()).setWorkflowInstanceKey(typedRecord.getKey()).setActivityId(WorkflowInstanceStreamProcessor.this.activityInstanceMap.getActivityId()).setActivityInstanceKey(this.activityInstanceKey);
                newBatch.addFollowUpCommand(this.jobKey, JobIntent.CANCEL, this.jobRecord);
            }
            if (this.activityInstanceKey > 0) {
                this.activityInstanceEvent.reset();
                this.activityInstanceEvent.setBpmnProcessId(value.getBpmnProcessId()).setVersion(value.getVersion()).setWorkflowInstanceKey(typedRecord.getKey()).setActivityId(WorkflowInstanceStreamProcessor.this.activityInstanceMap.getActivityId());
                newBatch.addFollowUpEvent(this.activityInstanceKey, WorkflowInstanceIntent.ACTIVITY_TERMINATED, this.activityInstanceEvent);
            }
            newBatch.addFollowUpEvent(typedRecord.getKey(), WorkflowInstanceIntent.CANCELED, value);
            return newBatch.write();
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public boolean executeSideEffects(TypedRecord<WorkflowInstanceRecord> typedRecord, TypedResponseWriter typedResponseWriter) {
            return this.isCanceled ? typedResponseWriter.writeRecord(WorkflowInstanceIntent.CANCELED, typedRecord) : typedResponseWriter.writeRejection((TypedRecord<?>) typedRecord, this.rejectionType, this.rejectionReason);
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public void updateState(TypedRecord<WorkflowInstanceRecord> typedRecord) {
            if (this.isCanceled) {
                WorkflowInstanceStreamProcessor.this.workflowInstanceIndex.remove(typedRecord.getKey());
                WorkflowInstanceStreamProcessor.this.payloadCache.remove(typedRecord.getKey());
                WorkflowInstanceStreamProcessor.this.activityInstanceMap.remove(this.activityInstanceKey);
            }
        }
    }

    /* loaded from: input_file:io/zeebe/broker/workflow/processor/WorkflowInstanceStreamProcessor$ConsumeTokenAspectHandler.class */
    private final class ConsumeTokenAspectHandler extends FlowElementEventProcessor<FlowElement> {
        private boolean isCompleted;
        private int activeTokenCount;

        private ConsumeTokenAspectHandler() {
            super();
        }

        @Override // io.zeebe.broker.workflow.processor.WorkflowInstanceStreamProcessor.FlowElementEventProcessor
        void processFlowElementEvent(TypedRecord<WorkflowInstanceRecord> typedRecord, FlowElement flowElement) {
            WorkflowInstanceRecord value = typedRecord.getValue();
            WorkflowInstanceIndex.WorkflowInstance workflowInstance = WorkflowInstanceStreamProcessor.this.workflowInstanceIndex.get(value.getWorkflowInstanceKey().longValue());
            this.activeTokenCount = workflowInstance != null ? workflowInstance.getTokenCount() : 0;
            this.isCompleted = this.activeTokenCount == 1;
            if (this.isCompleted) {
                value.setActivityId("");
            }
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public long writeRecord(TypedRecord<WorkflowInstanceRecord> typedRecord, TypedStreamWriter typedStreamWriter) {
            if (this.isCompleted) {
                return typedStreamWriter.writeFollowUpEvent(typedRecord.getValue().getWorkflowInstanceKey().longValue(), WorkflowInstanceIntent.COMPLETED, typedRecord.getValue());
            }
            return 0L;
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public void updateState(TypedRecord<WorkflowInstanceRecord> typedRecord) {
            if (this.isCompleted) {
                long longValue = typedRecord.getValue().getWorkflowInstanceKey().longValue();
                WorkflowInstanceStreamProcessor.this.workflowInstanceIndex.remove(longValue);
                WorkflowInstanceStreamProcessor.this.payloadCache.remove(longValue);
            }
        }
    }

    /* loaded from: input_file:io/zeebe/broker/workflow/processor/WorkflowInstanceStreamProcessor$CreateWorkflowInstanceEventProcessor.class */
    private final class CreateWorkflowInstanceEventProcessor implements TypedRecordProcessor<WorkflowInstanceRecord> {
        private boolean accepted;
        private final WorkflowInstanceRecord startEventRecord;
        private RejectionType rejectionType;
        private String rejectionReason;
        private long requestId;
        private int requestStreamId;

        private CreateWorkflowInstanceEventProcessor() {
            this.startEventRecord = new WorkflowInstanceRecord();
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public void processRecord(TypedRecord<WorkflowInstanceRecord> typedRecord, EventLifecycleContext eventLifecycleContext) {
            WorkflowInstanceRecord value = typedRecord.getValue();
            this.requestId = typedRecord.getMetadata().getRequestId();
            this.requestStreamId = typedRecord.getMetadata().getRequestStreamId();
            value.setWorkflowInstanceKey(typedRecord.getKey());
            this.accepted = true;
            resolveWorkflowDefinition(value, eventLifecycleContext);
        }

        private void addRequestMetadata(RecordMetadata recordMetadata) {
            recordMetadata.requestId(this.requestId).requestStreamId(this.requestStreamId);
        }

        private void resolveWorkflowDefinition(WorkflowInstanceRecord workflowInstanceRecord, EventLifecycleContext eventLifecycleContext) {
            long workflowKey = workflowInstanceRecord.getWorkflowKey();
            DirectBuffer bpmnProcessId = workflowInstanceRecord.getBpmnProcessId();
            int version = workflowInstanceRecord.getVersion();
            ActorFuture<ClientResponse> actorFuture = null;
            if (workflowKey > 0) {
                DeployedWorkflow workflowByKey = WorkflowInstanceStreamProcessor.this.workflowCache.getWorkflowByKey(workflowKey);
                if (workflowByKey != null) {
                    workflowInstanceRecord.setVersion(workflowByKey.getVersion()).setBpmnProcessId(workflowByKey.getWorkflow().getBpmnProcessId());
                    this.accepted = true;
                } else {
                    actorFuture = WorkflowInstanceStreamProcessor.this.workflowCache.fetchWorkflowByKey(workflowKey);
                }
            } else if (version > 0) {
                DeployedWorkflow workflowByProcessIdAndVersion = WorkflowInstanceStreamProcessor.this.workflowCache.getWorkflowByProcessIdAndVersion(bpmnProcessId, version);
                if (workflowByProcessIdAndVersion != null) {
                    workflowInstanceRecord.setWorkflowKey(workflowByProcessIdAndVersion.getKey());
                    this.accepted = true;
                } else {
                    actorFuture = WorkflowInstanceStreamProcessor.this.workflowCache.fetchWorkflowByBpmnProcessIdAndVersion(bpmnProcessId, version);
                }
            } else {
                DeployedWorkflow latestWorkflowVersionByProcessId = WorkflowInstanceStreamProcessor.this.workflowCache.getLatestWorkflowVersionByProcessId(bpmnProcessId);
                if (latestWorkflowVersionByProcessId == null || version == -2) {
                    actorFuture = WorkflowInstanceStreamProcessor.this.workflowCache.fetchLatestWorkflowByBpmnProcessId(bpmnProcessId);
                } else {
                    workflowInstanceRecord.setWorkflowKey(latestWorkflowVersionByProcessId.getKey()).setVersion(latestWorkflowVersionByProcessId.getVersion());
                    this.accepted = true;
                }
            }
            if (actorFuture != null) {
                CompletableActorFuture completableActorFuture = new CompletableActorFuture();
                eventLifecycleContext.async(completableActorFuture);
                WorkflowInstanceStreamProcessor.this.actor.runOnCompletion(actorFuture, (clientResponse, th) -> {
                    if (th != null) {
                        this.accepted = false;
                        this.rejectionType = RejectionType.PROCESSING_ERROR;
                        this.rejectionReason = "Could not fetch workflow: " + th.getMessage();
                    } else {
                        DeployedWorkflow addWorkflow = WorkflowInstanceStreamProcessor.this.workflowCache.addWorkflow(clientResponse.getResponseBuffer());
                        if (addWorkflow != null) {
                            workflowInstanceRecord.setBpmnProcessId(addWorkflow.getWorkflow().getBpmnProcessId()).setWorkflowKey(addWorkflow.getKey()).setVersion(addWorkflow.getVersion());
                            this.accepted = true;
                        } else {
                            this.accepted = false;
                            this.rejectionType = RejectionType.BAD_VALUE;
                            this.rejectionReason = "Workflow is not deployed";
                        }
                    }
                    completableActorFuture.complete((Object) null);
                });
            }
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public long writeRecord(TypedRecord<WorkflowInstanceRecord> typedRecord, TypedStreamWriter typedStreamWriter) {
            if (!this.accepted) {
                return typedStreamWriter.writeRejection(typedRecord, this.rejectionType, this.rejectionReason, this::addRequestMetadata);
            }
            TypedBatchWriter newBatch = typedStreamWriter.newBatch();
            newBatch.addFollowUpEvent(typedRecord.getKey(), WorkflowInstanceIntent.CREATED, typedRecord.getValue(), this::addRequestMetadata);
            addStartEventOccured(newBatch, typedRecord.getValue());
            return newBatch.write();
        }

        private void addStartEventOccured(TypedBatchWriter typedBatchWriter, WorkflowInstanceRecord workflowInstanceRecord) {
            this.startEventRecord.setActivityId(WorkflowInstanceStreamProcessor.this.workflowCache.getWorkflowByKey(workflowInstanceRecord.getWorkflowKey()).getWorkflow().getInitialStartEvent().getIdAsBuffer()).setBpmnProcessId(workflowInstanceRecord.getBpmnProcessId()).setPayload(workflowInstanceRecord.getPayload()).setVersion(workflowInstanceRecord.getVersion()).setWorkflowInstanceKey(workflowInstanceRecord.getWorkflowInstanceKey().longValue()).setWorkflowKey(workflowInstanceRecord.getWorkflowKey());
            typedBatchWriter.addNewEvent(WorkflowInstanceIntent.START_EVENT_OCCURRED, this.startEventRecord);
        }
    }

    /* loaded from: input_file:io/zeebe/broker/workflow/processor/WorkflowInstanceStreamProcessor$ExclusiveSplitAspectHandler.class */
    private final class ExclusiveSplitAspectHandler extends FlowElementEventProcessor<ExclusiveGateway> {
        private boolean createsIncident;
        private boolean isResolvingIncident;
        private final IncidentRecord incidentCommand;

        private ExclusiveSplitAspectHandler() {
            super();
            this.incidentCommand = new IncidentRecord();
        }

        /* renamed from: processFlowElementEvent, reason: avoid collision after fix types in other method */
        void processFlowElementEvent2(TypedRecord<WorkflowInstanceRecord> typedRecord, ExclusiveGateway exclusiveGateway) {
            try {
                this.isResolvingIncident = typedRecord.getMetadata().hasIncidentKey();
                WorkflowInstanceRecord value = typedRecord.getValue();
                SequenceFlow sequenceFlowWithFulfilledCondition = getSequenceFlowWithFulfilledCondition(exclusiveGateway, value.getPayload());
                if (sequenceFlowWithFulfilledCondition != null) {
                    value.setActivityId(sequenceFlowWithFulfilledCondition.getIdAsBuffer());
                    this.createsIncident = false;
                } else {
                    this.incidentCommand.reset();
                    this.incidentCommand.initFromWorkflowInstanceFailure(typedRecord).setErrorType(ErrorType.CONDITION_ERROR).setErrorMessage("All conditions evaluated to false and no default flow is set.");
                    this.createsIncident = true;
                }
            } catch (JsonConditionException e) {
                this.incidentCommand.reset();
                this.incidentCommand.initFromWorkflowInstanceFailure(typedRecord).setErrorType(ErrorType.CONDITION_ERROR).setErrorMessage(e.getMessage());
                this.createsIncident = true;
            }
        }

        private SequenceFlow getSequenceFlowWithFulfilledCondition(ExclusiveGateway exclusiveGateway, DirectBuffer directBuffer) {
            List outgoingSequenceFlowsWithConditions = exclusiveGateway.getOutgoingSequenceFlowsWithConditions();
            for (int i = 0; i < outgoingSequenceFlowsWithConditions.size(); i++) {
                SequenceFlow sequenceFlow = (SequenceFlow) outgoingSequenceFlowsWithConditions.get(i);
                if (WorkflowInstanceStreamProcessor.this.conditionInterpreter.eval(sequenceFlow.getCondition().getCondition(), directBuffer)) {
                    return sequenceFlow;
                }
            }
            return exclusiveGateway.getDefaultFlow();
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public long writeRecord(TypedRecord<WorkflowInstanceRecord> typedRecord, TypedStreamWriter typedStreamWriter) {
            return !this.createsIncident ? typedStreamWriter.writeNewEvent(WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN, typedRecord.getValue()) : !this.isResolvingIncident ? typedStreamWriter.writeNewCommand(IncidentIntent.CREATE, this.incidentCommand) : typedStreamWriter.writeFollowUpEvent(typedRecord.getMetadata().getIncidentKey(), IncidentIntent.RESOLVE_FAILED, this.incidentCommand);
        }

        @Override // io.zeebe.broker.workflow.processor.WorkflowInstanceStreamProcessor.FlowElementEventProcessor
        /* bridge */ /* synthetic */ void processFlowElementEvent(TypedRecord typedRecord, ExclusiveGateway exclusiveGateway) {
            processFlowElementEvent2((TypedRecord<WorkflowInstanceRecord>) typedRecord, exclusiveGateway);
        }
    }

    /* loaded from: input_file:io/zeebe/broker/workflow/processor/WorkflowInstanceStreamProcessor$FlowElementEventProcessor.class */
    private abstract class FlowElementEventProcessor<T extends FlowElement> implements TypedRecordProcessor<WorkflowInstanceRecord> {
        private TypedRecord<WorkflowInstanceRecord> event;

        private FlowElementEventProcessor() {
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public void processRecord(TypedRecord<WorkflowInstanceRecord> typedRecord, EventLifecycleContext eventLifecycleContext) {
            this.event = typedRecord;
            long workflowKey = this.event.getValue().getWorkflowKey();
            DeployedWorkflow workflowByKey = WorkflowInstanceStreamProcessor.this.workflowCache.getWorkflowByKey(workflowKey);
            if (workflowByKey == null) {
                WorkflowInstanceStreamProcessor.this.fetchWorkflow(workflowKey, this::resolveCurrentFlowNode, eventLifecycleContext);
            } else {
                resolveCurrentFlowNode(workflowByKey);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private void resolveCurrentFlowNode(DeployedWorkflow deployedWorkflow) {
            processFlowElementEvent(this.event, deployedWorkflow.getWorkflow().findFlowElementById(this.event.getValue().getActivityId()));
        }

        abstract void processFlowElementEvent(TypedRecord<WorkflowInstanceRecord> typedRecord, T t);
    }

    /* loaded from: input_file:io/zeebe/broker/workflow/processor/WorkflowInstanceStreamProcessor$JobCompletedEventProcessor.class */
    private final class JobCompletedEventProcessor implements TypedRecordProcessor<JobRecord> {
        private final WorkflowInstanceRecord workflowInstanceEvent;
        private boolean activityCompleted;
        private long activityInstanceKey;

        private JobCompletedEventProcessor() {
            this.workflowInstanceEvent = new WorkflowInstanceRecord();
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public void processRecord(TypedRecord<JobRecord> typedRecord) {
            this.activityCompleted = false;
            JobRecord value = typedRecord.getValue();
            JobHeaders headers = value.headers();
            this.activityInstanceKey = headers.getActivityInstanceKey();
            if (headers.getWorkflowInstanceKey() <= 0 || !isJobOpen(typedRecord.getKey(), this.activityInstanceKey)) {
                return;
            }
            this.workflowInstanceEvent.setBpmnProcessId(headers.getBpmnProcessId()).setVersion(headers.getWorkflowDefinitionVersion()).setWorkflowKey(headers.getWorkflowKey()).setWorkflowInstanceKey(headers.getWorkflowInstanceKey()).setActivityId(headers.getActivityId()).setPayload(value.getPayload());
            this.activityCompleted = true;
        }

        private boolean isJobOpen(long j, long j2) {
            return WorkflowInstanceStreamProcessor.this.activityInstanceMap.wrapActivityInstanceKey(j2).getJobKey() == j;
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public long writeRecord(TypedRecord<JobRecord> typedRecord, TypedStreamWriter typedStreamWriter) {
            if (this.activityCompleted) {
                return typedStreamWriter.writeFollowUpEvent(this.activityInstanceKey, WorkflowInstanceIntent.ACTIVITY_COMPLETING, this.workflowInstanceEvent);
            }
            return 0L;
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public void updateState(TypedRecord<JobRecord> typedRecord) {
            if (this.activityCompleted) {
                WorkflowInstanceStreamProcessor.this.activityInstanceMap.wrapActivityInstanceKey(this.activityInstanceKey).setJobKey(-1L).write();
            }
        }
    }

    /* loaded from: input_file:io/zeebe/broker/workflow/processor/WorkflowInstanceStreamProcessor$JobCreatedProcessor.class */
    private final class JobCreatedProcessor implements TypedRecordProcessor<JobRecord> {
        private boolean isActive;

        private JobCreatedProcessor() {
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public void processRecord(TypedRecord<JobRecord> typedRecord) {
            this.isActive = false;
            JobHeaders headers = typedRecord.getValue().headers();
            long activityInstanceKey = headers.getActivityInstanceKey();
            if (activityInstanceKey > 0) {
                WorkflowInstanceIndex.WorkflowInstance workflowInstance = WorkflowInstanceStreamProcessor.this.workflowInstanceIndex.get(headers.getWorkflowInstanceKey());
                this.isActive = workflowInstance != null && activityInstanceKey == workflowInstance.getActivityInstanceKey();
            }
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public void updateState(TypedRecord<JobRecord> typedRecord) {
            if (this.isActive) {
                WorkflowInstanceStreamProcessor.this.activityInstanceMap.wrapActivityInstanceKey(typedRecord.getValue().headers().getActivityInstanceKey()).setJobKey(typedRecord.getKey()).write();
            }
        }
    }

    /* loaded from: input_file:io/zeebe/broker/workflow/processor/WorkflowInstanceStreamProcessor$SequenceFlowTakenEventProcessor.class */
    private final class SequenceFlowTakenEventProcessor extends FlowElementEventProcessor<SequenceFlow> {
        private Intent nextState;

        private SequenceFlowTakenEventProcessor() {
            super();
        }

        /* renamed from: processFlowElementEvent, reason: avoid collision after fix types in other method */
        void processFlowElementEvent2(TypedRecord<WorkflowInstanceRecord> typedRecord, SequenceFlow sequenceFlow) {
            FlowNode targetNode = sequenceFlow.getTargetNode();
            typedRecord.getValue().setActivityId(targetNode.getIdAsBuffer());
            if (targetNode instanceof EndEvent) {
                this.nextState = WorkflowInstanceIntent.END_EVENT_OCCURRED;
            } else if (targetNode instanceof ServiceTask) {
                this.nextState = WorkflowInstanceIntent.ACTIVITY_READY;
            } else {
                if (!(targetNode instanceof ExclusiveGateway)) {
                    throw new RuntimeException(String.format("Flow node of type '%s' is not supported.", targetNode));
                }
                this.nextState = WorkflowInstanceIntent.GATEWAY_ACTIVATED;
            }
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public long writeRecord(TypedRecord<WorkflowInstanceRecord> typedRecord, TypedStreamWriter typedStreamWriter) {
            return typedStreamWriter.writeNewEvent(this.nextState, typedRecord.getValue());
        }

        @Override // io.zeebe.broker.workflow.processor.WorkflowInstanceStreamProcessor.FlowElementEventProcessor
        /* bridge */ /* synthetic */ void processFlowElementEvent(TypedRecord typedRecord, SequenceFlow sequenceFlow) {
            processFlowElementEvent2((TypedRecord<WorkflowInstanceRecord>) typedRecord, sequenceFlow);
        }
    }

    /* loaded from: input_file:io/zeebe/broker/workflow/processor/WorkflowInstanceStreamProcessor$TakeSequenceFlowAspectHandler.class */
    private final class TakeSequenceFlowAspectHandler extends FlowElementEventProcessor<FlowNode> {
        private TakeSequenceFlowAspectHandler() {
            super();
        }

        /* renamed from: processFlowElementEvent, reason: avoid collision after fix types in other method */
        void processFlowElementEvent2(TypedRecord<WorkflowInstanceRecord> typedRecord, FlowNode flowNode) {
            typedRecord.getValue().setActivityId(((SequenceFlow) flowNode.getOutgoingSequenceFlows().get(0)).getIdAsBuffer());
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public long writeRecord(TypedRecord<WorkflowInstanceRecord> typedRecord, TypedStreamWriter typedStreamWriter) {
            return typedStreamWriter.writeNewEvent(WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN, typedRecord.getValue());
        }

        @Override // io.zeebe.broker.workflow.processor.WorkflowInstanceStreamProcessor.FlowElementEventProcessor
        /* bridge */ /* synthetic */ void processFlowElementEvent(TypedRecord typedRecord, FlowNode flowNode) {
            processFlowElementEvent2((TypedRecord<WorkflowInstanceRecord>) typedRecord, flowNode);
        }
    }

    /* loaded from: input_file:io/zeebe/broker/workflow/processor/WorkflowInstanceStreamProcessor$UpdatePayloadProcessor.class */
    private final class UpdatePayloadProcessor implements CommandProcessor<WorkflowInstanceRecord> {
        private UpdatePayloadProcessor() {
        }

        @Override // io.zeebe.broker.logstreams.processor.CommandProcessor
        public CommandProcessor.CommandResult onCommand(TypedRecord<WorkflowInstanceRecord> typedRecord, CommandProcessor.CommandControl commandControl) {
            WorkflowInstanceIndex.WorkflowInstance workflowInstance = WorkflowInstanceStreamProcessor.this.workflowInstanceIndex.get(typedRecord.getValue().getWorkflowInstanceKey().longValue());
            return workflowInstance != null && workflowInstance.getTokenCount() > 0 ? commandControl.accept(WorkflowInstanceIntent.PAYLOAD_UPDATED) : commandControl.reject(RejectionType.NOT_APPLICABLE, "Workflow instance is not running");
        }

        @Override // io.zeebe.broker.logstreams.processor.CommandProcessor
        public void updateStateOnAccept(TypedRecord<WorkflowInstanceRecord> typedRecord) {
            WorkflowInstanceRecord value = typedRecord.getValue();
            WorkflowInstanceStreamProcessor.this.payloadCache.addPayload(value.getWorkflowInstanceKey().longValue(), typedRecord.getPosition(), value.getPayload());
        }
    }

    /* loaded from: input_file:io/zeebe/broker/workflow/processor/WorkflowInstanceStreamProcessor$WorkflowInstanceCreatedEventProcessor.class */
    private final class WorkflowInstanceCreatedEventProcessor implements TypedRecordProcessor<WorkflowInstanceRecord> {
        private WorkflowInstanceCreatedEventProcessor() {
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public boolean executeSideEffects(TypedRecord<WorkflowInstanceRecord> typedRecord, TypedResponseWriter typedResponseWriter) {
            WorkflowInstanceStreamProcessor.this.workflowInstanceEventCreate.incrementOrdered();
            return typedResponseWriter.writeRecordUnchanged(typedRecord);
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public void updateState(TypedRecord<WorkflowInstanceRecord> typedRecord) {
            WorkflowInstanceStreamProcessor.this.workflowInstanceIndex.newWorkflowInstance(typedRecord.getKey()).setPosition(typedRecord.getPosition()).setActiveTokenCount(1).setActivityInstanceKey(-1L).setWorkflowKey(typedRecord.getValue().getWorkflowKey()).write();
        }
    }

    /* loaded from: input_file:io/zeebe/broker/workflow/processor/WorkflowInstanceStreamProcessor$WorkflowInstanceRejectedEventProcessor.class */
    private final class WorkflowInstanceRejectedEventProcessor implements TypedRecordProcessor<WorkflowInstanceRecord> {
        private WorkflowInstanceRejectedEventProcessor() {
        }

        @Override // io.zeebe.broker.logstreams.processor.TypedRecordProcessor
        public boolean executeSideEffects(TypedRecord<WorkflowInstanceRecord> typedRecord, TypedResponseWriter typedResponseWriter) {
            return typedResponseWriter.writeRecordUnchanged(typedRecord);
        }
    }

    public WorkflowInstanceStreamProcessor(ClientTransport clientTransport, TopologyManager topologyManager, int i) {
        this.managementApiClient = clientTransport;
        this.payloadCache = new PayloadCache(i);
        this.topologyManager = topologyManager;
    }

    public TypedStreamProcessor createStreamProcessor(TypedStreamEnvironment typedStreamEnvironment) {
        BpmnAspectEventProcessor bpmnAspectEventProcessor = new BpmnAspectEventProcessor();
        return typedStreamEnvironment.newStreamProcessor().onCommand(ValueType.WORKFLOW_INSTANCE, (Intent) WorkflowInstanceIntent.CREATE, (TypedRecordProcessor<?>) new CreateWorkflowInstanceEventProcessor()).onEvent(ValueType.WORKFLOW_INSTANCE, (Intent) WorkflowInstanceIntent.CREATED, (TypedRecordProcessor<?>) new WorkflowInstanceCreatedEventProcessor()).onRejection(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.CREATE, new WorkflowInstanceRejectedEventProcessor()).onCommand(ValueType.WORKFLOW_INSTANCE, (Intent) WorkflowInstanceIntent.CANCEL, (TypedRecordProcessor<?>) new CancelWorkflowInstanceProcessor()).onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN, workflowInstanceRecord -> {
            return isActive(workflowInstanceRecord.getWorkflowInstanceKey().longValue());
        }, new SequenceFlowTakenEventProcessor()).onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ACTIVITY_READY, workflowInstanceRecord2 -> {
            return isActive(workflowInstanceRecord2.getWorkflowInstanceKey().longValue());
        }, new ActivityReadyEventProcessor()).onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ACTIVITY_ACTIVATED, workflowInstanceRecord3 -> {
            return isActive(workflowInstanceRecord3.getWorkflowInstanceKey().longValue());
        }, new ActivityActivatedEventProcessor()).onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ACTIVITY_COMPLETING, workflowInstanceRecord4 -> {
            return isActive(workflowInstanceRecord4.getWorkflowInstanceKey().longValue());
        }, new ActivityCompletingEventProcessor()).onCommand(ValueType.WORKFLOW_INSTANCE, (Intent) WorkflowInstanceIntent.UPDATE_PAYLOAD, (CommandProcessor) new UpdatePayloadProcessor()).onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.START_EVENT_OCCURRED, workflowInstanceRecord5 -> {
            return isActive(workflowInstanceRecord5.getWorkflowInstanceKey().longValue());
        }, bpmnAspectEventProcessor).onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.END_EVENT_OCCURRED, workflowInstanceRecord6 -> {
            return isActive(workflowInstanceRecord6.getWorkflowInstanceKey().longValue());
        }, bpmnAspectEventProcessor).onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.GATEWAY_ACTIVATED, workflowInstanceRecord7 -> {
            return isActive(workflowInstanceRecord7.getWorkflowInstanceKey().longValue());
        }, bpmnAspectEventProcessor).onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ACTIVITY_COMPLETED, workflowInstanceRecord8 -> {
            return isActive(workflowInstanceRecord8.getWorkflowInstanceKey().longValue());
        }, bpmnAspectEventProcessor).onEvent(ValueType.WORKFLOW_INSTANCE, (Intent) WorkflowInstanceIntent.CANCELED, workflowInstanceRecord9 -> {
            this.workflowInstanceEventCanceled.incrementOrdered();
        }).onEvent(ValueType.WORKFLOW_INSTANCE, (Intent) WorkflowInstanceIntent.COMPLETED, workflowInstanceRecord10 -> {
            this.workflowInstanceEventCompleted.incrementOrdered();
        }).onEvent(ValueType.JOB, (Intent) JobIntent.CREATED, (TypedRecordProcessor<?>) new JobCreatedProcessor()).onEvent(ValueType.JOB, (Intent) JobIntent.COMPLETED, (TypedRecordProcessor<?>) new JobCompletedEventProcessor()).withStateResource((ZbMap<?, ?>) this.workflowInstanceIndex.getMap()).withStateResource((ZbMap<?, ?>) this.activityInstanceMap.getMap()).withStateResource((ZbMap<?, ?>) this.payloadCache.getMap()).withListener(this.payloadCache).withListener(this).build();
    }

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onOpen(TypedStreamProcessor typedStreamProcessor) {
        this.actor = typedStreamProcessor.getActor();
        LogStream stream = typedStreamProcessor.getEnvironment().getStream();
        this.workflowCache = new WorkflowCache(this.managementApiClient, this.topologyManager, stream.getTopicName());
        MetricsManager metricsManager = typedStreamProcessor.getStreamProcessorContext().getActorScheduler().getMetricsManager();
        String stringWithoutLengthUtf8 = stream.getTopicName().getStringWithoutLengthUtf8(0, stream.getTopicName().capacity());
        String num = Integer.toString(stream.getPartitionId());
        this.workflowInstanceEventCreate = metricsManager.newMetric("workflow_instance_events_count").type("counter").label("topic", stringWithoutLengthUtf8).label("partition", num).label("type", "created").create();
        this.workflowInstanceEventCanceled = metricsManager.newMetric("workflow_instance_events_count").type("counter").label("topic", stringWithoutLengthUtf8).label("partition", num).label("type", "canceled").create();
        this.workflowInstanceEventCompleted = metricsManager.newMetric("workflow_instance_events_count").type("counter").label("topic", stringWithoutLengthUtf8).label("partition", num).label("type", "completed").create();
    }

    @Override // io.zeebe.broker.logstreams.processor.StreamProcessorLifecycleAware
    public void onClose() {
        this.workflowCache.close();
        this.workflowInstanceEventCreate.close();
        this.workflowInstanceEventCanceled.close();
        this.workflowInstanceEventCompleted.close();
    }

    private boolean isActive(long j) {
        WorkflowInstanceIndex.WorkflowInstance workflowInstance = this.workflowInstanceIndex.get(j);
        return workflowInstance != null && workflowInstance.getTokenCount() > 0;
    }

    public void fetchWorkflow(long j, Consumer<DeployedWorkflow> consumer, EventLifecycleContext eventLifecycleContext) {
        ActorFuture<ClientResponse> fetchWorkflowByKey = this.workflowCache.fetchWorkflowByKey(j);
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        eventLifecycleContext.async(completableActorFuture);
        this.actor.runOnCompletion(fetchWorkflowByKey, (clientResponse, th) -> {
            if (th != null) {
                completableActorFuture.completeExceptionally(new RuntimeException("Could not fetch workflow", th));
                return;
            }
            try {
                consumer.accept(this.workflowCache.addWorkflow(clientResponse.getResponseBuffer()));
                completableActorFuture.complete((Object) null);
            } catch (Exception e) {
                completableActorFuture.completeExceptionally(new RuntimeException("Error while processing fetched workflow", e));
            }
        });
    }
}
