/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.builder.StartEventBuilder;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.protocol.record.intent.MessageStartEventSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessMessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.protocol.record.value.MessageRecordValue;
import io.camunda.zeebe.protocol.record.value.MessageStartEventSubscriptionRecordValue;
import io.camunda.zeebe.protocol.record.value.MessageSubscriptionRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessMessageSubscriptionRecordValue;
import io.camunda.zeebe.test.util.Strings;
import io.camunda.zeebe.test.util.record.MessageStartEventSubscriptionRecordStream;
import io.camunda.zeebe.test.util.record.MessageSubscriptionRecordStream;
import io.camunda.zeebe.test.util.record.ProcessInstanceRecordStream;
import io.camunda.zeebe.test.util.record.ProcessMessageSubscriptionRecordStream;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.OptionalAssert;
import org.assertj.core.groups.Tuple;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;

public class MessageMultiTenancyTest {
    @ClassRule
    public static final EngineRule ENGINE = EngineRule.singlePartition().maxCommandsInBatch(1);
    @Rule
    public final TestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();

    @Test
    public void shouldStartAndCompleteProcessWithMessageStartEvent() {
        String tenantId = "tenant" + String.valueOf(UUID.randomUUID());
        String processId = Strings.newRandomValidBpmnId();
        String messageName = "msg" + String.valueOf(UUID.randomUUID());
        ENGINE.deployment().withXmlResource(((StartEventBuilder)Bpmn.createExecutableProcess((String)processId).startEvent().message(messageName)).endEvent().done()).withTenantId(tenantId).deploy();
        ENGINE.message().withTenantId(tenantId).withName(messageName).withCorrelationKey("").publish();
        MessageMultiTenancyTest.assertMessagePublishedForTenantId(messageName, tenantId);
        MessageMultiTenancyTest.assertMessageStartEventSubscriptionCreatedForTenant(processId, messageName, tenantId);
        MessageMultiTenancyTest.assertMessageStartEventSubscriptionCorrelatedForTenant(processId, messageName, tenantId);
        MessageMultiTenancyTest.assertProcessInstanceCompleted(processId, tenantId);
    }

