/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.bpmn.activity;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.builder.BoundaryEventBuilder;
import io.camunda.zeebe.model.bpmn.builder.ServiceTaskBuilder;
import io.camunda.zeebe.model.bpmn.builder.StartEventBuilder;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.intent.IncidentIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.VariableIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.protocol.record.value.ErrorType;
import io.camunda.zeebe.protocol.record.value.IncidentRecordValue;
import io.camunda.zeebe.protocol.record.value.JobBatchRecordValue;
import io.camunda.zeebe.protocol.record.value.JobKind;
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
import io.camunda.zeebe.protocol.record.value.JobRecordValueAssert;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import io.camunda.zeebe.protocol.record.value.VariableRecordValue;
import io.camunda.zeebe.test.util.record.IncidentRecordStream;
import io.camunda.zeebe.test.util.record.JobRecordStream;
import io.camunda.zeebe.test.util.record.ProcessInstanceRecordStream;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.assertj.core.groups.Tuple;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

public class ExecutionListenerJobTest {
    @ClassRule
    public static final EngineRule ENGINE = EngineRule.singlePartition();
    private static final String PROCESS_ID = "process";
    private static final String SERVICE_TASK_TYPE = "test_service_task";
    private static final String START_EL_TYPE = "start_execution_listener";
    private static final String END_EL_TYPE = "end_execution_listener";
    private static final BpmnModelInstance SIMPLE_PROCESS = Bpmn.createExecutableProcess((String)"process").startEvent().serviceTask("task", t -> ((ServiceTaskBuilder)t.zeebeJobType(SERVICE_TASK_TYPE)).zeebeStartExecutionListener(START_EL_TYPE)).endEvent().done();
    @Rule
    public final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();

