/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.engine.util.RecordToWrite;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.builder.ProcessBuilder;
import io.camunda.zeebe.model.bpmn.builder.StartEventBuilder;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordValueWithVariables;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.protocol.record.intent.MessageStartEventSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessMessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
import io.camunda.zeebe.protocol.record.value.MessageRecordValue;
import io.camunda.zeebe.protocol.record.value.MessageStartEventSubscriptionRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessMessageSubscriptionRecordValue;
import io.camunda.zeebe.protocol.record.value.VariableRecordValue;
import io.camunda.zeebe.protocol.record.value.deployment.Process;
import io.camunda.zeebe.test.util.MsgPackUtil;
import io.camunda.zeebe.test.util.record.JobRecordStream;
import io.camunda.zeebe.test.util.record.MessageStartEventSubscriptionRecordStream;
import io.camunda.zeebe.test.util.record.ProcessInstanceRecordStream;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.assertj.core.api.AbstractListAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.groups.Tuple;
import org.junit.Rule;
import org.junit.Test;

public final class MessageStartEventTest {
    private static final String MESSAGE_NAME_1 = "a";
    private static final String MESSAGE_NAME_EXPRESSION_1 = "=\"a\"";
    private static final String MESSAGE_NAME_2 = "b";
    private static final String CORRELATION_KEY_1 = "key-1";
    private static final String CORRELATION_KEY_2 = "key-2";
    private static final BpmnModelInstance SINGLE_START_EVENT_1 = MessageStartEventTest.singleStartEvent(startEvent -> {}, "a");
    private static final BpmnModelInstance SINGLE_START_EVENT_EXPRESSION_1 = MessageStartEventTest.singleStartEvent(startEvent -> {}, "=\"a\"");
    private static final BpmnModelInstance MULTIPLE_START_EVENTS = MessageStartEventTest.multipleStartEvents();
    @Rule
    public final EngineRule engine = EngineRule.singlePartition();

    private static BpmnModelInstance singleStartEvent(Consumer<StartEventBuilder> customizer) {
        return MessageStartEventTest.singleStartEvent(customizer, MESSAGE_NAME_1);
    }

    private static BpmnModelInstance singleStartEvent(Consumer<StartEventBuilder> customizer, String messageName) {
        StartEventBuilder startEventBuilder = (StartEventBuilder)Bpmn.createExecutableProcess((String)"wf").startEvent("start").message(messageName);
        customizer.accept(startEventBuilder);
        return startEventBuilder.serviceTask("task", t -> t.zeebeJobType("test")).done();
    }

    private static BpmnModelInstance multipleStartEvents() {
        ProcessBuilder process = Bpmn.createExecutableProcess((String)"wf");
        ((StartEventBuilder)process.startEvent().message(MESSAGE_NAME_1)).serviceTask("task", t -> t.zeebeJobType("test"));
        ((StartEventBuilder)process.startEvent().message(MESSAGE_NAME_2)).connectTo("task");
        return process.done();
    }

    @Test
    public void shouldCorrelateMessageToStartEvent() {
        this.engine.deployment().withXmlResource(SINGLE_START_EVENT_1).deploy();
        this.engine.message().withCorrelationKey(CORRELATION_KEY_1).withName(MESSAGE_NAME_1).publish();
        Record processInstance = (Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATING).filterRootScope().getFirst();
        Record startEvent = (Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATING).withElementType(BpmnElementType.START_EVENT).getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((ProcessInstanceRecordValue)((ProcessInstanceRecordValue)startEvent.getValue())).hasProcessDefinitionKey(((ProcessInstanceRecordValue)processInstance.getValue()).getProcessDefinitionKey()).hasBpmnProcessId(((ProcessInstanceRecordValue)processInstance.getValue()).getBpmnProcessId()).hasVersion(((ProcessInstanceRecordValue)processInstance.getValue()).getVersion()).hasProcessInstanceKey(processInstance.getKey()).hasFlowScopeKey(processInstance.getKey());
    }

