package io.zeebe.broker.event;

import io.zeebe.broker.job.ActivateJobsTest;
import io.zeebe.broker.test.EmbeddedBrokerRule;
import io.zeebe.protocol.clientapi.ControlMessageType;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.protocol.intent.SubscriptionIntent;
import io.zeebe.test.broker.protocol.clientapi.ClientApiRule;
import io.zeebe.test.broker.protocol.clientapi.ControlMessageRequestBuilder;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandRequestBuilder;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandResponse;
import io.zeebe.test.broker.protocol.clientapi.SubscribedRecord;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

/* loaded from: input_file:io/zeebe/broker/event/TopicSubscriptionAcknowledgementTest.class */
public class TopicSubscriptionAcknowledgementTest {
    protected static final String SUBSCRIPTION_NAME = "foo";
    public EmbeddedBrokerRule brokerRule = new EmbeddedBrokerRule(new Consumer[0]);
    public ClientApiRule apiRule;

    @Rule
    public RuleChain ruleChain;
    protected long subscriberKey;

    public TopicSubscriptionAcknowledgementTest() {
        EmbeddedBrokerRule embeddedBrokerRule = this.brokerRule;
        embeddedBrokerRule.getClass();
        this.apiRule = new ClientApiRule(embeddedBrokerRule::getClientAddress);
        this.ruleChain = RuleChain.outerRule(this.brokerRule).around(this.apiRule);
    }

    @Before
    public void openSubscription() {
        openSubscription(0L);
    }

    public void openSubscription(long j) {
        this.subscriberKey = this.apiRule.openTopicSubscription("foo", j).await().key();
    }

    protected void closeSubscription() {
        ((ControlMessageRequestBuilder) this.apiRule.createControlMessageRequest().messageType(ControlMessageType.REMOVE_TOPIC_SUBSCRIPTION).partitionId(this.apiRule.getDefaultPartitionId()).data().put("subscriberKey", Long.valueOf(this.subscriberKey)).done()).sendAndAwait();
    }

    @Test
    public void shouldAcknowledgePosition() {
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.SUBSCRIPTION, SubscriptionIntent.ACKNOWLEDGE).command().put("name", "foo").put("ackPosition", 0).done()).sendAndAwait();
        Assertions.assertThat(sendAndAwait.getValue()).containsEntry("name", "foo");
        Assertions.assertThat(sendAndAwait.recordType()).isEqualTo(RecordType.EVENT);
        Assertions.assertThat(sendAndAwait.intent()).isEqualTo(SubscriptionIntent.ACKNOWLEDGED);
    }

    @Test
    public void shouldResumeAfterAcknowledgedPosition() {
        createJob();
        List list = (List) this.apiRule.subscribedEvents().limit(2L).collect(Collectors.toList());
        ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.SUBSCRIPTION, SubscriptionIntent.ACKNOWLEDGE).command().put("name", "foo").put("ackPosition", Long.valueOf(((SubscribedRecord) list.get(0)).position())).done()).sendAndAwait();
        closeSubscription();
        this.apiRule.moveMessageStreamToTail();
        openSubscription();
        Optional findFirst = this.apiRule.subscribedEvents().findFirst();
        Assertions.assertThat(findFirst).isPresent();
        Assertions.assertThat(((SubscribedRecord) findFirst.get()).position()).isEqualTo(((SubscribedRecord) list.get(1)).position());
    }

    @Test
    public void shouldResumeAtTailOnLongMaxAckPosition() {
        ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.SUBSCRIPTION, SubscriptionIntent.ACKNOWLEDGE).command().put("name", "foo").put("ackPosition", Long.MAX_VALUE).done()).sendAndAwait();
        closeSubscription();
        this.apiRule.moveMessageStreamToTail();
        openSubscription();
        ExecuteCommandResponse sendAndAwait = ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.JOB, JobIntent.CREATE).command().put("type", ActivateJobsTest.JOB_TYPE).done()).sendAndAwait();
        Optional findFirst = this.apiRule.subscribedEvents().findFirst();
        Assertions.assertThat(findFirst).isPresent();
        Assertions.assertThat(((SubscribedRecord) findFirst.get()).position()).isEqualTo(sendAndAwait.sourceRecordPosition());
    }

    @Test
    public void shouldPersistStartPosition() {
        createJob();
        List list = (List) this.apiRule.subscribedEvents().filter(subscribedRecord -> {
            return subscribedRecord.valueType() == ValueType.JOB;
        }).map(subscribedRecord2 -> {
            return Long.valueOf(subscribedRecord2.position());
        }).limit(2L).collect(Collectors.toList());
        closeSubscription();
        this.apiRule.moveMessageStreamToTail();
        openSubscription(((Long) list.get(1)).longValue());
        Assertions.assertThat((List) this.apiRule.subscribedEvents().filter(subscribedRecord3 -> {
            return subscribedRecord3.valueType() == ValueType.JOB;
        }).map(subscribedRecord4 -> {
            return Long.valueOf(subscribedRecord4.position());
        }).limit(2L).collect(Collectors.toList())).containsExactlyElementsOf(list);
    }

    private ExecuteCommandResponse createJob() {
        return ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.JOB, JobIntent.CREATE).command().put("type", "foo").put("retries", 1).done()).sendAndAwait();
    }
}
