package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.processing.bpmn.BpmnEventTypeTest;
import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.protocol.record.Assertions;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.MessageCorrelationIntent;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.protocol.record.intent.MessageStartEventSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessMessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.VariableIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.protocol.record.value.MessageCorrelationRecordValue;
import io.camunda.zeebe.test.util.MsgPackUtil;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.util.List;
import java.util.function.Function;
import org.agrona.DirectBuffer;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:io/camunda/zeebe/engine/processing/message/CorrelateMessageTest.class */
public final class CorrelateMessageTest {
    public static final String CORRELATION_KEY = "correlationKey";
    public static final String MESSAGE_NAME = "messageName";

    @Rule
    public final EngineRule engine = EngineRule.singlePartition();

    @Rule
    public final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();

    @Test
    public void shouldNotCorrelateWhenNoSubscriptions() {
        this.engine.messageCorrelation().withCorrelationKey(CORRELATION_KEY).withName(MESSAGE_NAME).expectRejection().correlate();
        Assertions.assertThat(((Record) RecordingExporter.messageCorrelationRecords(MessageCorrelationIntent.CORRELATE).onlyCommandRejections().getFirst()).getValue()).hasName(MESSAGE_NAME).hasCorrelationKey(CORRELATION_KEY);
    }

    @Test
    public void shouldHaveCorrectCorrelatedLifecycleForStartEvent() {
        deployProcessWithMessageStartEvent();
        this.engine.messageCorrelation().withCorrelationKey(CORRELATION_KEY).withName(MESSAGE_NAME).correlate();
        org.assertj.core.api.Assertions.assertThat(RecordingExporter.records().limit(record -> {
            return record.getIntent().equals(MessageIntent.EXPIRED);
        }).filter(record2 -> {
            return List.of(ValueType.MESSAGE_CORRELATION, ValueType.MESSAGE, ValueType.MESSAGE_START_EVENT_SUBSCRIPTION).contains(record2.getValueType());
        })).extracting((v0) -> {
            return v0.getIntent();
        }).containsExactly(new Intent[]{MessageStartEventSubscriptionIntent.CREATED, MessageCorrelationIntent.CORRELATE, MessageIntent.PUBLISHED, MessageCorrelationIntent.CORRELATING, MessageStartEventSubscriptionIntent.CORRELATED, MessageCorrelationIntent.CORRELATED, MessageIntent.EXPIRED});
    }

    @Test
    public void shouldHaveCorrectCorrelatedLifeCycleForMessageEvent() {
        deployAndStartProcessWithIntermediaryMessageEvent(MESSAGE_NAME, CORRELATION_KEY);
        this.engine.messageCorrelation().withName(MESSAGE_NAME).withCorrelationKey(CORRELATION_KEY).correlate();
        org.assertj.core.api.Assertions.assertThat(RecordingExporter.records().limit(record -> {
            return record.getIntent().equals(MessageCorrelationIntent.CORRELATED);
        }).filter(record2 -> {
            return List.of(ValueType.MESSAGE_CORRELATION, ValueType.MESSAGE_SUBSCRIPTION, ValueType.PROCESS_MESSAGE_SUBSCRIPTION, ValueType.MESSAGE).contains(record2.getValueType());
        })).extracting((v0) -> {
            return v0.getIntent();
        }).containsSubsequence(new Intent[]{MessageCorrelationIntent.CORRELATE, MessageIntent.PUBLISHED, MessageSubscriptionIntent.CORRELATING, ProcessMessageSubscriptionIntent.CORRELATE, MessageIntent.EXPIRED, ProcessMessageSubscriptionIntent.CORRELATED, MessageSubscriptionIntent.CORRELATE, MessageSubscriptionIntent.CORRELATED, MessageCorrelationIntent.CORRELATED});
    }

    @Test
    public void shouldCorrelateMessageToStartEvent() {
        deployProcessWithMessageStartEvent();
        assertMessageIsCorrelated(this.engine.messageCorrelation().withCorrelationKey(CORRELATION_KEY).withName(MESSAGE_NAME).correlate());
    }

    @Test
    public void shouldCorrelateToMessageIntermediaryEvent() {
        deployAndStartProcessWithIntermediaryMessageEvent(MESSAGE_NAME, CORRELATION_KEY);
        assertMessageIsCorrelated(this.engine.messageCorrelation().withName(MESSAGE_NAME).withCorrelationKey(CORRELATION_KEY).correlate());
    }

    @Test
    public void shouldCorrelateToMessageBoundaryEvent() {
        deployAndStartProcessWithMessageBoundaryEvent(MESSAGE_NAME, CORRELATION_KEY);
        assertMessageIsCorrelated(this.engine.messageCorrelation().withName(MESSAGE_NAME).withCorrelationKey(CORRELATION_KEY).correlate());
    }

