package io.zeebe.broker.workflow;

import io.zeebe.broker.exporter.util.TestJarExporter;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessorTest;
import io.zeebe.broker.test.EmbeddedBrokerConfigurator;
import io.zeebe.broker.test.EmbeddedBrokerRule;
import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.model.bpmn.BpmnModelInstance;
import io.zeebe.msgpack.spec.MsgPackHelper;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.impl.SubscriptionUtil;
import io.zeebe.protocol.intent.DeploymentIntent;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.protocol.intent.MessageSubscriptionIntent;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.protocol.intent.WorkflowInstanceSubscriptionIntent;
import io.zeebe.test.broker.protocol.clientapi.ClientApiRule;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandRequestBuilder;
import io.zeebe.test.broker.protocol.clientapi.SubscribedRecord;
import io.zeebe.test.broker.protocol.clientapi.TestPartitionClient;
import io.zeebe.test.util.MsgPackUtil;
import io.zeebe.util.buffer.BufferUtil;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.agrona.DirectBuffer;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:io/zeebe/broker/workflow/MessageCorrelationTest.class */
public class MessageCorrelationTest {
    public EmbeddedBrokerRule brokerRule = new EmbeddedBrokerRule(EmbeddedBrokerConfigurator.setPartitionCount(3));
    public ClientApiRule apiRule;

    @Rule
    public RuleChain ruleChain;
    private static final BpmnModelInstance CATCH_EVENT_WORKFLOW = Bpmn.createExecutableProcess("wf").startEvent().intermediateCatchEvent("receive-message").message(messageBuilder -> {
        messageBuilder.name("order canceled").zeebeCorrelationKey("$.orderId");
    }).sequenceFlowId("to-end").endEvent().done();
    private static final BpmnModelInstance RECEIVE_TASK_WORKFLOW = Bpmn.createExecutableProcess("wf").startEvent().receiveTask("receive-message").message(messageBuilder -> {
        messageBuilder.name("order canceled").zeebeCorrelationKey("$.orderId");
    }).sequenceFlowId("to-end").endEvent().done();

    @Parameterized.Parameter(0)
    public String elementType;

    @Parameterized.Parameter(CreateDeploymentTest.PARTITION_ID)
    public BpmnModelInstance workflow;
    private TestPartitionClient testClient;

    public MessageCorrelationTest() {
        EmbeddedBrokerRule embeddedBrokerRule = this.brokerRule;
        embeddedBrokerRule.getClass();
        this.apiRule = new ClientApiRule(embeddedBrokerRule::getClientAddress);
        this.ruleChain = RuleChain.outerRule(this.brokerRule).around(this.apiRule);
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @Parameterized.Parameters(name = "{0}")
    public static final Object[][] parameters() {
        return new Object[]{new Object[]{"intermediate message catch event", CATCH_EVENT_WORKFLOW}, new Object[]{"receive task", RECEIVE_TASK_WORKFLOW}};
    }

    @Before
    public void init() {
        this.apiRule.waitForPartition(3);
        this.testClient = this.apiRule.partition();
        long deploy = this.testClient.deploy(this.workflow);
        this.testClient.receiveFirstDeploymentEvent(DeploymentIntent.CREATED, deploy);
        this.apiRule.partition(1).receiveFirstDeploymentEvent(DeploymentIntent.CREATED, deploy);
        this.apiRule.partition(2).receiveFirstDeploymentEvent(DeploymentIntent.CREATED, deploy);
    }

    @Test
    public void testWorkflowInstanceLifeCycle() {
        this.testClient.publishMessage("order canceled", "order-123");
        this.testClient.createWorkflowInstance("wf", MsgPackUtil.asMsgPack("orderId", "order-123"));
        Assertions.assertThat((List) this.testClient.receiveRecords().ofTypeWorkflowInstance().limit(11L).collect(Collectors.toList())).extracting((v0) -> {
            return v0.intent();
        }).containsExactly(new Intent[]{WorkflowInstanceIntent.CREATE, WorkflowInstanceIntent.CREATED, WorkflowInstanceIntent.ELEMENT_READY, WorkflowInstanceIntent.ELEMENT_ACTIVATED, WorkflowInstanceIntent.START_EVENT_OCCURRED, WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN, WorkflowInstanceIntent.ELEMENT_READY, WorkflowInstanceIntent.ELEMENT_ACTIVATED, WorkflowInstanceIntent.ELEMENT_COMPLETING, WorkflowInstanceIntent.ELEMENT_COMPLETED, WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN});
    }

    @Test
    public void shouldActivateElement() {
        Assertions.assertThat(this.testClient.receiveElementInState("receive-message", WorkflowInstanceIntent.ELEMENT_ACTIVATED).value()).containsEntry("bpmnProcessId", "wf").containsEntry("version", 1L).containsEntry("workflowInstanceKey", Long.valueOf(this.testClient.createWorkflowInstance("wf", MsgPackUtil.asMsgPack("orderId", "order-123")))).containsEntry("activityId", "receive-message");
    }

    @Test
    public void shouldOpenMessageSubscription() {
        long createWorkflowInstance = this.testClient.createWorkflowInstance("wf", MsgPackUtil.asMsgPack("orderId", "order-123"));
        SubscribedRecord receiveElementInState = this.testClient.receiveElementInState("receive-message", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        SubscribedRecord findMessageSubscription = findMessageSubscription(MessageSubscriptionIntent.OPENED);
        Assertions.assertThat(findMessageSubscription.valueType()).isEqualTo(ValueType.MESSAGE_SUBSCRIPTION);
        Assertions.assertThat(findMessageSubscription.recordType()).isEqualTo(RecordType.EVENT);
        Assertions.assertThat(findMessageSubscription.value()).containsExactly(new Map.Entry[]{Assertions.entry("workflowInstancePartitionId", Long.valueOf(receiveElementInState.partitionId())), Assertions.entry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)), Assertions.entry("activityInstanceKey", Long.valueOf(receiveElementInState.key())), Assertions.entry("messageName", "order canceled"), Assertions.entry("correlationKey", "order-123")});
    }

