/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.engine.util.client.PublishMessageClient;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.builder.BoundaryEventBuilder;
import io.camunda.zeebe.model.bpmn.builder.IntermediateCatchEventBuilder;
import io.camunda.zeebe.model.bpmn.builder.ReceiveTaskBuilder;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessMessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.protocol.record.value.MessageRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessMessageSubscriptionRecordValue;
import io.camunda.zeebe.protocol.record.value.VariableRecordValue;
import io.camunda.zeebe.test.util.MsgPackUtil;
import io.camunda.zeebe.test.util.record.ProcessInstanceRecordStream;
import io.camunda.zeebe.test.util.record.ProcessInstances;
import io.camunda.zeebe.test.util.record.ProcessMessageSubscriptionRecordStream;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.assertj.core.api.AbstractListAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.groups.Tuple;
import org.junit.Rule;
import org.junit.Test;

public final class MessageCorrelationTest {
    private static final String PROCESS_ID = "process";
    private static final BpmnModelInstance RECEIVE_TASK_PROCESS = ((ReceiveTaskBuilder)Bpmn.createExecutableProcess((String)"process").startEvent().receiveTask("receive-message").message(m -> m.name("message").zeebeCorrelationKeyExpression("key"))).endEvent().done();
    private static final BpmnModelInstance SINGLE_MESSAGE_PROCESS = ((IntermediateCatchEventBuilder)Bpmn.createExecutableProcess((String)"process").startEvent().intermediateCatchEvent("receive-message").message(m -> m.name("message").zeebeCorrelationKeyExpression("key"))).endEvent().done();
    private static final BpmnModelInstance SINGLE_MESSAGE_PROCESS_WITH_FEEL_EXPRESSION_MESSAGE_NAME = ((IntermediateCatchEventBuilder)Bpmn.createExecutableProcess((String)"process").startEvent().intermediateCatchEvent("receive-message").message(m -> m.nameExpression("\"message\"").zeebeCorrelationKeyExpression("key"))).endEvent().done();
    private static final BpmnModelInstance TWO_MESSAGES_PROCESS = ((IntermediateCatchEventBuilder)((IntermediateCatchEventBuilder)Bpmn.createExecutableProcess((String)"process").startEvent().intermediateCatchEvent("message1").message(m -> m.name("ping").zeebeCorrelationKeyExpression("key"))).intermediateCatchEvent("message2").message(m -> m.name("ping").zeebeCorrelationKeyExpression("key"))).done();
    private static final BpmnModelInstance BOUNDARY_EVENTS_PROCESS = ((BoundaryEventBuilder)((BoundaryEventBuilder)((ReceiveTaskBuilder)Bpmn.createExecutableProcess((String)"process").startEvent().receiveTask("task").message(m -> m.name("taskMsg").zeebeCorrelationKeyExpression("key"))).boundaryEvent("msg1").message(m -> m.name("msg1").zeebeCorrelationKeyExpression("key"))).endEvent("msg1End").moveToActivity("task").boundaryEvent("msg2").message(m -> m.name("msg2").zeebeCorrelationKeyExpression("key"))).endEvent("msg2End").moveToActivity("task").endEvent("taskEnd").done();
    @Rule
    public final EngineRule engine = EngineRule.singlePartition();