    @Test
    public void shouldRespondFirstProcessInstanceKeyWhenMultipleMessageStartEvent() {
        deployProcessWithMessageStartEvent("process1");
        deployProcessWithMessageStartEvent("process2");
        Record<MessageCorrelationRecordValue> correlate = this.engine.messageCorrelation().withName(MESSAGE_NAME).withCorrelationKey(CORRELATION_KEY).correlate();
        assertMessageIsCorrelated(correlate);
        org.assertj.core.api.Assertions.assertThat(correlate.getValue().getProcessInstanceKey()).isEqualTo(((Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED).filter(record -> {
            return record.getValue().getBpmnProcessId().equals("process1");
        }).getFirst()).getKey());
    }

    @Test
    public void shouldResponseWithCreatedProcessInstanceWhenCorrelatedToStartEventAndIntermediateEvent() {
        deployProcessWithMessageStartEvent("processStartEvent");
        long deployAndStartProcessWithIntermediaryMessageEvent = deployAndStartProcessWithIntermediaryMessageEvent(MESSAGE_NAME, CORRELATION_KEY);
        Record<MessageCorrelationRecordValue> correlate = this.engine.messageCorrelation().withName(MESSAGE_NAME).withCorrelationKey(CORRELATION_KEY).correlate();
        assertMessageIsCorrelated(correlate);
        org.assertj.core.api.Assertions.assertThat(correlate.getValue().getProcessInstanceKey()).isEqualTo(((Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED).filter(record -> {
            return record.getValue().getBpmnProcessId().equals("processStartEvent");
        }).getFirst()).getKey());
        org.assertj.core.api.Assertions.assertThat(RecordingExporter.processInstanceRecords().withProcessInstanceKey(deployAndStartProcessWithIntermediaryMessageEvent).limitToProcessInstanceCompleted()).describedAs("Has completed intermediary message process", new Object[0]).isNotEmpty();
    }

    @Test
    public void shouldCorrelateMessageWithVariablesToStartEvent() {
        deployProcessWithMessageStartEvent("processId");
        assertMessageIsCorrelated(this.engine.messageCorrelation().withCorrelationKey(CORRELATION_KEY).withName(MESSAGE_NAME).withVariables(MsgPackUtil.asMsgPack("foo", "bar")).correlate());
        org.assertj.core.api.Assertions.assertThat((Record) RecordingExporter.variableRecords(VariableIntent.CREATED).withScopeKey(((Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED).withBpmnProcessId("processId").filter(record -> {
            return record.getValue().getBpmnElementType().equals(BpmnElementType.PROCESS);
        }).getFirst()).getValue().getProcessInstanceKey()).getFirst()).extracting(new Function[]{record2 -> {
            return record2.getValue().getName();
        }, record3 -> {
            return record3.getValue().getValue();
        }}).containsExactly(new Object[]{"foo", "\"bar\""});
    }

    @Test
    public void shouldCorrelateMessageWithVariablesToIntermediaryEvent() {
        DirectBuffer asMsgPack = MsgPackUtil.asMsgPack("foo", "bar");
        long deployAndStartProcessWithIntermediaryMessageEvent = deployAndStartProcessWithIntermediaryMessageEvent(MESSAGE_NAME, CORRELATION_KEY);
        assertMessageIsCorrelated(this.engine.messageCorrelation().withCorrelationKey(CORRELATION_KEY).withName(MESSAGE_NAME).withVariables(asMsgPack).correlate());
        org.assertj.core.api.Assertions.assertThat((Record) RecordingExporter.variableRecords(VariableIntent.CREATED).withScopeKey(deployAndStartProcessWithIntermediaryMessageEvent).getFirst()).extracting(new Function[]{record -> {
            return record.getValue().getName();
        }, record2 -> {
            return record2.getValue().getValue();
        }}).containsExactly(new Object[]{"foo", "\"bar\""});
    }

