/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.bpmn.multiinstance;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.builder.BoundaryEventBuilder;
import io.camunda.zeebe.model.bpmn.builder.MultiInstanceLoopCharacteristicsBuilder;
import io.camunda.zeebe.model.bpmn.builder.ServiceTaskBuilder;
import io.camunda.zeebe.model.bpmn.builder.zeebe.MessageBuilder;
import io.camunda.zeebe.model.bpmn.instance.ServiceTask;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.JobBatchIntent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.VariableIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.protocol.record.value.JobBatchRecordValue;
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import io.camunda.zeebe.protocol.record.value.VariableRecordValue;
import io.camunda.zeebe.test.util.BrokerClassRuleHelper;
import io.camunda.zeebe.test.util.JsonUtil;
import io.camunda.zeebe.test.util.record.JobRecordStream;
import io.camunda.zeebe.test.util.record.ProcessInstanceRecordStream;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.groups.Tuple;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public final class MultiInstanceActivityTest {
    @ClassRule
    public static final EngineRule ENGINE = EngineRule.singlePartition();
    private static final String PROCESS_ID = "process";
    private static final String ELEMENT_ID = "task";
    private static final String INPUT_COLLECTION_EXPRESSION = "items";
    private static final String INPUT_ELEMENT_VARIABLE = "item";
    private static final List<Integer> INPUT_COLLECTION = List.of(Integer.valueOf(10), Integer.valueOf(20), Integer.valueOf(30));
    private static final String OUTPUT_COLLECTION_VARIABLE = "results";
    private static final String OUTPUT_ELEMENT_EXPRESSION = "result";
    private static final List<Integer> OUTPUT_COLLECTION = List.of(Integer.valueOf(11), Integer.valueOf(22), Integer.valueOf(33));
    private static final String MESSAGE_CORRELATION_KEY_VARIABLE = "correlationKey";
    private static final String MESSAGE_CORRELATION_KEY = "key-123";
    private static final String MESSAGE_NAME = "message";
    private static final Consumer<MultiInstanceLoopCharacteristicsBuilder> INPUT_VARIABLE_BUILDER = MultiInstanceActivityTest.multiInstance(m -> ((MultiInstanceLoopCharacteristicsBuilder)((MultiInstanceLoopCharacteristicsBuilder)((MultiInstanceLoopCharacteristicsBuilder)m.zeebeInputCollectionExpression(INPUT_COLLECTION_EXPRESSION)).zeebeInputElement(INPUT_ELEMENT_VARIABLE)).zeebeOutputElementExpression(OUTPUT_ELEMENT_EXPRESSION)).zeebeOutputCollection(OUTPUT_COLLECTION_VARIABLE));
    private static final Consumer<MessageBuilder> MESSAGE_BUILDER = m -> m.name(MESSAGE_NAME).zeebeCorrelationKeyExpression(MESSAGE_CORRELATION_KEY_VARIABLE);
    @Rule
    public final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();
    @Rule
    public final BrokerClassRuleHelper helper = new BrokerClassRuleHelper();
    @Parameterized.Parameter(value=0)
    public String loopCharacteristics;
    @Parameterized.Parameter(value=1)
    public Consumer<MultiInstanceLoopCharacteristicsBuilder> miBuilder;
    @Parameterized.Parameter(value=2)
    public List<Tuple> expectedLifecycle;
    private String jobType;

    private BpmnModelInstance process(Consumer<MultiInstanceLoopCharacteristicsBuilder> builder) {
        return Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().serviceTask(ELEMENT_ID, t -> ((ServiceTaskBuilder)t.zeebeJobType(this.jobType)).multiInstance(INPUT_VARIABLE_BUILDER.andThen(builder))).endEvent().done();
    }

    @Parameterized.Parameters(name="{0} multi-instance")
    public static Object[][] parameters() {
        return new Object[][]{{"parallel", MultiInstanceActivityTest.multiInstance(m -> m.parallel()), MultiInstanceActivityTest.parallelLifecycle()}, {"sequential", MultiInstanceActivityTest.multiInstance(m -> m.sequential()), MultiInstanceActivityTest.sequentialLifecycle()}};
    }

    @Before
    public void init() {
        this.jobType = this.helper.getJobType();
    }

    private static Consumer<MultiInstanceLoopCharacteristicsBuilder> multiInstance(Consumer<MultiInstanceLoopCharacteristicsBuilder> builder) {
        return builder;
    }

    private static List<Tuple> parallelLifecycle() {
        return Arrays.asList(Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_COMPLETING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_COMPLETING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_COMPLETING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_COMPLETED}));
    }

    private static List<Tuple> sequentialLifecycle() {
        return Arrays.asList(Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_COMPLETING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_COMPLETING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_COMPLETING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_COMPLETED}));
    }

    @Test
    public void shouldActivateActivitiesWithLoopCharacteristics() {
        ENGINE.deployment().withXmlResource(this.process(this.miBuilder)).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION).create();
        this.completeJobs(processInstanceKey, INPUT_COLLECTION.size());
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted().withElementId(ELEMENT_ID)).extracting(r -> Assertions.tuple((Object[])new Object[]{((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r.getIntent()})).containsSubsequence(this.expectedLifecycle);
    }

    @Test
    public void shouldActivateActivitiesForEachElement() {
        ENGINE.deployment().withXmlResource(this.process(this.miBuilder)).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION).create();
        this.completeJobs(processInstanceKey, INPUT_COLLECTION.size());
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted().withElementId(ELEMENT_ID)).extracting(r -> Assertions.tuple((Object[])new Object[]{((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r.getIntent()})).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.MULTI_INSTANCE_BODY, ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.MULTI_INSTANCE_BODY, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATED})});
    }

    @Test
    public void shouldCreateOneJobForEachElement() {
        ENGINE.deployment().withXmlResource(this.process(this.miBuilder)).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION).create();
        this.completeJobs(processInstanceKey, INPUT_COLLECTION.size());
        ((ListAssert)Assertions.assertThat((Stream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withProcessInstanceKey(processInstanceKey).limit((long)INPUT_COLLECTION.size())).hasSize(INPUT_COLLECTION.size())).extracting(Record::getValue).extracting(JobRecordValue::getElementId).containsOnly((Object[])new String[]{ELEMENT_ID});
    }

    @Test
    public void shouldCompleteBodyWhenAllJobsAreCompleted() {
        ENGINE.deployment().withXmlResource(this.process(this.miBuilder)).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION).create();
        this.completeJobs(processInstanceKey, INPUT_COLLECTION.size());
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted().withElementId(ELEMENT_ID)).extracting(r -> Assertions.tuple((Object[])new Object[]{((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r.getIntent()})).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.MULTI_INSTANCE_BODY, ProcessInstanceIntent.COMPLETE_ELEMENT}), Assertions.tuple((Object[])new Object[]{BpmnElementType.MULTI_INSTANCE_BODY, ProcessInstanceIntent.ELEMENT_COMPLETING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.MULTI_INSTANCE_BODY, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldGoThroughMultiInstanceActivity() {
        ENGINE.deployment().withXmlResource(this.process(this.miBuilder)).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION).create();
        this.completeJobs(processInstanceKey, INPUT_COLLECTION.size());
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(r -> Assertions.tuple((Object[])new Object[]{((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r.getIntent()})).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.START_EVENT, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SEQUENCE_FLOW, ProcessInstanceIntent.SEQUENCE_FLOW_TAKEN}), Assertions.tuple((Object[])new Object[]{BpmnElementType.MULTI_INSTANCE_BODY, ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.MULTI_INSTANCE_BODY, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.MULTI_INSTANCE_BODY, ProcessInstanceIntent.ELEMENT_COMPLETING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.MULTI_INSTANCE_BODY, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SEQUENCE_FLOW, ProcessInstanceIntent.SEQUENCE_FLOW_TAKEN}), Assertions.tuple((Object[])new Object[]{BpmnElementType.END_EVENT, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldSetInputElementVariable() {
        ENGINE.deployment().withXmlResource(this.process(this.miBuilder)).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION).create();
        this.completeJobs(processInstanceKey, INPUT_COLLECTION.size());
        Assertions.assertThat((Stream)RecordingExporter.jobBatchRecords((JobBatchIntent)JobBatchIntent.ACTIVATED).withType(this.jobType).limit(3L)).flatExtracting(r -> ((JobBatchRecordValue)r.getValue()).getJobs()).extracting(j -> j.getVariables().get(INPUT_ELEMENT_VARIABLE)).containsExactlyElementsOf(INPUT_COLLECTION);
        List jsonInputCollection = INPUT_COLLECTION.stream().map(JsonUtil::toJson).collect(Collectors.toList());
        Assertions.assertThat((Stream)RecordingExporter.variableRecords((VariableIntent)VariableIntent.CREATED).withProcessInstanceKey(processInstanceKey).withName(INPUT_ELEMENT_VARIABLE).limit(3L)).extracting(r -> ((VariableRecordValue)r.getValue()).getValue()).containsExactlyElementsOf(jsonInputCollection);
    }

    @Test
    public void shouldNotPropagateInputElementVariable() {
        ENGINE.deployment().withXmlResource(this.process(this.miBuilder)).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION).create();
        this.completeJobs(processInstanceKey, INPUT_COLLECTION.size());
        Assertions.assertThat((Stream)RecordingExporter.records().limitToProcessInstance(processInstanceKey).variableRecords().withProcessInstanceKey(processInstanceKey).withScopeKey(processInstanceKey)).extracting(r -> ((VariableRecordValue)r.getValue()).getName()).doesNotContain((Object[])new String[]{INPUT_ELEMENT_VARIABLE});
    }

    @Test
    public void shouldCancelJobsOnTermination() {
        ENGINE.deployment().withXmlResource(this.process(this.miBuilder)).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION).create();
        this.completeJobs(processInstanceKey, 1);
        Record createdJob = (Record)((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withProcessInstanceKey(processInstanceKey).skip(1L)).getFirst();
        ENGINE.processInstance().withInstanceKey(processInstanceKey).cancel();
        ((ListAssert)Assertions.assertThat((Stream)RecordingExporter.jobRecords((JobIntent)JobIntent.CANCELED).withProcessInstanceKey(processInstanceKey).limit(1L)).hasSize(1)).extracting(Record::getKey).containsExactly((Object[])new Long[]{createdJob.getKey()});
    }

    @Test
    public void shouldTerminateInstancesOnTerminatingBody() {
        ENGINE.deployment().withXmlResource(this.process(this.miBuilder)).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION).create();
        int completedJobs = INPUT_COLLECTION.size() - 1;
        this.completeJobs(processInstanceKey, completedJobs);
        ((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withProcessInstanceKey(processInstanceKey).withElementId(ELEMENT_ID).withElementType(BpmnElementType.SERVICE_TASK).skip((long)completedJobs)).exists();
        ENGINE.processInstance().withInstanceKey(processInstanceKey).cancel();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceTerminated().withElementId(ELEMENT_ID)).extracting(r -> Assertions.tuple((Object[])new Object[]{((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r.getIntent()})).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.MULTI_INSTANCE_BODY, ProcessInstanceIntent.ELEMENT_TERMINATING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_TERMINATING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_TERMINATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.MULTI_INSTANCE_BODY, ProcessInstanceIntent.ELEMENT_TERMINATED})});
    }

    @Test
    public void shouldSkipIfCollectionIsEmpty() {
        ENGINE.deployment().withXmlResource(this.process(this.miBuilder)).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, Collections.emptyList()).create();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).withElementId(ELEMENT_ID).limit(6L)).extracting(r -> Assertions.tuple((Object[])new Object[]{((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r.getIntent()})).containsExactly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.MULTI_INSTANCE_BODY, ProcessInstanceIntent.ACTIVATE_ELEMENT}), Assertions.tuple((Object[])new Object[]{BpmnElementType.MULTI_INSTANCE_BODY, ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.MULTI_INSTANCE_BODY, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.MULTI_INSTANCE_BODY, ProcessInstanceIntent.COMPLETE_ELEMENT}), Assertions.tuple((Object[])new Object[]{BpmnElementType.MULTI_INSTANCE_BODY, ProcessInstanceIntent.ELEMENT_COMPLETING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.MULTI_INSTANCE_BODY, ProcessInstanceIntent.ELEMENT_COMPLETED})});
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().filterRootScope().limitToProcessInstanceCompleted()).extracting(Record::getIntent).contains((Object[])new Intent[]{ProcessInstanceIntent.ELEMENT_COMPLETED});
    }

    @Test
    public void shouldIgnoreInputElementVariableIfNotDefined() {
        ENGINE.deployment().withXmlResource(this.process(this.miBuilder.andThen(m -> ((MultiInstanceLoopCharacteristicsBuilder)m.zeebeInputCollectionExpression(INPUT_COLLECTION_EXPRESSION)).zeebeInputElement(null)))).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION).create();
        this.completeJobs(processInstanceKey, INPUT_COLLECTION.size());
        Assertions.assertThat((Stream)RecordingExporter.jobBatchRecords((JobBatchIntent)JobBatchIntent.ACTIVATED).withType(this.jobType).limit((long)INPUT_COLLECTION.size())).flatExtracting(r -> ((JobBatchRecordValue)r.getValue()).getJobs()).flatExtracting(j -> j.getVariables().keySet()).doesNotContain((Object[])new String[]{INPUT_ELEMENT_VARIABLE});
    }

    @Test
    public void shouldIterateOverNestedInputCollection() {
        ENGINE.deployment().withXmlResource(this.process(this.miBuilder.andThen(m -> m.zeebeInputCollectionExpression("nested.items")))).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("nested", Collections.singletonMap(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION)).create();
        ((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withProcessInstanceKey(processInstanceKey).limit((long)INPUT_COLLECTION.size())).exists();
        this.completeJobs(processInstanceKey, INPUT_COLLECTION.size());
        Assertions.assertThat((Stream)RecordingExporter.jobBatchRecords((JobBatchIntent)JobBatchIntent.ACTIVATED).withType(this.jobType).limit((long)INPUT_COLLECTION.size())).flatExtracting(r -> ((JobBatchRecordValue)r.getValue()).getJobs()).extracting(j -> j.getVariables().get(INPUT_ELEMENT_VARIABLE)).containsExactlyElementsOf(INPUT_COLLECTION);
    }

    @Test
    public void shouldCollectNestedOutputElements() {
        ENGINE.deployment().withXmlResource(this.process(this.miBuilder.andThen(m -> m.zeebeOutputElementExpression("result.nested")))).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION).create();
        this.completeJobs(processInstanceKey, INPUT_COLLECTION.size(), i -> Map.of("nested", OUTPUT_COLLECTION.get((int)i)));
        Assertions.assertThat((Stream)RecordingExporter.variableRecords((VariableIntent)VariableIntent.CREATED).withName(OUTPUT_ELEMENT_EXPRESSION).withValue("null").limit((long)INPUT_COLLECTION.size())).hasSize(INPUT_COLLECTION.size());
        io.camunda.zeebe.protocol.record.Assertions.assertThat((VariableRecordValue)((VariableRecordValue)((Record)RecordingExporter.variableRecords().withName(OUTPUT_COLLECTION_VARIABLE).withScopeKey(processInstanceKey).getFirst()).getValue())).hasValue(JsonUtil.toJson(OUTPUT_COLLECTION));
    }

    @Test
    public void shouldCollectOutputElementsFromExpression() {
        ENGINE.deployment().withXmlResource(this.process(this.miBuilder.andThen(m -> m.zeebeOutputElementExpression("number(string(loopCounter) + string(loopCounter))")))).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION).create();
        this.completeJobs(processInstanceKey, INPUT_COLLECTION.size());
        Assertions.assertThat(RecordingExporter.records().limitToProcessInstance(processInstanceKey).variableRecords().map(Record::getValue).map(VariableRecordValue::getName)).noneMatch("number(string(loopCounter) + string(loopCounter))"::equals);
        io.camunda.zeebe.protocol.record.Assertions.assertThat((VariableRecordValue)((VariableRecordValue)((Record)RecordingExporter.variableRecords().withName(OUTPUT_COLLECTION_VARIABLE).withScopeKey(processInstanceKey).getFirst()).getValue())).hasValue(JsonUtil.toJson(OUTPUT_COLLECTION));
    }

    @Test
    public void shouldSetOutputCollectionVariable() {
        ENGINE.deployment().withXmlResource(this.process(this.miBuilder)).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION).create();
        this.completeJobs(processInstanceKey, INPUT_COLLECTION.size());
        Record variableRecord = (Record)RecordingExporter.variableRecords().withName(OUTPUT_COLLECTION_VARIABLE).withScopeKey(processInstanceKey).getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((VariableRecordValue)((VariableRecordValue)variableRecord.getValue())).hasValue(JsonUtil.toJson(OUTPUT_COLLECTION));
    }

    @Test
    public void shouldCollectOutputInVariable() {
        ENGINE.deployment().withXmlResource(this.process(this.miBuilder)).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION).create();
        this.completeJobs(processInstanceKey, INPUT_COLLECTION.size());
        Record multiInstanceBody = (Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(processInstanceKey).withElementType(BpmnElementType.MULTI_INSTANCE_BODY).getFirst();
        Assertions.assertThat((Stream)RecordingExporter.variableRecords().withName(OUTPUT_COLLECTION_VARIABLE).withScopeKey(multiInstanceBody.getKey()).limit((long)(INPUT_COLLECTION.size() + 1))).extracting(r -> ((VariableRecordValue)r.getValue()).getValue()).contains((Object[])new String[]{"[null,null,null]", "[11,null,null]", "[11,22,null]", "[11,22,33]"});
    }

    @Test
    public void shouldSetOutputElementVariable() {
        ENGINE.deployment().withXmlResource(this.process(this.miBuilder)).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION).create();
        this.completeJobs(processInstanceKey, INPUT_COLLECTION.size());
        Assertions.assertThat((Stream)RecordingExporter.variableRecords((VariableIntent)VariableIntent.CREATED).withName(OUTPUT_ELEMENT_EXPRESSION).limit((long)INPUT_COLLECTION.size())).extracting(r -> ((VariableRecordValue)r.getValue()).getValue()).containsOnly((Object[])new String[]{"null"});
        Assertions.assertThat((Stream)RecordingExporter.variableRecords((VariableIntent)VariableIntent.UPDATED).withName(OUTPUT_ELEMENT_EXPRESSION).limit((long)INPUT_COLLECTION.size())).extracting(r -> ((VariableRecordValue)r.getValue()).getValue()).containsExactlyElementsOf((Iterable)OUTPUT_COLLECTION.stream().map(JsonUtil::toJson).collect(Collectors.toList()));
    }

    @Test
    public void shouldSetEmptyOutputCollectionIfSkip() {
        ENGINE.deployment().withXmlResource(this.process(this.miBuilder)).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, List.of()).create();
        Record variableRecord = (Record)RecordingExporter.variableRecords().withName(OUTPUT_COLLECTION_VARIABLE).withProcessInstanceKey(processInstanceKey).getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((VariableRecordValue)((VariableRecordValue)variableRecord.getValue())).hasValue("[]");
    }

    @Test
    public void shouldIgnoreOutputCollectionVariableIfNotDefined() {
        ENGINE.deployment().withXmlResource(this.process(this.miBuilder.andThen(m -> ((MultiInstanceLoopCharacteristicsBuilder)m.zeebeOutputCollection(null)).zeebeOutputElementExpression(null)))).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION).create();
        this.completeJobs(processInstanceKey, INPUT_COLLECTION.size());
        Assertions.assertThat((Stream)RecordingExporter.records().limitToProcessInstance(processInstanceKey).variableRecords().withProcessInstanceKey(processInstanceKey).withScopeKey(processInstanceKey)).extracting(r -> ((VariableRecordValue)r.getValue()).getName()).doesNotContain((Object[])new String[]{OUTPUT_COLLECTION_VARIABLE});
    }

    @Test
    public void shouldNotPropagateOutputElementVariable() {
        ENGINE.deployment().withXmlResource(this.process(this.miBuilder)).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION).create();
        this.completeJobs(processInstanceKey, INPUT_COLLECTION.size());
        Assertions.assertThat((Stream)RecordingExporter.records().limitToProcessInstance(processInstanceKey).variableRecords().withProcessInstanceKey(processInstanceKey).withScopeKey(processInstanceKey)).extracting(r -> ((VariableRecordValue)r.getValue()).getName()).doesNotContain((Object[])new String[]{OUTPUT_ELEMENT_EXPRESSION});
    }

    @Test
    public void shouldSetLoopCounterVariable() {
        ENGINE.deployment().withXmlResource(this.process(this.miBuilder)).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION).create();
        this.completeJobs(processInstanceKey, INPUT_COLLECTION.size());
        List elementInstanceKeys = ((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withProcessInstanceKey(processInstanceKey).withElementId(ELEMENT_ID).withElementType(BpmnElementType.SERVICE_TASK).limit(3L)).map(Record::getKey).collect(Collectors.toList());
        Assertions.assertThat((Stream)RecordingExporter.records().limitToProcessInstance(processInstanceKey).variableRecords().withProcessInstanceKey(processInstanceKey).withName("loopCounter")).extracting(Record::getValue).extracting(v -> Assertions.tuple((Object[])new Object[]{v.getScopeKey(), v.getValue()})).containsExactly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{elementInstanceKeys.get(0), "1"}), Assertions.tuple((Object[])new Object[]{elementInstanceKeys.get(1), "2"}), Assertions.tuple((Object[])new Object[]{elementInstanceKeys.get(2), "3"})});
    }

    @Test
    public void shouldApplyInputMapping() {
        ServiceTask task = (ServiceTask)this.process(this.miBuilder).getModelElementById(ELEMENT_ID);
        BpmnModelInstance process = ((ServiceTaskBuilder)((ServiceTaskBuilder)task.builder().zeebeInputExpression(INPUT_ELEMENT_VARIABLE, "x")).zeebeInputExpression("loopCounter", "y")).done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION).create();
        this.completeJobs(processInstanceKey, INPUT_COLLECTION.size());
        List elementInstanceKeys = ((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withProcessInstanceKey(processInstanceKey).withElementId(ELEMENT_ID).withElementType(BpmnElementType.SERVICE_TASK).limit(3L)).map(Record::getKey).collect(Collectors.toList());
        Assertions.assertThat((Stream)RecordingExporter.records().limitToProcessInstance(processInstanceKey).variableRecords().withProcessInstanceKey(processInstanceKey)).extracting(Record::getValue).extracting(v -> Assertions.tuple((Object[])new Object[]{v.getScopeKey(), v.getName(), v.getValue()})).contains((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{elementInstanceKeys.get(0), "x", JsonUtil.toJson((Object)INPUT_COLLECTION.get(0))}), Assertions.tuple((Object[])new Object[]{elementInstanceKeys.get(0), "y", "1"}), Assertions.tuple((Object[])new Object[]{elementInstanceKeys.get(1), "x", JsonUtil.toJson((Object)INPUT_COLLECTION.get(1))}), Assertions.tuple((Object[])new Object[]{elementInstanceKeys.get(1), "y", "2"}), Assertions.tuple((Object[])new Object[]{elementInstanceKeys.get(2), "x", JsonUtil.toJson((Object)INPUT_COLLECTION.get(2))}), Assertions.tuple((Object[])new Object[]{elementInstanceKeys.get(2), "y", "3"})});
    }

    @Test
    public void shouldApplyOutputMapping() {
        ServiceTask task = (ServiceTask)this.process(this.miBuilder).getModelElementById(ELEMENT_ID);
        BpmnModelInstance process = ((ServiceTaskBuilder)((ServiceTaskBuilder)task.builder().zeebeOutputExpression("loopCounter", OUTPUT_ELEMENT_EXPRESSION)).zeebeOutputExpression("loopCounter", "global")).done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION).create();
        this.completeJobs(processInstanceKey, INPUT_COLLECTION.size());
        io.camunda.zeebe.protocol.record.Assertions.assertThat((VariableRecordValue)((VariableRecordValue)((Record)RecordingExporter.variableRecords().withScopeKey(processInstanceKey).withName(OUTPUT_COLLECTION_VARIABLE).getFirst()).getValue())).hasValue("[1,2,3]");
        Assertions.assertThat((Stream)RecordingExporter.records().limitToProcessInstance(processInstanceKey).variableRecords().withProcessInstanceKey(processInstanceKey).withName("global")).extracting(Record::getValue).extracting(v -> Assertions.tuple((Object[])new Object[]{v.getScopeKey(), v.getValue()})).containsExactly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{processInstanceKey, "1"}), Assertions.tuple((Object[])new Object[]{processInstanceKey, "2"}), Assertions.tuple((Object[])new Object[]{processInstanceKey, "3"})});
    }

    @Test
    public void shouldTriggerInterruptingBoundaryEvent() {
        ServiceTask task = (ServiceTask)this.process(this.miBuilder).getModelElementById(ELEMENT_ID);
        BpmnModelInstance process = ((BoundaryEventBuilder)task.builder().boundaryEvent("boundary-event", b -> ((BoundaryEventBuilder)b.cancelActivity(Boolean.valueOf(true))).message(MESSAGE_BUILDER)).sequenceFlowId("to-canceled")).endEvent("canceled").done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariables(Map.of(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION, MESSAGE_CORRELATION_KEY_VARIABLE, MESSAGE_CORRELATION_KEY)).create();
        this.completeJobs(processInstanceKey, INPUT_COLLECTION.size() - 1);
        RecordingExporter.messageSubscriptionRecords((MessageSubscriptionIntent)MessageSubscriptionIntent.CREATED).withProcessInstanceKey(processInstanceKey).await();
        ENGINE.message().withName(MESSAGE_NAME).withCorrelationKey(MESSAGE_CORRELATION_KEY).withTimeToLive(0L).publish();
        Assertions.assertThat((Stream)RecordingExporter.messageSubscriptionRecords().withProcessInstanceKey(processInstanceKey).limit(5L)).extracting(Record::getIntent).containsExactly((Object[])new Intent[]{MessageSubscriptionIntent.CREATE, MessageSubscriptionIntent.CREATED, MessageSubscriptionIntent.CORRELATING, MessageSubscriptionIntent.CORRELATE, MessageSubscriptionIntent.CORRELATED});
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(r -> Assertions.tuple((Object[])new Object[]{((ProcessInstanceRecordValue)r.getValue()).getElementId(), ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r.getIntent()})).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{ELEMENT_ID, BpmnElementType.MULTI_INSTANCE_BODY, ProcessInstanceIntent.TERMINATE_ELEMENT}), Assertions.tuple((Object[])new Object[]{ELEMENT_ID, BpmnElementType.MULTI_INSTANCE_BODY, ProcessInstanceIntent.ELEMENT_TERMINATING}), Assertions.tuple((Object[])new Object[]{ELEMENT_ID, BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_TERMINATING}), Assertions.tuple((Object[])new Object[]{ELEMENT_ID, BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_TERMINATED}), Assertions.tuple((Object[])new Object[]{ELEMENT_ID, BpmnElementType.MULTI_INSTANCE_BODY, ProcessInstanceIntent.ELEMENT_TERMINATED}), Assertions.tuple((Object[])new Object[]{"to-canceled", BpmnElementType.SEQUENCE_FLOW, ProcessInstanceIntent.SEQUENCE_FLOW_TAKEN}), Assertions.tuple((Object[])new Object[]{"canceled", BpmnElementType.END_EVENT, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
        Assertions.assertThat((Stream)RecordingExporter.records().limitToProcessInstance(processInstanceKey).variableRecords().withScopeKey(processInstanceKey)).extracting(r -> ((VariableRecordValue)r.getValue()).getName()).doesNotContain((Object[])new String[]{OUTPUT_COLLECTION_VARIABLE});
    }

    @Test
    public void shouldTriggerNonInterruptingBoundaryEvent() {
        ServiceTask task = (ServiceTask)this.process(this.miBuilder).getModelElementById(ELEMENT_ID);
        BpmnModelInstance process = ((BoundaryEventBuilder)task.builder().boundaryEvent("boundary-event", b -> ((BoundaryEventBuilder)b.cancelActivity(Boolean.valueOf(false))).message(MESSAGE_BUILDER)).sequenceFlowId("to-notified")).endEvent("notified").done();
        ENGINE.deployment().withXmlResource(process).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariables(Map.of(INPUT_COLLECTION_EXPRESSION, INPUT_COLLECTION, MESSAGE_CORRELATION_KEY_VARIABLE, MESSAGE_CORRELATION_KEY)).create();
        this.completeJobs(processInstanceKey, INPUT_COLLECTION.size() - 1);
        RecordingExporter.messageSubscriptionRecords((MessageSubscriptionIntent)MessageSubscriptionIntent.CREATED).withProcessInstanceKey(processInstanceKey).await();
        ENGINE.message().withName(MESSAGE_NAME).withCorrelationKey(MESSAGE_CORRELATION_KEY).withTimeToLive(0L).publish();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limit(r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType() == BpmnElementType.END_EVENT)).extracting(r -> Assertions.tuple((Object[])new Object[]{((ProcessInstanceRecordValue)r.getValue()).getElementId(), ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r.getIntent()})).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"to-notified", BpmnElementType.SEQUENCE_FLOW, ProcessInstanceIntent.SEQUENCE_FLOW_TAKEN}), Assertions.tuple((Object[])new Object[]{"notified", BpmnElementType.END_EVENT, ProcessInstanceIntent.ACTIVATE_ELEMENT})});
        this.completeJobs(processInstanceKey, 1);
        Assertions.assertThat((Stream)RecordingExporter.messageSubscriptionRecords().withProcessInstanceKey(processInstanceKey).limit(7L)).extracting(Record::getIntent).containsExactly((Object[])new Intent[]{MessageSubscriptionIntent.CREATE, MessageSubscriptionIntent.CREATED, MessageSubscriptionIntent.CORRELATING, MessageSubscriptionIntent.CORRELATE, MessageSubscriptionIntent.CORRELATED, MessageSubscriptionIntent.DELETE, MessageSubscriptionIntent.DELETED});
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(r -> Assertions.tuple((Object[])new Object[]{((ProcessInstanceRecordValue)r.getValue()).getElementId(), ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), r.getIntent()})).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"notified", BpmnElementType.END_EVENT, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{ELEMENT_ID, BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{ELEMENT_ID, BpmnElementType.MULTI_INSTANCE_BODY, ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{PROCESS_ID, BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    private void completeJobs(long processInstanceKey, int count) {
        Function<Integer, Object> defaultResultProvider = OUTPUT_COLLECTION::get;
        this.completeJobs(processInstanceKey, count, defaultResultProvider);
    }

    private void completeJobs(long processInstanceKey, int count, Function<Integer, Object> resultProvider) {
        IntStream.range(0, count).forEach(i -> {
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withProcessInstanceKey(processInstanceKey).skip((long)i)).exists()).describedAs("Expected job %d/%d to be created", new Object[]{i + 1, count})).isTrue();
            JobBatchRecordValue jobBatch = (JobBatchRecordValue)ENGINE.jobs().withType(this.jobType).withMaxJobsToActivate(1).activate().getValue();
            jobBatch.getJobKeys().forEach(jobKey -> ENGINE.job().withKey((long)jobKey).withVariable(OUTPUT_ELEMENT_EXPRESSION, resultProvider.apply(i)).complete());
        });
    }
}

