/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.bpmn.multiinstance;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.builder.CallActivityBuilder;
import io.camunda.zeebe.model.bpmn.builder.MultiInstanceLoopCharacteristicsBuilder;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.protocol.record.value.JobBatchRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import io.camunda.zeebe.protocol.record.value.VariableRecordValue;
import io.camunda.zeebe.test.util.BrokerClassRuleHelper;
import io.camunda.zeebe.test.util.JsonUtil;
import io.camunda.zeebe.test.util.record.JobRecordStream;
import io.camunda.zeebe.test.util.record.ProcessInstanceRecordStream;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

public final class MultiInstanceCallActivityTest {
    @ClassRule
    public static final EngineRule ENGINE = EngineRule.singlePartition();
    private static final String PROCESS_ID_PARENT = "wf-parent";
    private static final String PROCESS_ID_CHILD = "wf-child";
    private static final String CALL_ACTIVITY_ID = "call";
    private static final String INPUT_COLLECTION_VARIABLE = "items";
    private static final List<Integer> INPUT_COLLECTION = List.of(Integer.valueOf(10), Integer.valueOf(20), Integer.valueOf(30));
    @Rule
    public final BrokerClassRuleHelper helper = new BrokerClassRuleHelper();
    private String jobType;

    private static BpmnModelInstance parentProcessWithCallActivity(Consumer<CallActivityBuilder> callActivityBuilder) {
        CallActivityBuilder process = Bpmn.createExecutableProcess((String)PROCESS_ID_PARENT).startEvent().callActivity(CALL_ACTIVITY_ID, c -> c.zeebeProcessId(PROCESS_ID_CHILD));
        callActivityBuilder.accept(process);
        return process.endEvent().done();
    }

    @Before
    public void init() {
        this.jobType = this.helper.getJobType();
        BpmnModelInstance childProcess = Bpmn.createExecutableProcess((String)PROCESS_ID_CHILD).startEvent().serviceTask("task", t -> t.zeebeJobType(this.jobType)).endEvent().done();
        BpmnModelInstance parentProcess = MultiInstanceCallActivityTest.parentProcessWithCallActivity(callActivity -> callActivity.multiInstance(b -> b.zeebeInputCollectionExpression(INPUT_COLLECTION_VARIABLE)));
        ENGINE.deployment().withXmlResource("wf-parent.bpmn", parentProcess).withXmlResource("wf-child.bpmn", childProcess).deploy();
    }