    @Test
    public void shouldCorrelateMessageWithVariablesToBoundaryEvent() {
        DirectBuffer asMsgPack = MsgPackUtil.asMsgPack("foo", "bar");
        long deployAndStartProcessWithMessageBoundaryEvent = deployAndStartProcessWithMessageBoundaryEvent(MESSAGE_NAME, CORRELATION_KEY);
        assertMessageIsCorrelated(this.engine.messageCorrelation().withCorrelationKey(CORRELATION_KEY).withName(MESSAGE_NAME).withVariables(asMsgPack).correlate());
        org.assertj.core.api.Assertions.assertThat((Record) RecordingExporter.variableRecords(VariableIntent.CREATED).withScopeKey(((Record) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED).withProcessInstanceKey(deployAndStartProcessWithMessageBoundaryEvent).filter(record -> {
            return record.getValue().getBpmnElementType().equals(BpmnElementType.PROCESS);
        }).getFirst()).getValue().getProcessInstanceKey()).getFirst()).extracting(new Function[]{record2 -> {
            return record2.getValue().getName();
        }, record3 -> {
            return record3.getValue().getValue();
        }}).containsExactly(new Object[]{"foo", "\"bar\""});
    }

    @Test
    public void shouldNotCorrelateToMessageStartAndIntermediateCatchWithSameProcessId() {
        deployAndStartProcessWithMessageStartAndMessageIntermediaryEventInSameProcess("message", CORRELATION_KEY);
        this.engine.messageCorrelation().withName("message").withCorrelationKey(CORRELATION_KEY).correlate();
        org.assertj.core.api.Assertions.assertThat(RecordingExporter.messageSubscriptionRecords(MessageSubscriptionIntent.CORRELATED).withMessageName("message").withCorrelationKey(CORRELATION_KEY).limit(1L).exists()).isTrue();
        org.assertj.core.api.Assertions.assertThat(RecordingExporter.records().limit(record -> {
            return record.getIntent() == MessageIntent.EXPIRED;
        }).messageStartEventSubscriptionRecords().withMessageName("message").withIntent(MessageStartEventSubscriptionIntent.CORRELATED).exists()).isFalse();
    }

    private static void assertMessageIsCorrelated(Record<MessageCorrelationRecordValue> record) {
        Assertions.assertThat(record).hasIntent(MessageCorrelationIntent.CORRELATED).hasRecordType(RecordType.EVENT).hasValueType(ValueType.MESSAGE_CORRELATION);
        Assertions.assertThat(record.getValue()).hasCorrelationKey(CORRELATION_KEY).hasName(MESSAGE_NAME).hasTenantId("<default>");
    }

    private static void assertMessageIsNotCorrelated(Record<MessageCorrelationRecordValue> record) {
        Assertions.assertThat(record).hasIntent(MessageCorrelationIntent.NOT_CORRELATED).hasRecordType(RecordType.EVENT).hasValueType(ValueType.MESSAGE_CORRELATION);
        Assertions.assertThat(record.getValue()).hasCorrelationKey(CORRELATION_KEY).hasName(MESSAGE_NAME).hasTenantId("<default>");
    }

    private void deployProcessWithMessageStartEvent(String str) {
        this.engine.deployment().withXmlResource(Bpmn.createExecutableProcess(str).startEvent().message(MESSAGE_NAME).endEvent().done()).deploy();
    }

    private void deployProcessWithMessageStartEvent() {
        deployProcessWithMessageStartEvent("process");
    }

    private long deployAndStartProcessWithIntermediaryMessageEvent(String str, String str2) {
        this.engine.deployment().withXmlResource(Bpmn.createExecutableProcess("process").startEvent().intermediateCatchEvent("msg", intermediateCatchEventBuilder -> {
            intermediateCatchEventBuilder.message(messageBuilder -> {
                messageBuilder.name(str).zeebeCorrelationKey("=\"%s\"".formatted(str2));
            });
        }).endEvent().done()).deploy();
        return this.engine.processInstance().ofBpmnProcessId("process").create();
    }

    private long deployAndStartProcessWithMessageBoundaryEvent(String str, String str2) {
        this.engine.deployment().withXmlResource(Bpmn.createExecutableProcess("process").startEvent().userTask().boundaryEvent("msg", boundaryEventBuilder -> {
            boundaryEventBuilder.message(messageBuilder -> {
                messageBuilder.name(str).zeebeCorrelationKey("=\"%s\"".formatted(str2));
            });
        }).endEvent().done()).deploy();
        return this.engine.processInstance().ofBpmnProcessId("process").create();
    }

    private void deployAndStartProcessWithMessageStartAndMessageIntermediaryEventInSameProcess(String str, String str2) {
        this.engine.deployment().withXmlResource(Bpmn.createExecutableProcess("process").startEvent("msgStart").message(str).endEvent().moveToProcess("process").startEvent().intermediateCatchEvent("msgCatch").message(messageBuilder -> {
            messageBuilder.name(str).zeebeCorrelationKeyExpression(BpmnEventTypeTest.CORRELATION_KEY);
        }).endEvent().done()).deploy();
        this.engine.processInstance().ofBpmnProcessId("process").withVariable(BpmnEventTypeTest.CORRELATION_KEY, str2).create();
    }
}