    @Test
    public void shouldOpenMessageSubscriptionsOnSamePartition() {
        TestPartitionClient partition = this.apiRule.partition(((Integer) this.apiRule.getPartitionIds().get(0)).intValue());
        TestPartitionClient partition2 = this.apiRule.partition(getPartitionId("order-123"));
        this.testClient.deploy(CATCH_EVENT_WORKFLOW);
        Assertions.assertThat((List) partition2.receiveEvents().filter(TestPartitionClient.intent(MessageSubscriptionIntent.OPENED)).limit(2L).collect(Collectors.toList())).extracting(subscribedRecord -> {
            return subscribedRecord.value().get("workflowInstanceKey");
        }).contains(new Object[]{Long.valueOf(partition.createWorkflowInstance("wf", MsgPackUtil.asMsgPack("orderId", "order-123"))), Long.valueOf(partition.createWorkflowInstance("wf", MsgPackUtil.asMsgPack("orderId", "order-123")))});
    }

    @Test
    public void shouldOpenWorkflowInstanceSubscription() {
        long createWorkflowInstance = this.testClient.createWorkflowInstance("wf", MsgPackUtil.asMsgPack("orderId", "order-123"));
        SubscribedRecord receiveElementInState = this.testClient.receiveElementInState("receive-message", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        SubscribedRecord subscribedRecord = (SubscribedRecord) this.testClient.receiveEvents().filter(TestPartitionClient.intent(WorkflowInstanceSubscriptionIntent.OPENED)).findFirst().orElseThrow(() -> {
            return new AssertionError("no workflow instance subscription event found");
        });
        Assertions.assertThat(subscribedRecord.valueType()).isEqualTo(ValueType.WORKFLOW_INSTANCE_SUBSCRIPTION);
        Assertions.assertThat(subscribedRecord.recordType()).isEqualTo(RecordType.EVENT);
        Assertions.assertThat(subscribedRecord.value()).containsExactly(new Map.Entry[]{Assertions.entry("subscriptionPartitionId", Long.valueOf(getPartitionId("order-123"))), Assertions.entry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)), Assertions.entry("activityInstanceKey", Long.valueOf(receiveElementInState.key())), Assertions.entry("messageName", "order canceled"), Assertions.entry("payload", MsgPackHelper.EMTPY_OBJECT)});
    }

    @Test
    public void shouldCorrelateMessageIfEnteredBefore() throws Exception {
        long createWorkflowInstance = this.testClient.createWorkflowInstance("wf", MsgPackUtil.asMsgPack("orderId", "order-123"));
        findMessageSubscription(MessageSubscriptionIntent.OPENED);
        this.testClient.publishMessage("order canceled", "order-123", MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, TestJarExporter.FOO));
        SubscribedRecord receiveFirstWorkflowInstanceEvent = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_COMPLETED);
        Assertions.assertThat(receiveFirstWorkflowInstanceEvent.value()).containsEntry("bpmnProcessId", "wf").containsEntry("version", 1L).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", "receive-message");
        MsgPackUtil.assertEquality((byte[]) receiveFirstWorkflowInstanceEvent.value().get("payload"), "{'orderId':'order-123', 'foo':'bar'}");
    }

    @Test
    public void shouldCorrelateMessageIfPublishedBefore() throws Exception {
        this.testClient.publishMessage("order canceled", "order-123", MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, TestJarExporter.FOO));
        long createWorkflowInstance = this.testClient.createWorkflowInstance("wf", MsgPackUtil.asMsgPack("orderId", "order-123"));
        SubscribedRecord receiveFirstWorkflowInstanceEvent = this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_COMPLETED);
        Assertions.assertThat(receiveFirstWorkflowInstanceEvent.value()).containsEntry("bpmnProcessId", "wf").containsEntry("version", 1L).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", "receive-message");
        MsgPackUtil.assertEquality((byte[]) receiveFirstWorkflowInstanceEvent.value().get("payload"), "{'orderId':'order-123', 'foo':'bar'}");
    }

    @Test
    public void shouldCorrelateFirstPublishedMessage() throws Exception {
        this.testClient.publishMessage("order canceled", "order-123", MsgPackUtil.asMsgPack("nr", "first"));
        this.testClient.publishMessage("order canceled", "order-123", MsgPackUtil.asMsgPack("nr", "second"));
        this.testClient.createWorkflowInstance("wf", MsgPackUtil.asMsgPack("orderId", "order-123"));
        MsgPackUtil.assertEquality((byte[]) this.testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_COMPLETED).value().get("payload"), "{'orderId':'order-123', 'nr':'first'}");
    }

    @Test
    public void shouldContinueInstanceAfteMessageIsCorrelated() {
        long createWorkflowInstance = this.testClient.createWorkflowInstance("wf", MsgPackUtil.asMsgPack("orderId", "order-123"));
        this.testClient.publishMessage("order canceled", "order-123");
        Assertions.assertThat(this.testClient.receiveFirstWorkflowInstanceEvent(createWorkflowInstance, "to-end", WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN).value()).containsEntry("bpmnProcessId", "wf").containsEntry("version", 1L).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityId", "to-end");
    }

    @Test
    public void shouldCorrelateMessageWithZeroTTL() throws Exception {
        long createWorkflowInstance = this.testClient.createWorkflowInstance("wf", MsgPackUtil.asMsgPack("orderId", "order-123"));
        findMessageSubscription(MessageSubscriptionIntent.OPENED);
        this.testClient.publishMessage("order canceled", "order-123", MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, TestJarExporter.FOO), 0L);
        Assertions.assertThat(this.testClient.receiveElementInState("receive-message", WorkflowInstanceIntent.ELEMENT_COMPLETED).value()).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance));
    }

    @Test
    public void shouldNotCorrelateMessageAfterTTL() throws Exception {
        this.testClient.publishMessage("order canceled", "order-123", MsgPackUtil.asMsgPack("nr", "first"), 0L);
        this.testClient.publishMessage("order canceled", "order-123", MsgPackUtil.asMsgPack("nr", "second"), 10000L);
        this.testClient.createWorkflowInstance("wf", MsgPackUtil.asMsgPack("orderId", "order-123"));
        MsgPackUtil.assertEquality((byte[]) this.testClient.receiveElementInState("receive-message", WorkflowInstanceIntent.ELEMENT_COMPLETED).value().get("payload"), "{'orderId':'order-123', 'nr':'second'}");
    }

    @Test
    public void shouldCorrelateMessageByCorrelationKey() throws Exception {
        long createWorkflowInstance = this.testClient.createWorkflowInstance("wf", MsgPackUtil.asMsgPack("orderId", "order-123"));
        long createWorkflowInstance2 = this.testClient.createWorkflowInstance("wf", MsgPackUtil.asMsgPack("orderId", "order-456"));
        this.testClient.publishMessage("order canceled", "order-123", MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, TestJarExporter.FOO));
        this.testClient.publishMessage("order canceled", "order-456", MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, "baz"));
        MsgPackUtil.assertEquality((byte[]) this.testClient.receiveFirstWorkflowInstanceEvent(createWorkflowInstance, WorkflowInstanceIntent.ELEMENT_COMPLETED).value().get("payload"), "{'orderId':'order-123', 'foo':'bar'}");
        MsgPackUtil.assertEquality((byte[]) this.testClient.receiveFirstWorkflowInstanceEvent(createWorkflowInstance2, WorkflowInstanceIntent.ELEMENT_COMPLETED).value().get("payload"), "{'orderId':'order-456', 'foo':'baz'}");
    }

    @Test
    public void shouldCorrelateMessageToAllSubscriptions() {
        long createWorkflowInstance = this.testClient.createWorkflowInstance("wf", MsgPackUtil.asMsgPack("orderId", "order-123"));
        long createWorkflowInstance2 = this.testClient.createWorkflowInstance("wf", MsgPackUtil.asMsgPack("orderId", "order-123"));
        this.testClient.publishMessage("order canceled", "order-123");
        Assertions.assertThat((List) this.testClient.receiveEvents().filter(TestPartitionClient.intent(WorkflowInstanceIntent.ELEMENT_COMPLETED)).filter(subscribedRecord -> {
            return "receive-message".equals(subscribedRecord.value().get("activityId"));
        }).limit(2L).collect(Collectors.toList())).extracting(subscribedRecord2 -> {
            return subscribedRecord2.value().get("workflowInstanceKey");
        }).contains(new Object[]{Long.valueOf(createWorkflowInstance), Long.valueOf(createWorkflowInstance2)});
    }

    @Test
    public void shouldCorrelateWorkflowInstanceSubscription() {
        long createWorkflowInstance = this.testClient.createWorkflowInstance("wf", MsgPackUtil.asMsgPack("orderId", "order-123"));
        SubscribedRecord receiveElementInState = this.testClient.receiveElementInState("receive-message", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        DirectBuffer asMsgPack = MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, TestJarExporter.FOO);
        int partitionId = this.testClient.publishMessage("order canceled", "order-123", asMsgPack).partitionId();
        SubscribedRecord subscribedRecord = (SubscribedRecord) this.testClient.receiveEvents().filter(TestPartitionClient.intent(WorkflowInstanceSubscriptionIntent.CORRELATED)).findFirst().orElseThrow(() -> {
            return new AssertionError("no subscription event found");
        });
        Assertions.assertThat(subscribedRecord.valueType()).isEqualTo(ValueType.WORKFLOW_INSTANCE_SUBSCRIPTION);
        Assertions.assertThat(subscribedRecord.recordType()).isEqualTo(RecordType.EVENT);
        Assertions.assertThat(subscribedRecord.value()).containsExactly(new Map.Entry[]{Assertions.entry("subscriptionPartitionId", Long.valueOf(partitionId)), Assertions.entry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)), Assertions.entry("activityInstanceKey", Long.valueOf(receiveElementInState.key())), Assertions.entry("messageName", "order canceled"), Assertions.entry("payload", asMsgPack.byteArray())});
    }

    @Test
    public void shouldCorrelateMessageSubscription() {
        long createWorkflowInstance = this.testClient.createWorkflowInstance("wf", MsgPackUtil.asMsgPack("orderId", "order-123"));
        SubscribedRecord receiveElementInState = this.testClient.receiveElementInState("receive-message", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        this.testClient.publishMessage("order canceled", "order-123", MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, TestJarExporter.FOO));
        SubscribedRecord subscribedRecord = (SubscribedRecord) this.testClient.receiveEvents().filter(TestPartitionClient.intent(MessageSubscriptionIntent.CORRELATED)).findFirst().orElseThrow(() -> {
            return new AssertionError("no subscription event found");
        });
        Assertions.assertThat(subscribedRecord.valueType()).isEqualTo(ValueType.MESSAGE_SUBSCRIPTION);
        Assertions.assertThat(subscribedRecord.recordType()).isEqualTo(RecordType.EVENT);
        Assertions.assertThat(subscribedRecord.value()).containsExactly(new Map.Entry[]{Assertions.entry("workflowInstancePartitionId", Long.valueOf(receiveElementInState.partitionId())), Assertions.entry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)), Assertions.entry("activityInstanceKey", Long.valueOf(receiveElementInState.key())), Assertions.entry("messageName", "order canceled"), Assertions.entry("correlationKey", "")});
    }

    @Test
    public void shouldRejectCorrelateCommand() throws Exception {
        long createWorkflowInstance = this.testClient.createWorkflowInstance("wf", MsgPackUtil.asMsgPack("orderId", "order-123"));
        SubscribedRecord receiveElementInState = this.testClient.receiveElementInState("receive-message", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.CANCEL).key(createWorkflowInstance).command().done()).send();
        this.testClient.receiveElementInState("wf", WorkflowInstanceIntent.ELEMENT_TERMINATED);
        this.testClient.publishMessage("order canceled", "order-123");
        Assertions.assertThat(((SubscribedRecord) this.testClient.receiveRejections().filter(TestPartitionClient.intent(WorkflowInstanceSubscriptionIntent.CORRELATE)).findFirst().orElseThrow(() -> {
            return new AssertionError("no rejection found");
        })).value()).containsEntry("workflowInstanceKey", Long.valueOf(createWorkflowInstance)).containsEntry("activityInstanceKey", Long.valueOf(receiveElementInState.key()));
    }

    private SubscribedRecord findMessageSubscription(MessageSubscriptionIntent messageSubscriptionIntent) throws AssertionError {
        return (SubscribedRecord) this.testClient.receiveEvents().filter(TestPartitionClient.intent(messageSubscriptionIntent)).findFirst().orElseThrow(() -> {
            return new AssertionError("no message subscription event found");
        });
    }

    private int getPartitionId(String str) {
        return SubscriptionUtil.getSubscriptionPartitionId(BufferUtil.wrapString(str), this.apiRule.getPartitionIds().size());
    }
}
