/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.EngineConfiguration;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnBehaviors;
import io.camunda.zeebe.engine.processing.message.MessageEventProcessors;
import io.camunda.zeebe.engine.processing.message.MessageObserver;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.engine.state.mutable.MutableProcessingState;
import io.camunda.zeebe.engine.util.StreamProcessorRule;
import io.camunda.zeebe.engine.util.TypedRecordStream;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageSubscriptionRecord;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessMessageSubscriptionIntent;
import io.camunda.zeebe.stream.api.InterPartitionCommandSender;
import io.camunda.zeebe.stream.api.ProcessingResultBuilder;
import io.camunda.zeebe.test.util.MsgPackUtil;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.util.FeatureFlags;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.time.Duration;
import java.util.function.Supplier;
import org.agrona.DirectBuffer;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public final class MessageStreamProcessorTest {
    private static final EngineConfiguration DEFAULT_ENGINE_CONFIGURATION = new EngineConfiguration();
    private static final String DEFAULT_TENANT = "<default>";
    @Rule
    public final StreamProcessorRule rule = new StreamProcessorRule();
    private SubscriptionCommandSender spySubscriptionCommandSender;
    private InterPartitionCommandSender mockInterpartitionCommandSender;

    @Before
    public void setup() {
        this.mockInterpartitionCommandSender = (InterPartitionCommandSender)Mockito.mock(InterPartitionCommandSender.class);
        ProcessingResultBuilder mockProcessingResultBuilder = (ProcessingResultBuilder)Mockito.mock(ProcessingResultBuilder.class);
        EventAppliers mockEventAppliers = (EventAppliers)Mockito.mock(EventAppliers.class);
        Writers writers = new Writers(() -> mockProcessingResultBuilder, (EventApplier)mockEventAppliers);
        this.spySubscriptionCommandSender = (SubscriptionCommandSender)Mockito.spy((Object)new SubscriptionCommandSender(1, this.mockInterpartitionCommandSender));
        this.spySubscriptionCommandSender.setWriters(writers);
        this.rule.startTypedStreamProcessor((typedRecordProcessors, processingContext) -> {
            MutableProcessingState processingState = processingContext.getProcessingState();
            Supplier scheduledTaskState = processingContext.getScheduledTaskStateFactory();
            MessageEventProcessors.addMessageProcessors((BpmnBehaviors)((BpmnBehaviors)Mockito.mock(BpmnBehaviors.class)), (TypedRecordProcessors)typedRecordProcessors, (MutableProcessingState)processingState, (Supplier)scheduledTaskState, (SubscriptionCommandSender)this.spySubscriptionCommandSender, (Writers)processingContext.getWriters(), (EngineConfiguration)DEFAULT_ENGINE_CONFIGURATION, (FeatureFlags)FeatureFlags.createDefault());
            return typedRecordProcessors;
        });
    }

    @Test
    public void shouldRejectDuplicatedOpenMessageSubscription() {
        MessageSubscriptionRecord subscription = this.messageSubscription();
        this.rule.writeCommand((Intent)MessageSubscriptionIntent.CREATE, (UnifiedRecordValue)subscription);
        this.rule.writeCommand((Intent)MessageSubscriptionIntent.CREATE, (UnifiedRecordValue)subscription);
        Record<MessageSubscriptionRecord> rejection = this.awaitAndGetFirstSubscriptionRejection();
        Assertions.assertThat((Object)rejection.getIntent()).isEqualTo((Object)MessageSubscriptionIntent.CREATE);
        Assertions.assertThat((Comparable)rejection.getRejectionType()).isEqualTo((Object)RejectionType.INVALID_STATE);
        ((SubscriptionCommandSender)Mockito.verify((Object)this.spySubscriptionCommandSender, (VerificationMode)Mockito.timeout((long)5000L).times(2))).openProcessMessageSubscription(ArgumentMatchers.eq((long)subscription.getProcessInstanceKey()), ArgumentMatchers.eq((long)subscription.getElementInstanceKey()), (DirectBuffer)ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), (String)ArgumentMatchers.eq((Object)DEFAULT_TENANT));
    }

    @Test
    public void shouldRetryToCorrelateMessageSubscriptionAfterPublishedMessage() {
        MessageSubscriptionRecord subscription = this.messageSubscription();
        MessageRecord message = this.message();
        this.rule.writeCommand((Intent)MessageSubscriptionIntent.CREATE, (UnifiedRecordValue)subscription);
        this.rule.writeCommand((Intent)MessageIntent.PUBLISH, (UnifiedRecordValue)message);
        TestUtil.waitUntil(() -> this.rule.events().onlyMessageRecords().withIntent((Intent)MessageIntent.PUBLISHED).exists());
        Awaitility.await((String)"retry correlation").untilAsserted(() -> {
            this.rule.getClock().addTime(MessageObserver.SUBSCRIPTION_CHECK_INTERVAL.plus(MessageObserver.SUBSCRIPTION_TIMEOUT));
            ((InterPartitionCommandSender)Mockito.verify((Object)this.mockInterpartitionCommandSender, (VerificationMode)Mockito.timeout((long)100L).times(2))).sendCommand(ArgumentMatchers.eq((int)0), (ValueType)ArgumentMatchers.eq((Object)ValueType.PROCESS_MESSAGE_SUBSCRIPTION), (Intent)ArgumentMatchers.eq((Object)ProcessMessageSubscriptionIntent.CORRELATE), (UnifiedRecordValue)ArgumentMatchers.any());
        });
    }

    @Test
    public void shouldRetryToCorrelateMessageSubscriptionAfterOpenedSubscription() {
        MessageSubscriptionRecord subscription = this.messageSubscription();
        MessageRecord message = this.message();
        this.rule.writeCommand((Intent)MessageIntent.PUBLISH, (UnifiedRecordValue)message);
        this.rule.writeCommand((Intent)MessageSubscriptionIntent.CREATE, (UnifiedRecordValue)subscription);
        TestUtil.waitUntil(() -> this.rule.events().onlyMessageSubscriptionRecords().withIntent((Intent)MessageSubscriptionIntent.CREATED).exists());
        Awaitility.await((String)"retry correlation").untilAsserted(() -> {
            this.rule.getClock().addTime(MessageObserver.SUBSCRIPTION_CHECK_INTERVAL.plus(MessageObserver.SUBSCRIPTION_TIMEOUT));
            ((InterPartitionCommandSender)Mockito.verify((Object)this.mockInterpartitionCommandSender, (VerificationMode)Mockito.timeout((long)100L).times(2))).sendCommand(ArgumentMatchers.eq((int)0), (ValueType)ArgumentMatchers.eq((Object)ValueType.PROCESS_MESSAGE_SUBSCRIPTION), (Intent)ArgumentMatchers.eq((Object)ProcessMessageSubscriptionIntent.CORRELATE), (UnifiedRecordValue)ArgumentMatchers.any());
        });
    }

    @Test
    public void shouldRejectCorrelateIfMessageSubscriptionClosed() {
        MessageSubscriptionRecord subscription = this.messageSubscription();
        MessageRecord message = this.message();
        this.rule.writeCommand((Intent)MessageIntent.PUBLISH, (UnifiedRecordValue)message);
        this.rule.writeCommand((Intent)MessageSubscriptionIntent.CREATE, (UnifiedRecordValue)subscription);
        TestUtil.waitUntil(() -> this.rule.events().onlyMessageSubscriptionRecords().withIntent((Intent)MessageSubscriptionIntent.CREATED).exists());
        this.rule.writeCommand((Intent)MessageSubscriptionIntent.DELETE, (UnifiedRecordValue)subscription);
        this.rule.writeCommand((Intent)MessageSubscriptionIntent.CORRELATE, (UnifiedRecordValue)subscription);
        Record<MessageSubscriptionRecord> rejection = this.awaitAndGetFirstSubscriptionRejection();
        Assertions.assertThat((Object)rejection.getIntent()).isEqualTo((Object)MessageSubscriptionIntent.CORRELATE);
        Assertions.assertThat((Comparable)rejection.getRejectionType()).isEqualTo((Object)RejectionType.NOT_FOUND);
    }

    @Test
    public void shouldRejectDuplicatedCloseMessageSubscription() {
        MessageSubscriptionRecord subscription = this.messageSubscription();
        this.rule.writeCommand((Intent)MessageSubscriptionIntent.CREATE, (UnifiedRecordValue)subscription);
        TestUtil.waitUntil(() -> this.rule.events().onlyMessageSubscriptionRecords().withIntent((Intent)MessageSubscriptionIntent.CREATED).exists());
        this.rule.writeCommand((Intent)MessageSubscriptionIntent.DELETE, (UnifiedRecordValue)subscription);
        this.rule.writeCommand((Intent)MessageSubscriptionIntent.DELETE, (UnifiedRecordValue)subscription);
        Record<MessageSubscriptionRecord> rejection = this.awaitAndGetFirstSubscriptionRejection();
        Assertions.assertThat((Object)rejection.getIntent()).isEqualTo((Object)MessageSubscriptionIntent.DELETE);
        Assertions.assertThat((Comparable)rejection.getRejectionType()).isEqualTo((Object)RejectionType.NOT_FOUND);
        ((SubscriptionCommandSender)Mockito.verify((Object)this.spySubscriptionCommandSender, (VerificationMode)Mockito.timeout((long)5000L).times(2))).closeProcessMessageSubscription(ArgumentMatchers.eq((long)subscription.getProcessInstanceKey()), ArgumentMatchers.eq((long)subscription.getElementInstanceKey()), (DirectBuffer)ArgumentMatchers.any(DirectBuffer.class), (String)ArgumentMatchers.eq((Object)DEFAULT_TENANT));
    }

    @Test
    public void shouldNotCorrelateNewMessagesIfSubscriptionNotCorrelatable() {
        MessageSubscriptionRecord subscription = this.messageSubscription();
        MessageRecord message = this.message();
        this.rule.writeCommand((Intent)MessageSubscriptionIntent.CREATE, (UnifiedRecordValue)subscription);
        this.rule.writeCommand((Intent)MessageIntent.PUBLISH, (UnifiedRecordValue)message);
        TestUtil.waitUntil(() -> this.rule.events().onlyMessageRecords().withIntent((Intent)MessageIntent.PUBLISHED).exists());
        this.rule.writeCommand((Intent)MessageIntent.PUBLISH, (UnifiedRecordValue)message);
        long messageKey = ((Record)this.rule.events().onlyMessageRecords().withIntent((Intent)MessageIntent.PUBLISHED).getFirst()).getKey();
        ((SubscriptionCommandSender)Mockito.verify((Object)this.spySubscriptionCommandSender, (VerificationMode)Mockito.timeout((long)5000L).times(1))).correlateProcessMessageSubscription(ArgumentMatchers.eq((long)subscription.getProcessInstanceKey()), ArgumentMatchers.eq((long)subscription.getElementInstanceKey()), (DirectBuffer)ArgumentMatchers.any(), (DirectBuffer)ArgumentMatchers.any(), ArgumentMatchers.eq((long)messageKey), (DirectBuffer)ArgumentMatchers.any(), (DirectBuffer)ArgumentMatchers.any(), (String)ArgumentMatchers.eq((Object)DEFAULT_TENANT));
    }

    @Test
    public void shouldCorrelateNewMessagesIfSubscriptionIsReusable() {
        MessageSubscriptionRecord subscription = this.messageSubscription();
        MessageRecord message = this.message();
        subscription.setInterrupting(false);
        this.rule.writeCommand((Intent)MessageSubscriptionIntent.CREATE, (UnifiedRecordValue)subscription);
        this.rule.writeCommand((Intent)MessageIntent.PUBLISH, (UnifiedRecordValue)message);
        this.rule.writeCommand((Intent)MessageSubscriptionIntent.CORRELATE, (UnifiedRecordValue)subscription);
        this.rule.writeCommand((Intent)MessageIntent.PUBLISH, (UnifiedRecordValue)message);
        TestUtil.waitUntil(() -> ((TypedRecordStream)this.rule.events().onlyMessageRecords().withIntent((Intent)MessageIntent.PUBLISHED).limit(2L)).count() == 2L);
        long firstMessageKey = ((Record)this.rule.events().onlyMessageRecords().withIntent((Intent)MessageIntent.PUBLISHED).getFirst()).getKey();
        long lastMessageKey = ((Record)((TypedRecordStream)this.rule.events().onlyMessageRecords().withIntent((Intent)MessageIntent.PUBLISHED).skip(1L)).getFirst()).getKey();
        ((SubscriptionCommandSender)Mockito.verify((Object)this.spySubscriptionCommandSender, (VerificationMode)Mockito.timeout((long)5000L))).correlateProcessMessageSubscription(ArgumentMatchers.eq((long)subscription.getProcessInstanceKey()), ArgumentMatchers.eq((long)subscription.getElementInstanceKey()), (DirectBuffer)ArgumentMatchers.eq((Object)subscription.getBpmnProcessIdBuffer()), (DirectBuffer)ArgumentMatchers.any(), ArgumentMatchers.eq((long)firstMessageKey), (DirectBuffer)ArgumentMatchers.any(), (DirectBuffer)ArgumentMatchers.any(), (String)ArgumentMatchers.eq((Object)DEFAULT_TENANT));
        ((SubscriptionCommandSender)Mockito.verify((Object)this.spySubscriptionCommandSender, (VerificationMode)Mockito.timeout((long)5000L))).correlateProcessMessageSubscription(ArgumentMatchers.eq((long)subscription.getProcessInstanceKey()), ArgumentMatchers.eq((long)subscription.getElementInstanceKey()), (DirectBuffer)ArgumentMatchers.eq((Object)subscription.getBpmnProcessIdBuffer()), (DirectBuffer)ArgumentMatchers.any(), ArgumentMatchers.eq((long)lastMessageKey), (DirectBuffer)ArgumentMatchers.any(), (DirectBuffer)ArgumentMatchers.any(), (String)ArgumentMatchers.eq((Object)DEFAULT_TENANT));
    }

    @Test
    public void shouldCorrelateMultipleMessagesOneBeforeOpenOneAfter() {
        MessageSubscriptionRecord subscription = this.messageSubscription().setInterrupting(false);
        MessageRecord first = this.message().setVariables(MsgPackUtil.asMsgPack((String)"foo", (Object)"bar"));
        MessageRecord second = this.message().setVariables(MsgPackUtil.asMsgPack((String)"foo", (Object)"baz"));
        this.rule.writeCommand((Intent)MessageIntent.PUBLISH, (UnifiedRecordValue)first);
        this.rule.writeCommand((Intent)MessageSubscriptionIntent.CREATE, (UnifiedRecordValue)subscription);
        TestUtil.waitUntil(() -> this.rule.events().onlyMessageSubscriptionRecords().withIntent((Intent)MessageSubscriptionIntent.CREATED).exists());
        this.rule.writeCommand((Intent)MessageSubscriptionIntent.CORRELATE, (UnifiedRecordValue)subscription);
        this.rule.writeCommand((Intent)MessageIntent.PUBLISH, (UnifiedRecordValue)second);
        this.assertAllMessagesReceived(subscription);
    }

    @Test
    public void shouldCorrelateMultipleMessagesTwoBeforeOpen() {
        MessageSubscriptionRecord subscription = this.messageSubscription().setInterrupting(false);
        MessageRecord first = this.message().setVariables(MsgPackUtil.asMsgPack((String)"foo", (Object)"bar"));
        MessageRecord second = this.message().setVariables(MsgPackUtil.asMsgPack((String)"foo", (Object)"baz"));
        this.rule.writeCommand((Intent)MessageIntent.PUBLISH, (UnifiedRecordValue)first);
        this.rule.writeCommand((Intent)MessageIntent.PUBLISH, (UnifiedRecordValue)second);
        this.rule.writeCommand((Intent)MessageSubscriptionIntent.CREATE, (UnifiedRecordValue)subscription);
        TestUtil.waitUntil(() -> this.rule.events().onlyMessageSubscriptionRecords().withIntent((Intent)MessageSubscriptionIntent.CREATED).exists());
        this.rule.writeCommand((Intent)MessageSubscriptionIntent.CORRELATE, (UnifiedRecordValue)subscription);
        this.assertAllMessagesReceived(subscription);
    }

    @Test
    public void shouldCorrelateToFirstSubscriptionAfterRejection() {
        MessageRecord message = this.message();
        MessageSubscriptionRecord firstSubscription = this.messageSubscription().setElementInstanceKey(5L);
        MessageSubscriptionRecord secondSubscription = this.messageSubscription().setElementInstanceKey(10L);
        this.rule.writeCommand((Intent)MessageIntent.PUBLISH, (UnifiedRecordValue)message);
        this.rule.writeCommand((Intent)MessageSubscriptionIntent.CREATE, (UnifiedRecordValue)firstSubscription);
        this.rule.writeCommand((Intent)MessageSubscriptionIntent.CREATE, (UnifiedRecordValue)secondSubscription);
        TestUtil.waitUntil(() -> ((TypedRecordStream)this.rule.events().onlyMessageSubscriptionRecords().withIntent((Intent)MessageSubscriptionIntent.CREATED).filter(r -> ((MessageSubscriptionRecord)r.getValue()).getElementInstanceKey() == secondSubscription.getElementInstanceKey())).exists());
        long messageKey = ((Record)this.rule.events().onlyMessageRecords().withIntent((Intent)MessageIntent.PUBLISHED).getFirst()).getKey();
        firstSubscription.setMessageKey(messageKey);
        this.rule.writeCommand((Intent)MessageSubscriptionIntent.REJECT, (UnifiedRecordValue)firstSubscription);
        ((SubscriptionCommandSender)Mockito.verify((Object)this.spySubscriptionCommandSender, (VerificationMode)Mockito.timeout((long)5000L))).correlateProcessMessageSubscription(ArgumentMatchers.eq((long)firstSubscription.getProcessInstanceKey()), ArgumentMatchers.eq((long)firstSubscription.getElementInstanceKey()), (DirectBuffer)ArgumentMatchers.eq((Object)firstSubscription.getBpmnProcessIdBuffer()), (DirectBuffer)ArgumentMatchers.any(DirectBuffer.class), ArgumentMatchers.eq((long)messageKey), (DirectBuffer)ArgumentMatchers.any(DirectBuffer.class), (DirectBuffer)ArgumentMatchers.eq((Object)firstSubscription.getCorrelationKeyBuffer()), (String)ArgumentMatchers.eq((Object)DEFAULT_TENANT));
        ((SubscriptionCommandSender)Mockito.verify((Object)this.spySubscriptionCommandSender, (VerificationMode)Mockito.timeout((long)5000L))).correlateProcessMessageSubscription(ArgumentMatchers.eq((long)secondSubscription.getProcessInstanceKey()), ArgumentMatchers.eq((long)secondSubscription.getElementInstanceKey()), (DirectBuffer)ArgumentMatchers.eq((Object)secondSubscription.getBpmnProcessIdBuffer()), (DirectBuffer)ArgumentMatchers.any(DirectBuffer.class), ArgumentMatchers.eq((long)messageKey), (DirectBuffer)ArgumentMatchers.any(DirectBuffer.class), (DirectBuffer)ArgumentMatchers.eq((Object)secondSubscription.getCorrelationKeyBuffer()), (String)ArgumentMatchers.eq((Object)DEFAULT_TENANT));
    }

    private void assertAllMessagesReceived(MessageSubscriptionRecord subscription) {
        TestUtil.waitUntil(() -> ((TypedRecordStream)this.rule.events().onlyMessageRecords().withIntent((Intent)MessageIntent.PUBLISHED).limit(2L)).count() == 2L);
        long firstMessageKey = ((Record)this.rule.events().onlyMessageRecords().withIntent((Intent)MessageIntent.PUBLISHED).getFirst()).getKey();
        long lastMessageKey = ((Record)((TypedRecordStream)this.rule.events().onlyMessageRecords().withIntent((Intent)MessageIntent.PUBLISHED).skip(1L)).getFirst()).getKey();
        ((SubscriptionCommandSender)Mockito.verify((Object)this.spySubscriptionCommandSender, (VerificationMode)Mockito.timeout((long)5000L))).correlateProcessMessageSubscription(ArgumentMatchers.eq((long)subscription.getProcessInstanceKey()), ArgumentMatchers.eq((long)subscription.getElementInstanceKey()), (DirectBuffer)ArgumentMatchers.eq((Object)subscription.getBpmnProcessIdBuffer()), (DirectBuffer)ArgumentMatchers.eq((Object)subscription.getMessageNameBuffer()), ArgumentMatchers.eq((long)firstMessageKey), (DirectBuffer)ArgumentMatchers.any(), (DirectBuffer)ArgumentMatchers.eq((Object)subscription.getCorrelationKeyBuffer()), (String)ArgumentMatchers.eq((Object)DEFAULT_TENANT));
        ((SubscriptionCommandSender)Mockito.verify((Object)this.spySubscriptionCommandSender, (VerificationMode)Mockito.timeout((long)5000L))).correlateProcessMessageSubscription(ArgumentMatchers.eq((long)subscription.getProcessInstanceKey()), ArgumentMatchers.eq((long)subscription.getElementInstanceKey()), (DirectBuffer)ArgumentMatchers.eq((Object)subscription.getBpmnProcessIdBuffer()), (DirectBuffer)ArgumentMatchers.eq((Object)subscription.getMessageNameBuffer()), ArgumentMatchers.eq((long)lastMessageKey), (DirectBuffer)ArgumentMatchers.any(), (DirectBuffer)ArgumentMatchers.eq((Object)subscription.getCorrelationKeyBuffer()), (String)ArgumentMatchers.eq((Object)DEFAULT_TENANT));
    }

    private MessageSubscriptionRecord messageSubscription() {
        MessageSubscriptionRecord subscription = new MessageSubscriptionRecord();
        subscription.setProcessInstanceKey(1L).setElementInstanceKey(2L).setBpmnProcessId(BufferUtil.wrapString((String)"process")).setMessageKey(-1L).setMessageName(BufferUtil.wrapString((String)"order canceled")).setCorrelationKey(BufferUtil.wrapString((String)"order-123")).setInterrupting(true);
        return subscription;
    }

    private MessageRecord message() {
        MessageRecord message = new MessageRecord();
        message.setName(BufferUtil.wrapString((String)"order canceled")).setCorrelationKey(BufferUtil.wrapString((String)"order-123")).setTimeToLive(Duration.ofSeconds(10L).toMillis()).setVariables(MsgPackUtil.asMsgPack((String)"orderId", (Object)"order-123"));
        return message;
    }

    private Record<MessageSubscriptionRecord> awaitAndGetFirstSubscriptionRejection() {
        TestUtil.waitUntil(() -> this.rule.events().onlyMessageSubscriptionRecords().onlyRejections().findFirst().isPresent());
        return (Record)this.rule.events().onlyMessageSubscriptionRecords().onlyRejections().findFirst().get();
    }
}

