package io.zeebe.broker.event;

import io.zeebe.broker.logstreams.processor.TypedStreamProcessorTest;
import io.zeebe.broker.test.EmbeddedBrokerRule;
import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.protocol.clientapi.ControlMessageType;
import io.zeebe.protocol.clientapi.ExecuteCommandResponseDecoder;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.SubscriptionType;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.intent.DeploymentIntent;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.test.broker.protocol.clientapi.ClientApiRule;
import io.zeebe.test.broker.protocol.clientapi.ControlMessageRequestBuilder;
import io.zeebe.test.broker.protocol.clientapi.SubscribedRecord;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

/* loaded from: input_file:io/zeebe/broker/event/SystemTopicSubscriptionTest.class */
public class SystemTopicSubscriptionTest {
    public static final int DEFAULT_PARTITION = 0;
    public EmbeddedBrokerRule brokerRule = new EmbeddedBrokerRule(new Consumer[0]);
    public ClientApiRule apiRule;

    @Rule
    public RuleChain ruleChain;

    public SystemTopicSubscriptionTest() {
        EmbeddedBrokerRule embeddedBrokerRule = this.brokerRule;
        embeddedBrokerRule.getClass();
        this.apiRule = new ClientApiRule(embeddedBrokerRule::getClientAddress);
        this.ruleChain = RuleChain.outerRule(this.brokerRule).around(this.apiRule);
    }

    @Before
    public void init() {
        this.brokerRule.getClock().pinCurrentTime();
    }

    @Test
    public void shouldOpenSubscription() {
        Assertions.assertThat(this.apiRule.openTopicSubscription(0, TypedStreamProcessorTest.STREAM_NAME, 0L).await().key()).isGreaterThanOrEqualTo(0L);
    }

    @Test
    public void shouldCloseSubscription() {
        long key = this.apiRule.openTopicSubscription(0, TypedStreamProcessorTest.STREAM_NAME, 0L).await().key();
        Assertions.assertThat(((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.REMOVE_TOPIC_SUBSCRIPTION).partitionId(0).data().put("subscriberKey", Long.valueOf(key)).done()).sendAndAwait().getData()).containsOnly(new Map.Entry[]{Assertions.entry("subscriberKey", Long.valueOf(key))});
    }

    @Test
    public void shouldPushDeploymentEvents() {
        long deploy = this.apiRule.partition().deploy(Bpmn.createExecutableProcess("wf").startEvent().done());
        long key = this.apiRule.openTopicSubscription(0, TypedStreamProcessorTest.STREAM_NAME, 0L).await().key();
        List list = (List) this.apiRule.subscribedEvents().filter(subscribedRecord -> {
            return subscribedRecord.valueType() == ValueType.DEPLOYMENT;
        }).limit(4L).collect(Collectors.toList());
        Assertions.assertThat(list).hasSize(4);
        Assertions.assertThat(list).extracting((v0) -> {
            return v0.subscriberKey();
        }).containsOnly(new Long[]{Long.valueOf(key)});
        Assertions.assertThat(list).extracting((v0) -> {
            return v0.subscriptionType();
        }).containsOnly(new SubscriptionType[]{SubscriptionType.TOPIC_SUBSCRIPTION});
        Assertions.assertThat(list).extracting((v0) -> {
            return v0.key();
        }).containsOnly(new Long[]{Long.valueOf(ExecuteCommandResponseDecoder.keyNullValue()), Long.valueOf(deploy)});
        Assertions.assertThat(list).extracting((v0) -> {
            return v0.partitionId();
        }).containsOnly(new Integer[]{0});
        Assertions.assertThat(list).extracting((v0) -> {
            return v0.timestamp();
        }).containsOnly(new Long[]{Long.valueOf(this.brokerRule.getClock().getCurrentTimeInMillis())});
        Assertions.assertThat(list).extracting((v0) -> {
            return v0.valueType();
        }).containsOnly(new ValueType[]{ValueType.DEPLOYMENT});
        Assertions.assertThat(list).extracting((v0) -> {
            return v0.sourceRecordPosition();
        }).containsExactly(new Long[]{-1L, Long.valueOf(((SubscribedRecord) list.get(0)).position()), Long.valueOf(((SubscribedRecord) list.get(1)).position()), Long.valueOf(((SubscribedRecord) list.get(2)).position())});
        Assertions.assertThat(list).extracting((v0) -> {
            return v0.recordType();
        }).containsExactly(new RecordType[]{RecordType.COMMAND, RecordType.EVENT, RecordType.COMMAND, RecordType.EVENT});
        Assertions.assertThat(list).extracting((v0) -> {
            return v0.intent();
        }).containsExactly(new Intent[]{DeploymentIntent.CREATE, DeploymentIntent.CREATED, DeploymentIntent.DISTRIBUTE, DeploymentIntent.DISTRIBUTED});
    }
}