    @Test
    public void shouldCompleteServiceTaskWithExecutionListeners() {
        BpmnModelInstance modelInstance = ((ServiceTaskBuilder)((ServiceTaskBuilder)((ServiceTaskBuilder)Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().serviceTask("task", t -> t.zeebeJobType(SERVICE_TASK_TYPE)).zeebeStartExecutionListener("start_execution_listener_1")).zeebeStartExecutionListener("start_execution_listener_2")).zeebeEndExecutionListener(END_EL_TYPE)).endEvent().done();
        ENGINE.deployment().withXmlResource(modelInstance).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        this.verifyJobCreationThenComplete(processInstanceKey, 0L, "start_execution_listener_1", JobKind.EXECUTION_LISTENER);
        this.verifyJobCreationThenComplete(processInstanceKey, 1L, "start_execution_listener_2", JobKind.EXECUTION_LISTENER);
        this.verifyJobCreationThenComplete(processInstanceKey, 2L, SERVICE_TASK_TYPE, JobKind.BPMN_ELEMENT);
        this.verifyJobCreationThenComplete(processInstanceKey, 3L, END_EL_TYPE, JobKind.EXECUTION_LISTENER);
        ProcessInstanceRecordStream processInstanceRecordStream = RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted();
        Assertions.assertThat((Stream)processInstanceRecordStream).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.COMPLETE_EXECUTION_LISTENER}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.COMPLETE_EXECUTION_LISTENER}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.COMPLETE_ELEMENT}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_COMPLETING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.COMPLETE_EXECUTION_LISTENER}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldRetryExecutionListener() {
        ENGINE.deployment().withXmlResource(SIMPLE_PROCESS).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(START_EL_TYPE).withRetries(1).fail();
        ENGINE.job().ofInstance(processInstanceKey).withType(START_EL_TYPE).complete();
        this.assertJobState(processInstanceKey, 1L, SERVICE_TASK_TYPE, JobIntent.CREATED, JobKind.BPMN_ELEMENT);
    }

    @Test
    public void shouldCreateIncidentForExecutionListenerWhenNoRetriesLeft() {
        ENGINE.deployment().withXmlResource(SIMPLE_PROCESS).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(START_EL_TYPE).withRetries(0).fail();
        Record incident = (Record)RecordingExporter.incidentRecords((IncidentIntent)IncidentIntent.CREATED).withProcessInstanceKey(processInstanceKey).getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((IncidentRecordValue)((IncidentRecordValue)incident.getValue())).hasProcessInstanceKey(processInstanceKey).hasErrorType(ErrorType.JOB_NO_RETRIES).hasErrorMessage("No more retries left.");
        ENGINE.incident().ofInstance(processInstanceKey).withKey(incident.getKey()).resolve();
        ENGINE.job().ofInstance(processInstanceKey).withType(START_EL_TYPE).complete();
        this.assertJobState(processInstanceKey, 1L, SERVICE_TASK_TYPE, JobIntent.CREATED, JobKind.BPMN_ELEMENT);
    }

    @Test
    public void shouldProceedWithRemainingExecutionListenersAfterResolvingIncidentForEndEL() {
        BpmnModelInstance modelInstance = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent("start").serviceTask("task", t -> ((ServiceTaskBuilder)((ServiceTaskBuilder)((ServiceTaskBuilder)((ServiceTaskBuilder)t.zeebeJobType(SERVICE_TASK_TYPE)).zeebeStartExecutionListener(START_EL_TYPE)).zeebeEndExecutionListener("end_execution_listener_1")).zeebeEndExecutionListener("end_execution_listener_2")).zeebeEndExecutionListener("end_execution_listener_3")).endEvent("end").done();
        ENGINE.deployment().withXmlResource(modelInstance).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(START_EL_TYPE).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("end_execution_listener_1").complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("end_execution_listener_2").withRetries(0).fail();
        Record incident = (Record)RecordingExporter.incidentRecords((IncidentIntent)IncidentIntent.CREATED).withProcessInstanceKey(processInstanceKey).getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((IncidentRecordValue)((IncidentRecordValue)incident.getValue())).hasProcessInstanceKey(processInstanceKey).hasErrorType(ErrorType.JOB_NO_RETRIES).hasErrorMessage("No more retries left.");
        ENGINE.incident().ofInstance(processInstanceKey).withKey(incident.getKey()).resolve();
        ENGINE.job().ofInstance(processInstanceKey).withType("end_execution_listener_2").complete();
        this.verifyJobCreationThenComplete(processInstanceKey, 4L, "end_execution_listener_3", JobKind.EXECUTION_LISTENER);
    }

    @Test
    public void shouldCreateIncidentDuringEvaluatingServiceTaskInputMappingsAndResumeWithStartELsAfterResolving() {
        BpmnModelInstance modelInstance = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent("start").serviceTask("task", t -> ((ServiceTaskBuilder)((ServiceTaskBuilder)((ServiceTaskBuilder)t.zeebeJobType(SERVICE_TASK_TYPE)).zeebeInputExpression("assert(some_var, some_var != null)", "o_var_1")).zeebeStartExecutionListener(START_EL_TYPE)).zeebeEndExecutionListener(END_EL_TYPE)).endEvent("end").done();
        ENGINE.deployment().withXmlResource(modelInstance).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        Record incident = (Record)RecordingExporter.incidentRecords((IncidentIntent)IncidentIntent.CREATED).withProcessInstanceKey(processInstanceKey).getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((IncidentRecordValue)((IncidentRecordValue)incident.getValue())).hasProcessInstanceKey(processInstanceKey).hasErrorType(ErrorType.IO_MAPPING_ERROR).hasErrorMessage("Assertion failure on evaluate the expression '{o_var_1:assert(some_var, some_var != null)}': The condition is not fulfilled The evaluation reported the following warnings:\n[NO_VARIABLE_FOUND] No variable found with name 'some_var'\n[NO_VARIABLE_FOUND] No variable found with name 'some_var'\n[ASSERT_FAILURE] The condition is not fulfilled");
        ENGINE.variables().ofScope(processInstanceKey).withDocument(Map.of("some_var", "foo_bar")).update();
        ENGINE.incident().ofInstance(processInstanceKey).withKey(incident.getKey()).resolve();
        this.verifyJobCreationThenComplete(processInstanceKey, 0L, START_EL_TYPE, JobKind.EXECUTION_LISTENER);
        this.verifyJobCreationThenComplete(processInstanceKey, 1L, SERVICE_TASK_TYPE, JobKind.BPMN_ELEMENT);
        this.verifyJobCreationThenComplete(processInstanceKey, 2L, END_EL_TYPE, JobKind.EXECUTION_LISTENER);
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.START_EVENT, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.START_EVENT, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.COMPLETE_EXECUTION_LISTENER}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldCreateIncidentDuringEvaluatingServiceTaskOutputMappingsAndResumeWithEndELsAfterResolving() {
        BpmnModelInstance modelInstance = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent("start").serviceTask("task", t -> ((ServiceTaskBuilder)((ServiceTaskBuilder)((ServiceTaskBuilder)t.zeebeJobType(SERVICE_TASK_TYPE)).zeebeOutputExpression("assert(some_var, some_var != null)", "o_var_1")).zeebeStartExecutionListener(START_EL_TYPE)).zeebeEndExecutionListener(END_EL_TYPE)).endEvent("end").done();
        ENGINE.deployment().withXmlResource(modelInstance).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(START_EL_TYPE).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE).complete();
        Record incident = (Record)RecordingExporter.incidentRecords((IncidentIntent)IncidentIntent.CREATED).withProcessInstanceKey(processInstanceKey).getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((IncidentRecordValue)((IncidentRecordValue)incident.getValue())).hasProcessInstanceKey(processInstanceKey).hasErrorType(ErrorType.IO_MAPPING_ERROR).hasErrorMessage("Assertion failure on evaluate the expression '{o_var_1:assert(some_var, some_var != null)}': The condition is not fulfilled The evaluation reported the following warnings:\n[NO_VARIABLE_FOUND] No variable found with name 'some_var'\n[NO_VARIABLE_FOUND] No variable found with name 'some_var'\n[ASSERT_FAILURE] The condition is not fulfilled");
        ENGINE.variables().ofScope(processInstanceKey).withDocument(Map.of("some_var", "foo_bar")).update();
        ENGINE.incident().ofInstance(processInstanceKey).withKey(incident.getKey()).resolve();
        this.verifyJobCreationThenComplete(processInstanceKey, 2L, END_EL_TYPE, JobKind.EXECUTION_LISTENER);
    }

    @Test
    public void shouldCreateIncidentWhenCorrelationKeyNotProvidedBeforeProcessingTaskWithMessageBoundaryEvent() {
        BpmnModelInstance modelInstance = ((BoundaryEventBuilder)Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().serviceTask("task", t -> ((ServiceTaskBuilder)((ServiceTaskBuilder)t.zeebeJobType(SERVICE_TASK_TYPE)).zeebeStartExecutionListener(START_EL_TYPE)).zeebeEndExecutionListener(END_EL_TYPE)).boundaryEvent("boundary_event").message(b -> b.name("service_task_event").zeebeCorrelationKeyExpression("order_id"))).endEvent().done();
        ENGINE.deployment().withXmlResource(modelInstance).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(START_EL_TYPE).complete();
        Record firstIncident = (Record)((IncidentRecordStream)RecordingExporter.incidentRecords((IncidentIntent)IncidentIntent.CREATED).withProcessInstanceKey(processInstanceKey).skip(0L)).getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((IncidentRecordValue)((IncidentRecordValue)firstIncident.getValue())).hasProcessInstanceKey(processInstanceKey).hasErrorType(ErrorType.EXTRACT_VALUE_ERROR).hasErrorMessage("Failed to extract the correlation key for 'order_id': The value must be either a string or a number, but was 'NULL'. The evaluation reported the following warnings:\n[NO_VARIABLE_FOUND] No variable found with name 'order_id'");
        ENGINE.variables().ofScope(processInstanceKey).withDocument(Map.of("order_id", 123)).update();
        ENGINE.incident().ofInstance(processInstanceKey).withKey(firstIncident.getKey()).resolve();
        ExecutionListenerJobTest.completeRecreatedJobWithType(processInstanceKey, START_EL_TYPE);
        this.verifyJobCreationThenComplete(processInstanceKey, 2L, SERVICE_TASK_TYPE, JobKind.BPMN_ELEMENT);
        this.assertJobState(processInstanceKey, 3L, END_EL_TYPE, JobIntent.CREATED, JobKind.EXECUTION_LISTENER);
    }

    @Test
    public void shouldCreateIncidentWhenServiceTaskWithExecutionListenersFailed() {
        BpmnModelInstance modelInstance = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent("start").serviceTask("task", t -> ((ServiceTaskBuilder)((ServiceTaskBuilder)t.zeebeJobType(SERVICE_TASK_TYPE)).zeebeStartExecutionListener(START_EL_TYPE)).zeebeEndExecutionListener(END_EL_TYPE)).endEvent("end").done();
        ENGINE.deployment().withXmlResource(modelInstance).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(START_EL_TYPE).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE).withRetries(0).fail();
        Record firstIncident = (Record)RecordingExporter.incidentRecords((IncidentIntent)IncidentIntent.CREATED).withProcessInstanceKey(processInstanceKey).getFirst();
        ENGINE.incident().ofInstance(processInstanceKey).withKey(firstIncident.getKey()).resolve();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE).complete();
        this.assertJobState(processInstanceKey, 2L, END_EL_TYPE, JobIntent.CREATED, JobKind.EXECUTION_LISTENER);
    }

    @Test
    public void shouldRecurFailedExecutionListenerJobAfterBackoff() {
        BpmnModelInstance modelInstance = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent("start").serviceTask("task", t -> ((ServiceTaskBuilder)t.zeebeJobType(SERVICE_TASK_TYPE)).zeebeStartExecutionListener(START_EL_TYPE)).endEvent("end").done();
        ENGINE.deployment().withXmlResource(modelInstance).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        Duration backOff = Duration.ofMinutes(30L);
        Record<JobRecordValue> failedStartElJobRecord = ENGINE.job().ofInstance(processInstanceKey).withType(START_EL_TYPE).withBackOff(backOff).withRetries(2).fail();
        io.camunda.zeebe.protocol.record.Assertions.assertThat(failedStartElJobRecord).hasRecordType(RecordType.EVENT).hasIntent((Intent)JobIntent.FAILED);
        ENGINE.increaseTime(backOff);
        Record recurredJob = (Record)RecordingExporter.jobRecords((JobIntent)JobIntent.RECURRED_AFTER_BACKOFF).withType(START_EL_TYPE).getFirst();
        Assertions.assertThat((long)recurredJob.getKey()).isEqualTo(failedStartElJobRecord.getKey());
        ENGINE.job().ofInstance(processInstanceKey).withType(START_EL_TYPE).complete();
        this.assertJobState(processInstanceKey, 1L, SERVICE_TASK_TYPE, JobIntent.CREATED, JobKind.BPMN_ELEMENT);
    }

    @Test
    public void shouldReCreateFirstElJobAfterResolvingIncidentCreatedDuringResolvingServiceJobExpressions() {
        BpmnModelInstance modelInstance = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent("start").serviceTask("task", t -> ((ServiceTaskBuilder)t.zeebeJobTypeExpression("=service_task_job_name_var + \"_type\"")).zeebeStartExecutionListener(START_EL_TYPE)).endEvent("end").done();
        ENGINE.deployment().withXmlResource(modelInstance).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(START_EL_TYPE).complete();
        Record incident = (Record)RecordingExporter.incidentRecords((IncidentIntent)IncidentIntent.CREATED).withProcessInstanceKey(processInstanceKey).getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((IncidentRecordValue)((IncidentRecordValue)incident.getValue())).hasProcessInstanceKey(processInstanceKey).hasErrorType(ErrorType.EXTRACT_VALUE_ERROR).hasErrorMessage("Expected result of the expression 'service_task_job_name_var + \"_type\"' to be 'STRING', but was 'NULL'. The evaluation reported the following warnings:\n[NO_VARIABLE_FOUND] No variable found with name 'service_task_job_name_var'\n[INVALID_TYPE] Can't add '\"_type\"' to 'null'");
        ENGINE.incident().ofInstance(processInstanceKey).withKey(incident.getKey()).resolve();
        this.assertJobState(processInstanceKey, 1L, START_EL_TYPE, JobIntent.CREATED, JobKind.EXECUTION_LISTENER);
    }

    @Test
    public void shouldAccessJobVariablesInEndListener() {
        ENGINE.deployment().withXmlResource(((ServiceTaskBuilder)Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().serviceTask("task", t -> t.zeebeJobType(SERVICE_TASK_TYPE)).zeebeEndExecutionListener(END_EL_TYPE)).endEvent().done()).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE).withVariable("x", 1).complete();
        Optional<JobRecordValue> jobActivated = ((JobBatchRecordValue)ENGINE.jobs().withType(END_EL_TYPE).activate().getValue()).getJobs().stream().filter(job -> job.getProcessInstanceKey() == processInstanceKey).findFirst();
        Assertions.assertThat(jobActivated).isPresent();
        Assertions.assertThat((Map)jobActivated.get().getVariables()).contains(new Map.Entry[]{Assertions.entry((Object)"x", (Object)1)});
    }

    @Test
    public void shouldCompleteExecutionListenerJobWithVariablesMerging() {
        BpmnModelInstance modelInstance = ((ServiceTaskBuilder)((ServiceTaskBuilder)((StartEventBuilder)Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().zeebeOutput("=\"aValue\"", "startEventVar")).serviceTask("serviceTask", t -> ((ServiceTaskBuilder)((ServiceTaskBuilder)((ServiceTaskBuilder)t.zeebeJobType(SERVICE_TASK_TYPE)).zeebeStartExecutionListener(START_EL_TYPE)).zeebeEndExecutionListener("end_execution_listener_1")).zeebeEndExecutionListener("end_execution_listener_2")).zeebeInput("=\"bValue\"", "serviceTaskVar")).zeebeOutput("=startEventVar + \"+\" + serviceTaskVar", "mergedVars")).endEvent().done();
        ENGINE.deployment().withXmlResource(modelInstance).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        this.assertVariable(processInstanceKey, VariableIntent.CREATED, "startEventVar", "\"aValue\"");
        this.assertVariable(processInstanceKey, VariableIntent.CREATED, "serviceTaskVar", "\"bValue\"");
        ENGINE.job().ofInstance(processInstanceKey).withType(START_EL_TYPE).withVariables(Map.of("newVarFromStartListener", "cValue", "serviceTaskVar", "bValueUpdated")).complete();
        this.assertVariable(processInstanceKey, VariableIntent.UPDATED, "serviceTaskVar", "\"bValueUpdated\"");
        this.assertVariable(processInstanceKey, VariableIntent.CREATED, "newVarFromStartListener", "\"cValue\"");
        ENGINE.job().ofInstance(processInstanceKey).withType(SERVICE_TASK_TYPE).complete();
        ENGINE.job().ofInstance(processInstanceKey).withType("end_execution_listener_1").withVariable("startEventVar", "aValueUpdated").complete();
        this.assertVariable(processInstanceKey, VariableIntent.UPDATED, "startEventVar", "\"aValueUpdated\"");
        ENGINE.job().ofInstance(processInstanceKey).withType("end_execution_listener_2").complete();
        this.assertVariable(processInstanceKey, VariableIntent.CREATED, "mergedVars", "\"aValue+bValueUpdated\"");
    }

    @Test
    public void shouldEvaluateExpressionsForExecutionListenerJobs() {
        BpmnModelInstance modelInstance = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent("start").serviceTask("task", t -> ((ServiceTaskBuilder)((ServiceTaskBuilder)((ServiceTaskBuilder)t.zeebeJobType(SERVICE_TASK_TYPE)).zeebeExecutionListener(b -> b.start().typeExpression("listenerNameVar").retriesExpression("elRetries"))).zeebeExecutionListener(b -> b.start().type("start_execution_listener_2").retries("5"))).zeebeExecutionListener(b -> b.end().type(END_EL_TYPE).retriesExpression("elRetries + 5"))).endEvent("end").done();
        ENGINE.deployment().withXmlResource(modelInstance).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariables(Map.of("elRetries", 6, "listenerNameVar", "start_execution_listener_1")).create();
        this.verifyJobCreationThenComplete(processInstanceKey, 0L, "start_execution_listener_1", JobKind.EXECUTION_LISTENER).hasRetries(6);
        this.verifyJobCreationThenComplete(processInstanceKey, 1L, "start_execution_listener_2", JobKind.EXECUTION_LISTENER).hasRetries(5);
        this.verifyJobCreationThenComplete(processInstanceKey, 2L, SERVICE_TASK_TYPE, JobKind.BPMN_ELEMENT).hasRetries(3);
        this.verifyJobCreationThenComplete(processInstanceKey, 3L, END_EL_TYPE, JobKind.EXECUTION_LISTENER).hasRetries(11);
    }

    @Test
    public void shouldRecreateStartExecutionListenerJobsAndProceedAfterIncidentResolution() {
        BpmnModelInstance modelInstance = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent("start").serviceTask("task", t -> ((ServiceTaskBuilder)((ServiceTaskBuilder)((ServiceTaskBuilder)((ServiceTaskBuilder)t.zeebeJobType(SERVICE_TASK_TYPE)).zeebeExecutionListener(b -> b.start().type("start_execution_listener_1"))).zeebeExecutionListener(b -> b.start().typeExpression("listenerNameVar"))).zeebeExecutionListener(b -> b.start().type("start_execution_listener_3"))).zeebeExecutionListener(b -> b.end().type(END_EL_TYPE))).endEvent("end").done();
        ENGINE.deployment().withXmlResource(modelInstance).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        this.verifyJobCreationThenComplete(processInstanceKey, 0L, "start_execution_listener_1", JobKind.EXECUTION_LISTENER);
        Record incident = (Record)RecordingExporter.incidentRecords((IncidentIntent)IncidentIntent.CREATED).withProcessInstanceKey(processInstanceKey).getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((IncidentRecordValue)((IncidentRecordValue)incident.getValue())).hasProcessInstanceKey(processInstanceKey).hasErrorType(ErrorType.EXTRACT_VALUE_ERROR).hasErrorMessage("Expected result of the expression 'listenerNameVar' to be 'STRING', but was 'NULL'. The evaluation reported the following warnings:\n[NO_VARIABLE_FOUND] No variable found with name 'listenerNameVar'");
        ENGINE.variables().ofScope(processInstanceKey).withDocument(Map.of("listenerNameVar", "start_execution_listener_2")).update();
        ENGINE.incident().ofInstance(processInstanceKey).withKey(incident.getKey()).resolve();
        this.assertJobState(processInstanceKey, 1L, "start_execution_listener_1", JobIntent.CREATED, JobKind.EXECUTION_LISTENER);
        ExecutionListenerJobTest.completeRecreatedJobWithType(processInstanceKey, "start_execution_listener_1");
        this.verifyJobCreationThenComplete(processInstanceKey, 2L, "start_execution_listener_2", JobKind.EXECUTION_LISTENER);
        this.verifyJobCreationThenComplete(processInstanceKey, 3L, "start_execution_listener_3", JobKind.EXECUTION_LISTENER);
        this.verifyJobCreationThenComplete(processInstanceKey, 4L, SERVICE_TASK_TYPE, JobKind.BPMN_ELEMENT);
        this.verifyJobCreationThenComplete(processInstanceKey, 5L, END_EL_TYPE, JobKind.EXECUTION_LISTENER);
    }

    @Test
    public void shouldRecreateEndExecutionListenerJobsAndProceedAfterIncidentResolution() {
        BpmnModelInstance modelInstance = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent("start").serviceTask("task", t -> ((ServiceTaskBuilder)((ServiceTaskBuilder)((ServiceTaskBuilder)((ServiceTaskBuilder)t.zeebeJobType(SERVICE_TASK_TYPE)).zeebeExecutionListener(b -> b.start().type(START_EL_TYPE))).zeebeExecutionListener(b -> b.end().type("end_execution_listener_1"))).zeebeExecutionListener(b -> b.end().type("end_execution_listener_2"))).zeebeExecutionListener(b -> b.end().typeExpression("listenerNameVar"))).endEvent("end").done();
        ENGINE.deployment().withXmlResource(modelInstance).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();
        this.verifyJobCreationThenComplete(processInstanceKey, 0L, START_EL_TYPE, JobKind.EXECUTION_LISTENER);
        this.verifyJobCreationThenComplete(processInstanceKey, 1L, SERVICE_TASK_TYPE, JobKind.BPMN_ELEMENT);
        this.verifyJobCreationThenComplete(processInstanceKey, 2L, "end_execution_listener_1", JobKind.EXECUTION_LISTENER);
        this.verifyJobCreationThenComplete(processInstanceKey, 3L, "end_execution_listener_2", JobKind.EXECUTION_LISTENER);
        Record incident = (Record)RecordingExporter.incidentRecords((IncidentIntent)IncidentIntent.CREATED).withProcessInstanceKey(processInstanceKey).getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((IncidentRecordValue)((IncidentRecordValue)incident.getValue())).hasProcessInstanceKey(processInstanceKey).hasErrorType(ErrorType.EXTRACT_VALUE_ERROR).hasErrorMessage("Expected result of the expression 'listenerNameVar' to be 'STRING', but was 'NULL'. The evaluation reported the following warnings:\n[NO_VARIABLE_FOUND] No variable found with name 'listenerNameVar'");
        ENGINE.variables().ofScope(processInstanceKey).withDocument(Map.of("listenerNameVar", "end_execution_listener_3")).update();
        ENGINE.incident().ofInstance(processInstanceKey).withKey(incident.getKey()).resolve();
        this.assertJobState(processInstanceKey, 4L, "end_execution_listener_1", JobIntent.CREATED, JobKind.EXECUTION_LISTENER);
        ExecutionListenerJobTest.completeRecreatedJobWithType(processInstanceKey, "end_execution_listener_1");
        ExecutionListenerJobTest.completeRecreatedJobWithType(processInstanceKey, "end_execution_listener_2");
        this.verifyJobCreationThenComplete(processInstanceKey, 6L, "end_execution_listener_3", JobKind.EXECUTION_LISTENER);
    }

    private static void completeRecreatedJobWithType(long processInstanceKey, String jobType) {
        long jobKey = ((Record)((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withProcessInstanceKey(processInstanceKey).withType(jobType).skip(1L)).getFirst()).getKey();
        ENGINE.job().ofInstance(processInstanceKey).withKey(jobKey).complete();
    }

    private void assertVariable(long processInstanceKey, VariableIntent intent, String varName, String expectedVarValue) {
        Record variableRecordValueRecord = (Record)RecordingExporter.variableRecords((VariableIntent)intent).withProcessInstanceKey(processInstanceKey).withName(varName).getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((VariableRecordValue)((VariableRecordValue)variableRecordValueRecord.getValue())).hasName(varName).hasValue(expectedVarValue);
    }

    private JobRecordValueAssert assertJobState(long processInstanceKey, long jobIndex, String expectedJobType, JobIntent expectedJobIntent, JobKind expectedJobKind) {
        Record activatingJob = (Record)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).withIntent((Intent)ProcessInstanceIntent.ELEMENT_ACTIVATING)).withElementType(BpmnElementType.SERVICE_TASK).getFirst();
        Record jobRecord = (Record)((JobRecordStream)RecordingExporter.jobRecords((JobIntent)expectedJobIntent).withProcessInstanceKey(processInstanceKey).skip(jobIndex)).getFirst();
        return io.camunda.zeebe.protocol.record.Assertions.assertThat((JobRecordValue)((JobRecordValue)jobRecord.getValue())).hasElementInstanceKey(activatingJob.getKey()).hasElementId(((ProcessInstanceRecordValue)activatingJob.getValue()).getElementId()).hasProcessDefinitionKey(((ProcessInstanceRecordValue)activatingJob.getValue()).getProcessDefinitionKey()).hasBpmnProcessId(((ProcessInstanceRecordValue)activatingJob.getValue()).getBpmnProcessId()).hasProcessDefinitionVersion(((ProcessInstanceRecordValue)activatingJob.getValue()).getVersion()).hasJobKind(expectedJobKind).hasType(expectedJobType);
    }

    private JobRecordValueAssert verifyJobCreationThenComplete(long processInstanceKey, long jobIndex, String jobType, JobKind jobKind) {
        this.assertJobState(processInstanceKey, jobIndex, jobType, JobIntent.CREATED, jobKind);
        ENGINE.job().ofInstance(processInstanceKey).withType(jobType).complete();
        return this.assertJobState(processInstanceKey, jobIndex, jobType, JobIntent.COMPLETED, jobKind);
    }
}

