/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.bpmn.multiinstance;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.builder.MultiInstanceLoopCharacteristicsBuilder;
import io.camunda.zeebe.model.bpmn.builder.ReceiveTaskBuilder;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.protocol.record.value.VariableRecordValue;
import io.camunda.zeebe.test.util.record.MessageSubscriptionRecordStream;
import io.camunda.zeebe.test.util.record.ProcessInstanceRecordStream;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.groups.Tuple;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

public final class MultiInstanceReceiveTaskTest {
    @ClassRule
    public static final EngineRule ENGINE = EngineRule.singlePartition();
    private static final String PROCESS_ID = "process";
    private static final String ELEMENT_ID = "task";
    private static final String MESSAGE_NAME = "test";
    private static final String INPUT_COLLECTION = "items";
    private static final String INPUT_ELEMENT = "item";
    private static final String OUTPUT_ELEMENT = "result";
    private static final String OUTPUT_COLLECTION = "results";
    private static final BpmnModelInstance PROCESS = Bpmn.createExecutableProcess((String)"process").startEvent().receiveTask("task", t -> ((ReceiveTaskBuilder)t.message(m -> m.name(MESSAGE_NAME).zeebeCorrelationKeyExpression(INPUT_ELEMENT))).multiInstance(b -> ((MultiInstanceLoopCharacteristicsBuilder)((MultiInstanceLoopCharacteristicsBuilder)((MultiInstanceLoopCharacteristicsBuilder)b.zeebeInputCollectionExpression(INPUT_COLLECTION)).zeebeInputElement(INPUT_ELEMENT)).zeebeOutputElementExpression(OUTPUT_ELEMENT)).zeebeOutputCollection(OUTPUT_COLLECTION))).endEvent().done();
    @Rule
    public final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();

    @Test
    public void shouldCreateOneMessageSubscriptionForEachElement() {
        ENGINE.deployment().withXmlResource(PROCESS).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION, Arrays.asList("a", "b", "c")).create();
        List elementInstanceKey = ((ProcessInstanceRecordStream)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withProcessInstanceKey(processInstanceKey).withElementId(ELEMENT_ID).skip(1L)).limit(3L)).map(Record::getKey).collect(Collectors.toList());
        ((ListAssert)Assertions.assertThat((Stream)RecordingExporter.messageSubscriptionRecords((MessageSubscriptionIntent)MessageSubscriptionIntent.CREATED).withProcessInstanceKey(processInstanceKey).limit(3L)).hasSize(3)).extracting(Record::getValue).extracting(r -> Tuple.tuple((Object[])new Object[]{r.getCorrelationKey(), r.getElementInstanceKey()})).containsExactly((Object[])new Tuple[]{Tuple.tuple((Object[])new Object[]{"a", elementInstanceKey.get(0)}), Tuple.tuple((Object[])new Object[]{"b", elementInstanceKey.get(1)}), Tuple.tuple((Object[])new Object[]{"c", elementInstanceKey.get(2)})});
    }

    @Test
    public void shouldCompleteBodyWhenAllMessagesAreCorrelated() {
        ENGINE.deployment().withXmlResource(PROCESS).deploy();
        List<String> inputCollection = Arrays.asList("a", "b", "c");
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION, inputCollection).create();
        inputCollection.forEach(element -> ENGINE.message().withName(MESSAGE_NAME).withCorrelationKey((String)element).withTimeToLive(Duration.ofSeconds(3L).toMillis()).publish());
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted().withElementId(ELEMENT_ID)).hasSize(4);
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().filterRootScope().limitToProcessInstanceCompleted()).extracting(Record::getIntent).contains((Object[])new Intent[]{ProcessInstanceIntent.ELEMENT_COMPLETED});
    }

    @Test
    public void shouldCloseMessageSubscriptionOnTermination() {
        ENGINE.deployment().withXmlResource(PROCESS).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION, Arrays.asList(10, 20, 30)).create();
        ((MessageSubscriptionRecordStream)RecordingExporter.messageSubscriptionRecords((MessageSubscriptionIntent)MessageSubscriptionIntent.CREATED).withProcessInstanceKey(processInstanceKey).limit(3L)).exists();
        ENGINE.processInstance().withInstanceKey(processInstanceKey).cancel();
        Assertions.assertThat((Stream)RecordingExporter.messageSubscriptionRecords((MessageSubscriptionIntent)MessageSubscriptionIntent.DELETED).withProcessInstanceKey(processInstanceKey).limit(3L)).hasSize(3);
    }

    @Test
    public void shouldCollectOutputFromChildInstance() {
        List<Integer> inputCollection = Arrays.asList(10, 20, 30);
        ENGINE.deployment().withXmlResource(PROCESS).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable(INPUT_COLLECTION, inputCollection).create();
        inputCollection.stream().map(Objects::toString).forEach(element -> ENGINE.message().withName(MESSAGE_NAME).withCorrelationKey((String)element).withTimeToLive(Duration.ofSeconds(3L).toMillis()).withVariables(Map.of(OUTPUT_ELEMENT, element)).publish());
        RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(processInstanceKey).withElementType(BpmnElementType.MULTI_INSTANCE_BODY).await();
        Assertions.assertThat((Stream)RecordingExporter.records().limitToProcessInstance(processInstanceKey).variableRecords().withName(OUTPUT_COLLECTION).withScopeKey(processInstanceKey)).extracting(r -> ((VariableRecordValue)r.getValue()).getValue()).containsExactly((Object[])new String[]{"[\"10\",\"20\",\"30\"]"});
    }
}

