package io.zeebe.broker.workflow.message;

import io.zeebe.broker.test.EmbeddedBrokerConfigurator;
import io.zeebe.broker.test.EmbeddedBrokerRule;
import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.model.bpmn.BpmnModelInstance;
import io.zeebe.protocol.impl.SubscriptionUtil;
import io.zeebe.protocol.intent.MessageSubscriptionIntent;
import io.zeebe.test.broker.protocol.clientapi.ClientApiRule;
import io.zeebe.test.broker.protocol.clientapi.PartitionTestClient;
import io.zeebe.test.util.MsgPackUtil;
import io.zeebe.test.util.record.RecordingExporter;
import io.zeebe.test.util.record.WorkflowInstances;
import io.zeebe.util.buffer.BufferUtil;
import java.util.Arrays;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.assertj.core.groups.Tuple;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

/* loaded from: input_file:io/zeebe/broker/workflow/message/MessageCorrelationMultiplePartitionsTest.class */
public class MessageCorrelationMultiplePartitionsTest {
    private static final String CORRELATION_KEY_PARTITION_0 = "item-2";
    private static final String CORRELATION_KEY_PARTITION_1 = "item-1";
    private static final String CORRELATION_KEY_PARTITION_2 = "item-0";
    private static final String PROCESS_ID = "process";
    private static final BpmnModelInstance WORKFLOW = Bpmn.createExecutableProcess("process").startEvent().intermediateCatchEvent("receive-message").message(messageBuilder -> {
        messageBuilder.name("message").zeebeCorrelationKey("key");
    }).endEvent("end").done();
    public EmbeddedBrokerRule brokerRule = new EmbeddedBrokerRule(EmbeddedBrokerConfigurator.setPartitionCount(3));
    public ClientApiRule apiRule;

    @Rule
    public RuleChain ruleChain;
    private PartitionTestClient testClient;

    public MessageCorrelationMultiplePartitionsTest() {
        EmbeddedBrokerRule embeddedBrokerRule = this.brokerRule;
        embeddedBrokerRule.getClass();
        this.apiRule = new ClientApiRule(embeddedBrokerRule::getClientAddress);
        this.ruleChain = RuleChain.outerRule(this.brokerRule).around(this.apiRule);
    }

    @Before
    public void init() {
        Assertions.assertThat(getPartitionId(CORRELATION_KEY_PARTITION_0)).isEqualTo(0);
        Assertions.assertThat(getPartitionId(CORRELATION_KEY_PARTITION_1)).isEqualTo(1);
        Assertions.assertThat(getPartitionId(CORRELATION_KEY_PARTITION_2)).isEqualTo(2);
        this.testClient = this.apiRule.partitionClient();
        this.testClient.deploy(WORKFLOW);
    }

    @Test
    public void shouldOpenMessageSubscriptionsOnDifferentPartitions() {
        IntStream.range(0, 10).forEach(i -> {
            this.testClient.createWorkflowInstance(workflowInstanceCreationRecord -> {
                return workflowInstanceCreationRecord.setBpmnProcessId("process").setVariables(MsgPackUtil.asMsgPack("key", CORRELATION_KEY_PARTITION_0));
            }).getInstanceKey();
            this.testClient.createWorkflowInstance(workflowInstanceCreationRecord2 -> {
                return workflowInstanceCreationRecord2.setBpmnProcessId("process").setVariables(MsgPackUtil.asMsgPack("key", CORRELATION_KEY_PARTITION_1));
            }).getInstanceKey();
            this.testClient.createWorkflowInstance(workflowInstanceCreationRecord3 -> {
                return workflowInstanceCreationRecord3.setBpmnProcessId("process").setVariables(MsgPackUtil.asMsgPack("key", CORRELATION_KEY_PARTITION_2));
            }).getInstanceKey();
        });
        Assertions.assertThat(RecordingExporter.messageSubscriptionRecords(MessageSubscriptionIntent.OPENED).limit(30L)).extracting(record -> {
            return Assertions.tuple(new Object[]{Integer.valueOf(record.getMetadata().getPartitionId()), record.getValue().getCorrelationKey()});
        }).containsOnly(new Tuple[]{Assertions.tuple(new Object[]{0, CORRELATION_KEY_PARTITION_0}), Assertions.tuple(new Object[]{1, CORRELATION_KEY_PARTITION_1}), Assertions.tuple(new Object[]{2, CORRELATION_KEY_PARTITION_2})});
    }

    @Test
    public void shouldCorrelateMessageOnDifferentPartitions() {
        this.apiRule.partitionClient(0).publishMessage("message", CORRELATION_KEY_PARTITION_0, MsgPackUtil.asMsgPack("p", "p0"));
        this.apiRule.partitionClient(1).publishMessage("message", CORRELATION_KEY_PARTITION_1, MsgPackUtil.asMsgPack("p", "p1"));
        this.apiRule.partitionClient(2).publishMessage("message", CORRELATION_KEY_PARTITION_2, MsgPackUtil.asMsgPack("p", "p2"));
        Assertions.assertThat(Arrays.asList((String) WorkflowInstances.getCurrentVariables(this.testClient.createWorkflowInstance(workflowInstanceCreationRecord -> {
            return workflowInstanceCreationRecord.setBpmnProcessId("process").setVariables(MsgPackUtil.asMsgPack("key", CORRELATION_KEY_PARTITION_0));
        }).getInstanceKey()).get("p"), (String) WorkflowInstances.getCurrentVariables(this.testClient.createWorkflowInstance(workflowInstanceCreationRecord2 -> {
            return workflowInstanceCreationRecord2.setBpmnProcessId("process").setVariables(MsgPackUtil.asMsgPack("key", CORRELATION_KEY_PARTITION_1));
        }).getInstanceKey()).get("p"), (String) WorkflowInstances.getCurrentVariables(this.testClient.createWorkflowInstance(workflowInstanceCreationRecord3 -> {
            return workflowInstanceCreationRecord3.setBpmnProcessId("process").setVariables(MsgPackUtil.asMsgPack("key", CORRELATION_KEY_PARTITION_2));
        }).getInstanceKey()).get("p"))).contains(new String[]{"\"p0\"", "\"p1\"", "\"p2\""});
    }

    private int getPartitionId(String str) {
        return SubscriptionUtil.getSubscriptionPartitionId(BufferUtil.wrapString(str), this.apiRule.getPartitionIds().size());
    }
}
