/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.bpmn.multiinstance;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.engine.util.client.PublishMessageClient;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.builder.EventSubProcessBuilder;
import io.camunda.zeebe.model.bpmn.builder.MultiInstanceLoopCharacteristicsBuilder;
import io.camunda.zeebe.model.bpmn.builder.StartEventBuilder;
import io.camunda.zeebe.model.bpmn.builder.SubProcessBuilder;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.TimerIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.protocol.record.value.JobBatchRecordValue;
import io.camunda.zeebe.protocol.record.value.MessageSubscriptionRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import io.camunda.zeebe.protocol.record.value.TimerRecordValue;
import io.camunda.zeebe.test.util.record.JobRecordStream;
import io.camunda.zeebe.test.util.record.ProcessInstanceRecordStream;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.groups.Tuple;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

public final class MultiInstanceSubProcessTest {
    @ClassRule
    public static final EngineRule ENGINE = EngineRule.singlePartition();
    public static final String TASK_ELEMENT_ID = "task";
    private static final String PROCESS_ID = "process";
    private static final String SUB_PROCESS_ELEMENT_ID = "sub-process";
    private static final String JOB_TYPE = "test";
    private static final String INPUT_COLLECTION = "items";
    private static final String INPUT_ELEMENT = "item";
    private static final BpmnModelInstance EMPTY_SUB_PROCESS = MultiInstanceSubProcessTest.process(b -> b.sequenceFlowId("sub-process-to-end"));
    private static final BpmnModelInstance SERVICE_TASK_SUB_PROCESS = MultiInstanceSubProcessTest.process(b -> b.serviceTask(TASK_ELEMENT_ID, t -> t.zeebeJobType(JOB_TYPE)));
    @Rule
    public final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();

    private static BpmnModelInstance process(Consumer<StartEventBuilder> subProcessBuilder) {
        StartEventBuilder process = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().subProcess(SUB_PROCESS_ELEMENT_ID, s -> s.multiInstance(b -> ((MultiInstanceLoopCharacteristicsBuilder)b.zeebeInputCollectionExpression(INPUT_COLLECTION)).zeebeInputElement(INPUT_ELEMENT))).embeddedSubProcess().startEvent("sub-process-start");
        subProcessBuilder.accept(process);
        return process.endEvent("sub-process-end").done();
    }

