package io.zeebe.broker.event;

import io.zeebe.UnstableCI;
import io.zeebe.broker.clustering.base.partitions.Partition;
import io.zeebe.broker.job.ActivateJobsTest;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessorTest;
import io.zeebe.broker.test.EmbeddedBrokerRule;
import io.zeebe.protocol.clientapi.ControlMessageType;
import io.zeebe.protocol.clientapi.ErrorCode;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.clientapi.SubscriptionType;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.protocol.intent.SubscriberIntent;
import io.zeebe.protocol.intent.SubscriptionIntent;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.test.broker.protocol.MsgPackHelper;
import io.zeebe.test.broker.protocol.clientapi.ClientApiRule;
import io.zeebe.test.broker.protocol.clientapi.ControlMessageRequestBuilder;
import io.zeebe.test.broker.protocol.clientapi.ErrorResponse;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandRequestBuilder;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandResponse;
import io.zeebe.test.broker.protocol.clientapi.RawMessage;
import io.zeebe.test.broker.protocol.clientapi.SubscribedRecord;
import io.zeebe.test.util.TestUtil;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.RuleChain;

/* loaded from: input_file:io/zeebe/broker/event/TopicSubscriptionTest.class */
public class TopicSubscriptionTest {
    public static final int MAXIMUM_SUBSCRIPTION_NAME_LENGTH = 32;
    public EmbeddedBrokerRule brokerRule = new EmbeddedBrokerRule(new Consumer[0]);
    public ClientApiRule apiRule;

    @Rule
    public RuleChain ruleChain;

    public TopicSubscriptionTest() {
        EmbeddedBrokerRule embeddedBrokerRule = this.brokerRule;
        embeddedBrokerRule.getClass();
        this.apiRule = new ClientApiRule(embeddedBrokerRule::getClientAddress);
        this.ruleChain = RuleChain.outerRule(this.brokerRule).around(this.apiRule);
    }

    @Test
    public void shouldOpenSubscription() {
        Assertions.assertThat(this.apiRule.openTopicSubscription(TypedStreamProcessorTest.STREAM_NAME, 0L).await().key()).isGreaterThanOrEqualTo(0L);
    }

    @Test
    public void shouldCloseSubscription() {
        long key = this.apiRule.openTopicSubscription(TypedStreamProcessorTest.STREAM_NAME, 0L).await().key();
        Assertions.assertThat(((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.REMOVE_TOPIC_SUBSCRIPTION).partitionId(this.apiRule.getDefaultPartitionId()).data().put("subscriberKey", Long.valueOf(key)).done()).sendAndAwait().getData()).containsOnly(new Map.Entry[]{Assertions.entry("subscriberKey", Long.valueOf(key))});
    }

