package io.zeebe.broker.subscription.message;

import io.zeebe.broker.clustering.base.topology.TopologyManager;
import io.zeebe.broker.exporter.util.TestJarExporter;
import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessorTest;
import io.zeebe.broker.logstreams.state.ZeebeState;
import io.zeebe.broker.subscription.command.SubscriptionCommandSender;
import io.zeebe.broker.subscription.message.data.MessageSubscriptionRecord;
import io.zeebe.broker.subscription.message.processor.MessageEventProcessors;
import io.zeebe.broker.subscription.message.processor.MessageObserver;
import io.zeebe.broker.util.StreamProcessorControl;
import io.zeebe.broker.util.StreamProcessorRule;
import io.zeebe.msgpack.UnpackedObject;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.zeebe.protocol.intent.MessageIntent;
import io.zeebe.protocol.intent.MessageSubscriptionIntent;
import io.zeebe.test.util.MsgPackUtil;
import io.zeebe.test.util.TestUtil;
import io.zeebe.util.buffer.BufferUtil;
import org.agrona.DirectBuffer;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

/* loaded from: input_file:io/zeebe/broker/subscription/message/MessageStreamProcessorTest.class */
public class MessageStreamProcessorTest {

    @Rule
    public StreamProcessorRule rule = new StreamProcessorRule();

    @Mock
    private SubscriptionCommandSender mockSubscriptionCommandSender;

    @Mock
    private TopologyManager mockTopologyManager;
    private StreamProcessorControl streamProcessor;

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
        Mockito.when(Boolean.valueOf(this.mockSubscriptionCommandSender.openWorkflowInstanceSubscription(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (DirectBuffer) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.mockSubscriptionCommandSender.correlateWorkflowInstanceSubscription(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (DirectBuffer) ArgumentMatchers.any(), (DirectBuffer) ArgumentMatchers.any()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.mockSubscriptionCommandSender.closeWorkflowInstanceSubscription(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (DirectBuffer) ArgumentMatchers.any(DirectBuffer.class)))).thenReturn(true);
        this.streamProcessor = this.rule.runStreamProcessor((typedEventStreamProcessorBuilder, zeebeDb) -> {
            MessageEventProcessors.addMessageProcessors(typedEventStreamProcessorBuilder, new ZeebeState(zeebeDb), this.mockSubscriptionCommandSender, this.mockTopologyManager);
            return typedEventStreamProcessorBuilder.build();
        });
    }

    @Test
    public void shouldRejectDuplicatedOpenMessageSubscription() {
        UnpackedObject messageSubscription = messageSubscription();
        this.rule.writeCommand(MessageSubscriptionIntent.OPEN, messageSubscription);
        this.rule.writeCommand(MessageSubscriptionIntent.OPEN, messageSubscription);
        this.streamProcessor.unblock();
        TypedRecord<MessageSubscriptionRecord> awaitAndGetFirstSubscriptionRejection = awaitAndGetFirstSubscriptionRejection();
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getMetadata().getIntent()).isEqualTo(MessageSubscriptionIntent.OPEN);
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getMetadata().getRejectionType()).isEqualTo(RejectionType.INVALID_STATE);
        ((SubscriptionCommandSender) Mockito.verify(this.mockSubscriptionCommandSender, Mockito.timeout(5000L).times(2))).openWorkflowInstanceSubscription(ArgumentMatchers.eq(messageSubscription.getWorkflowInstanceKey()), ArgumentMatchers.eq(messageSubscription.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean());
    }