    @Test
    public void shouldCorrelateMessageIfEnteredBefore() {
        String messageId = UUID.randomUUID().toString();
        this.engine.deployment().withXmlResource(SINGLE_MESSAGE_PROCESS).deploy();
        long processInstanceKey = this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "order-123").create();
        Assertions.assertThat((boolean)RecordingExporter.messageSubscriptionRecords((MessageSubscriptionIntent)MessageSubscriptionIntent.CREATED).exists()).isTrue();
        this.engine.message().withName("message").withCorrelationKey("order-123").withTimeToLive(1000L).withVariables(MsgPackUtil.asMsgPack((String)"foo", (Object)"bar")).withId(messageId).publish();
        Record event = (Record)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords().withElementId("receive-message").withIntent((Intent)ProcessInstanceIntent.ELEMENT_COMPLETED)).getFirst();
        Map variables = ProcessInstances.getCurrentVariables((long)processInstanceKey, (long)event.getPosition());
        Assertions.assertThat((Map)variables).containsOnly(new Map.Entry[]{Assertions.entry((Object)"key", (Object)"\"order-123\""), Assertions.entry((Object)"foo", (Object)"\"bar\"")});
    }

    @Test
    public void shouldCorrelateMessageIfPublishedBefore() {
        this.engine.deployment().withXmlResource(SINGLE_MESSAGE_PROCESS).deploy();
        this.engine.message().withName("message").withCorrelationKey("order-123").withVariables(MsgPackUtil.asMsgPack((String)"foo", (Object)"bar")).publish();
        long processInstanceKey = this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "order-123").create();
        Record event = (Record)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords().withElementId("receive-message").withIntent((Intent)ProcessInstanceIntent.ELEMENT_COMPLETED)).getFirst();
        Map variables = ProcessInstances.getCurrentVariables((long)processInstanceKey, (long)event.getPosition());
        Assertions.assertThat((Map)variables).containsOnly(new Map.Entry[]{Assertions.entry((Object)"key", (Object)"\"order-123\""), Assertions.entry((Object)"foo", (Object)"\"bar\"")});
    }

    @Test
    public void shouldCorrelateMessageToMessageWithFeelExpressionNameIfPublishedBefore() {
        this.engine.deployment().withXmlResource(SINGLE_MESSAGE_PROCESS_WITH_FEEL_EXPRESSION_MESSAGE_NAME).deploy();
        this.engine.message().withName("message").withCorrelationKey("order-123").withVariables(MsgPackUtil.asMsgPack((String)"foo", (Object)"bar")).publish();
        long processInstanceKey = this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "order-123").create();
        Record event = (Record)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords().withElementId("receive-message").withIntent((Intent)ProcessInstanceIntent.ELEMENT_COMPLETED)).getFirst();
        Map variables = ProcessInstances.getCurrentVariables((long)processInstanceKey, (long)event.getPosition());
        Assertions.assertThat((Map)variables).containsOnly(new Map.Entry[]{Assertions.entry((Object)"key", (Object)"\"order-123\""), Assertions.entry((Object)"foo", (Object)"\"bar\"")});
    }

    @Test
    public void shouldCorrelateMessageIfCorrelationKeyIsANumber() {
        this.engine.deployment().withXmlResource(SINGLE_MESSAGE_PROCESS).deploy();
        this.engine.message().withName("message").withCorrelationKey("123").withVariables(MsgPackUtil.asMsgPack((String)"foo", (Object)"bar")).publish();
        long processInstanceKey = this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", 123).create();
        Record event = (Record)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords().withElementType(BpmnElementType.PROCESS).withIntent((Intent)ProcessInstanceIntent.ELEMENT_COMPLETED)).getFirst();
        Map variables = ProcessInstances.getCurrentVariables((long)processInstanceKey, (long)event.getPosition());
        Assertions.assertThat((Map)variables).containsOnly(new Map.Entry[]{Assertions.entry((Object)"key", (Object)"123"), Assertions.entry((Object)"foo", (Object)"\"bar\"")});
    }

    @Test
    public void shouldCorrelateFirstPublishedMessage() {
        this.engine.deployment().withXmlResource(SINGLE_MESSAGE_PROCESS).deploy();
        PublishMessageClient messageClient = this.engine.message().withName("message").withCorrelationKey("order-123");
        messageClient.withVariables(MsgPackUtil.asMsgPack((String)"nr", (Object)1)).publish();
        messageClient.withVariables(MsgPackUtil.asMsgPack((String)"nr", (Object)2)).publish();
        long processInstanceKey = this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "order-123").create();
        Record event = (Record)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords().withElementType(BpmnElementType.PROCESS).withIntent((Intent)ProcessInstanceIntent.ELEMENT_COMPLETED)).getFirst();
        Map variables = ProcessInstances.getCurrentVariables((long)processInstanceKey, (long)event.getPosition());
        Assertions.assertThat((Map)variables).containsOnly(new Map.Entry[]{Assertions.entry((Object)"key", (Object)"\"order-123\""), Assertions.entry((Object)"nr", (Object)"1")});
    }

    @Test
    public void shouldCorrelateMessageWithZeroTTL() {
        this.engine.deployment().withXmlResource(SINGLE_MESSAGE_PROCESS).deploy();
        long processInstanceKey = this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "order-123").create();
        Assertions.assertThat((boolean)RecordingExporter.messageSubscriptionRecords((MessageSubscriptionIntent)MessageSubscriptionIntent.CREATED).exists()).isTrue();
        this.engine.message().withName("message").withCorrelationKey("order-123").withVariables(MsgPackUtil.asMsgPack((String)"foo", (Object)"bar")).withTimeToLive(0L).publish();
        Record event = (Record)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords().withElementId("receive-message").withIntent((Intent)ProcessInstanceIntent.ELEMENT_COMPLETED)).getFirst();
        Assertions.assertThat((long)((ProcessInstanceRecordValue)event.getValue()).getProcessInstanceKey()).isEqualTo(processInstanceKey);
    }

    @Test
    public void shouldCorrelateMessageByCorrelationKey() {
        this.engine.deployment().withXmlResource(SINGLE_MESSAGE_PROCESS).deploy();
        long processInstanceKey1 = this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "order-123").create();
        long processInstanceKey2 = this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "order-456").create();
        this.engine.message().withName("message").withCorrelationKey("order-123").withVariables(MsgPackUtil.asMsgPack((String)"nr", (Object)1)).publish();
        this.engine.message().withName("message").withCorrelationKey("order-456").withVariables(MsgPackUtil.asMsgPack((String)"nr", (Object)2)).publish();
        Record catchEvent1Completed = (Record)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey1).withElementType(BpmnElementType.INTERMEDIATE_CATCH_EVENT).withIntent((Intent)ProcessInstanceIntent.ELEMENT_COMPLETED)).getFirst();
        Map variables1 = ProcessInstances.getCurrentVariables((long)processInstanceKey1, (long)catchEvent1Completed.getPosition());
        Assertions.assertThat((Map)variables1).containsOnly(new Map.Entry[]{Assertions.entry((Object)"key", (Object)"\"order-123\""), Assertions.entry((Object)"nr", (Object)"1")});
        Record catchEvent2Completed = (Record)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey2).withElementType(BpmnElementType.INTERMEDIATE_CATCH_EVENT).withIntent((Intent)ProcessInstanceIntent.ELEMENT_COMPLETED)).getFirst();
        Map variables2 = ProcessInstances.getCurrentVariables((long)processInstanceKey2, (long)catchEvent2Completed.getPosition());
        Assertions.assertThat((Map)variables2).containsOnly(new Map.Entry[]{Assertions.entry((Object)"key", (Object)"\"order-456\""), Assertions.entry((Object)"nr", (Object)"2")});
    }

    @Test
    public void shouldCorrelateMessageToDifferentProcesses() {
        this.engine.deployment().withXmlResource("wf-1.bpmn", SINGLE_MESSAGE_PROCESS).withXmlResource("wf-2.bpmn", Bpmn.createExecutableProcess((String)"process-2").startEvent().intermediateCatchEvent("catch", c -> c.message(m -> m.name("message").zeebeCorrelationKeyExpression("key"))).endEvent().done()).deploy();
        long processInstanceKey1 = this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "order-123").create();
        long processInstanceKey2 = this.engine.processInstance().ofBpmnProcessId("process-2").withVariable("key", "order-123").create();
        Record<MessageRecordValue> message = this.engine.message().withName("message").withCorrelationKey("order-123").publish();
        Assertions.assertThat((Stream)RecordingExporter.processMessageSubscriptionRecords((ProcessMessageSubscriptionIntent)ProcessMessageSubscriptionIntent.CORRELATED).limit(2L)).extracting(Record::getValue).extracting(v -> Assertions.tuple((Object[])new Object[]{v.getMessageKey(), v.getProcessInstanceKey()})).contains((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{message.getKey(), processInstanceKey1}), Assertions.tuple((Object[])new Object[]{message.getKey(), processInstanceKey2})});
    }

    @Test
    public void shouldCorrelateMessageOnlyOncePerProcess() {
        this.engine.deployment().withXmlResource(SINGLE_MESSAGE_PROCESS).deploy();
        long processInstanceKey1 = this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "order-123").create();
        long processInstanceKey2 = this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "order-123").create();
        Record<MessageRecordValue> message1 = this.engine.message().withName("message").withCorrelationKey("order-123").publish();
        Record<MessageRecordValue> message2 = this.engine.message().withName("message").withCorrelationKey("order-123").publish();
        Assertions.assertThat((Stream)RecordingExporter.processMessageSubscriptionRecords((ProcessMessageSubscriptionIntent)ProcessMessageSubscriptionIntent.CORRELATED).limit(2L)).extracting(Record::getValue).extracting(v -> Assertions.tuple((Object[])new Object[]{v.getMessageKey(), v.getProcessInstanceKey()})).contains((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{message1.getKey(), processInstanceKey1}), Assertions.tuple((Object[])new Object[]{message2.getKey(), processInstanceKey2})});
    }

    @Test
    public void shouldCorrelateMessageOnlyOncePerProcessAcrossVersions() {
        this.engine.deployment().withXmlResource(SINGLE_MESSAGE_PROCESS).deploy();
        long processInstanceKey1 = this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "order-123").create();
        this.engine.deployment().withXmlResource("wf_v2.bpmn", Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().intermediateCatchEvent("catch", c -> c.message(m -> m.name("message").zeebeCorrelationKeyExpression("key"))).endEvent().done()).deploy();
        long processInstanceKey2 = this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "order-123").create();
        Record<MessageRecordValue> message1 = this.engine.message().withName("message").withCorrelationKey("order-123").publish();
        Record<MessageRecordValue> message2 = this.engine.message().withName("message").withCorrelationKey("order-123").publish();
        Assertions.assertThat((Stream)RecordingExporter.processMessageSubscriptionRecords((ProcessMessageSubscriptionIntent)ProcessMessageSubscriptionIntent.CORRELATED).limit(2L)).extracting(Record::getValue).extracting(v -> Assertions.tuple((Object[])new Object[]{v.getMessageKey(), v.getProcessInstanceKey()})).contains((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{message1.getKey(), processInstanceKey1}), Assertions.tuple((Object[])new Object[]{message2.getKey(), processInstanceKey2})});
    }

    @Test
    public void shouldCorrelateMessageOnlyOnceIfPublishedBefore() {
        this.engine.deployment().withXmlResource(TWO_MESSAGES_PROCESS).deploy();
        PublishMessageClient messageClient = this.engine.message().withName("ping").withCorrelationKey("123");
        messageClient.withVariables(MsgPackUtil.asMsgPack((String)"nr", (Object)1)).publish();
        messageClient.withVariables(MsgPackUtil.asMsgPack((String)"nr", (Object)2)).publish();
        this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "123").create();
        List correlatedValues = ((ProcessInstanceRecordStream)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).filter(r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId().startsWith("message"))).limit(2L)).map(event -> Assertions.tuple((Object[])new Object[]{((ProcessInstanceRecordValue)event.getValue()).getElementId(), ProcessInstances.getCurrentVariables((long)((ProcessInstanceRecordValue)event.getValue()).getProcessInstanceKey(), (long)event.getPosition()).get("nr")})).collect(Collectors.toList());
        Assertions.assertThat(correlatedValues).contains(new Object[]{Assertions.tuple((Object[])new Object[]{"message1", "1"}), Assertions.tuple((Object[])new Object[]{"message2", "2"})});
    }

    @Test
    public void shouldCorrelateMessageOnlyOnceIfEnteredBefore() {
        this.engine.deployment().withXmlResource(TWO_MESSAGES_PROCESS).deploy();
        this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "123").create();
        Assertions.assertThat((boolean)RecordingExporter.processMessageSubscriptionRecords((ProcessMessageSubscriptionIntent)ProcessMessageSubscriptionIntent.CREATED).exists()).isTrue();
        PublishMessageClient messageClient = this.engine.message().withName("ping").withCorrelationKey("123");
        messageClient.withVariables(MsgPackUtil.asMsgPack((String)"nr", (Object)1)).publish();
        Assertions.assertThat((long)((ProcessMessageSubscriptionRecordStream)RecordingExporter.processMessageSubscriptionRecords((ProcessMessageSubscriptionIntent)ProcessMessageSubscriptionIntent.CREATED).limit(2L)).count()).isEqualTo(2L);
        messageClient.withVariables(MsgPackUtil.asMsgPack((String)"nr", (Object)2)).publish();
        List correlatedValues = ((ProcessInstanceRecordStream)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).filter(r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId().startsWith("message"))).limit(2L)).map(event -> Assertions.tuple((Object[])new Object[]{((ProcessInstanceRecordValue)event.getValue()).getElementId(), ProcessInstances.getCurrentVariables((long)((ProcessInstanceRecordValue)event.getValue()).getProcessInstanceKey(), (long)event.getPosition()).get("nr")})).collect(Collectors.toList());
        Assertions.assertThat(correlatedValues).contains(new Object[]{Assertions.tuple((Object[])new Object[]{"message1", "1"}), Assertions.tuple((Object[])new Object[]{"message2", "2"})});
    }

    @Test
    public void shouldCorrelateMessageOnlyOnceToInstance() {
        this.engine.deployment().withXmlResource(((IntermediateCatchEventBuilder)((IntermediateCatchEventBuilder)Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().parallelGateway().intermediateCatchEvent("message1").message(m -> m.name("ping").zeebeCorrelationKeyExpression("key"))).moveToLastGateway().intermediateCatchEvent("message2").message(m -> m.name("ping").zeebeCorrelationKeyExpression("key"))).done()).deploy();
        this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "123").create();
        Assertions.assertThat((long)((ProcessMessageSubscriptionRecordStream)RecordingExporter.processMessageSubscriptionRecords((ProcessMessageSubscriptionIntent)ProcessMessageSubscriptionIntent.CREATED).limit(2L)).count()).isEqualTo(2L);
        PublishMessageClient client = this.engine.message().withName("ping").withCorrelationKey("123");
        client.withVariables(MsgPackUtil.asMsgPack((String)"nr", (Object)1)).publish();
        client.withVariables(MsgPackUtil.asMsgPack((String)"nr", (Object)2)).publish();
        List correlatedValues = ((ProcessInstanceRecordStream)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).filter(r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId().startsWith("message"))).limit(2L)).map(event -> (String)ProcessInstances.getCurrentVariables((long)((ProcessInstanceRecordValue)event.getValue()).getProcessInstanceKey(), (long)event.getPosition()).get("nr")).collect(Collectors.toList());
        Assertions.assertThat(correlatedValues).contains(new Object[]{"1", "2"});
    }

    @Test
    public void shouldCorrelateOnlyOneMessagePerCatchElement() {
        this.engine.deployment().withXmlResource(TWO_MESSAGES_PROCESS).deploy();
        this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "123").create();
        Assertions.assertThat((boolean)RecordingExporter.processMessageSubscriptionRecords((ProcessMessageSubscriptionIntent)ProcessMessageSubscriptionIntent.CREATED).exists()).isTrue();
        PublishMessageClient messageClient = this.engine.message().withName("ping").withCorrelationKey("123");
        messageClient.withVariables(MsgPackUtil.asMsgPack((String)"nr", (Object)1)).publish();
        messageClient.withVariables(MsgPackUtil.asMsgPack((String)"nr", (Object)2)).publish();
        List correlatedValues = ((ProcessInstanceRecordStream)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).filter(r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId().startsWith("message"))).limit(2L)).map(event -> Assertions.tuple((Object[])new Object[]{((ProcessInstanceRecordValue)event.getValue()).getElementId(), ProcessInstances.getCurrentVariables((long)((ProcessInstanceRecordValue)event.getValue()).getProcessInstanceKey(), (long)event.getPosition()).get("nr")})).collect(Collectors.toList());
        Assertions.assertThat(correlatedValues).contains(new Object[]{Assertions.tuple((Object[])new Object[]{"message1", "1"}), Assertions.tuple((Object[])new Object[]{"message2", "2"})});
    }

    @Test
    public void shouldCorrelateCorrectBoundaryEvent() {
        this.engine.deployment().withXmlResource(BOUNDARY_EVENTS_PROCESS).deploy();
        this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "123").create();
        this.awaitSubscriptionsOpened(3);
        this.engine.message().withName("msg1").withCorrelationKey("123").withVariables(MsgPackUtil.asMsgPack((String)"foo", (Object)1)).publish();
        ((AbstractListAssert)((ListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().limitToProcessInstanceCompleted()).filteredOn(r -> r.getIntent() == ProcessInstanceIntent.ELEMENT_ACTIVATED)).extracting(Record::getValue).extracting(ProcessInstanceRecordValue::getElementId).contains((Object[])new String[]{"msg1End"})).doesNotContain((Object[])new String[]{"taskEnd", "msg2End"});
    }

    @Test
    public void shouldNotTriggerBoundaryEventIfReceiveTaskTriggeredFirst() {
        this.engine.deployment().withXmlResource(BOUNDARY_EVENTS_PROCESS).deploy();
        this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "123").create();
        this.awaitSubscriptionsOpened(3);
        this.engine.message().withName("taskMsg").withCorrelationKey("123").withVariables(MsgPackUtil.asMsgPack((String)"foo", (Object)1)).publish();
        ((AbstractListAssert)((ListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().limitToProcessInstanceCompleted()).filteredOn(r -> r.getIntent() == ProcessInstanceIntent.ELEMENT_ACTIVATED)).extracting(Record::getValue).extracting(ProcessInstanceRecordValue::getElementId).contains((Object[])new String[]{"taskEnd"})).doesNotContain((Object[])new String[]{"msg1End", "msg2End"});
    }

    @Test
    public void shouldNotTriggerReceiveTaskIfBoundaryEventTriggeredFirst() {
        this.engine.deployment().withXmlResource(BOUNDARY_EVENTS_PROCESS).deploy();
        this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "123").create();
        this.awaitSubscriptionsOpened(3);
        this.engine.message().withName("msg2").withCorrelationKey("123").withVariables(MsgPackUtil.asMsgPack((String)"foo", (Object)1)).publish();
        ((AbstractListAssert)((ListAssert)Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().limitToProcessInstanceCompleted()).filteredOn(r -> r.getIntent() == ProcessInstanceIntent.ELEMENT_ACTIVATED)).extracting(Record::getValue).extracting(ProcessInstanceRecordValue::getElementId).contains((Object[])new String[]{"msg2End"})).doesNotContain((Object[])new String[]{"taskEnd", "msg1End"});
    }

    @Test
    public void testIntermediateMessageEventLifeCycle() {
        this.engine.deployment().withXmlResource(SINGLE_MESSAGE_PROCESS).deploy();
        this.engine.message().withName("message").withCorrelationKey("order-123").publish();
        this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "order-123").create();
        List events = (List)RecordingExporter.processInstanceRecords().limitToProcessInstanceCompleted().collect(Collectors.toList());
        ((ListAssert)Assertions.assertThat((List)events).filteredOn(r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId().equals("receive-message"))).extracting(new Function[]{Record::getRecordType, Record::getIntent}).containsExactly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{RecordType.COMMAND, ProcessInstanceIntent.ACTIVATE_ELEMENT}), Assertions.tuple((Object[])new Object[]{RecordType.EVENT, ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple((Object[])new Object[]{RecordType.EVENT, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{RecordType.COMMAND, ProcessInstanceIntent.COMPLETE_ELEMENT}), Assertions.tuple((Object[])new Object[]{RecordType.EVENT, ProcessInstanceIntent.ELEMENT_COMPLETING}), Assertions.tuple((Object[])new Object[]{RecordType.EVENT, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void testReceiveTaskLifeCycle() {
        this.engine.deployment().withXmlResource(RECEIVE_TASK_PROCESS).deploy();
        this.engine.message().withName("message").withCorrelationKey("order-123").publish();
        this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "order-123").create();
        List events = (List)RecordingExporter.processInstanceRecords().limitToProcessInstanceCompleted().collect(Collectors.toList());
        ((ListAssert)Assertions.assertThat((List)events).filteredOn(r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId().equals("receive-message"))).extracting(Record::getIntent).containsExactly((Object[])new Intent[]{ProcessInstanceIntent.ACTIVATE_ELEMENT, ProcessInstanceIntent.ELEMENT_ACTIVATING, ProcessInstanceIntent.ELEMENT_ACTIVATED, ProcessInstanceIntent.COMPLETE_ELEMENT, ProcessInstanceIntent.ELEMENT_COMPLETING, ProcessInstanceIntent.ELEMENT_COMPLETED});
    }

    @Test
    public void testBoundaryMessageEventLifecycle() {
        this.engine.deployment().withXmlResource(BOUNDARY_EVENTS_PROCESS).deploy();
        this.engine.message().withName("msg1").withCorrelationKey("order-123").publish();
        this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "order-123").create();
        List events = (List)RecordingExporter.processInstanceRecords().limitToProcessInstanceCompleted().collect(Collectors.toList());
        Assertions.assertThat((List)events).extracting(r -> Assertions.tuple((Object[])new Object[]{((ProcessInstanceRecordValue)r.getValue()).getElementId(), r.getIntent()})).containsSequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"task", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"task", ProcessInstanceIntent.TERMINATE_ELEMENT}), Assertions.tuple((Object[])new Object[]{"task", ProcessInstanceIntent.ELEMENT_TERMINATING}), Assertions.tuple((Object[])new Object[]{"task", ProcessInstanceIntent.ELEMENT_TERMINATED}), Assertions.tuple((Object[])new Object[]{"msg1", ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple((Object[])new Object[]{"msg1", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{"msg1", ProcessInstanceIntent.COMPLETE_ELEMENT}), Assertions.tuple((Object[])new Object[]{"msg1", ProcessInstanceIntent.ELEMENT_COMPLETING}), Assertions.tuple((Object[])new Object[]{"msg1", ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldCorrelateToNonInterruptingBoundaryEvent() {
        BpmnModelInstance process = ((BoundaryEventBuilder)((BoundaryEventBuilder)Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().serviceTask("task", b -> b.zeebeJobType("type")).boundaryEvent("msg1").cancelActivity(Boolean.valueOf(false))).message(m -> m.name("msg1").zeebeCorrelationKeyExpression("key"))).endEvent("msg1End").moveToActivity("task").endEvent("taskEnd").done();
        this.engine.deployment().withXmlResource(process).deploy();
        long processInstanceKey = this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "123").create();
        PublishMessageClient messageClient = this.engine.message().withName("msg1").withCorrelationKey("123");
        messageClient.withVariables(MsgPackUtil.asMsgPack((String)"foo", (Object)0)).publish();
        messageClient.withVariables(MsgPackUtil.asMsgPack((String)"foo", (Object)1)).publish();
        messageClient.withVariables(MsgPackUtil.asMsgPack((String)"foo", (Object)2)).publish();
        Assertions.assertThat(this.awaitMessagesCorrelated(3)).hasSize(3);
        Assertions.assertThat((long)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).withElementType(BpmnElementType.BOUNDARY_EVENT).limit(3L)).count()).isEqualTo(3L);
        Assertions.assertThat((Stream)RecordingExporter.variableRecords().withName("foo").withScopeKey(processInstanceKey).limit(3L)).extracting(r -> ((VariableRecordValue)r.getValue()).getValue()).containsExactly((Object[])new String[]{"0", "1", "2"});
    }

    @Test
    public void shouldCorrelateOnlyOnceToNonInterruptingBoundaryEvent() {
        BpmnModelInstance process = ((BoundaryEventBuilder)((BoundaryEventBuilder)((BoundaryEventBuilder)Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().serviceTask("task", b -> b.zeebeJobType("test")).boundaryEvent("message").cancelActivity(Boolean.valueOf(false))).message(m -> m.name("message").zeebeCorrelationKeyExpression("key"))).endEvent("correlated").moveToActivity("task").boundaryEvent("timer").timerWithDuration("PT5M")).endEvent().done();
        this.engine.deployment().withXmlResource(process).deploy();
        long processInstanceKey = this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "key-1").create();
        this.engine.message().withName("message").withCorrelationKey("key-1").publish();
        RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(processInstanceKey).withElementType(BpmnElementType.BOUNDARY_EVENT).await();
        this.engine.increaseTime(Duration.ofMinutes(5L));
        ((ListAssert)((ListAssert)Assertions.assertThat((Stream)((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords().withProcessInstanceKey(processInstanceKey).limitToProcessInstanceCompleted().withIntent((Intent)ProcessInstanceIntent.ELEMENT_ACTIVATED)).withElementType(BpmnElementType.BOUNDARY_EVENT)).describedAs("Expected that the message boundary event is activated only once", new Object[0])).hasSize(2)).extracting(r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId()).containsExactly((Object[])new String[]{"message", "timer"});
        ((ListAssert)Assertions.assertThat((Stream)RecordingExporter.records().limitToProcessInstance(processInstanceKey).onlyCommandRejections()).describedAs("Expected no subscription command rejections", new Object[0])).isEmpty();
    }

    @Test
    public void shouldCorrelateMessageAgainAfterRejection() {
        this.engine.message().withName("a").withCorrelationKey("123").publish();
        this.engine.message().withName("b").withCorrelationKey("123").publish();
        BpmnModelInstance twoMessages = Bpmn.createExecutableProcess((String)PROCESS_ID).startEvent().eventBasedGateway("split").intermediateCatchEvent("element-a", c -> c.message(m -> m.name("a").zeebeCorrelationKeyExpression("key"))).intermediateCatchEvent("element-ab", c -> c.message(m -> m.name("b").zeebeCorrelationKeyExpression("key"))).exclusiveGateway("merge").endEvent().moveToNode("split").intermediateCatchEvent("element-b", c -> c.message(m -> m.name("b").zeebeCorrelationKeyExpression("key"))).intermediateCatchEvent("element-ba", c -> c.message(m -> m.name("a").zeebeCorrelationKeyExpression("key"))).connectTo("merge").done();
        this.engine.deployment().withXmlResource(twoMessages).deploy();
        long processInstanceKey = this.engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "123").create();
        Assertions.assertThat((Stream)RecordingExporter.records().limitToProcessInstance(processInstanceKey)).extracting(new Function[]{Record::getRecordType, Record::getIntent}).containsSubsequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{RecordType.COMMAND_REJECTION, ProcessMessageSubscriptionIntent.CORRELATE}), Assertions.tuple((Object[])new Object[]{RecordType.EVENT, MessageSubscriptionIntent.REJECTED}), Assertions.tuple((Object[])new Object[]{RecordType.COMMAND, ProcessMessageSubscriptionIntent.CORRELATE})});
        Assertions.assertThat((Stream)RecordingExporter.processMessageSubscriptionRecords((ProcessMessageSubscriptionIntent)ProcessMessageSubscriptionIntent.CORRELATED).limit(2L)).extracting(r -> ((ProcessMessageSubscriptionRecordValue)r.getValue()).getMessageName()).containsExactlyInAnyOrder((Object[])new String[]{"a", "b"});
        RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).withElementType(BpmnElementType.PROCESS).await();
    }

    @Test
    public void shouldNotCorrelateMessageAfterTTL() {
        this.engine.deployment().withXmlResource(Bpmn.createExecutableProcess((String)"wf").startEvent().serviceTask("task", t -> t.zeebeJobType("test")).intermediateCatchEvent("catch", c -> c.message(m -> m.name("a").zeebeCorrelationKeyExpression("key"))).done()).deploy();
        this.engine.processInstance().ofBpmnProcessId("wf").withVariable("key", "key-1").create();
        this.engine.message().withName("a").withCorrelationKey("key-1").withVariables(Map.of("x", 1)).withTimeToLive(Duration.ZERO).publish();
        Duration messageTtl = Duration.ofSeconds(10L);
        this.engine.message().withName("a").withCorrelationKey("key-1").withVariables(Map.of("x", 2)).withTimeToLive(messageTtl).publish();
        this.engine.message().withName("a").withCorrelationKey("key-1").withVariables(Map.of("x", 3)).withTimeToLive(messageTtl.multipliedBy(2L)).publish();
        Record job = (Record)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).getFirst();
        this.engine.getClock().addTime(messageTtl);
        this.engine.job().withKey(job.getKey()).complete();
        Record variable = (Record)RecordingExporter.variableRecords().withName("x").getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((VariableRecordValue)((VariableRecordValue)variable.getValue())).hasValue("3");
    }

    private List<Record<ProcessMessageSubscriptionRecordValue>> awaitMessagesCorrelated(int messagesCount) {
        return ((ProcessMessageSubscriptionRecordStream)RecordingExporter.processMessageSubscriptionRecords((ProcessMessageSubscriptionIntent)ProcessMessageSubscriptionIntent.CORRELATED).limit((long)messagesCount)).asList();
    }

    private List<Record<ProcessMessageSubscriptionRecordValue>> awaitSubscriptionsOpened(int subscriptionsCount) {
        return ((ProcessMessageSubscriptionRecordStream)((ProcessMessageSubscriptionRecordStream)RecordingExporter.processMessageSubscriptionRecords().withIntent((Intent)ProcessMessageSubscriptionIntent.CREATED)).limit((long)subscriptionsCount)).asList();
    }
}