    @Test
    public void shouldNotPushEventsAfterClose() throws InterruptedException {
        ((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.REMOVE_TOPIC_SUBSCRIPTION).partitionId(this.apiRule.getDefaultPartitionId()).data().put("subscriberKey", Long.valueOf(this.apiRule.openTopicSubscription(TypedStreamProcessorTest.STREAM_NAME, 0L).await().key())).done()).sendAndAwait();
        this.apiRule.moveMessageStreamToTail();
        ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.JOB, JobIntent.CREATE).command().put("type", ActivateJobsTest.JOB_TYPE).done()).sendAndAwait();
        Thread.sleep(1000L);
        Assertions.assertThat(this.apiRule.numSubscribedEventsAvailable()).isEqualTo(0);
    }

    @Test
    public void shouldPushEvents() {
        this.brokerRule.getClock().setCurrentTime(1L);
        ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.JOB, JobIntent.CREATE).command().put("type", TypedStreamProcessorTest.STREAM_NAME).put("retries", 1).done()).sendAndAwait().key();
        long key = this.apiRule.openTopicSubscription(TypedStreamProcessorTest.STREAM_NAME, 0L).await().key();
        List list = (List) this.apiRule.subscribedEvents().filter(subscribedRecord -> {
            return subscribedRecord.valueType() == ValueType.JOB;
        }).limit(2L).collect(Collectors.toList());
        Assertions.assertThat(list).hasSize(2);
        SubscribedRecord subscribedRecord2 = (SubscribedRecord) list.get(0);
        Assertions.assertThat(subscribedRecord2.subscriberKey()).isEqualTo(key);
        Assertions.assertThat(subscribedRecord2.subscriptionType()).isEqualTo(SubscriptionType.TOPIC_SUBSCRIPTION);
        Assertions.assertThat(subscribedRecord2.position()).isGreaterThan(0L);
        Assertions.assertThat(subscribedRecord2.partitionId()).isEqualTo(this.apiRule.getDefaultPartitionId());
        Assertions.assertThat(subscribedRecord2.recordType()).isEqualTo(RecordType.COMMAND);
        Assertions.assertThat(subscribedRecord2.valueType()).isEqualTo(ValueType.JOB);
        Assertions.assertThat(subscribedRecord2.intent()).isEqualTo(JobIntent.CREATE);
        Assertions.assertThat(subscribedRecord2.sourceRecordPosition()).isEqualTo(-1L);
        Assertions.assertThat(subscribedRecord2.timestamp()).isEqualTo(1L);
        SubscribedRecord subscribedRecord3 = (SubscribedRecord) list.get(1);
        Assertions.assertThat(subscribedRecord3.subscriberKey()).isEqualTo(key);
        Assertions.assertThat(subscribedRecord3.subscriptionType()).isEqualTo(SubscriptionType.TOPIC_SUBSCRIPTION);
        Assertions.assertThat(subscribedRecord3.position()).isGreaterThan(((SubscribedRecord) list.get(0)).position());
        Assertions.assertThat(subscribedRecord3.partitionId()).isEqualTo(this.apiRule.getDefaultPartitionId());
        Assertions.assertThat(subscribedRecord3.recordType()).isEqualTo(RecordType.EVENT);
        Assertions.assertThat(subscribedRecord3.valueType()).isEqualTo(ValueType.JOB);
        Assertions.assertThat(subscribedRecord3.intent()).isEqualTo(JobIntent.CREATED);
        Assertions.assertThat(subscribedRecord3.sourceRecordPosition()).isEqualTo(((SubscribedRecord) list.get(0)).position());
        Assertions.assertThat(subscribedRecord3.timestamp()).isEqualTo(1L);
    }

    @Test
    public void shouldPushRejection() {
        ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.CREATE).command().put("bpmnProcessId", "does not exist").put("version", -1).done()).sendAndAwait();
        this.apiRule.openTopicSubscription(TypedStreamProcessorTest.STREAM_NAME, 0L).await();
        SubscribedRecord subscribedRecord = (SubscribedRecord) this.apiRule.subscribedEvents().filter(subscribedRecord2 -> {
            return subscribedRecord2.valueType() == ValueType.WORKFLOW_INSTANCE;
        }).filter(subscribedRecord3 -> {
            return subscribedRecord3.recordType() == RecordType.COMMAND_REJECTION;
        }).findFirst().get();
        Assertions.assertThat(subscribedRecord.recordType()).isEqualTo(RecordType.COMMAND_REJECTION);
        Assertions.assertThat(subscribedRecord.valueType()).isEqualTo(ValueType.WORKFLOW_INSTANCE);
        Assertions.assertThat(subscribedRecord.intent()).isEqualTo(WorkflowInstanceIntent.CREATE);
        Assertions.assertThat(subscribedRecord.rejectionType()).isEqualTo(RejectionType.BAD_VALUE);
        Assertions.assertThat(subscribedRecord.rejectionReason()).isEqualTo("Workflow is not deployed");
    }

    @Test
    public void shouldReturnStartPositionOnOpen() {
        ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.JOB, JobIntent.CREATE).command().put("type", TypedStreamProcessorTest.STREAM_NAME).put("retries", 1).done()).sendAndAwait();
        Assertions.assertThat(((Long) this.apiRule.openTopicSubscription(TypedStreamProcessorTest.STREAM_NAME, 0L).await().getValue().get("startPosition")).longValue()).isLessThan((Comparable) this.apiRule.subscribedEvents().map(subscribedRecord -> {
            return Long.valueOf(subscribedRecord.position());
        }).findFirst().get());
    }

    @Test
    public void shouldNotOpenSubscriptionForNonExistingPartition() {
        ErrorResponse awaitError = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().partitionId(999).type(ValueType.SUBSCRIBER, SubscriberIntent.SUBSCRIBE).command().put("startPosition", 0).put("name", TypedStreamProcessorTest.STREAM_NAME).done()).sendWithoutRetries().awaitError();
        String format = String.format("Cannot execute command. Partition with id '%d' not found", 999);
        Assertions.assertThat(awaitError.getErrorCode()).isEqualTo(ErrorCode.PARTITION_NOT_FOUND);
        Assertions.assertThat(awaitError.getErrorData()).isEqualTo(format);
    }

    @Test
    public void shouldNotCloseSubscriptionForNonExistingPartition() {
        ErrorResponse awaitError = ((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.REMOVE_TOPIC_SUBSCRIPTION).partitionId(999).data().put("subscriberKey", 0L).done()).sendWithoutRetries().awaitError();
        String format = String.format("Cannot close topic subscription. No subscription management processor registered for partition '%d'", 999);
        Assertions.assertThat(awaitError.getErrorCode()).isEqualTo(ErrorCode.REQUEST_PROCESSING_FAILURE);
        Assertions.assertThat(awaitError.getErrorData()).isEqualTo(format);
    }

    @Test
    public void shouldCloseSubscriptionNonExistingSubscription() {
        Assertions.assertThat(((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.REMOVE_TOPIC_SUBSCRIPTION).partitionId(this.apiRule.getDefaultPartitionId()).data().put("subscriberKey", Long.MAX_VALUE).done()).sendAndAwait().getData()).containsOnly(new Map.Entry[]{Assertions.entry("subscriberKey", Long.MAX_VALUE)});
    }

    @Test
    public void shouldOpenSubscriptionWithMaximumNameLength() {
        Assertions.assertThat(this.apiRule.openTopicSubscription(getStringOfLength(32), 0L).await().key()).isGreaterThanOrEqualTo(0L);
    }

    @Test
    public void shouldNotOpenSubscriptionWithOverlongName() {
        String stringOfLength = getStringOfLength(33);
        ErrorResponse awaitError = this.apiRule.openTopicSubscription(stringOfLength, 0L).awaitError();
        Assertions.assertThat(awaitError.getErrorCode()).isEqualTo(ErrorCode.REQUEST_PROCESSING_FAILURE);
        Assertions.assertThat(awaitError.getErrorData()).isEqualTo("Cannot open topic subscription '" + stringOfLength + "'. Subscription name must be 32 characters or shorter.");
    }

    @Test
    public void shouldNotOpenSubscriptionWithNegativeBufferSize() {
        ErrorResponse awaitError = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.SUBSCRIBER, SubscriberIntent.SUBSCRIBE).command().put("startPosition", -1).put("name", TypedStreamProcessorTest.STREAM_NAME).put("bufferSize", -1).done()).send().awaitError();
        Assertions.assertThat(awaitError.getErrorCode()).isEqualTo(ErrorCode.REQUEST_PROCESSING_FAILURE);
        Assertions.assertThat(awaitError.getErrorData()).isEqualTo("Cannot open topic subscription 'foo'. Buffer size must be greater than 0.");
    }

    @Test
    public void shouldNotOpenSubscriptionWithMissingBufferSize() {
        ErrorResponse awaitError = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.SUBSCRIBER, SubscriberIntent.SUBSCRIBE).command().put("startPosition", -1).put("name", TypedStreamProcessorTest.STREAM_NAME).done()).send().awaitError();
        Assertions.assertThat(awaitError.getErrorCode()).isEqualTo(ErrorCode.INVALID_MESSAGE);
        Assertions.assertThat(awaitError.getErrorData()).contains(new CharSequence[]{"Property 'bufferSize' has no valid value"});
    }

    @Test
    public void shouldOpenSubscriptionAndForceStartPosition() {
        long key = this.apiRule.openTopicSubscription(TypedStreamProcessorTest.STREAM_NAME, 0L).await().key();
        ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.JOB, JobIntent.CREATE).command().put("type", TypedStreamProcessorTest.STREAM_NAME).put("retries", 1).done()).sendAndAwait();
        List list = (List) this.apiRule.subscribedEvents().filter(subscribedRecord -> {
            return subscribedRecord.valueType() == ValueType.JOB;
        }).limit(2L).map(subscribedRecord2 -> {
            return Long.valueOf(subscribedRecord2.position());
        }).collect(Collectors.toList());
        ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.SUBSCRIPTION, SubscriptionIntent.ACKNOWLEDGE).command().put("name", TypedStreamProcessorTest.STREAM_NAME).put("ackPosition", list.get(1)).done()).sendAndAwait();
        ((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.REMOVE_TOPIC_SUBSCRIPTION).partitionId(this.apiRule.getDefaultPartitionId()).data().put("subscriberKey", Long.valueOf(key)).done()).sendAndAwait();
        this.apiRule.moveMessageStreamToTail();
        ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.SUBSCRIBER, SubscriberIntent.SUBSCRIBE).command().put("startPosition", list.get(0)).put("name", TypedStreamProcessorTest.STREAM_NAME).put("bufferSize", 32).put("forceStart", true).done()).sendAndAwait();
        List list2 = (List) this.apiRule.subscribedEvents().filter(subscribedRecord3 -> {
            return subscribedRecord3.valueType() == ValueType.JOB;
        }).limit(2L).map(subscribedRecord4 -> {
            return Long.valueOf(subscribedRecord4.position());
        }).collect(Collectors.toList());
        Assertions.assertThat(list2).hasSize(2);
        Assertions.assertThat(list2).containsExactlyElementsOf(list);
    }

    @Test
    public void shouldPersistStartPositionOnOpen() {
        long key = this.apiRule.openTopicSubscription(TypedStreamProcessorTest.STREAM_NAME, 0L).await().key();
        ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.JOB, JobIntent.CREATE).command().put("type", TypedStreamProcessorTest.STREAM_NAME).put("retries", 1).done()).sendAndAwait();
        List list = (List) this.apiRule.subscribedEvents().filter(subscribedRecord -> {
            return subscribedRecord.valueType() == ValueType.JOB;
        }).limit(2L).map(subscribedRecord2 -> {
            return Long.valueOf(subscribedRecord2.position());
        }).collect(Collectors.toList());
        ((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.REMOVE_TOPIC_SUBSCRIPTION).partitionId(this.apiRule.getDefaultPartitionId()).data().put("subscriberKey", Long.valueOf(key)).done()).sendAndAwait();
        this.apiRule.moveMessageStreamToTail();
        this.apiRule.openTopicSubscription(TypedStreamProcessorTest.STREAM_NAME, ((Long) list.get(1)).longValue()).await();
        Assertions.assertThat((List) this.apiRule.subscribedEvents().filter(subscribedRecord3 -> {
            return subscribedRecord3.valueType() == ValueType.JOB;
        }).limit(2L).map(subscribedRecord4 -> {
            return Long.valueOf(subscribedRecord4.position());
        }).collect(Collectors.toList())).containsExactlyElementsOf(list);
    }

    @Test
    @Ignore
    public void shouldReturnErrorIfSubscriptionProcessorRemovalFails() {
        long key = this.apiRule.openTopicSubscription(TypedStreamProcessorTest.STREAM_NAME, 0L).await().key();
        this.brokerRule.removeService(ServiceName.newServiceName("log.log." + Partition.getPartitionName(this.apiRule.getDefaultPartitionId()) + ".subscription.push.foo", Object.class));
        ErrorResponse awaitError = ((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.REMOVE_TOPIC_SUBSCRIPTION).partitionId(this.apiRule.getDefaultPartitionId()).data().put("subscriberKey", Long.valueOf(key)).done()).send().awaitError();
        Assertions.assertThat(awaitError.getErrorCode()).isEqualTo(ErrorCode.REQUEST_PROCESSING_FAILURE);
        Assertions.assertThat(awaitError.getErrorData()).contains(new CharSequence[]{"Cannot close topic subscription. Cannot remove service"});
    }

    @Test
    @Category({UnstableCI.class})
    public void shouldCloseSubscriptionOnTransportChannelClose() {
        long key = this.apiRule.openTopicSubscription(TypedStreamProcessorTest.STREAM_NAME, 0L).await().key();
        this.apiRule.interruptAllChannels();
        Assertions.assertThat(((ExecuteCommandResponse) TestUtil.doRepeatedly(() -> {
            return this.apiRule.openTopicSubscription(TypedStreamProcessorTest.STREAM_NAME, 0L).await();
        }).until((v0) -> {
            return Objects.nonNull(v0);
        }, 50, "Failed to reopen topic subscription", new Object[0])).key()).isNotEqualTo(key);
    }

    @Test
    public void shouldNotPushEventsBeforeSubscriptionResponse() {
        ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.JOB, JobIntent.CREATE).command().put("type", TypedStreamProcessorTest.STREAM_NAME).put("retries", 1).done()).sendAndAwait();
        this.apiRule.openTopicSubscription(TypedStreamProcessorTest.STREAM_NAME, 0L).await();
        RawMessage rawMessage = (RawMessage) this.apiRule.commandResponses().filter(rawMessage2 -> {
            return asCommandResponse(rawMessage2).intent() == SubscriberIntent.SUBSCRIBED;
        }).findFirst().get();
        this.apiRule.moveMessageStreamToHead();
        Assertions.assertThat(((SubscribedRecord) this.apiRule.subscribedEvents().findFirst().get()).getRawMessage().getSequenceNumber()).isGreaterThan(rawMessage.getSequenceNumber());
    }

    protected String getStringOfLength(int i) {
        char[] cArr = new char[i];
        Arrays.fill(cArr, 'a');
        return new String(cArr);
    }

    protected static ExecuteCommandResponse asCommandResponse(RawMessage rawMessage) {
        ExecuteCommandResponse executeCommandResponse = new ExecuteCommandResponse(new MsgPackHelper());
        executeCommandResponse.wrap(rawMessage.getMessage(), 0, rawMessage.getMessage().capacity());
        return executeCommandResponse;
    }
}
