/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.engine.util.client.ProcessInstanceClient;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.builder.IntermediateCatchEventBuilder;
import io.camunda.zeebe.protocol.impl.SubscriptionUtil;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.value.MessageSubscriptionRecordValue;
import io.camunda.zeebe.test.util.MsgPackUtil;
import io.camunda.zeebe.test.util.collection.Maps;
import io.camunda.zeebe.test.util.record.MessageSubscriptionRecordStream;
import io.camunda.zeebe.test.util.record.ProcessInstances;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.agrona.DirectBuffer;
import org.assertj.core.api.AbstractListAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.data.MapEntry;
import org.assertj.core.groups.Tuple;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public final class MessageCorrelationMultiplePartitionsTest {
    private static final Map<Integer, String> CORRELATION_KEYS = Maps.of((Map.Entry[])new Map.Entry[]{MapEntry.entry((Object)1, (Object)"item-2"), MapEntry.entry((Object)2, (Object)"item-1"), MapEntry.entry((Object)3, (Object)"item-0")});
    private static final String PROCESS_ID = "process";
    private static final BpmnModelInstance PROCESS = ((IntermediateCatchEventBuilder)Bpmn.createExecutableProcess((String)"process").startEvent().intermediateCatchEvent("receive-message").message(m -> m.name("message").zeebeCorrelationKeyExpression("key"))).endEvent("end").done();
    @Rule
    public final EngineRule engine = EngineRule.multiplePartition(3);

    @Before
    public void init() {
        Assertions.assertThat((int)this.getPartitionId(CORRELATION_KEYS.get(1))).isEqualTo(1);
        Assertions.assertThat((int)this.getPartitionId(CORRELATION_KEYS.get(2))).isEqualTo(2);
        Assertions.assertThat((int)this.getPartitionId(CORRELATION_KEYS.get(3))).isEqualTo(3);
        this.engine.deployment().withXmlResource(PROCESS).deploy();
    }

    @Test
    public void shouldOpenMessageSubscriptionsOnDifferentPartitions() {
        IntStream.range(0, 10).forEach(i -> {
            ProcessInstanceClient.ProcessInstanceCreationClient processInstanceCreationClient = this.engine.processInstance().ofBpmnProcessId(PROCESS_ID);
            processInstanceCreationClient.withVariable("key", CORRELATION_KEYS.get(1)).create();
            processInstanceCreationClient.withVariable("key", CORRELATION_KEYS.get(2)).create();
            processInstanceCreationClient.withVariable("key", CORRELATION_KEYS.get(3)).create();
        });
        Assertions.assertThat((Stream)RecordingExporter.messageSubscriptionRecords((MessageSubscriptionIntent)MessageSubscriptionIntent.CREATED).limit(30L)).extracting(r -> Assertions.tuple((Object[])new Object[]{r.getPartitionId(), ((MessageSubscriptionRecordValue)r.getValue()).getCorrelationKey()})).containsOnly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{1, CORRELATION_KEYS.get(1)}), Assertions.tuple((Object[])new Object[]{2, CORRELATION_KEYS.get(2)}), Assertions.tuple((Object[])new Object[]{3, CORRELATION_KEYS.get(3)})});
    }

    @Test
    public void shouldCorrelateMessageOnDifferentPartitions() {
        this.engine.forEachPartition(partitionId -> this.engine.message().onPartition((int)partitionId).withName("message").withCorrelationKey(CORRELATION_KEYS.get(partitionId)).withVariables(MsgPackUtil.asMsgPack((String)"p", (Object)("p" + partitionId))).publish());
        ProcessInstanceClient.ProcessInstanceCreationClient processInstanceCreationClient = this.engine.processInstance().ofBpmnProcessId(PROCESS_ID);
        long processInstanceKey1 = processInstanceCreationClient.withVariable("key", CORRELATION_KEYS.get(1)).create();
        long processInstanceKey2 = processInstanceCreationClient.withVariable("key", CORRELATION_KEYS.get(2)).create();
        long processInstanceKey3 = processInstanceCreationClient.withVariable("key", CORRELATION_KEYS.get(3)).create();
        List<String> correlatedValues = Arrays.asList((String)ProcessInstances.getCurrentVariables((long)processInstanceKey1).get("p"), (String)ProcessInstances.getCurrentVariables((long)processInstanceKey2).get("p"), (String)ProcessInstances.getCurrentVariables((long)processInstanceKey3).get("p"));
        Assertions.assertThat(correlatedValues).contains((Object[])new String[]{"\"p1\"", "\"p2\"", "\"p3\""});
    }

    @Test
    public void shouldOpenMessageSubscriptionsOnSamePartitionsAfterRestart() {
        ProcessInstanceClient.ProcessInstanceCreationClient processInstanceCreationClient = this.engine.processInstance().ofBpmnProcessId(PROCESS_ID);
        IntStream.range(0, 5).forEach(i -> {
            processInstanceCreationClient.withVariable("key", CORRELATION_KEYS.get(1)).create();
            processInstanceCreationClient.withVariable("key", CORRELATION_KEYS.get(2)).create();
            processInstanceCreationClient.withVariable("key", CORRELATION_KEYS.get(3)).create();
        });
        Assertions.assertThat((long)((MessageSubscriptionRecordStream)RecordingExporter.messageSubscriptionRecords((MessageSubscriptionIntent)MessageSubscriptionIntent.CREATED).limit(15L)).count()).isEqualTo(15L);
        this.engine.stop();
        RecordingExporter.reset();
        this.engine.start();
        IntStream.range(0, 5).forEach(i -> {
            processInstanceCreationClient.withVariable("key", CORRELATION_KEYS.get(1)).create();
            processInstanceCreationClient.withVariable("key", CORRELATION_KEYS.get(2)).create();
            processInstanceCreationClient.withVariable("key", CORRELATION_KEYS.get(3)).create();
        });
        ((AbstractListAssert)Assertions.assertThat((Stream)RecordingExporter.messageSubscriptionRecords((MessageSubscriptionIntent)MessageSubscriptionIntent.CREATED).limit(30L)).extracting(r -> Assertions.tuple((Object[])new Object[]{r.getPartitionId(), ((MessageSubscriptionRecordValue)r.getValue()).getCorrelationKey()})).hasSize(30)).containsOnly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{1, CORRELATION_KEYS.get(1)}), Assertions.tuple((Object[])new Object[]{2, CORRELATION_KEYS.get(2)}), Assertions.tuple((Object[])new Object[]{3, CORRELATION_KEYS.get(3)})});
    }

    private int getPartitionId(String correlationKey) {
        List<Integer> partitionIds = this.engine.getPartitionIds();
        return SubscriptionUtil.getSubscriptionPartitionId((DirectBuffer)BufferUtil.wrapString((String)correlationKey), (int)partitionIds.size());
    }
}