    @Test
    public void shouldCreateChildInstanceForEachElement() {
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID_PARENT).withVariable(INPUT_COLLECTION_VARIABLE, INPUT_COLLECTION).create();
        List callActivityInstanceKey = ((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withProcessInstanceKey(processInstanceKey).withElementType(BpmnElementType.CALL_ACTIVITY).limit((long)INPUT_COLLECTION.size())).map(Record::getKey).collect(Collectors.toList());
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withParentProcessInstanceKey(processInstanceKey).withBpmnProcessId(PROCESS_ID_CHILD).filterRootScope().limit((long)INPUT_COLLECTION.size())).extracting(r -> ((ProcessInstanceRecordValue)r.getValue()).getParentElementInstanceKey()).containsExactly((Object[])new Long[]{(Long)callActivityInstanceKey.get(0), (Long)callActivityInstanceKey.get(1), (Long)callActivityInstanceKey.get(2)});
    }

    @Test
    public void shouldCompleteBodyWhenAllChildInstancesAreCompleted() {
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID_PARENT).withVariable(INPUT_COLLECTION_VARIABLE, INPUT_COLLECTION).create();
        this.awaitJobsCreated(INPUT_COLLECTION.size());
        ((JobBatchRecordValue)ENGINE.jobs().withType(this.jobType).activate().getValue()).getJobKeys().forEach(jobKey -> ENGINE.job().withKey((long)jobKey).complete());
        ((AbstractLongAssert)Assertions.assertThat((long)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).withParentProcessInstanceKey(processInstanceKey).filterRootScope().limit((long)INPUT_COLLECTION.size())).count()).describedAs("Expected child process instances to be completed", new Object[0])).isEqualTo((long)INPUT_COLLECTION.size());
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted()).extracting(r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType()).containsSequence((Object[])new BpmnElementType[]{BpmnElementType.CALL_ACTIVITY, BpmnElementType.CALL_ACTIVITY, BpmnElementType.CALL_ACTIVITY, BpmnElementType.MULTI_INSTANCE_BODY, BpmnElementType.END_EVENT, BpmnElementType.PROCESS});
    }

    @Test
    public void shouldCancelChildInstancesOnTermination() {
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID_PARENT).withVariable(INPUT_COLLECTION_VARIABLE, INPUT_COLLECTION).create();
        this.awaitJobsCreated(INPUT_COLLECTION.size());
        ENGINE.processInstance().withInstanceKey(processInstanceKey).cancel();
        ((AbstractLongAssert)Assertions.assertThat((long)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_TERMINATED).withParentProcessInstanceKey(processInstanceKey).filterRootScope().limit((long)INPUT_COLLECTION.size())).count()).describedAs("Expected child process instances to be terminated", new Object[0])).isEqualTo((long)INPUT_COLLECTION.size());
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_TERMINATED).withProcessInstanceKey(processInstanceKey).limitToProcessInstanceTerminated()).extracting(r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType()).containsExactly((Object[])new BpmnElementType[]{BpmnElementType.CALL_ACTIVITY, BpmnElementType.CALL_ACTIVITY, BpmnElementType.CALL_ACTIVITY, BpmnElementType.MULTI_INSTANCE_BODY, BpmnElementType.PROCESS});
    }

    @Test
    public void shouldCollectOutputFromChildInstance() {
        BpmnModelInstance parentProcess = MultiInstanceCallActivityTest.parentProcessWithCallActivity(callActivity -> ((CallActivityBuilder)callActivity.zeebeOutputExpression("x", "result")).multiInstance(b -> ((MultiInstanceLoopCharacteristicsBuilder)((MultiInstanceLoopCharacteristicsBuilder)b.zeebeInputCollectionExpression(INPUT_COLLECTION_VARIABLE)).zeebeOutputElementExpression("result")).zeebeOutputCollection("results")));
        ENGINE.deployment().withXmlResource("wf-parent.bpmn", parentProcess).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID_PARENT).withVariable(INPUT_COLLECTION_VARIABLE, INPUT_COLLECTION).create();
        this.awaitJobsCreated(INPUT_COLLECTION.size());
        AtomicInteger jobCounter = new AtomicInteger();
        ((JobBatchRecordValue)ENGINE.jobs().withType(this.jobType).activate().getValue()).getJobKeys().forEach(jobKey -> ENGINE.job().withKey((long)jobKey).withVariable("x", jobCounter.incrementAndGet()).complete());
        RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(processInstanceKey).withElementType(BpmnElementType.MULTI_INSTANCE_BODY).await();
        String expectedOutputCollection = JsonUtil.toJson(IntStream.rangeClosed(1, INPUT_COLLECTION.size()).boxed().collect(Collectors.toList()));
        Assertions.assertThat((Stream)RecordingExporter.records().betweenProcessInstance(processInstanceKey).variableRecords().withName("results").withScopeKey(processInstanceKey)).extracting(r -> ((VariableRecordValue)r.getValue()).getValue()).containsExactly((Object[])new String[]{expectedOutputCollection});
    }

    private void awaitJobsCreated(int size) {
        ((AbstractLongAssert)Assertions.assertThat((long)((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).withType(this.jobType).limit((long)size)).count()).describedAs("Expected %d jobs to be created", new Object[]{size})).isEqualTo((long)size);
    }
}

