/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.builder.BoundaryEventBuilder;
import io.camunda.zeebe.model.bpmn.builder.IntermediateCatchEventBuilder;
import io.camunda.zeebe.model.bpmn.builder.ReceiveTaskBuilder;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessMessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.value.MessageRecordValue;
import io.camunda.zeebe.protocol.record.value.MessageSubscriptionRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessMessageSubscriptionRecordValue;
import io.camunda.zeebe.test.util.MsgPackUtil;
import io.camunda.zeebe.test.util.record.ProcessMessageSubscriptionRecordStream;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.groups.Tuple;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public final class MessageCatchElementTest {
    private static final int PARTITION_COUNT = 3;
    @ClassRule
    public static final EngineRule ENGINE_RULE = EngineRule.multiplePartition(3);
    private static final String ELEMENT_ID = "receive-message";
    private static final String CORRELATION_VARIABLE = "orderId";
    private static final String MESSAGE_NAME = "order canceled";
    private static final String SEQUENCE_FLOW_ID = "to-end";
    private static final String CATCH_EVENT_PROCESS_PROCESS_ID = "catchEventProcess";
    private static final BpmnModelInstance CATCH_EVENT_PROCESS = ((IntermediateCatchEventBuilder)((IntermediateCatchEventBuilder)Bpmn.createExecutableProcess((String)"catchEventProcess").startEvent().intermediateCatchEvent("receive-message").message(m -> m.name(MESSAGE_NAME).zeebeCorrelationKeyExpression(CORRELATION_VARIABLE))).sequenceFlowId("to-end")).endEvent().done();
    private static final String RECEIVE_TASK_PROCESS_PROCESS_ID = "receiveTaskProcess";
    private static final BpmnModelInstance RECEIVE_TASK_PROCESS = ((ReceiveTaskBuilder)((ReceiveTaskBuilder)Bpmn.createExecutableProcess((String)"receiveTaskProcess").startEvent().receiveTask("receive-message").message(m -> m.name(MESSAGE_NAME).zeebeCorrelationKeyExpression(CORRELATION_VARIABLE))).sequenceFlowId("to-end")).endEvent().done();
    private static final String BOUNDARY_EVENT_PROCESS_PROCESS_ID = "boundaryEventProcess";
    private static final BpmnModelInstance BOUNDARY_EVENT_PROCESS = ((BoundaryEventBuilder)((BoundaryEventBuilder)Bpmn.createExecutableProcess((String)"boundaryEventProcess").startEvent().serviceTask("receive-message", b -> b.zeebeJobType("type")).boundaryEvent().message(m -> m.name(MESSAGE_NAME).zeebeCorrelationKeyExpression(CORRELATION_VARIABLE))).sequenceFlowId("to-end")).endEvent().done();
    private static final String NON_INT_BOUNDARY_EVENT_PROCESS_PROCESS_ID = "nonIntBoundaryEventProcess";
    private static final BpmnModelInstance NON_INT_BOUNDARY_EVENT_PROCESS = ((BoundaryEventBuilder)((BoundaryEventBuilder)((BoundaryEventBuilder)Bpmn.createExecutableProcess((String)"nonIntBoundaryEventProcess").startEvent().serviceTask("receive-message", b -> b.zeebeJobType("type")).boundaryEvent("event").cancelActivity(Boolean.valueOf(false))).message(m -> m.name(MESSAGE_NAME).zeebeCorrelationKeyExpression(CORRELATION_VARIABLE))).sequenceFlowId("to-end")).endEvent().done();
    @Rule
    public final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();
    @Parameterized.Parameter
    public String elementType;
    @Parameterized.Parameter(value=1)
    public String bpmnProcessId;
    @Parameterized.Parameter(value=2)
    public ProcessInstanceIntent enteredState;
    @Parameterized.Parameter(value=3)
    public ProcessInstanceIntent continueState;
    @Parameterized.Parameter(value=4)
    public String continuedElementId;
    private String correlationKey;
    private long processInstanceKey;

    @Parameterized.Parameters(name="{0}")
    public static Object[][] parameters() {
        return new Object[][]{{"intermediate message catch event", CATCH_EVENT_PROCESS_PROCESS_ID, ProcessInstanceIntent.ELEMENT_ACTIVATED, ProcessInstanceIntent.ELEMENT_COMPLETED, ELEMENT_ID}, {"receive task", RECEIVE_TASK_PROCESS_PROCESS_ID, ProcessInstanceIntent.ELEMENT_ACTIVATED, ProcessInstanceIntent.ELEMENT_COMPLETED, ELEMENT_ID}, {"int boundary event", BOUNDARY_EVENT_PROCESS_PROCESS_ID, ProcessInstanceIntent.ELEMENT_ACTIVATED, ProcessInstanceIntent.ELEMENT_TERMINATED, ELEMENT_ID}, {"non int boundary event", NON_INT_BOUNDARY_EVENT_PROCESS_PROCESS_ID, ProcessInstanceIntent.ELEMENT_ACTIVATED, ProcessInstanceIntent.ELEMENT_COMPLETED, "event"}};
    }

    @BeforeClass
    public static void awaitCluster() {
        MessageCatchElementTest.deploy(CATCH_EVENT_PROCESS);
        MessageCatchElementTest.deploy(RECEIVE_TASK_PROCESS);
        MessageCatchElementTest.deploy(BOUNDARY_EVENT_PROCESS);
        MessageCatchElementTest.deploy(NON_INT_BOUNDARY_EVENT_PROCESS);
    }

    private static void deploy(BpmnModelInstance modelInstance) {
        ENGINE_RULE.deployment().withXmlResource(modelInstance).deploy();
    }

    @Before
    public void init() {
        this.correlationKey = UUID.randomUUID().toString();
        this.processInstanceKey = ENGINE_RULE.processInstance().ofBpmnProcessId(this.bpmnProcessId).withVariable(CORRELATION_VARIABLE, this.correlationKey).create();
    }

    @Test
    public void shouldOpenMessageSubscription() {
        Record<ProcessInstanceRecordValue> catchEventEntered = this.getFirstElementRecord(this.enteredState);
        Record<MessageSubscriptionRecordValue> messageSubscription = this.getFirstMessageSubscriptionRecord(MessageSubscriptionIntent.CREATED);
        Assertions.assertThat((Comparable)messageSubscription.getValueType()).isEqualTo((Object)ValueType.MESSAGE_SUBSCRIPTION);
        Assertions.assertThat((Comparable)messageSubscription.getRecordType()).isEqualTo((Object)RecordType.EVENT);
        io.camunda.zeebe.protocol.record.Assertions.assertThat((MessageSubscriptionRecordValue)((MessageSubscriptionRecordValue)messageSubscription.getValue())).hasProcessInstanceKey(this.processInstanceKey).hasElementInstanceKey(catchEventEntered.getKey()).hasMessageName(MESSAGE_NAME).hasCorrelationKey(this.correlationKey);
    }

    @Test
    public void shouldOpenProcessMessageSubscription() {
        Record<ProcessMessageSubscriptionRecordValue> subscriptionCreating = this.getFirstProcessMessageSubscriptionRecord(ProcessMessageSubscriptionIntent.CREATING);
        Record<ProcessInstanceRecordValue> catchEventEntered = this.getFirstElementRecord(this.enteredState);
        Record<ProcessMessageSubscriptionRecordValue> processMessageSubscription = this.getFirstProcessMessageSubscriptionRecord(ProcessMessageSubscriptionIntent.CREATED);
        io.camunda.zeebe.protocol.record.Assertions.assertThat(processMessageSubscription).hasValueType(ValueType.PROCESS_MESSAGE_SUBSCRIPTION).hasRecordType(RecordType.EVENT).hasKey(subscriptionCreating.getKey());
        io.camunda.zeebe.protocol.record.Assertions.assertThat((ProcessMessageSubscriptionRecordValue)((ProcessMessageSubscriptionRecordValue)processMessageSubscription.getValue())).hasProcessInstanceKey(this.processInstanceKey).hasElementInstanceKey(catchEventEntered.getKey()).hasMessageName(MESSAGE_NAME);
        Assertions.assertThat((Map)((ProcessMessageSubscriptionRecordValue)processMessageSubscription.getValue()).getVariables()).isEmpty();
    }

    @Test
    public void shouldCorrelateProcessMessageSubscription() {
        Record<ProcessInstanceRecordValue> catchEventEntered = this.getFirstElementRecord(this.enteredState);
        ENGINE_RULE.message().withCorrelationKey(this.correlationKey).withName(MESSAGE_NAME).withVariables(MsgPackUtil.asMsgPack((String)"foo", (Object)"bar")).publish();
        Record<ProcessMessageSubscriptionRecordValue> subscription = this.getFirstProcessMessageSubscriptionRecord(ProcessMessageSubscriptionIntent.CORRELATED);
        Assertions.assertThat((Comparable)subscription.getValueType()).isEqualTo((Object)ValueType.PROCESS_MESSAGE_SUBSCRIPTION);
        Assertions.assertThat((Comparable)subscription.getRecordType()).isEqualTo((Object)RecordType.EVENT);
        io.camunda.zeebe.protocol.record.Assertions.assertThat((ProcessMessageSubscriptionRecordValue)((ProcessMessageSubscriptionRecordValue)subscription.getValue())).hasProcessInstanceKey(this.processInstanceKey).hasElementInstanceKey(catchEventEntered.getKey()).hasMessageName(MESSAGE_NAME);
        Assertions.assertThat((Map)((ProcessMessageSubscriptionRecordValue)subscription.getValue()).getVariables()).containsExactly(new Map.Entry[]{Assertions.entry((Object)"foo", (Object)"bar")});
    }

    @Test
    public void shouldCorrelateMessageSubscription() {
        Record<ProcessInstanceRecordValue> catchEventEntered = this.getFirstElementRecord(this.enteredState);
        this.getFirstMessageSubscriptionRecord(MessageSubscriptionIntent.CREATED);
        Record<MessageRecordValue> messagePublished = ENGINE_RULE.message().withCorrelationKey(this.correlationKey).withName(MESSAGE_NAME).withVariables(MsgPackUtil.asMsgPack((String)"foo", (Object)"bar")).publish();
        Record<MessageSubscriptionRecordValue> subscriptionCorrelating = this.getFirstMessageSubscriptionRecord(MessageSubscriptionIntent.CORRELATING);
        io.camunda.zeebe.protocol.record.Assertions.assertThat((MessageSubscriptionRecordValue)((MessageSubscriptionRecordValue)subscriptionCorrelating.getValue())).hasProcessInstanceKey(this.processInstanceKey).hasElementInstanceKey(catchEventEntered.getKey()).hasMessageName(MESSAGE_NAME).hasCorrelationKey(this.correlationKey).hasBpmnProcessId(this.bpmnProcessId).hasMessageKey(messagePublished.getKey());
        Record<MessageSubscriptionRecordValue> subscriptionCorrelated = this.getFirstMessageSubscriptionRecord(MessageSubscriptionIntent.CORRELATED);
        io.camunda.zeebe.protocol.record.Assertions.assertThat((MessageSubscriptionRecordValue)((MessageSubscriptionRecordValue)subscriptionCorrelated.getValue())).hasProcessInstanceKey(this.processInstanceKey).hasElementInstanceKey(catchEventEntered.getKey()).hasMessageName(MESSAGE_NAME).hasCorrelationKey(this.correlationKey);
    }

    @Test
    public void shouldCloseMessageSubscription() {
        Record subscriptionCreated = (Record)RecordingExporter.messageSubscriptionRecords((MessageSubscriptionIntent)MessageSubscriptionIntent.CREATED).withProcessInstanceKey(this.processInstanceKey).getFirst();
        ENGINE_RULE.processInstance().withInstanceKey(this.processInstanceKey).cancel();
        Record<MessageSubscriptionRecordValue> messageSubscription = this.getFirstMessageSubscriptionRecord(MessageSubscriptionIntent.DELETED);
        io.camunda.zeebe.protocol.record.Assertions.assertThat(messageSubscription).hasRecordType(RecordType.EVENT).hasKey(subscriptionCreated.getKey());
        io.camunda.zeebe.protocol.record.Assertions.assertThat((MessageSubscriptionRecordValue)((MessageSubscriptionRecordValue)messageSubscription.getValue())).hasProcessInstanceKey(this.processInstanceKey).hasElementInstanceKey(((MessageSubscriptionRecordValue)subscriptionCreated.getValue()).getElementInstanceKey()).hasMessageName(MESSAGE_NAME).hasCorrelationKey(((MessageSubscriptionRecordValue)subscriptionCreated.getValue()).getCorrelationKey());
    }

    @Test
    public void shouldCloseProcessMessageSubscription() {
        Record<ProcessMessageSubscriptionRecordValue> subscriptionCreated = this.getFirstProcessMessageSubscriptionRecord(ProcessMessageSubscriptionIntent.CREATED);
        Record<ProcessInstanceRecordValue> catchEventEntered = this.getFirstElementRecord(this.enteredState);
        ENGINE_RULE.processInstance().withInstanceKey(this.processInstanceKey).cancel();
        ((ListAssert)Assertions.assertThat((Stream)((ProcessMessageSubscriptionRecordStream)((ProcessMessageSubscriptionRecordStream)RecordingExporter.processMessageSubscriptionRecords().onlyEvents()).limit(r -> r.getIntent() == ProcessMessageSubscriptionIntent.DELETED)).withMessageName(MESSAGE_NAME).withProcessInstanceKey(this.processInstanceKey).withElementInstanceKey(catchEventEntered.getKey()).map(Record::getIntent)).as("the lifecycle of the subscription should end with DELETING and DELETED on close", new Object[0])).containsSubsequence((Object[])new Intent[]{ProcessMessageSubscriptionIntent.DELETING, ProcessMessageSubscriptionIntent.DELETED});
        Record subscriptionDeleted = (Record)RecordingExporter.processMessageSubscriptionRecords((ProcessMessageSubscriptionIntent)ProcessMessageSubscriptionIntent.DELETED).getFirst();
        io.camunda.zeebe.protocol.record.Assertions.assertThat((Record)subscriptionDeleted).hasKey(subscriptionCreated.getKey());
    }

    @Test
    public void shouldCorrelateMessageAndContinue() {
        RecordingExporter.processMessageSubscriptionRecords((ProcessMessageSubscriptionIntent)ProcessMessageSubscriptionIntent.CREATED).withProcessInstanceKey(this.processInstanceKey).withMessageName(MESSAGE_NAME).await();
        ENGINE_RULE.message().withCorrelationKey(this.correlationKey).withName(MESSAGE_NAME).publish();
        Assertions.assertThat((boolean)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)this.continueState).withProcessInstanceKey(this.processInstanceKey).withElementId(this.continuedElementId).exists()).isTrue();
        Assertions.assertThat((boolean)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)ProcessInstanceIntent.SEQUENCE_FLOW_TAKEN).withProcessInstanceKey(this.processInstanceKey).withElementId(SEQUENCE_FLOW_ID).exists()).isTrue();
    }

    @Test
    public void testMessageSubscriptionLifecycle() {
        this.getFirstMessageSubscriptionRecord(MessageSubscriptionIntent.CREATED);
        ENGINE_RULE.message().withCorrelationKey(this.correlationKey).withName(MESSAGE_NAME).withVariables(MsgPackUtil.asMsgPack((String)"foo", (Object)"bar")).publish();
        Assertions.assertThat((Stream)RecordingExporter.messageSubscriptionRecords().withProcessInstanceKey(this.processInstanceKey).withMessageName(MESSAGE_NAME).limit(5L)).extracting(new Function[]{Record::getRecordType, Record::getIntent}).containsExactly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{RecordType.COMMAND, MessageSubscriptionIntent.CREATE}), Assertions.tuple((Object[])new Object[]{RecordType.EVENT, MessageSubscriptionIntent.CREATED}), Assertions.tuple((Object[])new Object[]{RecordType.EVENT, MessageSubscriptionIntent.CORRELATING}), Assertions.tuple((Object[])new Object[]{RecordType.COMMAND, MessageSubscriptionIntent.CORRELATE}), Assertions.tuple((Object[])new Object[]{RecordType.EVENT, MessageSubscriptionIntent.CORRELATED})});
    }

    @Test
    public void testProcessMessageSubscriptionLifecycle() {
        this.getFirstProcessMessageSubscriptionRecord(ProcessMessageSubscriptionIntent.CREATED);
        ENGINE_RULE.message().withCorrelationKey(this.correlationKey).withName(MESSAGE_NAME).publish();
        Assertions.assertThat((Stream)RecordingExporter.processMessageSubscriptionRecords().withProcessInstanceKey(this.processInstanceKey).withMessageName(MESSAGE_NAME).limit(5L)).extracting(new Function[]{Record::getRecordType, Record::getIntent}).containsExactly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{RecordType.EVENT, ProcessMessageSubscriptionIntent.CREATING}), Assertions.tuple((Object[])new Object[]{RecordType.COMMAND, ProcessMessageSubscriptionIntent.CREATE}), Assertions.tuple((Object[])new Object[]{RecordType.EVENT, ProcessMessageSubscriptionIntent.CREATED}), Assertions.tuple((Object[])new Object[]{RecordType.COMMAND, ProcessMessageSubscriptionIntent.CORRELATE}), Assertions.tuple((Object[])new Object[]{RecordType.EVENT, ProcessMessageSubscriptionIntent.CORRELATED})});
    }

    @Test
    public void shouldHaveSameMessageSubscriptionKey() {
        long messageSubscriptionKey = this.getFirstMessageSubscriptionRecord(MessageSubscriptionIntent.CREATED).getKey();
        ENGINE_RULE.message().withCorrelationKey(this.correlationKey).withName(MESSAGE_NAME).publish();
        Assertions.assertThat((Stream)RecordingExporter.messageSubscriptionRecords().withProcessInstanceKey(this.processInstanceKey).withMessageName(MESSAGE_NAME).limit(5L)).extracting(new Function[]{Record::getIntent, Record::getKey}).containsExactly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{MessageSubscriptionIntent.CREATE, -1L}), Assertions.tuple((Object[])new Object[]{MessageSubscriptionIntent.CREATED, messageSubscriptionKey}), Assertions.tuple((Object[])new Object[]{MessageSubscriptionIntent.CORRELATING, messageSubscriptionKey}), Assertions.tuple((Object[])new Object[]{MessageSubscriptionIntent.CORRELATE, -1L}), Assertions.tuple((Object[])new Object[]{MessageSubscriptionIntent.CORRELATED, messageSubscriptionKey})});
    }

    @Test
    public void shouldHaveSameProcessMessageSubscriptionKey() {
        long subscriptionKey = this.getFirstProcessMessageSubscriptionRecord(ProcessMessageSubscriptionIntent.CREATING).getKey();
        ENGINE_RULE.message().withCorrelationKey(this.correlationKey).withName(MESSAGE_NAME).publish();
        Assertions.assertThat((Stream)((ProcessMessageSubscriptionRecordStream)RecordingExporter.processMessageSubscriptionRecords().withProcessInstanceKey(this.processInstanceKey).withMessageName(MESSAGE_NAME).filter(r -> r.getIntent() != ProcessMessageSubscriptionIntent.CREATE && r.getIntent() != ProcessMessageSubscriptionIntent.CREATED)).limit(3L)).extracting(new Function[]{Record::getIntent, Record::getKey}).containsExactly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{ProcessMessageSubscriptionIntent.CREATING, subscriptionKey}), Assertions.tuple((Object[])new Object[]{ProcessMessageSubscriptionIntent.CORRELATE, -1L}), Assertions.tuple((Object[])new Object[]{ProcessMessageSubscriptionIntent.CORRELATED, subscriptionKey})});
    }

    private Record<ProcessInstanceRecordValue> getFirstElementRecord(ProcessInstanceIntent intent) {
        return (Record)RecordingExporter.processInstanceRecords((ProcessInstanceIntent)intent).withProcessInstanceKey(this.processInstanceKey).withElementId(ELEMENT_ID).getFirst();
    }

    private Record<MessageSubscriptionRecordValue> getFirstMessageSubscriptionRecord(MessageSubscriptionIntent intent) {
        return (Record)RecordingExporter.messageSubscriptionRecords((MessageSubscriptionIntent)intent).withProcessInstanceKey(this.processInstanceKey).withMessageName(MESSAGE_NAME).getFirst();
    }

    private Record<ProcessMessageSubscriptionRecordValue> getFirstProcessMessageSubscriptionRecord(ProcessMessageSubscriptionIntent intent) {
        return (Record)RecordingExporter.processMessageSubscriptionRecords((ProcessMessageSubscriptionIntent)intent).withProcessInstanceKey(this.processInstanceKey).withMessageName(MESSAGE_NAME).getFirst();
    }
}