    @Test
    public void shouldRetryToCorrelateMessageSubscriptionAfterPublishedMessage() {
        UnpackedObject messageSubscription = messageSubscription();
        UnpackedObject message = message();
        this.streamProcessor.blockAfterMessageEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == MessageIntent.PUBLISHED;
        });
        this.rule.writeCommand(MessageSubscriptionIntent.OPEN, messageSubscription);
        this.rule.writeCommand(MessageIntent.PUBLISH, message);
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.rule.getClock().addTime(MessageObserver.SUBSCRIPTION_CHECK_INTERVAL.plus(MessageObserver.SUBSCRIPTION_TIMEOUT));
        this.streamProcessor.unblock();
        ((SubscriptionCommandSender) Mockito.verify(this.mockSubscriptionCommandSender, Mockito.timeout(5000L).times(2))).correlateWorkflowInstanceSubscription(messageSubscription.getWorkflowInstanceKey(), messageSubscription.getElementInstanceKey(), messageSubscription.getMessageName(), message.getPayload());
    }

    @Test
    public void shouldRetryToCorrelateMessageSubscriptionAfterOpenedSubscription() {
        UnpackedObject messageSubscription = messageSubscription();
        UnpackedObject message = message();
        this.streamProcessor.blockAfterMessageSubscriptionEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == MessageSubscriptionIntent.OPENED;
        });
        this.rule.writeCommand(MessageIntent.PUBLISH, message);
        this.rule.writeCommand(MessageSubscriptionIntent.OPEN, messageSubscription);
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.rule.getClock().addTime(MessageObserver.SUBSCRIPTION_CHECK_INTERVAL.plus(MessageObserver.SUBSCRIPTION_TIMEOUT));
        this.streamProcessor.unblock();
        ((SubscriptionCommandSender) Mockito.verify(this.mockSubscriptionCommandSender, Mockito.timeout(5000L).times(2))).correlateWorkflowInstanceSubscription(messageSubscription.getWorkflowInstanceKey(), messageSubscription.getElementInstanceKey(), messageSubscription.getMessageName(), message.getPayload());
    }

    @Test
    public void shouldRejectCorrelateIfMessageSubscriptionClosed() {
        UnpackedObject messageSubscription = messageSubscription();
        this.rule.writeCommand(MessageIntent.PUBLISH, message());
        this.streamProcessor.blockAfterMessageSubscriptionEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == MessageSubscriptionIntent.OPENED;
        });
        this.rule.writeCommand(MessageSubscriptionIntent.OPEN, messageSubscription);
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.rule.writeCommand(MessageSubscriptionIntent.CLOSE, messageSubscription);
        this.rule.writeCommand(MessageSubscriptionIntent.CORRELATE, messageSubscription);
        this.streamProcessor.unblock();
        TypedRecord<MessageSubscriptionRecord> awaitAndGetFirstSubscriptionRejection = awaitAndGetFirstSubscriptionRejection();
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getMetadata().getIntent()).isEqualTo(MessageSubscriptionIntent.CORRELATE);
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getMetadata().getRejectionType()).isEqualTo(RejectionType.NOT_FOUND);
    }

    @Test
    public void shouldRejectDuplicatedCloseMessageSubscription() {
        UnpackedObject messageSubscription = messageSubscription();
        this.streamProcessor.blockAfterMessageSubscriptionEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == MessageSubscriptionIntent.OPENED;
        });
        this.rule.writeCommand(MessageSubscriptionIntent.OPEN, messageSubscription);
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.rule.writeCommand(MessageSubscriptionIntent.CLOSE, messageSubscription);
        this.rule.writeCommand(MessageSubscriptionIntent.CLOSE, messageSubscription);
        this.streamProcessor.unblock();
        TypedRecord<MessageSubscriptionRecord> awaitAndGetFirstSubscriptionRejection = awaitAndGetFirstSubscriptionRejection();
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getMetadata().getIntent()).isEqualTo(MessageSubscriptionIntent.CLOSE);
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getMetadata().getRejectionType()).isEqualTo(RejectionType.NOT_FOUND);
        ((SubscriptionCommandSender) Mockito.verify(this.mockSubscriptionCommandSender, Mockito.timeout(5000L).times(2))).closeWorkflowInstanceSubscription(ArgumentMatchers.eq(messageSubscription.getWorkflowInstanceKey()), ArgumentMatchers.eq(messageSubscription.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.any(DirectBuffer.class));
    }

    @Test
    public void shouldNotCorrelateNewMessagesIfSubscriptionNotCorrelatable() {
        UnpackedObject messageSubscription = messageSubscription();
        UnpackedObject message = message();
        this.streamProcessor.blockAfterMessageEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == MessageIntent.PUBLISHED;
        });
        this.rule.writeCommand(MessageSubscriptionIntent.OPEN, messageSubscription);
        this.rule.writeCommand(MessageIntent.PUBLISH, message);
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.rule.writeCommand(MessageIntent.PUBLISH, message);
        this.streamProcessor.unblock();
        ((SubscriptionCommandSender) Mockito.verify(this.mockSubscriptionCommandSender, Mockito.timeout(5000L).times(1))).correlateWorkflowInstanceSubscription(messageSubscription.getWorkflowInstanceKey(), messageSubscription.getElementInstanceKey(), messageSubscription.getMessageName(), message.getPayload());
    }

    @Test
    public void shouldCorrelateNewMessagesIfSubscriptionIsReusable() {
        UnpackedObject messageSubscription = messageSubscription();
        UnpackedObject message = message();
        messageSubscription.setCloseOnCorrelate(false);
        this.streamProcessor.blockAfterMessageEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == MessageIntent.PUBLISHED;
        });
        this.rule.writeCommand(MessageSubscriptionIntent.OPEN, messageSubscription);
        this.rule.writeCommand(MessageIntent.PUBLISH, message);
        this.rule.writeCommand(MessageSubscriptionIntent.CORRELATE, messageSubscription);
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.rule.writeCommand(MessageIntent.PUBLISH, message);
        this.streamProcessor.unblock();
        ((SubscriptionCommandSender) Mockito.verify(this.mockSubscriptionCommandSender, Mockito.timeout(5000L).times(2))).correlateWorkflowInstanceSubscription(messageSubscription.getWorkflowInstanceKey(), messageSubscription.getElementInstanceKey(), messageSubscription.getMessageName(), message.getPayload());
    }

    @Test
    public void shouldCorrelateMultipleMessagesOneBeforeOpenOneAfter() {
        UnpackedObject closeOnCorrelate = messageSubscription().setCloseOnCorrelate(false);
        UnpackedObject payload = message().setPayload(MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, TestJarExporter.FOO));
        UnpackedObject payload2 = message().setPayload(MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, "baz"));
        this.streamProcessor.blockAfterMessageSubscriptionEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == MessageSubscriptionIntent.OPENED;
        });
        this.rule.writeCommand(MessageIntent.PUBLISH, payload);
        this.rule.writeCommand(MessageSubscriptionIntent.OPEN, closeOnCorrelate);
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.rule.writeCommand(MessageSubscriptionIntent.CORRELATE, closeOnCorrelate);
        this.streamProcessor.unblock();
        this.rule.writeCommand(MessageIntent.PUBLISH, payload2);
        assertAllMessagesReceived(closeOnCorrelate, payload, payload2);
    }

    @Test
    public void shouldCorrelateMultipleMessagesTwoBeforeOpen() {
        UnpackedObject closeOnCorrelate = messageSubscription().setCloseOnCorrelate(false);
        UnpackedObject payload = message().setPayload(MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, TestJarExporter.FOO));
        UnpackedObject payload2 = message().setPayload(MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, "baz"));
        this.streamProcessor.blockAfterMessageSubscriptionEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == MessageSubscriptionIntent.OPENED;
        });
        this.rule.writeCommand(MessageIntent.PUBLISH, payload);
        this.rule.writeCommand(MessageIntent.PUBLISH, payload2);
        this.rule.writeCommand(MessageSubscriptionIntent.OPEN, closeOnCorrelate);
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.rule.writeCommand(MessageSubscriptionIntent.CORRELATE, closeOnCorrelate);
        this.streamProcessor.unblock();
        assertAllMessagesReceived(closeOnCorrelate, payload, payload2);
    }

    private void assertAllMessagesReceived(MessageSubscriptionRecord messageSubscriptionRecord, MessageRecord messageRecord, MessageRecord messageRecord2) {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(DirectBuffer.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(DirectBuffer.class);
        ((SubscriptionCommandSender) Mockito.verify(this.mockSubscriptionCommandSender, Mockito.timeout(5000L).times(2))).correlateWorkflowInstanceSubscription(ArgumentMatchers.eq(messageSubscriptionRecord.getWorkflowInstanceKey()), ArgumentMatchers.eq(messageSubscriptionRecord.getElementInstanceKey()), (DirectBuffer) forClass.capture(), (DirectBuffer) forClass2.capture());
        Assertions.assertThat((Comparable) forClass.getValue()).isEqualTo(messageSubscriptionRecord.getMessageName());
        Assertions.assertThat((Comparable) forClass2.getAllValues().get(0)).isEqualTo(messageRecord.getPayload());
        Assertions.assertThat((Comparable) forClass2.getAllValues().get(1)).isEqualTo(messageRecord2.getPayload());
    }

    private MessageSubscriptionRecord messageSubscription() {
        MessageSubscriptionRecord messageSubscriptionRecord = new MessageSubscriptionRecord();
        messageSubscriptionRecord.setWorkflowInstanceKey(1L).setElementInstanceKey(2L).setMessageName(BufferUtil.wrapString("order canceled")).setCorrelationKey(BufferUtil.wrapString("order-123")).setCloseOnCorrelate(true);
        return messageSubscriptionRecord;
    }

    private MessageRecord message() {
        MessageRecord messageRecord = new MessageRecord();
        messageRecord.setName(BufferUtil.wrapString("order canceled")).setCorrelationKey(BufferUtil.wrapString("order-123")).setTimeToLive(1L).setPayload(MsgPackUtil.asMsgPack("orderId", "order-123"));
        return messageRecord;
    }

    private TypedRecord<MessageSubscriptionRecord> awaitAndGetFirstSubscriptionRejection() {
        TestUtil.waitUntil(() -> {
            return this.rule.events().onlyMessageSubscriptionRecords().onlyRejections().findFirst().isPresent();
        });
        return (TypedRecord) this.rule.events().onlyMessageSubscriptionRecords().onlyRejections().findFirst().get();
    }
}
