package io.zeebe.broker.workflow.message;

import io.zeebe.broker.exporter.util.TestJarExporter;
import io.zeebe.broker.test.EmbeddedBrokerRule;
import io.zeebe.broker.workflow.WorkflowAssert;
import io.zeebe.broker.workflow.processor.WorkflowInstanceStreamProcessorRule;
import io.zeebe.exporter.record.Record;
import io.zeebe.exporter.record.value.WorkflowInstanceRecordValue;
import io.zeebe.exporter.record.value.WorkflowInstanceSubscriptionRecordValue;
import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.model.bpmn.BpmnModelInstance;
import io.zeebe.protocol.BpmnElementType;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.protocol.intent.MessageSubscriptionIntent;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.protocol.intent.WorkflowInstanceSubscriptionIntent;
import io.zeebe.test.broker.protocol.clientapi.ClientApiRule;
import io.zeebe.test.broker.protocol.clientapi.PartitionTestClient;
import io.zeebe.test.util.MsgPackUtil;
import io.zeebe.test.util.record.RecordingExporter;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.assertj.core.groups.Tuple;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

/* loaded from: input_file:io/zeebe/broker/workflow/message/MessageCorrelationTest.class */
public class MessageCorrelationTest {
    private static final String PROCESS_ID = "process";
    private static final BpmnModelInstance RECEIVE_TASK_WORKFLOW = Bpmn.createExecutableProcess("process").startEvent().receiveTask("receive-message").message(messageBuilder -> {
        messageBuilder.name("message").zeebeCorrelationKey("$.key");
    }).endEvent().done();
    private static final BpmnModelInstance SINGLE_MESSAGE_WORKFLOW = Bpmn.createExecutableProcess("process").startEvent().intermediateCatchEvent("receive-message").message(messageBuilder -> {
        messageBuilder.name("message").zeebeCorrelationKey("$.key");
    }).endEvent().done();
    private static final BpmnModelInstance TWO_MESSAGES_WORKFLOW = Bpmn.createExecutableProcess("process").startEvent().intermediateCatchEvent("message1").message(messageBuilder -> {
        messageBuilder.name("ping").zeebeCorrelationKey("$.key");
    }).intermediateCatchEvent("message2").message(messageBuilder2 -> {
        messageBuilder2.name("ping").zeebeCorrelationKey("$.key");
    }).done();
    private static final BpmnModelInstance BOUNDARY_EVENTS_WORKFLOW = Bpmn.createExecutableProcess("process").startEvent().receiveTask("task").message(messageBuilder -> {
        messageBuilder.name("taskMsg").zeebeCorrelationKey("$.key");
    }).boundaryEvent("msg1").message(messageBuilder2 -> {
        messageBuilder2.name("msg1").zeebeCorrelationKey("$.key");
    }).endEvent("msg1End").moveToActivity("task").boundaryEvent("msg2").message(messageBuilder3 -> {
        messageBuilder3.name("msg2").zeebeCorrelationKey("$.key");
    }).endEvent("msg2End").moveToActivity("task").endEvent("taskEnd").done();
    public EmbeddedBrokerRule brokerRule = new EmbeddedBrokerRule(new Consumer[0]);
    public ClientApiRule apiRule;

    @Rule
    public RuleChain ruleChain;
    private PartitionTestClient testClient;

    public MessageCorrelationTest() {
        EmbeddedBrokerRule embeddedBrokerRule = this.brokerRule;
        embeddedBrokerRule.getClass();
        this.apiRule = new ClientApiRule(embeddedBrokerRule::getClientAddress);
        this.ruleChain = RuleChain.outerRule(this.brokerRule).around(this.apiRule);
    }

    @Before
    public void init() {
        this.testClient = this.apiRule.partitionClient();
    }