    @Test
    public void shouldStartAndCompleteProcessWithIntermediateCatchEvent() {
        String tenantId = "tenant" + String.valueOf(UUID.randomUUID());
        String processId = Strings.newRandomValidBpmnId();
        String messageName = "msg" + String.valueOf(UUID.randomUUID());
        String correlationKey = "corr" + UUID.randomUUID().toString().replace("-", "");
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess((String)processId).startEvent().intermediateCatchEvent("catch", c -> c.message(m -> m.name(messageName).zeebeCorrelationKeyExpression("key"))).endEvent().done()).withTenantId(tenantId).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(processId).withVariable("key", correlationKey).withTenantId(tenantId).create();
        ((MessageSubscriptionRecordStream)RecordingExporter.messageSubscriptionRecords((MessageSubscriptionIntent)MessageSubscriptionIntent.CREATED).withProcessInstanceKey(processInstanceKey).withMessageName(messageName).withCorrelationKey(correlationKey).limit(1L)).await();
        ENGINE.message().withTenantId(tenantId).withName(messageName).withCorrelationKey(correlationKey).publish();
        MessageMultiTenancyTest.assertMessagePublishedForTenantId(messageName, tenantId);
        MessageMultiTenancyTest.assertProcessMessageSubscriptionCreatedForTenantId(tenantId, messageName, processInstanceKey);
        MessageMultiTenancyTest.assertProcessMessageSubscriptionCorrelatedForTenantId(tenantId, messageName, processInstanceKey);
        MessageMultiTenancyTest.assertMessageSubscriptionCreatedForTenantId(tenantId, messageName, processInstanceKey);
        MessageMultiTenancyTest.assertMessageSubscriptionCorrelatedForTenantId(tenantId, messageName, processInstanceKey);
        MessageMultiTenancyTest.assertProcessInstanceCompleted(processId, tenantId);
    }

    @Test
    public void shouldNotCorrelateToMessageStartOfDifferentTenant() {
        String tenantId = "tenant" + String.valueOf(UUID.randomUUID());
        String otherTenant = "tenant" + String.valueOf(UUID.randomUUID());
        String processId = Strings.newRandomValidBpmnId();
        String messageName = "msg" + String.valueOf(UUID.randomUUID());
        ENGINE.deployment().withXmlResource(((StartEventBuilder)Bpmn.createExecutableProcess((String)processId).startEvent().message(messageName)).endEvent().done()).withTenantId(tenantId).deploy();
        ENGINE.message().withTenantId(otherTenant).withName(messageName).withCorrelationKey("").publish();
        MessageMultiTenancyTest.assertMessageStartEventSubscriptionCreatedForTenant(processId, messageName, tenantId);
        MessageMultiTenancyTest.assertMessagePublishedForTenantId(messageName, otherTenant);
        MessageMultiTenancyTest.assertMessageStartEventSubscriptionNotCorrelatedForTenant(processId, messageName, tenantId);
        MessageMultiTenancyTest.assertMessageStartEventSubscriptionNotCorrelatedForTenant(processId, messageName, otherTenant);
        MessageMultiTenancyTest.assertProcessInstanceNotCompleted(processId, tenantId);
    }

    @Test
    public void shouldNotCorrelateToIntermediateCatchEventOfDifferentTenant() {
        String tenantId = "tenant" + String.valueOf(UUID.randomUUID());
        String otherTenant = "tenant" + String.valueOf(UUID.randomUUID());
        String processId = Strings.newRandomValidBpmnId();
        String messageName = "msg" + String.valueOf(UUID.randomUUID());
        String correlationKey = "corr" + UUID.randomUUID().toString().replace("-", "");
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess((String)processId).startEvent().intermediateCatchEvent("catch", c -> c.message(m -> m.name(messageName).zeebeCorrelationKeyExpression("key"))).endEvent().done()).withTenantId(tenantId).deploy();
        long processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(processId).withVariable("key", correlationKey).withTenantId(tenantId).create();
        ENGINE.message().withTenantId(otherTenant).withName(messageName).withCorrelationKey(correlationKey).publish();
        MessageMultiTenancyTest.assertProcessMessageSubscriptionCreatedForTenantId(tenantId, messageName, processInstanceKey);
        MessageMultiTenancyTest.assertMessageSubscriptionCreatedForTenantId(tenantId, messageName, processInstanceKey);
        MessageMultiTenancyTest.assertMessagePublishedForTenantId(messageName, otherTenant);
        MessageMultiTenancyTest.assertProcessMessageSubscriptionNotCorrelatedForTenantId(tenantId, messageName, processInstanceKey);
        MessageMultiTenancyTest.assertProcessMessageSubscriptionNotCorrelatedForTenantId(otherTenant, messageName, processInstanceKey);
        MessageMultiTenancyTest.assertMessageSubscriptionNotCorrelatedForTenantId(tenantId, messageName, processInstanceKey);
        MessageMultiTenancyTest.assertMessageSubscriptionNotCorrelatedForTenantId(otherTenant, messageName, processInstanceKey);
        MessageMultiTenancyTest.assertProcessInstanceNotCompleted(processId, tenantId);
    }

    @Test
    public void shouldCorrelateBufferedMessageToCorrectTenant() {
        String tenantId = "tenant" + String.valueOf(UUID.randomUUID());
        String otherTenant = "otherTenant" + String.valueOf(UUID.randomUUID());
        String processId = Strings.newRandomValidBpmnId();
        String messageName = "msg" + String.valueOf(UUID.randomUUID());
        String correlationKey = "corr" + UUID.randomUUID().toString().replace("-", "");
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess((String)processId).startEvent().intermediateCatchEvent("catch", c -> c.message(m -> m.name(messageName).zeebeCorrelationKeyExpression("key"))).endEvent().done()).withTenantId(tenantId).deploy();
        ENGINE.deployment().withXmlResource(Bpmn.createExecutableProcess((String)processId).startEvent().intermediateCatchEvent("catch", c -> c.message(m -> m.name(messageName).zeebeCorrelationKeyExpression("key"))).endEvent().done()).withTenantId(otherTenant).deploy();
        ENGINE.message().withTenantId(tenantId).withName(messageName).withCorrelationKey(correlationKey).withTimeToLive(Duration.ofMinutes(5L)).publish();
        long processInstanceKeyOtherTenant = ENGINE.processInstance().ofBpmnProcessId(processId).withVariable("key", correlationKey).withTenantId(otherTenant).create();
        long processInstanceKeyTenant = ENGINE.processInstance().ofBpmnProcessId(processId).withVariable("key", correlationKey).withTenantId(tenantId).create();
        MessageMultiTenancyTest.assertMessagePublishedForTenantId(messageName, tenantId);
        MessageMultiTenancyTest.assertProcessMessageSubscriptionCreatedForTenantId(otherTenant, messageName, processInstanceKeyOtherTenant);
        MessageMultiTenancyTest.assertMessageSubscriptionCreatedForTenantId(tenantId, messageName, processInstanceKeyTenant);
        MessageMultiTenancyTest.assertMessageSubscriptionCreatedForTenantId(otherTenant, messageName, processInstanceKeyOtherTenant);
        MessageMultiTenancyTest.assertProcessMessageSubscriptionCorrelatedForTenantId(tenantId, messageName, processInstanceKeyTenant);
        MessageMultiTenancyTest.assertProcessMessageSubscriptionNotCorrelatedForTenantId(otherTenant, messageName, processInstanceKeyOtherTenant);
        MessageMultiTenancyTest.assertMessageSubscriptionCorrelatedForTenantId(tenantId, messageName, processInstanceKeyTenant);
        MessageMultiTenancyTest.assertMessageSubscriptionNotCorrelatedForTenantId(otherTenant, messageName, processInstanceKeyOtherTenant);
        MessageMultiTenancyTest.assertProcessInstanceCompleted(processId, tenantId);
        MessageMultiTenancyTest.assertProcessInstanceNotCompleted(processId, otherTenant);
    }

    private static void assertMessageSubscriptionCreatedForTenantId(String tenantId, String messageName, long processInstanceKey) {
        Assertions.assertThat((Stream)((MessageSubscriptionRecordStream)RecordingExporter.messageSubscriptionRecords().withIntent((Intent)MessageSubscriptionIntent.CREATED)).withProcessInstanceKey(processInstanceKey).withMessageName(messageName).limit(1L)).extracting(new Function[]{Record::getIntent, r -> ((MessageSubscriptionRecordValue)r.getValue()).getTenantId()}).containsExactly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{MessageSubscriptionIntent.CREATED, tenantId})});
    }

    private static void assertMessageSubscriptionCorrelatedForTenantId(String tenantId, String messageName, long processInstanceKey) {
        Assertions.assertThat((Stream)((MessageSubscriptionRecordStream)RecordingExporter.messageSubscriptionRecords().withIntents(new Intent[]{MessageSubscriptionIntent.CORRELATING, MessageSubscriptionIntent.CORRELATED})).withProcessInstanceKey(processInstanceKey).withMessageName(messageName).limit(2L)).extracting(new Function[]{Record::getIntent, r -> ((MessageSubscriptionRecordValue)r.getValue()).getTenantId()}).containsExactly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{MessageSubscriptionIntent.CORRELATING, tenantId}), Assertions.tuple((Object[])new Object[]{MessageSubscriptionIntent.CORRELATED, tenantId})});
    }

    private static void assertMessageSubscriptionNotCorrelatedForTenantId(String tenantId, String messageName, long processInstanceKey) {
        long finalPosition = MessageMultiTenancyTest.getFinalPosition();
        Assertions.assertThat((Stream)((MessageSubscriptionRecordStream)RecordingExporter.records().between(0L, finalPosition).messageSubscriptionRecords().withIntents(new Intent[]{MessageSubscriptionIntent.CORRELATING, MessageSubscriptionIntent.CORRELATED})).withProcessInstanceKey(processInstanceKey).withMessageName(messageName).withTenantId(tenantId)).isEmpty();
    }

    private static void assertProcessMessageSubscriptionCreatedForTenantId(String tenantId, String messageName, long processInstanceKey) {
        Assertions.assertThat((Stream)((ProcessMessageSubscriptionRecordStream)RecordingExporter.processMessageSubscriptionRecords().withIntents(new Intent[]{ProcessMessageSubscriptionIntent.CREATING, ProcessMessageSubscriptionIntent.CREATED})).withProcessInstanceKey(processInstanceKey).withMessageName(messageName).withTenantId(tenantId).limit(2L)).extracting(new Function[]{Record::getIntent, r -> ((ProcessMessageSubscriptionRecordValue)r.getValue()).getTenantId()}).containsExactly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{ProcessMessageSubscriptionIntent.CREATING, tenantId}), Assertions.tuple((Object[])new Object[]{ProcessMessageSubscriptionIntent.CREATED, tenantId})});
    }

    private static void assertProcessMessageSubscriptionCorrelatedForTenantId(String tenantId, String messageName, long processInstanceKey) {
        Assertions.assertThat((Stream)((ProcessMessageSubscriptionRecordStream)RecordingExporter.processMessageSubscriptionRecords().withIntent((Intent)ProcessMessageSubscriptionIntent.CORRELATED)).withProcessInstanceKey(processInstanceKey).withMessageName(messageName).withTenantId(tenantId).limit(1L)).extracting(new Function[]{Record::getIntent, r -> ((ProcessMessageSubscriptionRecordValue)r.getValue()).getTenantId()}).containsExactly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{ProcessMessageSubscriptionIntent.CORRELATED, tenantId})});
    }

    private static void assertProcessMessageSubscriptionNotCorrelatedForTenantId(String tenantId, String messageName, long processInstanceKey) {
        long finalPosition = MessageMultiTenancyTest.getFinalPosition();
        Assertions.assertThat((Stream)((ProcessMessageSubscriptionRecordStream)RecordingExporter.records().between(0L, finalPosition).processMessageSubscriptionRecords().withIntent((Intent)ProcessMessageSubscriptionIntent.CORRELATED)).withProcessInstanceKey(processInstanceKey).withMessageName(messageName).withTenantId(tenantId)).isEmpty();
    }

    private static void assertMessagePublishedForTenantId(String messageName, String tenantId) {
        ((OptionalAssert)Assertions.assertThat((Optional)RecordingExporter.messageRecords((MessageIntent)MessageIntent.PUBLISHED).withName(messageName).findFirst()).isPresent()).get().extracting(r -> ((MessageRecordValue)r.getValue()).getTenantId()).isEqualTo((Object)tenantId);
    }

    private static void assertMessageStartEventSubscriptionCreatedForTenant(String processId, String messageName, String tenantId) {
        Assertions.assertThat((Stream)((MessageStartEventSubscriptionRecordStream)RecordingExporter.messageStartEventSubscriptionRecords().withIntent((Intent)MessageStartEventSubscriptionIntent.CREATED)).withBpmnProcessId(processId).withMessageName(messageName).withTenantId(tenantId).limit(1L)).extracting(new Function[]{Record::getIntent, r -> ((MessageStartEventSubscriptionRecordValue)r.getValue()).getTenantId()}).containsExactly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{MessageStartEventSubscriptionIntent.CREATED, tenantId})});
    }

    private static void assertMessageStartEventSubscriptionCorrelatedForTenant(String processId, String messageName, String tenantId) {
        Assertions.assertThat((Stream)((MessageStartEventSubscriptionRecordStream)RecordingExporter.messageStartEventSubscriptionRecords().withIntent((Intent)MessageStartEventSubscriptionIntent.CORRELATED)).withBpmnProcessId(processId).withMessageName(messageName).withTenantId(tenantId).limit(1L)).extracting(new Function[]{Record::getIntent, r -> ((MessageStartEventSubscriptionRecordValue)r.getValue()).getTenantId()}).containsExactly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{MessageStartEventSubscriptionIntent.CORRELATED, tenantId})});
    }

    private static void assertMessageStartEventSubscriptionNotCorrelatedForTenant(String processId, String messageName, String tenantId) {
        long finalPosition = MessageMultiTenancyTest.getFinalPosition();
        Assertions.assertThat((Stream)((MessageStartEventSubscriptionRecordStream)RecordingExporter.records().between(0L, finalPosition).messageStartEventSubscriptionRecords().withIntent((Intent)MessageStartEventSubscriptionIntent.CORRELATED)).withBpmnProcessId(processId).withMessageName(messageName).withTenantId(tenantId)).isEmpty();
    }

    private static void assertProcessInstanceCompleted(String processId, String tenantId) {
        Assertions.assertThat((Stream)RecordingExporter.processInstanceRecords().withBpmnProcessId(processId).withTenantId(tenantId).limitToProcessInstanceCompleted()).isNotEmpty();
    }

    private static void assertProcessInstanceNotCompleted(String processId, String tenantId) {
        long finalPosition = MessageMultiTenancyTest.getFinalPosition();
        Assertions.assertThat((Stream)((ProcessInstanceRecordStream)RecordingExporter.records().between(0L, finalPosition).processInstanceRecords().withBpmnProcessId(processId).withTenantId(tenantId).withIntent((Intent)ProcessInstanceIntent.ELEMENT_COMPLETED)).withElementType(BpmnElementType.PROCESS)).isEmpty();
    }

    private static long getFinalPosition() {
        return ENGINE.decision().ofDecisionId(UUID.randomUUID().toString()).expectRejection().evaluate().getPosition();
    }
}

