package io.zeebe.broker.event;

import io.zeebe.broker.job.ActivateJobsTest;
import io.zeebe.broker.test.EmbeddedBrokerRule;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.protocol.intent.SubscriberIntent;
import io.zeebe.protocol.intent.SubscriptionIntent;
import io.zeebe.test.broker.protocol.clientapi.ClientApiRule;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandRequestBuilder;
import io.zeebe.test.util.TestUtil;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

/* loaded from: input_file:io/zeebe/broker/event/TopicSubscriptionThrottlingTest.class */
public class TopicSubscriptionThrottlingTest {
    protected static final String SUBSCRIPTION_NAME = "foo";
    public EmbeddedBrokerRule brokerRule = new EmbeddedBrokerRule(new Consumer[0]);
    public ClientApiRule apiRule;

    @Rule
    public RuleChain ruleChain;

    public TopicSubscriptionThrottlingTest() {
        EmbeddedBrokerRule embeddedBrokerRule = this.brokerRule;
        embeddedBrokerRule.getClass();
        this.apiRule = new ClientApiRule(embeddedBrokerRule::getClientAddress);
        this.ruleChain = RuleChain.outerRule(this.brokerRule).around(this.apiRule);
    }

    public void openSubscription(int i) {
        ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.SUBSCRIBER, SubscriberIntent.SUBSCRIBE).command().put("startPosition", 0).put("name", "foo").put("bufferSize", Integer.valueOf(i)).done()).sendAndAwait();
    }

    @Test
    public void shouldNotPushMoreThanBufferSize() throws InterruptedException {
        createJobs(5);
        openSubscription(3);
        TestUtil.waitUntil(() -> {
            return this.apiRule.numSubscribedEventsAvailable() >= 3;
        });
        Thread.sleep(1000L);
        Assertions.assertThat(this.apiRule.numSubscribedEventsAvailable()).isEqualTo(3);
    }

    @Test
    public void shouldPushMoreAfterAck() throws InterruptedException {
        createJobs(5);
        openSubscription(3);
        TestUtil.waitUntil(() -> {
            return this.apiRule.numSubscribedEventsAvailable() == 3;
        });
        List list = (List) this.apiRule.subscribedEvents().limit(3L).map(subscribedRecord -> {
            return Long.valueOf(subscribedRecord.position());
        }).collect(Collectors.toList());
        this.apiRule.moveMessageStreamToTail();
        ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.SUBSCRIPTION, SubscriptionIntent.ACKNOWLEDGE).command().put("name", "foo").put("ackPosition", list.get(1)).done()).sendAndAwait();
        Thread.sleep(1000L);
        Assertions.assertThat(this.apiRule.numSubscribedEventsAvailable()).isEqualTo(2);
        List list2 = (List) this.apiRule.subscribedEvents().limit(2L).map(subscribedRecord2 -> {
            return Long.valueOf(subscribedRecord2.position());
        }).collect(Collectors.toList());
        Assertions.assertThat((Long) list2.get(0)).isGreaterThan((Comparable) list.get(2));
        Assertions.assertThat((Long) list2.get(1)).isGreaterThan((Comparable) list.get(2));
    }

    protected void createJobs(int i) {
        for (int i2 = 0; i2 < i; i2++) {
            ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.JOB, JobIntent.CREATE).command().put("type", ActivateJobsTest.JOB_TYPE).done()).sendAndAwait();
        }
    }
}