    @Test
    public void shouldCorrelateMessageIfEnteredBefore() {
        this.testClient.deploy(SINGLE_MESSAGE_WORKFLOW);
        this.testClient.createWorkflowInstance("process", MsgPackUtil.asMsgPack("key", "order-123"));
        Assertions.assertThat(RecordingExporter.messageSubscriptionRecords(MessageSubscriptionIntent.OPENED).exists()).isTrue();
        this.testClient.publishMessage("message", "order-123", MsgPackUtil.asMsgPack("foo", TestJarExporter.FOO));
        WorkflowAssert.assertWorkflowInstancePayload((Record<WorkflowInstanceRecordValue>) this.testClient.receiveElementInState("receive-message", WorkflowInstanceIntent.ELEMENT_COMPLETED), "{'key':'order-123', 'foo':'bar'}");
    }

    @Test
    public void shouldCorrelateMessageIfPublishedBefore() {
        this.testClient.deploy(SINGLE_MESSAGE_WORKFLOW);
        this.testClient.publishMessage("message", "order-123", MsgPackUtil.asMsgPack("foo", TestJarExporter.FOO));
        this.testClient.createWorkflowInstance("process", MsgPackUtil.asMsgPack("key", "order-123"));
        WorkflowAssert.assertWorkflowInstancePayload((Record<WorkflowInstanceRecordValue>) this.testClient.receiveElementInState("receive-message", WorkflowInstanceIntent.ELEMENT_COMPLETED), "{'key':'order-123', 'foo':'bar'}");
    }