    @Test
    public void shouldActivateStartEventForEachElement() {
        ENGINE.deployment().withXmlResource(EMPTY_SUB_PROCESS).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION, Arrays.asList(10, 20, 30)).create();
        List subProcessInstanceKey = ((ProcessInstanceRecordStream)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withProcessInstanceKey(processInstanceKey).withElementId(SUB_PROCESS_ELEMENT_ID).skip(1L)).limit(3L)).map(Record::getKey).collect(Collectors.toList());
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withElementId("sub-process-start").withProcessInstanceKey(processInstanceKey).limit(3L)).extracting(r -> ((ProcessInstanceRecordValue)r.getValue()).getFlowScopeKey()).containsExactly((Object[])new Long[]{(Long)subProcessInstanceKey.get(0), (Long)subProcessInstanceKey.get(1), (Long)subProcessInstanceKey.get(2)});
    }

    @Test
    public void shouldActivateAllElementsOfSubProcess() {
        ENGINE.deployment().withXmlResource(EMPTY_SUB_PROCESS).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION, Arrays.asList(10, 20, 30)).create();
        long subProcessInstanceKey = ((Record)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withElementId(SUB_PROCESS_ELEMENT_ID).skip(1L)).getFirst()).getKey();
        Assertions.assertThat((Stream)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords().onlyEvents()).withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted().withFlowScopeKey(subProcessInstanceKey)).extracting(r -> Assertions.tuple((Object[])new Object[]{((ProcessInstanceRecordValue)r.getValue()).getElementId(), r.getIntent()})).containsExactly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"sub-process-start", ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple((Object[])new Object[]{"sub-process-start", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"sub-process-start", ProcessInstanceIntent.ELEMENT_COMPLETING}), Assertions.tuple((Object[])new Object[]{"sub-process-start", ProcessInstanceIntent.ELEMENT_COMPLETED}), Assertions.tuple((Object[])new Object[]{"sub-process-to-end", ProcessInstanceIntent.SEQUENCE_FLOW_TAKEN}), Assertions.tuple((Object[])new Object[]{"sub-process-end", ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple((Object[])new Object[]{"sub-process-end", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"sub-process-end", ProcessInstanceIntent.ELEMENT_COMPLETING}), Assertions.tuple((Object[])new Object[]{"sub-process-end", ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldCancelSubProcessOnTermination() {
        ENGINE.deployment().withXmlResource(SERVICE_TASK_SUB_PROCESS).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION, Arrays.asList(10, 20, 30)).create();
        ((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withProcessInstanceKey(processInstanceKey).limit(3L)).await();
        ENGINE.processInstance().withInstanceKey(processInstanceKey).cancel();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_TERMINATED).withProcessInstanceKey(processInstanceKey).limitToProcessInstanceTerminated()).extracting(r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType()).containsExactly((Object[])new BpmnElementType[]{BpmnElementType.SERVICE_TASK, BpmnElementType.SUB_PROCESS, BpmnElementType.SERVICE_TASK, BpmnElementType.SUB_PROCESS, BpmnElementType.SERVICE_TASK, BpmnElementType.SUB_PROCESS, BpmnElementType.MULTI_INSTANCE_BODY, BpmnElementType.PROCESS});
    }

    @Test
    public void shouldCreateJobForEachSubProcess() {
        ENGINE.deployment().withXmlResource(SERVICE_TASK_SUB_PROCESS).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION, Arrays.asList(10, 20, 30)).create();
        Assertions.assertThat((Stream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withProcessInstanceKey(processInstanceKey).limit(3L)).hasSize(3);
        JobBatchRecordValue jobActivation = (JobBatchRecordValue)ENGINE.jobs().withType(JOB_TYPE).activate().getValue();
        jobActivation.getJobKeys().forEach(jobKey -> ENGINE.job().withKey((long)jobKey).complete());
        Assertions.assertThat((List)jobActivation.getJobs()).extracting(j -> j.getVariables().get(INPUT_ELEMENT)).containsExactly(new Object[]{10, 20, 30});
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).filterRootScope().limitToProcessInstanceCompleted()).extracting(Record::getIntent).containsExactly((Object[])new Intent[]{ProcessInstanceIntent.ELEMENT_COMPLETED});
    }

    @Test
    public void shouldCreateMessageSubscriptionForEachSubProcess() {
        BpmnModelInstance process = MultiInstanceSubProcessTest.process(b -> b.intermediateCatchEvent().message(m -> m.name("message").zeebeCorrelationKeyExpression(INPUT_ELEMENT)));
        ENGINE.deployment().withXmlResource(process).deploy();
        List<String> inputCollection = Arrays.asList("a", "b", "c");
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION, inputCollection).create();
        ((ListAssert)Assertions.assertThat((Stream)RecordingExporter.messageSubscriptionRecords((MessageSubscriptionIntent)MessageSubscriptionIntent.CREATED).withProcessInstanceKey(processInstanceKey).limit(3L)).hasSize(3)).extracting(r -> ((MessageSubscriptionRecordValue)r.getValue()).getCorrelationKey()).containsExactly((Object[])new String[]{"a", "b", "c"});
        inputCollection.forEach(element -> ENGINE.message().withName("message").withCorrelationKey((String)element).publish());
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).filterRootScope().limitToProcessInstanceCompleted()).extracting(Record::getIntent).containsExactly((Object[])new Intent[]{ProcessInstanceIntent.ELEMENT_COMPLETED});
    }

    @Test
    public void shouldCreateTimerForEachSubProcess() {
        BpmnModelInstance process = MultiInstanceSubProcessTest.process(b -> b.intermediateCatchEvent("timer").timerWithDuration("PT1S"));
        ENGINE.deployment().withXmlResource(process).deploy();
        List<String> inputCollection = Arrays.asList("a", "b", "c");
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION, inputCollection).create();
        ((ListAssert)Assertions.assertThat((Stream)RecordingExporter.timerRecords((TimerIntent)TimerIntent.CREATED).withProcessInstanceKey(processInstanceKey).limit(3L)).hasSize(3)).extracting(r -> ((TimerRecordValue)r.getValue()).getTargetElementId()).containsOnly((Object[])new String[]{"timer"});
        ENGINE.getClock().addTime(Duration.ofSeconds(1L));
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).filterRootScope().limitToProcessInstanceCompleted()).extracting(Record::getIntent).containsExactly((Object[])new Intent[]{ProcessInstanceIntent.ELEMENT_COMPLETED});
    }

    @Test
    public void shouldCorrelateMessagesToEventSubProcessForEachSubProcess() {
        Consumer<EventSubProcessBuilder> eventSubprocess = s -> ((StartEventBuilder)s.startEvent().message(m -> m.name("msg").zeebeCorrelationKeyExpression("item.id"))).endEvent();
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().subProcess("subprocess", s -> ((SubProcessBuilder)s.multiInstance(m -> ((MultiInstanceLoopCharacteristicsBuilder)((MultiInstanceLoopCharacteristicsBuilder)m.parallel()).zeebeInputCollectionExpression(INPUT_COLLECTION)).zeebeInputElement(INPUT_ELEMENT))).embeddedSubProcess().eventSubProcess("msg-subprocess", eventSubprocess).startEvent().userTask(TASK_ELEMENT_ID).endEvent()).endEvent().done()).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION, Arrays.asList(Map.of("id", 1), Map.of("id", 2), Map.of("id", 3))).create();
        Stream.of("1", "2", "3").map(correlationKey -> ENGINE.message().withName("msg").withCorrelationKey((String)correlationKey)).forEach(PublishMessageClient::publish);
        ((ListAssert)Assertions.assertThat((Stream)RecordingExporter.messageSubscriptionRecords((MessageSubscriptionIntent)MessageSubscriptionIntent.CORRELATED).withProcessInstanceKey(processInstanceKey).limit(3L)).describedAs("Expect that each message is correlated", new Object[0])).hasSize(3);
        ((ListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(processInstanceKey).withElementType(BpmnElementType.PROCESS).limitToProcessInstanceCompleted()).describedAs("Expect that each sub process is interrupted so process could complete", new Object[0])).hasSize(1);
    }
}