    @Test
    public void shouldCorrelateMessageSubscription() {
        this.engine.deployment().withXmlResource(SINGLE_START_EVENT_1).deploy();
        Record<MessageRecordValue> messagePublished = this.engine.message().withCorrelationKey(CORRELATION_KEY_1).withName(MESSAGE_NAME_1).publish();
        Record startEventActivated = (Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withElementType(BpmnElementType.START_EVENT).getFirst();
        Record subscriptionCorrelated = (Record)RecordingExporter.messageStartEventSubscriptionRecords((MessageStartEventSubscriptionIntent)MessageStartEventSubscriptionIntent.CORRELATED).getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((MessageStartEventSubscriptionRecordValue)((MessageStartEventSubscriptionRecordValue)subscriptionCorrelated.getValue())).hasProcessDefinitionKey(((ProcessInstanceRecordValue)startEventActivated.getValue()).getProcessDefinitionKey()).hasBpmnProcessId(((ProcessInstanceRecordValue)startEventActivated.getValue()).getBpmnProcessId()).hasProcessInstanceKey(((ProcessInstanceRecordValue)startEventActivated.getValue()).getProcessInstanceKey()).hasStartEventId(((ProcessInstanceRecordValue)startEventActivated.getValue()).getElementId()).hasMessageKey(messagePublished.getKey()).hasMessageName(MESSAGE_NAME_1).hasCorrelationKey(CORRELATION_KEY_1);
    }

    @Test
    public void shouldCreateNewInstanceWithNameLiteral() {
        this.engine.deployment().withXmlResource(SINGLE_START_EVENT_1).deploy();
        this.engine.message().withCorrelationKey(CORRELATION_KEY_1).withName(MESSAGE_NAME_1).publish();
        Record job = (Record)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).getFirst();
        this.engine.job().withKey(job.getKey()).complete();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), Record::getIntent}).containsSequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.START_EVENT, ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.START_EVENT, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.START_EVENT, ProcessInstanceIntent.COMPLETE_ELEMENT}), Assertions.tuple((Object[])new Object[]{BpmnElementType.START_EVENT, ProcessInstanceIntent.ELEMENT_COMPLETING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.START_EVENT, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldCreateNewInstanceWithNameFeelExpression() {
        this.engine.deployment().withXmlResource(SINGLE_START_EVENT_EXPRESSION_1).deploy();
        this.engine.message().withCorrelationKey(CORRELATION_KEY_1).withName(MESSAGE_NAME_1).publish();
        Record job = (Record)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).getFirst();
        this.engine.job().withKey(job.getKey()).complete();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().limitToProcessInstanceCompleted()).extracting(new Function[]{r -> ((ProcessInstanceRecordValue)r.getValue()).getBpmnElementType(), Record::getIntent}).containsSequence((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.START_EVENT, ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.START_EVENT, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple((Object[])new Object[]{BpmnElementType.START_EVENT, ProcessInstanceIntent.COMPLETE_ELEMENT}), Assertions.tuple((Object[])new Object[]{BpmnElementType.START_EVENT, ProcessInstanceIntent.ELEMENT_COMPLETING}), Assertions.tuple((Object[])new Object[]{BpmnElementType.START_EVENT, ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldCreateNewInstanceWithMessageVariables() {
        this.engine.deployment().withXmlResource(SINGLE_START_EVENT_1).deploy();
        this.engine.message().withCorrelationKey(CORRELATION_KEY_1).withName(MESSAGE_NAME_1).withVariables(Map.of("x", 1, "y", 2)).publish();
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.variableRecords().limit(2L)).extracting(Record::getValue).extracting(new Function[]{VariableRecordValue::getName, VariableRecordValue::getValue}).hasSize(2)).contains((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"x", "1"}), Assertions.tuple((Object[])new Object[]{"y", "2"})});
    }

    @Test
    public void shouldApplyOutputMappings() {
        this.engine.deployment().withXmlResource(MessageStartEventTest.singleStartEvent(startEvent -> startEvent.zeebeOutputExpression("x", "y"))).deploy();
        this.engine.message().withCorrelationKey(CORRELATION_KEY_1).withName(MESSAGE_NAME_1).withVariables(Map.of("x", 1)).publish();
        Record processInstance = (Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).filterRootScope().getFirst();
        Assertions.assertThat((Stream)RecordingExporter.variableRecords().withScopeKey(processInstance.getKey()).limit(1L)).extracting(Record::getValue).extracting(new Function[]{VariableRecordValue::getName, VariableRecordValue::getValue}).contains((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"y", "1"})});
    }

    @Test
    public void shouldCreateInstanceOfLatestVersion() {
        this.engine.deployment().withXmlResource(MessageStartEventTest.singleStartEvent(startEvent -> startEvent.id("v1"))).deploy();
        this.engine.deployment().withXmlResource(MessageStartEventTest.singleStartEvent(startEvent -> startEvent.id("v2"))).deploy();
        this.engine.message().withCorrelationKey(CORRELATION_KEY_1).withName(MESSAGE_NAME_1).publish();
        Record startEvent2 = (Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withElementType(BpmnElementType.START_EVENT).getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((ProcessInstanceRecordValue)((ProcessInstanceRecordValue)startEvent2.getValue())).hasElementId("v2");
    }

    @Test
    public void shouldCreateNewInstanceWithMultipleStartEvents() {
        this.engine.deployment().withXmlResource(MULTIPLE_START_EVENTS).deploy();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 1)).publish();
        this.engine.message().withName(MESSAGE_NAME_2).withCorrelationKey(CORRELATION_KEY_2).withVariables(Map.of("x", 2)).publish();
        Assertions.assertThat((Stream)RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(2L)).extracting(r -> ((VariableRecordValue)r.getValue()).getValue()).describedAs("Expected messages [1,2] to be correlated", new Object[0]).containsExactly((Object[])new String[]{"1", "2"});
    }

    @Test
    public void shouldTriggerOnlyMessageStartEvent() {
        ProcessBuilder process = Bpmn.createExecutableProcess((String)"process");
        process.startEvent("none-start").endEvent();
        ((StartEventBuilder)process.startEvent("message-start").message(MESSAGE_NAME_1)).endEvent();
        ((StartEventBuilder)process.startEvent("timer-start").timerWithCycle("R/PT1H")).endEvent();
        this.engine.deployment().withXmlResource(process.done()).deploy();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).publish();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().limitToProcessInstanceCompleted().withElementType(BpmnElementType.START_EVENT)).extracting(r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId()).containsOnly((Object[])new String[]{"message-start"});
    }

    @Test
    public void shouldNotCorrelateSameMessageToCreatedInstance() {
        this.engine.deployment().withXmlResource(((StartEventBuilder)Bpmn.createExecutableProcess((String)"wf").startEvent().message(MESSAGE_NAME_1)).intermediateCatchEvent("catch", e -> e.message(m -> m.name(MESSAGE_NAME_1).zeebeCorrelationKeyExpression("key"))).endEvent().done()).deploy();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("key", CORRELATION_KEY_1, "x", 1)).publish();
        RecordingExporter.messageSubscriptionRecords((MessageSubscriptionIntent)MessageSubscriptionIntent.CREATED).await();
        Record<MessageRecordValue> message2 = this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 2)).publish();
        Assertions.assertThat((Stream)RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(2L)).extracting(r -> ((VariableRecordValue)r.getValue()).getValue()).describedAs("Expected messages [1,2] to be correlated", new Object[0]).containsExactly((Object[])new String[]{"1", "2"});
        Record subscription = (Record)RecordingExporter.processMessageSubscriptionRecords((ProcessMessageSubscriptionIntent)ProcessMessageSubscriptionIntent.CORRELATED).getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((ProcessMessageSubscriptionRecordValue)((ProcessMessageSubscriptionRecordValue)subscription.getValue())).hasMessageKey(message2.getKey());
    }

    @Test
    public void shouldCreateMultipleInstancesIfCorrelationKeyIsEmpty() {
        this.engine.deployment().withXmlResource(SINGLE_START_EVENT_1).deploy();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey("").withVariables(Map.of("x", 1)).publish();
        RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).await();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey("").withVariables(Map.of("x", 2)).publish();
        Assertions.assertThat((Stream)RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(2L)).extracting(r -> ((VariableRecordValue)r.getValue()).getValue()).describedAs("Expected messages [1,2] to be correlated", new Object[0]).containsExactly((Object[])new String[]{"1", "2"});
    }

    @Test
    public void shouldCreateOnlyOneInstancePerCorrelationKey() {
        this.engine.deployment().withXmlResource(SINGLE_START_EVENT_1).deploy();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 1)).publish();
        RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).await();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 2)).publish();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_2).withVariables(Map.of("x", 3)).publish();
        Assertions.assertThat((Stream)RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(2L)).extracting(r -> ((VariableRecordValue)r.getValue()).getValue()).describedAs("Expected messages [1,3] to be correlated", new Object[0]).containsExactly((Object[])new String[]{"1", "3"});
    }

    @Test
    public void shouldNotCreateInstanceForDifferentVersion() {
        this.engine.deployment().withXmlResource(MessageStartEventTest.singleStartEvent(startEvent -> startEvent.id("v1"))).deploy();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 1)).publish();
        RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).await();
        this.engine.deployment().withXmlResource(MessageStartEventTest.singleStartEvent(startEvent -> startEvent.id("v2"))).deploy();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 2)).publish();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_2).withVariables(Map.of("x", 3)).publish();
        Assertions.assertThat((Stream)RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(2L)).extracting(r -> ((VariableRecordValue)r.getValue()).getValue()).describedAs("Expected messages [1,3] to be correlated", new Object[0]).containsExactly((Object[])new String[]{"1", "3"});
    }

    @Test
    public void shouldCreateNewInstanceAfterCompletion() {
        this.engine.deployment().withXmlResource(SINGLE_START_EVENT_1).deploy();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 1)).publish();
        Record job = (Record)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).getFirst();
        this.engine.job().withKey(job.getKey()).complete();
        RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_COMPLETED).withProcessInstanceKey(((JobRecordValue)job.getValue()).getProcessInstanceKey()).filterRootScope().await();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 2)).publish();
        Assertions.assertThat((Stream)RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(2L)).extracting(r -> ((VariableRecordValue)r.getValue()).getValue()).describedAs("Expected messages [1,2] to be correlated", new Object[0]).containsExactly((Object[])new String[]{"1", "2"});
    }

    @Test
    public void shouldCreateNewInstanceAfterTermination() {
        this.engine.deployment().withXmlResource(SINGLE_START_EVENT_1).deploy();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 1)).publish();
        Record job = (Record)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).getFirst();
        this.engine.processInstance().withInstanceKey(((JobRecordValue)job.getValue()).getProcessInstanceKey()).cancel();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 2)).publish();
        Assertions.assertThat((Stream)RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(2L)).extracting(r -> ((VariableRecordValue)r.getValue()).getValue()).describedAs("Expected messages [1,2] to be correlated", new Object[0]).containsExactly((Object[])new String[]{"1", "2"});
    }

    @Test
    public void shouldCreateNewInstanceForBufferedMessageAfterCompletion() {
        this.engine.deployment().withXmlResource(SINGLE_START_EVENT_1).deploy();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 1)).publish();
        Record job1 = (Record)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).getFirst();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 2)).publish();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 3)).publish();
        this.engine.job().withKey(job1.getKey()).complete();
        Record job2 = (Record)((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).skip(1L)).getFirst();
        this.engine.job().withKey(job2.getKey()).complete();
        Assertions.assertThat((Stream)RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(3L)).extracting(r -> ((VariableRecordValue)r.getValue()).getValue()).describedAs("Expected messages [1,2,3] to be correlated", new Object[0]).containsExactly((Object[])new String[]{"1", "2", "3"});
    }

    @Test
    public void shouldCreateNewInstanceForBufferedMessageAfterTermination() {
        this.engine.deployment().withXmlResource(SINGLE_START_EVENT_1).deploy();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 1)).publish();
        Record job1 = (Record)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).getFirst();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 2)).publish();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 3)).publish();
        this.engine.processInstance().withInstanceKey(((JobRecordValue)job1.getValue()).getProcessInstanceKey()).cancel();
        Record job2 = (Record)((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).skip(1L)).getFirst();
        this.engine.processInstance().withInstanceKey(((JobRecordValue)job2.getValue()).getProcessInstanceKey()).cancel();
        Assertions.assertThat((Stream)RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(3L)).extracting(r -> ((VariableRecordValue)r.getValue()).getValue()).describedAs("Expected messages [1,2,3] to be correlated", new Object[0]).containsExactly((Object[])new String[]{"1", "2", "3"});
    }

    @Test
    public void shouldWriteCorrelatedEventsForBufferedMessages() {
        int instanceCount = 3;
        this.engine.deployment().withXmlResource(SINGLE_START_EVENT_1).deploy();
        List expectedSubscriptionTuples = IntStream.range(0, 3).mapToObj(i -> {
            Map<String, Object> variables = Map.of("x", i);
            Record<MessageRecordValue> messagePublished = this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(variables).publish();
            Record jobCreated = (Record)((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).skip((long)i)).getFirst();
            this.engine.job().withKey(jobCreated.getKey()).complete();
            long processInstanceKey = ((JobRecordValue)jobCreated.getValue()).getProcessInstanceKey();
            return Assertions.tuple((Object[])new Object[]{messagePublished.getKey(), processInstanceKey, variables});
        }).collect(Collectors.toList());
        Process process = (Process)((Record)RecordingExporter.processRecords().getFirst()).getValue();
        List subscriptionRecords = ((MessageStartEventSubscriptionRecordStream)RecordingExporter.messageStartEventSubscriptionRecords((MessageStartEventSubscriptionIntent)MessageStartEventSubscriptionIntent.CORRELATED).limit(3L)).map(Record::getValue).collect(Collectors.toList());
        Assertions.assertThat(subscriptionRecords).allSatisfy(value -> {
            Assertions.assertThat((String)value.getMessageName()).isEqualTo(MESSAGE_NAME_1);
            Assertions.assertThat((String)value.getCorrelationKey()).isEqualTo(CORRELATION_KEY_1);
            Assertions.assertThat((long)value.getProcessDefinitionKey()).isEqualTo(process.getProcessDefinitionKey());
            Assertions.assertThat((String)value.getBpmnProcessId()).isEqualTo(process.getBpmnProcessId());
            Assertions.assertThat((String)value.getStartEventId()).isEqualTo("start");
        });
        Assertions.assertThat(subscriptionRecords).extracting(new Function[]{MessageStartEventSubscriptionRecordValue::getMessageKey, MessageStartEventSubscriptionRecordValue::getProcessInstanceKey, RecordValueWithVariables::getVariables}).containsSequence(expectedSubscriptionTuples);
    }

    @Test
    public void shouldCreateNewInstanceOfLatestProcessVersionForBufferedMessage() {
        this.engine.deployment().withXmlResource(MessageStartEventTest.singleStartEvent(startEvent -> startEvent.id("v1"))).deploy();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 1)).publish();
        Record job = (Record)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).getFirst();
        this.engine.deployment().withXmlResource(MessageStartEventTest.singleStartEvent(startEvent -> startEvent.id("v2"))).deploy();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 2)).publish();
        this.engine.job().withKey(job.getKey()).complete();
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withElementType(BpmnElementType.START_EVENT).limit(2L)).extracting(r -> ((ProcessInstanceRecordValue)r.getValue()).getElementId()).containsExactly((Object[])new String[]{"v1", "v2"});
    }

    @Test
    public void shouldNotCreateNewInstanceForBufferedMessageAfterTTL() {
        this.engine.deployment().withXmlResource(SINGLE_START_EVENT_1).deploy();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 1)).publish();
        Duration messageTTL = Duration.ofSeconds(10L);
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 2)).withTimeToLive(messageTTL).publish();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 3)).withTimeToLive(messageTTL.multipliedBy(2L)).publish();
        Record job = (Record)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).getFirst();
        this.engine.getClock().addTime(messageTTL);
        this.engine.job().withKey(job.getKey()).complete();
        Assertions.assertThat((Stream)RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(2L)).extracting(r -> ((VariableRecordValue)r.getValue()).getValue()).describedAs("Expected messages [1,3] to be correlated", new Object[0]).containsExactly((Object[])new String[]{"1", "3"});
    }

    @Test
    public void shouldCreateOnlyOneInstancePerCorrelationKeyWithMultipleStartEvents() {
        this.engine.deployment().withXmlResource(MULTIPLE_START_EVENTS).deploy();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 1)).publish();
        RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).await();
        this.engine.message().withName(MESSAGE_NAME_2).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 2)).publish();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_2).withVariables(Map.of("x", 3)).publish();
        Assertions.assertThat((Stream)RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(2L)).extracting(r -> ((VariableRecordValue)r.getValue()).getValue()).describedAs("Expected messages [1,3] to be correlated", new Object[0]).containsExactly((Object[])new String[]{"1", "3"});
    }

    @Test
    public void shouldCreateNewInstanceForBufferedMessageWithMultipleStartEvents() {
        this.engine.deployment().withXmlResource(MULTIPLE_START_EVENTS).deploy();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 1)).publish();
        this.engine.message().withName(MESSAGE_NAME_2).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 2)).publish();
        this.engine.message().withName(MESSAGE_NAME_1).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 3)).publish();
        this.engine.message().withName(MESSAGE_NAME_2).withCorrelationKey(CORRELATION_KEY_1).withVariables(Map.of("x", 4)).publish();
        IntStream.range(0, 4).forEach(j -> {
            Record job = (Record)((JobRecordStream)RecordingExporter.jobRecords((JobIntent)JobIntent.CREATED).skip((long)j)).getFirst();
            this.engine.job().withKey(job.getKey()).complete();
        });
        Assertions.assertThat((Stream)RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(4L)).extracting(r -> ((VariableRecordValue)r.getValue()).getValue()).describedAs("Expected messages [1,2,3,4] to be correlated", new Object[0]).containsExactly((Object[])new String[]{"1", "2", "3", "4"});
    }

    @Test
    public void shouldCreateProcessInstancesAndPassVariables() {
        this.engine.deployment().withXmlResource(SINGLE_START_EVENT_1).deploy();
        this.engine.writeRecords(RecordToWrite.command().message(MessageIntent.PUBLISH, (MessageRecordValue)new MessageRecord().setName(MESSAGE_NAME_1).setTimeToLive(0L).setCorrelationKey(CORRELATION_KEY_1).setVariables(MsgPackUtil.asMsgPack((String)"x", (Object)1))), RecordToWrite.command().message(MessageIntent.PUBLISH, (MessageRecordValue)new MessageRecord().setName(MESSAGE_NAME_1).setTimeToLive(0L).setCorrelationKey(CORRELATION_KEY_2).setVariables(MsgPackUtil.asMsgPack((String)"x", (Object)2))));
        List processInstanceKeys = ((ProcessInstanceRecordStream)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.ELEMENT_ACTIVATED).withElementType(BpmnElementType.PROCESS).limit(2L)).map(r -> ((ProcessInstanceRecordValue)r.getValue()).getProcessInstanceKey()).collect(Collectors.toList());
        ((ListAssert)Assertions.assertThat(processInstanceKeys).describedAs("Expected two process instances to be created", new Object[0])).hasSize(2);
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.variableRecords().filterProcessInstanceScope().limit(2L)).extracting(Record::getValue).extracting(new Function[]{VariableRecordValue::getProcessInstanceKey, VariableRecordValue::getValue}).hasSize(2)).describedAs("Expected two process instances with different variables", new Object[0]).contains((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{processInstanceKeys.get(0), "1"}), Assertions.tuple((Object[])new Object[]{processInstanceKeys.get(1), "2"})});
    }
}