    @Test
    public void shouldCorrelateMessageIfCorrelationKeyIsANumber() {
        this.testClient.deploy(SINGLE_MESSAGE_WORKFLOW);
        this.testClient.publishMessage("message", "123", MsgPackUtil.asMsgPack("foo", TestJarExporter.FOO));
        this.testClient.createWorkflowInstance("process", MsgPackUtil.asMsgPack("key", Integer.valueOf(WorkflowInstanceStreamProcessorRule.WORKFLOW_KEY)));
        WorkflowAssert.assertWorkflowInstancePayload((Record<WorkflowInstanceRecordValue>) this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_COMPLETED, BpmnElementType.PROCESS), "{'key':123, 'foo':'bar'}");
    }

    @Test
    public void shouldCorrelateFirstPublishedMessage() {
        this.testClient.deploy(SINGLE_MESSAGE_WORKFLOW);
        this.testClient.publishMessage("message", "order-123", MsgPackUtil.asMsgPack("nr", 1));
        this.testClient.publishMessage("message", "order-123", MsgPackUtil.asMsgPack("nr", 2));
        this.testClient.createWorkflowInstance("process", MsgPackUtil.asMsgPack("key", "order-123"));
        WorkflowAssert.assertWorkflowInstancePayload((Record<WorkflowInstanceRecordValue>) this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_COMPLETED, BpmnElementType.PROCESS), "{'key':'order-123', 'nr':1}");
    }

    @Test
    public void shouldCorrelateMessageWithZeroTTL() {
        this.testClient.deploy(SINGLE_MESSAGE_WORKFLOW);
        long createWorkflowInstance = this.testClient.createWorkflowInstance("process", MsgPackUtil.asMsgPack("key", "order-123"));
        Assertions.assertThat(RecordingExporter.messageSubscriptionRecords(MessageSubscriptionIntent.OPENED).exists()).isTrue();
        this.testClient.publishMessage("message", "order-123", MsgPackUtil.asMsgPack("foo", TestJarExporter.FOO), 0L);
        Assertions.assertThat(this.testClient.receiveElementInState("receive-message", WorkflowInstanceIntent.ELEMENT_COMPLETED).getValue().getWorkflowInstanceKey()).isEqualTo(createWorkflowInstance);
    }

    @Test
    public void shouldNotCorrelateMessageAfterTTL() {
        this.testClient.deploy(SINGLE_MESSAGE_WORKFLOW);
        this.testClient.publishMessage("message", "order-123", MsgPackUtil.asMsgPack("nr", 1), 0L);
        this.testClient.publishMessage("message", "order-123", MsgPackUtil.asMsgPack("nr", 2), 10000L);
        this.testClient.createWorkflowInstance("process", MsgPackUtil.asMsgPack("key", "order-123"));
        WorkflowAssert.assertWorkflowInstancePayload((Record<WorkflowInstanceRecordValue>) this.testClient.receiveElementInState("receive-message", WorkflowInstanceIntent.ELEMENT_COMPLETED), "{'key':'order-123', 'nr':2}");
    }

    @Test
    public void shouldCorrelateMessageByCorrelationKey() {
        this.testClient.deploy(SINGLE_MESSAGE_WORKFLOW);
        long createWorkflowInstance = this.testClient.createWorkflowInstance("process", MsgPackUtil.asMsgPack("key", "order-123"));
        long createWorkflowInstance2 = this.testClient.createWorkflowInstance("process", MsgPackUtil.asMsgPack("key", "order-456"));
        this.testClient.publishMessage("message", "order-123", MsgPackUtil.asMsgPack("nr", 1));
        this.testClient.publishMessage("message", "order-456", MsgPackUtil.asMsgPack("nr", 2));
        WorkflowAssert.assertWorkflowInstancePayload((Record<WorkflowInstanceRecordValue>) this.testClient.receiveFirstWorkflowInstanceEvent(createWorkflowInstance, WorkflowInstanceIntent.ELEMENT_COMPLETED, BpmnElementType.INTERMEDIATE_CATCH_EVENT), "{'key':'order-123', 'nr':1}");
        WorkflowAssert.assertWorkflowInstancePayload((Record<WorkflowInstanceRecordValue>) this.testClient.receiveFirstWorkflowInstanceEvent(createWorkflowInstance2, WorkflowInstanceIntent.ELEMENT_COMPLETED, BpmnElementType.INTERMEDIATE_CATCH_EVENT), "{'key':'order-456', 'nr':2}");
    }

    @Test
    public void shouldCorrelateMessageToAllSubscriptions() {
        this.testClient.deploy(SINGLE_MESSAGE_WORKFLOW);
        long createWorkflowInstance = this.testClient.createWorkflowInstance("process", MsgPackUtil.asMsgPack("key", "order-123"));
        long createWorkflowInstance2 = this.testClient.createWorkflowInstance("process", MsgPackUtil.asMsgPack("key", "order-123"));
        this.testClient.publishMessage("message", "order-123");
        Assertions.assertThat((List) this.testClient.receiveWorkflowInstances().withIntent(WorkflowInstanceIntent.ELEMENT_COMPLETED).withElementId("receive-message").limit(2L).collect(Collectors.toList())).extracting(record -> {
            return Long.valueOf(record.getValue().getWorkflowInstanceKey());
        }).contains(new Long[]{Long.valueOf(createWorkflowInstance), Long.valueOf(createWorkflowInstance2)});
    }

    @Test
    public void shouldCorrelateMessageOnlyOnceIfPublishedBefore() {
        this.testClient.deploy(TWO_MESSAGES_WORKFLOW);
        this.testClient.publishMessage("ping", "123", MsgPackUtil.asMsgPack("nr", 1));
        this.testClient.publishMessage("ping", "123", MsgPackUtil.asMsgPack("nr", 2));
        this.testClient.createWorkflowInstance("process", MsgPackUtil.asMsgPack("key", "123"));
        Assertions.assertThat(RecordingExporter.workflowInstanceRecords(WorkflowInstanceIntent.ELEMENT_COMPLETED).filter(record -> {
            return record.getValue().getElementId().startsWith("message");
        }).limit(2L).asList()).extracting(record2 -> {
            return Assertions.tuple(new Object[]{record2.getValue().getElementId(), record2.getValue().getPayloadAsMap().get("nr")});
        }).contains(new Tuple[]{Assertions.tuple(new Object[]{"message1", 1}), Assertions.tuple(new Object[]{"message2", 2})});
    }

    @Test
    public void shouldCorrelateMessageOnlyOnceIfEnteredBefore() {
        this.testClient.deploy(TWO_MESSAGES_WORKFLOW);
        this.testClient.createWorkflowInstance("process", MsgPackUtil.asMsgPack("key", "123"));
        Assertions.assertThat(RecordingExporter.workflowInstanceSubscriptionRecords(WorkflowInstanceSubscriptionIntent.OPENED).exists()).isTrue();
        this.testClient.publishMessage("ping", "123", MsgPackUtil.asMsgPack("nr", 1));
        Assertions.assertThat(RecordingExporter.workflowInstanceSubscriptionRecords(WorkflowInstanceSubscriptionIntent.OPENED).limit(2L).count()).isEqualTo(2L);
        this.testClient.publishMessage("ping", "123", MsgPackUtil.asMsgPack("nr", 2));
        Assertions.assertThat(RecordingExporter.workflowInstanceRecords(WorkflowInstanceIntent.ELEMENT_COMPLETED).filter(record -> {
            return record.getValue().getElementId().startsWith("message");
        }).limit(2L).asList()).extracting(record2 -> {
            return Assertions.tuple(new Object[]{record2.getValue().getElementId(), record2.getValue().getPayloadAsMap().get("nr")});
        }).contains(new Tuple[]{Assertions.tuple(new Object[]{"message1", 1}), Assertions.tuple(new Object[]{"message2", 2})});
    }

    @Test
    public void shouldCorrelateMessageOnlyOnceToInstance() {
        this.testClient.deploy(Bpmn.createExecutableProcess("process").startEvent().parallelGateway().intermediateCatchEvent("message1").message(messageBuilder -> {
            messageBuilder.name("ping").zeebeCorrelationKey("$.key");
        }).moveToLastGateway().intermediateCatchEvent("message2").message(messageBuilder2 -> {
            messageBuilder2.name("ping").zeebeCorrelationKey("$.key");
        }).done());
        this.testClient.createWorkflowInstance("process", MsgPackUtil.asMsgPack("key", "123"));
        Assertions.assertThat(RecordingExporter.workflowInstanceSubscriptionRecords(WorkflowInstanceSubscriptionIntent.OPENED).limit(2L).count()).isEqualTo(2L);
        this.testClient.publishMessage("ping", "123", MsgPackUtil.asMsgPack("nr", 1));
        this.testClient.publishMessage("ping", "123", MsgPackUtil.asMsgPack("nr", 2));
        Assertions.assertThat(RecordingExporter.workflowInstanceRecords(WorkflowInstanceIntent.ELEMENT_COMPLETED).filter(record -> {
            return record.getValue().getElementId().startsWith("message");
        }).limit(2L).asList()).extracting(record2 -> {
            return record2.getValue().getPayloadAsMap().get("nr");
        }).contains(new Object[]{1, 2});
    }

    @Test
    public void shouldCorrelateOnlyOneMessagePerCatchElement() {
        this.testClient.deploy(TWO_MESSAGES_WORKFLOW);
        this.testClient.createWorkflowInstance("process", MsgPackUtil.asMsgPack("key", "123"));
        Assertions.assertThat(RecordingExporter.workflowInstanceSubscriptionRecords(WorkflowInstanceSubscriptionIntent.OPENED).exists()).isTrue();
        this.testClient.publishMessage("ping", "123", MsgPackUtil.asMsgPack("nr", 1));
        this.testClient.publishMessage("ping", "123", MsgPackUtil.asMsgPack("nr", 2));
        Assertions.assertThat(RecordingExporter.workflowInstanceRecords(WorkflowInstanceIntent.ELEMENT_COMPLETED).filter(record -> {
            return record.getValue().getElementId().startsWith("message");
        }).limit(2L).asList()).extracting(record2 -> {
            return Assertions.tuple(new Object[]{record2.getValue().getElementId(), record2.getValue().getPayloadAsMap().get("nr")});
        }).contains(new Tuple[]{Assertions.tuple(new Object[]{"message1", 1}), Assertions.tuple(new Object[]{"message2", 2})});
    }

    @Test
    public void shouldCorrelateCorrectBoundaryEvent() {
        this.testClient.deploy(BOUNDARY_EVENTS_WORKFLOW);
        this.testClient.createWorkflowInstance("process", MsgPackUtil.asMsgPack("key", "123"));
        awaitSubscriptionsOpened(3);
        this.testClient.publishMessage("msg1", "123", MsgPackUtil.asMsgPack("foo", 1));
        Assertions.assertThat(RecordingExporter.workflowInstanceRecords().limitToWorkflowInstanceCompleted()).filteredOn(record -> {
            return record.getMetadata().getIntent() == WorkflowInstanceIntent.ELEMENT_ACTIVATED;
        }).extracting((v0) -> {
            return v0.getValue();
        }).extracting((v0) -> {
            return v0.getElementId();
        }).contains(new String[]{"msg1End"}).doesNotContain(new String[]{"taskEnd", "msg2End"});
    }

    @Test
    public void shouldNotTriggerBoundaryEventIfReceiveTaskTriggeredFirst() {
        this.testClient.deploy(BOUNDARY_EVENTS_WORKFLOW);
        this.testClient.createWorkflowInstance("process", MsgPackUtil.asMsgPack("key", "123"));
        awaitSubscriptionsOpened(3);
        this.testClient.publishMessage("taskMsg", "123", MsgPackUtil.asMsgPack("foo", 1));
        Assertions.assertThat(RecordingExporter.workflowInstanceRecords().limitToWorkflowInstanceCompleted()).filteredOn(record -> {
            return record.getMetadata().getIntent() == WorkflowInstanceIntent.ELEMENT_ACTIVATED;
        }).extracting((v0) -> {
            return v0.getValue();
        }).extracting((v0) -> {
            return v0.getElementId();
        }).contains(new String[]{"taskEnd"}).doesNotContain(new String[]{"msg1End", "msg2End"});
    }

    @Test
    public void shouldNotTriggerReceiveTaskIfBoundaryEventTriggeredFirst() {
        this.testClient.deploy(BOUNDARY_EVENTS_WORKFLOW);
        this.testClient.createWorkflowInstance("process", MsgPackUtil.asMsgPack("key", "123"));
        awaitSubscriptionsOpened(3);
        this.testClient.publishMessage("msg2", "123", MsgPackUtil.asMsgPack("foo", 1));
        Assertions.assertThat(RecordingExporter.workflowInstanceRecords().limitToWorkflowInstanceCompleted()).filteredOn(record -> {
            return record.getMetadata().getIntent() == WorkflowInstanceIntent.ELEMENT_ACTIVATED;
        }).extracting((v0) -> {
            return v0.getValue();
        }).extracting((v0) -> {
            return v0.getElementId();
        }).contains(new String[]{"msg2End"}).doesNotContain(new String[]{"taskEnd", "msg1End"});
    }

    @Test
    public void testIntermediateMessageEventLifeCycle() {
        this.testClient.deploy(SINGLE_MESSAGE_WORKFLOW);
        this.testClient.publishMessage("message", "order-123");
        this.testClient.createWorkflowInstance("process", MsgPackUtil.asMsgPack("key", "order-123"));
        Assertions.assertThat((List) this.testClient.receiveWorkflowInstances().limitToWorkflowInstanceCompleted().collect(Collectors.toList())).filteredOn(record -> {
            return record.getValue().getElementId().equals("receive-message");
        }).extracting((v0) -> {
            return v0.getMetadata();
        }).extracting((v0) -> {
            return v0.getIntent();
        }).containsExactly(new Intent[]{WorkflowInstanceIntent.ELEMENT_ACTIVATING, WorkflowInstanceIntent.ELEMENT_ACTIVATED, WorkflowInstanceIntent.EVENT_OCCURRED, WorkflowInstanceIntent.ELEMENT_COMPLETING, WorkflowInstanceIntent.ELEMENT_COMPLETED});
    }

    @Test
    public void testReceiveTaskLifeCycle() {
        this.testClient.deploy(RECEIVE_TASK_WORKFLOW);
        this.testClient.publishMessage("message", "order-123");
        this.testClient.createWorkflowInstance("process", MsgPackUtil.asMsgPack("key", "order-123"));
        Assertions.assertThat((List) this.testClient.receiveWorkflowInstances().limitToWorkflowInstanceCompleted().collect(Collectors.toList())).filteredOn(record -> {
            return record.getValue().getElementId().equals("receive-message");
        }).extracting((v0) -> {
            return v0.getMetadata();
        }).extracting((v0) -> {
            return v0.getIntent();
        }).containsExactly(new Intent[]{WorkflowInstanceIntent.ELEMENT_ACTIVATING, WorkflowInstanceIntent.ELEMENT_ACTIVATED, WorkflowInstanceIntent.EVENT_OCCURRED, WorkflowInstanceIntent.ELEMENT_COMPLETING, WorkflowInstanceIntent.ELEMENT_COMPLETED});
    }

    @Test
    public void testBoundaryMessageEventLifecycle() {
        this.testClient.deploy(BOUNDARY_EVENTS_WORKFLOW);
        this.testClient.publishMessage("msg1", "order-123");
        this.testClient.createWorkflowInstance("process", MsgPackUtil.asMsgPack("key", "order-123"));
        Assertions.assertThat((List) this.testClient.receiveWorkflowInstances().limitToWorkflowInstanceCompleted().collect(Collectors.toList())).extracting(record -> {
            return Assertions.tuple(new Object[]{record.getValue().getElementId(), record.getMetadata().getIntent()});
        }).containsSequence(new Tuple[]{Assertions.tuple(new Object[]{"task", WorkflowInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple(new Object[]{"task", WorkflowInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"task", WorkflowInstanceIntent.EVENT_OCCURRED}), Assertions.tuple(new Object[]{"task", WorkflowInstanceIntent.ELEMENT_TERMINATING}), Assertions.tuple(new Object[]{"task", WorkflowInstanceIntent.ELEMENT_TERMINATED}), Assertions.tuple(new Object[]{"msg1", WorkflowInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple(new Object[]{"msg1", WorkflowInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"msg1", WorkflowInstanceIntent.ELEMENT_COMPLETING}), Assertions.tuple(new Object[]{"msg1", WorkflowInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldCorrelateToNonInterruptingBoundaryEvent() {
        this.testClient.deploy(Bpmn.createExecutableProcess("process").startEvent().serviceTask("task", serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeTaskType("type");
        }).boundaryEvent("msg1").cancelActivity(false).message(messageBuilder -> {
            messageBuilder.name("msg1").zeebeCorrelationKey("$.key");
        }).endEvent("msg1End").moveToActivity("task").endEvent("taskEnd").done());
        this.testClient.createWorkflowInstance("process", MsgPackUtil.asMsgPack("key", "123"));
        this.testClient.publishMessage("msg1", "123", MsgPackUtil.asMsgPack("foo", "0"));
        this.testClient.publishMessage("msg1", "123", MsgPackUtil.asMsgPack("foo", "1"));
        this.testClient.publishMessage("msg1", "123", MsgPackUtil.asMsgPack("foo", "2"));
        Assertions.assertThat(awaitMessagesCorrelated(3)).hasSize(3);
        Assertions.assertThat(RecordingExporter.workflowInstanceRecords(WorkflowInstanceIntent.ELEMENT_COMPLETED).withElementId("msg1").limit(3L).asList()).extracting(record -> {
            return record.getValue().getPayloadAsMap().get("foo");
        }).contains(new Object[]{"0", "1", "2"});
    }

    private List<Record<WorkflowInstanceSubscriptionRecordValue>> awaitMessagesCorrelated(int i) {
        return RecordingExporter.workflowInstanceSubscriptionRecords(WorkflowInstanceSubscriptionIntent.CORRELATED).limit(i).asList();
    }

    private List<Record<WorkflowInstanceSubscriptionRecordValue>> awaitSubscriptionsOpened(int i) {
        return this.testClient.receiveWorkflowInstanceSubscriptions().withIntent(WorkflowInstanceSubscriptionIntent.OPENED).limit(i).asList();
    }
}
