package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.class */
public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest {
    @Override // org.apache.pulsar.broker.service.BatchMessageTest, org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        this.conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
        super.baseSetup();
    }

    @Test
    public void testBatchMessageAck() {
        String str = "persistent://prop/ns-abc/batchMessageAck-" + UUID.randomUUID();
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("sub-batch-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).enableBatchIndexAcknowledgment(true).negativeAckRedeliveryDelay(100L, TimeUnit.MILLISECONDS).subscribe();
        Producer create = this.pulsarClient.newProducer().topic(str).batchingMaxMessages(20).batchingMaxPublishDelay(1L, TimeUnit.HOURS).enableBatching(true).create();
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 40; i++) {
            newArrayList.add(create.newMessage().value(("batch-message-" + i).getBytes()).sendAsync());
        }
        FutureUtil.waitForAll(newArrayList).get();
        PersistentDispatcherMultipleConsumers dispatcher = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference(str).get()).getSubscription("sub-batch-1").getDispatcher();
        Message receive = subscribe.receive();
        Message receive2 = subscribe.receive();
        subscribe.acknowledge(receive);
        subscribe.acknowledge(receive2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(((Consumer) dispatcher.getConsumers().get(0)).getUnackedMessages(), 18);
        });
        Message receive3 = subscribe.receive();
        Message receive4 = subscribe.receive();
        subscribe.acknowledge(receive3);
        subscribe.acknowledge(receive4);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(((Consumer) dispatcher.getConsumers().get(0)).getUnackedMessages(), 16);
        });
        subscribe.negativeAcknowledge(subscribe.receive());
        Awaitility.await().pollInterval(1L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
            Assert.assertEquals(((Consumer) dispatcher.getConsumers().get(0)).getUnackedMessages(), 0);
        });
        subscribe.receive();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(((Consumer) dispatcher.getConsumers().get(0)).getUnackedMessages(), 16);
        });
    }
}
