/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class BatchMessageTest
extends BrokerTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(BatchMessageTest.class);

    @Override
    @BeforeClass
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override
    @AfterClass(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider(name="codecAndContainerBuilder")
    public Object[][] codecAndContainerBuilderProvider() {
        return new Object[][]{{CompressionType.NONE, BatcherBuilder.DEFAULT}, {CompressionType.LZ4, BatcherBuilder.DEFAULT}, {CompressionType.ZLIB, BatcherBuilder.DEFAULT}, {CompressionType.NONE, BatcherBuilder.KEY_BASED}, {CompressionType.LZ4, BatcherBuilder.KEY_BASED}, {CompressionType.ZLIB, BatcherBuilder.KEY_BASED}};
    }

    @DataProvider(name="containerBuilder")
    public Object[][] containerBuilderProvider() {
        return new Object[][]{{BatcherBuilder.DEFAULT}, {BatcherBuilder.KEY_BASED}};
    }

    @Test(dataProvider="codecAndContainerBuilder")
    public void testSimpleBatchProducerWithFixedBatchSize(CompressionType compressionType, BatcherBuilder builder) throws Exception {
        int numMsgs = 50;
        int numMsgsInBatch = numMsgs / 2;
        String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSize-" + UUID.randomUUID();
        String subscriptionName = "sub-1" + compressionType.toString();
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).subscribe();
        consumer.close();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).compressionType(compressionType).batchingMaxPublishDelay(5L, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true).batcherBuilder(builder).create();
        ArrayList sendFutureList = Lists.newArrayList();
        for (int i = 0; i < numMsgs; ++i) {
            byte[] message = ("my-message-" + i).getBytes();
            sendFutureList.add(producer.sendAsync((Object)message));
        }
        FutureUtil.waitForAll((List)sendFutureList).get();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        this.rolloverPerIntervalStats();
        Assert.assertTrue((((Producer)topic.getProducers().values().iterator().next()).getStats().msgRateIn > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((long)topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), (long)2L);
        consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).subscribe();
        for (int i = 0; i < numMsgs; ++i) {
            Message msg = consumer.receive(5, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)msg);
            String receivedMessage = new String(msg.getData());
            String expectedMessage = "my-message-" + i;
            Assert.assertEquals((String)receivedMessage, (String)expectedMessage, (String)("Received message " + receivedMessage + " did not match the expected message " + expectedMessage));
        }
        consumer.close();
        producer.close();
    }

    @Test(dataProvider="codecAndContainerBuilder")
    public void testSimpleBatchProducerWithFixedBatchBytes(CompressionType compressionType, BatcherBuilder builder) throws Exception {
        int numMsgs = 50;
        int numBytesInBatch = 600;
        String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSize-" + UUID.randomUUID();
        String subscriptionName = "sub-1" + compressionType.toString();
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).subscribe();
        consumer.close();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).compressionType(compressionType).batchingMaxPublishDelay(5L, TimeUnit.SECONDS).batchingMaxMessages(0).batchingMaxBytes(numBytesInBatch).enableBatching(true).batcherBuilder(builder).create();
        ArrayList sendFutureList = Lists.newArrayList();
        for (int i = 0; i < numMsgs; ++i) {
            byte[] message = ("my-message-" + i).getBytes();
            sendFutureList.add(producer.sendAsync((Object)message));
        }
        FutureUtil.waitForAll((List)sendFutureList).get();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        this.rolloverPerIntervalStats();
        Assert.assertTrue((((Producer)topic.getProducers().values().iterator().next()).getStats().msgRateIn > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((long)topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), (long)2L);
        consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).subscribe();
        for (int i = 0; i < numMsgs; ++i) {
            Message msg = consumer.receive(5, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)msg);
            String receivedMessage = new String(msg.getData());
            String expectedMessage = "my-message-" + i;
            Assert.assertEquals((String)receivedMessage, (String)expectedMessage, (String)("Received message " + receivedMessage + " did not match the expected message " + expectedMessage));
        }
        consumer.close();
        producer.close();
    }

    @Test(dataProvider="codecAndContainerBuilder")
    public void testSimpleBatchProducerWithFixedBatchTime(CompressionType compressionType, BatcherBuilder builder) throws Exception {
        int numMsgs = 100;
        String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchTime-" + UUID.randomUUID();
        String subscriptionName = "time-sub-1" + compressionType.toString();
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).subscribe();
        consumer.close();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).compressionType(compressionType).batchingMaxPublishDelay(10L, TimeUnit.MILLISECONDS).enableBatching(true).batcherBuilder(builder).create();
        Random random = new Random();
        ArrayList sendFutureList = Lists.newArrayList();
        for (int i = 0; i < numMsgs; ++i) {
            Thread.sleep(random.nextInt(4));
            byte[] message = ("msg-" + i).getBytes();
            sendFutureList.add(producer.sendAsync((Object)message));
        }
        FutureUtil.waitForAll((List)sendFutureList).get();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        this.rolloverPerIntervalStats();
        Assert.assertTrue((((Producer)topic.getProducers().values().iterator().next()).getStats().msgRateIn > 0.0 ? 1 : 0) != 0);
        LOG.info("Sent {} messages, backlog is {} messages", (Object)numMsgs, (Object)topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false));
        Assert.assertTrue((topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false) < (long)numMsgs ? 1 : 0) != 0);
        producer.close();
    }

    @Test(dataProvider="codecAndContainerBuilder")
    public void testSimpleBatchProducerWithFixedBatchSizeAndTime(CompressionType compressionType, BatcherBuilder builder) throws Exception {
        int numMsgs = 100;
        String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSizeAndTime-" + UUID.randomUUID();
        String subscriptionName = "time-size-sub-1" + compressionType.toString();
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).subscribe();
        consumer.close();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).batchingMaxPublishDelay(10L, TimeUnit.MILLISECONDS).batchingMaxMessages(5).batcherBuilder(builder).compressionType(compressionType).enableBatching(true).create();
        Random random = new Random();
        ArrayList sendFutureList = Lists.newArrayList();
        for (int i = 0; i < numMsgs; ++i) {
            Thread.sleep(random.nextInt(4));
            byte[] message = ("msg-" + i).getBytes();
            sendFutureList.add(producer.sendAsync((Object)message));
        }
        FutureUtil.waitForAll((List)sendFutureList).get();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        this.rolloverPerIntervalStats();
        Assert.assertTrue((((Producer)topic.getProducers().values().iterator().next()).getStats().msgRateIn > 0.0 ? 1 : 0) != 0);
        LOG.info("Sent {} messages, backlog is {} messages", (Object)numMsgs, (Object)topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false));
        Assert.assertTrue((topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false) < (long)numMsgs ? 1 : 0) != 0);
        producer.close();
    }

    @Test(dataProvider="codecAndContainerBuilder")
    public void testBatchProducerWithLargeMessage(CompressionType compressionType, BatcherBuilder builder) throws Exception {
        int numMsgs = 50;
        int numMsgsInBatch = numMsgs / 2;
        String topicName = "persistent://prop/ns-abc/testBatchProducerWithLargeMessage-" + UUID.randomUUID();
        String subscriptionName = "large-message-sub-1" + compressionType.toString();
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).subscribe();
        consumer.close();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).compressionType(compressionType).batchingMaxPublishDelay(5L, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true).batcherBuilder(builder).create();
        ArrayList sendFutureList = Lists.newArrayList();
        for (int i = 0; i < numMsgs; ++i) {
            if (i == 25) {
                byte[] largeMessage = new byte[131076];
                sendFutureList.add(producer.sendAsync((Object)largeMessage));
                continue;
            }
            byte[] message = ("msg-" + i).getBytes();
            sendFutureList.add(producer.sendAsync((Object)message));
        }
        byte[] lastMsg = "msg-last".getBytes();
        sendFutureList.add(producer.sendAsync((Object)lastMsg));
        FutureUtil.waitForAll((List)sendFutureList).get();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        this.rolloverPerIntervalStats();
        Assert.assertTrue((((Producer)topic.getProducers().values().iterator().next()).getStats().msgRateIn > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((long)topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), (long)3L);
        consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        for (int i = 0; i <= numMsgs; ++i) {
            Message msg = consumer.receive(5, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)msg);
            LOG.info("received msg - {}", (Object)Arrays.toString(msg.getData()));
            consumer.acknowledge(msg);
        }
        Thread.sleep(100L);
        Assert.assertEquals((long)topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), (long)0L);
        consumer.close();
        producer.close();
    }

    @Test(dataProvider="codecAndContainerBuilder")
    public void testSimpleBatchProducerConsumer(CompressionType compressionType, BatcherBuilder builder) throws Exception {
        int numMsgs = 500;
        int numMsgsInBatch = numMsgs / 20;
        String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerConsumer-" + UUID.randomUUID();
        String subscriptionName = "pc-sub-1" + compressionType.toString();
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).subscribe();
        consumer.close();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).compressionType(compressionType).messageRoutingMode(MessageRoutingMode.SinglePartition).batchingMaxPublishDelay(60L, TimeUnit.HOURS).batchingMaxMessages(2 * numMsgs).enableBatching(true).batcherBuilder(builder).create();
        ArrayList sendFutureList = Lists.newArrayList();
        for (int i = 0; i < numMsgs; ++i) {
            byte[] message = ("msg-" + i).getBytes();
            sendFutureList.add(producer.sendAsync((Object)message));
            if ((i + 1) % numMsgsInBatch != 0) continue;
            producer.flush();
            LOG.info("Flush {} messages", (Object)(i + 1));
        }
        FutureUtil.waitForAll((List)sendFutureList).get();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        this.rolloverPerIntervalStats();
        Assert.assertTrue((((Producer)topic.getProducers().values().iterator().next()).getStats().msgRateIn > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((long)topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), (long)(numMsgs / numMsgsInBatch));
        consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).subscribe();
        Message lastunackedMsg = null;
        for (int i = 0; i < numMsgs; ++i) {
            Message msg = consumer.receive(5, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)msg);
            if (i % 2 == 0) {
                consumer.acknowledgeCumulative(msg);
                continue;
            }
            lastunackedMsg = msg;
        }
        if (lastunackedMsg != null) {
            consumer.acknowledgeCumulative(lastunackedMsg);
        }
        Thread.sleep(100L);
        Assert.assertEquals((long)topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), (long)0L);
        consumer.close();
        producer.close();
    }

    @Test(dataProvider="containerBuilder")
    public void testSimpleBatchSyncProducerWithFixedBatchSize(BatcherBuilder builder) throws Exception {
        int numMsgs = 10;
        int numMsgsInBatch = numMsgs / 2;
        String topicName = "persistent://prop/ns-abc/testSimpleBatchSyncProducerWithFixedBatchSize-" + UUID.randomUUID();
        String subscriptionName = "syncsub-1";
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("syncsub-1").subscribe();
        consumer.close();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).batchingMaxPublishDelay(1L, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true).batcherBuilder(builder).create();
        for (int i = 0; i < numMsgs; ++i) {
            byte[] message = ("my-message-" + i).getBytes();
            producer.send((Object)message);
        }
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        this.rolloverPerIntervalStats();
        Assert.assertTrue((((Producer)topic.getProducers().values().iterator().next()).getStats().msgRateIn > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((long)topic.getSubscription("syncsub-1").getNumberOfEntriesInBacklog(false), (long)10L);
        consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("syncsub-1").subscribe();
        for (int i = 0; i < numMsgs; ++i) {
            Message msg = consumer.receive(5, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)msg);
            String receivedMessage = new String(msg.getData());
            String expectedMessage = "my-message-" + i;
            Assert.assertEquals((String)receivedMessage, (String)expectedMessage, (String)("Received message " + receivedMessage + " did not match the expected message " + expectedMessage));
        }
        consumer.close();
        producer.close();
    }

    @Test(dataProvider="containerBuilder")
    public void testSimpleBatchProducerConsumer1kMessages(BatcherBuilder builder) throws Exception {
        int numMsgs = 2000;
        int numMsgsInBatch = 4;
        String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerConsumer1kMessages-" + UUID.randomUUID();
        String subscriptionName = "pc1k-sub-1";
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("pc1k-sub-1").subscribe();
        consumer.close();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).maxPendingMessages(numMsgs + 1).batchingMaxPublishDelay(30L, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true).batcherBuilder(builder).create();
        ArrayList sendFutureList = Lists.newArrayList();
        for (int i = 0; i < numMsgs; ++i) {
            Object message = ("msg-" + i).getBytes();
            sendFutureList.add(producer.sendAsync(message));
        }
        FutureUtil.waitForAll((List)sendFutureList).get();
        int sendError = 0;
        for (CompletableFuture sendFuture : sendFutureList) {
            if (!sendFuture.isCompletedExceptionally()) continue;
            ++sendError;
        }
        if (sendError != 0) {
            LOG.warn("[{}] Error sending {} messages", (Object)"pc1k-sub-1", (Object)sendError);
            numMsgs -= sendError;
        }
        LOG.info("[{}] sent {} messages", (Object)"pc1k-sub-1", (Object)numMsgs);
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        LOG.info("[{}] checking backlog stats..", (Object)topic);
        this.rolloverPerIntervalStats();
        Assert.assertEquals((long)topic.getSubscription("pc1k-sub-1").getNumberOfEntriesInBacklog(false), (long)(numMsgs / numMsgsInBatch));
        consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("pc1k-sub-1").subscribe();
        Message lastunackedMsg = null;
        for (int i = 0; i < numMsgs; ++i) {
            Message msg = consumer.receive(1, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)msg);
            lastunackedMsg = msg;
        }
        if (lastunackedMsg != null) {
            consumer.acknowledgeCumulative(lastunackedMsg);
        }
        consumer.close();
        producer.close();
        Assert.assertEquals((long)topic.getSubscription("pc1k-sub-1").getNumberOfEntriesInBacklog(false), (long)0L);
    }

    @Test(groups={"broker"})
    public void testOutOfOrderAcksForBatchMessage() throws Exception {
        int numMsgs = 40;
        int numMsgsInBatch = numMsgs / 4;
        String topicName = "persistent://prop/ns-abc/testOutOfOrderAcksForBatchMessage";
        String subscriptionName = "oooack-sub-1";
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/testOutOfOrderAcksForBatchMessage"}).subscriptionName("oooack-sub-1").subscribe();
        consumer.close();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testOutOfOrderAcksForBatchMessage").batchingMaxPublishDelay(5L, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true).create();
        ArrayList sendFutureList = Lists.newArrayList();
        for (int i = 0; i < numMsgs; ++i) {
            byte[] message = ("msg-" + i).getBytes();
            sendFutureList.add(producer.sendAsync((Object)message));
        }
        FutureUtil.waitForAll((List)sendFutureList).get();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testOutOfOrderAcksForBatchMessage").get();
        this.rolloverPerIntervalStats();
        Assert.assertEquals((long)topic.getSubscription("oooack-sub-1").getNumberOfEntriesInBacklog(false), (long)(numMsgs / numMsgsInBatch));
        consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/testOutOfOrderAcksForBatchMessage"}).subscriptionName("oooack-sub-1").subscribe();
        HashSet<Integer> individualAcks = new HashSet<Integer>();
        for (int i = 15; i < 20; ++i) {
            individualAcks.add(i);
        }
        Message lastunackedMsg = null;
        for (int i = 0; i < numMsgs; ++i) {
            Message msg = consumer.receive(5, TimeUnit.SECONDS);
            LOG.info("received message {}", (Object)new String(msg.getData(), StandardCharsets.UTF_8));
            Assert.assertNotNull((Object)msg);
            if (i == 8) {
                consumer.acknowledgeCumulative(msg);
                continue;
            }
            if (i == 9) continue;
            if (i == 14) {
                consumer.acknowledgeCumulative(msg);
                Thread.sleep(1000L);
                this.rolloverPerIntervalStats();
                Thread.sleep(1000L);
                Assert.assertEquals((long)topic.getSubscription("oooack-sub-1").getNumberOfEntriesInBacklog(false), (long)3L);
                continue;
            }
            if (individualAcks.contains(i)) {
                consumer.acknowledge(msg);
                continue;
            }
            lastunackedMsg = msg;
        }
        Thread.sleep(1000L);
        this.rolloverPerIntervalStats();
        Assert.assertEquals((long)topic.getSubscription("oooack-sub-1").getNumberOfEntriesInBacklog(false), (long)2L);
        if (lastunackedMsg != null) {
            consumer.acknowledgeCumulative(lastunackedMsg);
        }
        Thread.sleep(100L);
        Assert.assertEquals((long)topic.getSubscription("oooack-sub-1").getNumberOfEntriesInBacklog(false), (long)0L);
        consumer.close();
        producer.close();
    }

    @Test(dataProvider="containerBuilder")
    public void testNonBatchCumulativeAckAfterBatchPublish(BatcherBuilder builder) throws Exception {
        int numMsgs;
        int numMsgsInBatch = numMsgs = 10;
        String topicName = "persistent://prop/ns-abc/testNonBatchCumulativeAckAfterBatchPublish-" + UUID.randomUUID();
        String subscriptionName = "nbcaabp-sub-1";
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("nbcaabp-sub-1").subscribe();
        consumer.close();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).batchingMaxPublishDelay(5L, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true).batcherBuilder(builder).create();
        org.apache.pulsar.client.api.Producer noBatchProducer = this.pulsarClient.newProducer().topic(topicName).create();
        ArrayList sendFutureList = Lists.newArrayList();
        for (int i = 0; i < numMsgs; ++i) {
            byte[] message = ("msg-" + i).getBytes();
            sendFutureList.add(producer.sendAsync((Object)message));
        }
        FutureUtil.waitForAll((List)sendFutureList).get();
        sendFutureList.clear();
        byte[] nobatchmsg = "nobatch".getBytes();
        noBatchProducer.sendAsync((Object)nobatchmsg).get();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        this.rolloverPerIntervalStats();
        Assert.assertTrue((((Producer)topic.getProducers().values().iterator().next()).getStats().msgRateIn > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((long)topic.getSubscription("nbcaabp-sub-1").getNumberOfEntriesInBacklog(false), (long)2L);
        consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("nbcaabp-sub-1").subscribe();
        Message lastunackedMsg = null;
        for (int i = 0; i <= numMsgs; ++i) {
            Message msg = consumer.receive(5, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)msg);
            lastunackedMsg = msg;
        }
        consumer.acknowledgeCumulative(lastunackedMsg);
        Thread.sleep(100L);
        this.rolloverPerIntervalStats();
        Assert.assertEquals((long)topic.getSubscription("nbcaabp-sub-1").getNumberOfEntriesInBacklog(false), (long)0L);
        consumer.close();
        producer.close();
        noBatchProducer.close();
    }

    @Test(dataProvider="containerBuilder")
    public void testBatchAndNonBatchCumulativeAcks(BatcherBuilder builder) throws Exception {
        int numMsgs = 50;
        int numMsgsInBatch = numMsgs / 10;
        String topicName = "persistent://prop/ns-abc/testBatchAndNonBatchCumulativeAcks-" + UUID.randomUUID();
        String subscriptionName = "bnb-sub-1";
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("bnb-sub-1").subscribe();
        consumer.close();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).batchingMaxPublishDelay(5L, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true).batcherBuilder(builder).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        org.apache.pulsar.client.api.Producer noBatchProducer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        ArrayList sendFutureList = Lists.newArrayList();
        for (int i = 0; i < numMsgs / 2; ++i) {
            byte[] message = ("msg-" + i).getBytes();
            sendFutureList.add(producer.sendAsync((Object)message));
            byte[] nobatchmsg = ("nobatch-" + i).getBytes();
            sendFutureList.add(noBatchProducer.sendAsync((Object)nobatchmsg));
        }
        FutureUtil.waitForAll((List)sendFutureList).get();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        this.rolloverPerIntervalStats();
        Assert.assertTrue((((Producer)topic.getProducers().values().iterator().next()).getStats().msgRateIn > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((long)topic.getSubscription("bnb-sub-1").getNumberOfEntriesInBacklog(false), (long)(numMsgs / 2 / numMsgsInBatch + numMsgs / 2));
        consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("bnb-sub-1").acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        Message lastunackedMsg = null;
        for (int i = 0; i < numMsgs; ++i) {
            Message msg = consumer.receive(5, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)msg);
            LOG.info("[{}] got message position{} data {}", new Object[]{"bnb-sub-1", msg.getMessageId(), Arrays.toString(msg.getData())});
            if (i % 2 == 0) {
                lastunackedMsg = msg;
                continue;
            }
            consumer.acknowledgeCumulative(msg);
            LOG.info("[{}] did cumulative ack on position{} ", (Object)"bnb-sub-1", (Object)msg.getMessageId());
        }
        consumer.acknowledgeCumulative(lastunackedMsg);
        BatchMessageTest.retryStrategically(t -> topic.getSubscription("bnb-sub-1").getNumberOfEntriesInBacklog(false) == 0L, 100, 100L);
        consumer.close();
        producer.close();
        noBatchProducer.close();
    }

    @Test(dataProvider="containerBuilder")
    public void testConcurrentBatchMessageAck(BatcherBuilder builder) throws Exception {
        int numMsgs = 10;
        String topicName = "persistent://prop/ns-abc/testConcurrentAck-" + UUID.randomUUID();
        String subscriptionName = "sub-1";
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("sub-1").subscriptionType(SubscriptionType.Shared).subscribe();
        consumer.close();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).batchingMaxPublishDelay(5L, TimeUnit.SECONDS).batchingMaxMessages(numMsgs).enableBatching(true).batcherBuilder(builder).create();
        ArrayList sendFutureList = Lists.newArrayList();
        for (int i = 0; i < numMsgs; ++i) {
            byte[] message = ("my-message-" + i).getBytes();
            sendFutureList.add(producer.sendAsync((Object)message));
        }
        FutureUtil.waitForAll((List)sendFutureList).get();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        org.apache.pulsar.client.api.Consumer myConsumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("sub-1").subscriptionType(SubscriptionType.Shared).subscribe();
        ExecutorService executor = Executors.newFixedThreadPool(10);
        CountDownLatch latch = new CountDownLatch(numMsgs);
        AtomicBoolean failed = new AtomicBoolean(false);
        for (int i = 0; i < numMsgs; ++i) {
            executor.submit(() -> {
                try {
                    Message msg = myConsumer.receive(1, TimeUnit.SECONDS);
                    myConsumer.acknowledge(msg);
                }
                catch (Exception e) {
                    failed.set(false);
                }
                latch.countDown();
            });
        }
        latch.await();
        PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers)topic.getSubscription("sub-1").getDispatcher();
        BatchMessageTest.retryStrategically(test -> ((Consumer)dispatcher.getConsumers().get(0)).getUnackedMessages() == 0, 50, 150L);
        Assert.assertEquals((int)((Consumer)dispatcher.getConsumers().get(0)).getUnackedMessages(), (int)0);
        executor.shutdownNow();
        myConsumer.close();
        producer.close();
    }

    @Test
    public void testOrderingOfKeyBasedBatchMessageContainer() throws PulsarClientException, ExecutionException, InterruptedException {
        int i;
        String topicName = "persistent://prop/ns-abc/testKeyBased";
        String subscriptionName = "sub-1";
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testKeyBased").batchingMaxPublishDelay(5L, TimeUnit.SECONDS).batchingMaxMessages(30).enableBatching(true).batcherBuilder(BatcherBuilder.KEY_BASED).create();
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/testKeyBased"}).subscriptionName("sub-1").subscriptionType(SubscriptionType.Key_Shared).subscribe();
        ArrayList sendFutureList = Lists.newArrayList();
        String[] keys = new String[]{"key-1", "key-2", "key-3"};
        for (int i2 = 0; i2 < 10; ++i2) {
            byte[] message = ("my-message-" + i2).getBytes();
            for (String key : keys) {
                sendFutureList.add(producer.newMessage().key(key).value((Object)message).sendAsync());
            }
        }
        FutureUtil.waitForAll((List)sendFutureList).get();
        String receivedKey = "";
        int receivedMessageIndex = 0;
        for (i = 0; i < 30; ++i) {
            Message received = consumer.receive();
            if (!received.getKey().equals(receivedKey)) {
                receivedKey = received.getKey();
                receivedMessageIndex = 0;
            }
            Assert.assertEquals((String)new String((byte[])received.getValue()), (String)("my-message-" + receivedMessageIndex % 10));
            consumer.acknowledge(received);
            ++receivedMessageIndex;
        }
        for (i = 0; i < 10; ++i) {
            byte[] message = ("my-message-" + i).getBytes();
            for (String key : keys) {
                sendFutureList.add(producer.newMessage().key(UUID.randomUUID().toString()).orderingKey(key.getBytes()).value((Object)message).sendAsync());
            }
        }
        FutureUtil.waitForAll((List)sendFutureList).get();
        receivedKey = "";
        receivedMessageIndex = 0;
        for (i = 0; i < 30; ++i) {
            Message received = consumer.receive();
            if (!new String(received.getOrderingKey()).equals(receivedKey)) {
                receivedKey = new String(received.getOrderingKey());
                receivedMessageIndex = 0;
            }
            Assert.assertEquals((String)new String((byte[])received.getValue()), (String)("my-message-" + receivedMessageIndex % 10));
            consumer.acknowledge(received);
            ++receivedMessageIndex;
        }
        consumer.close();
        producer.close();
    }

    @Test(dataProvider="containerBuilder")
    public void testRetrieveSequenceIdGenerated(BatcherBuilder builder) throws Exception {
        int i;
        int numMsgs = 10;
        String topicName = "persistent://prop/ns-abc/testRetrieveSequenceIdGenerated-" + UUID.randomUUID();
        String subscriptionName = "sub-1";
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("sub-1").subscriptionType(SubscriptionType.Shared).subscribe();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).batchingMaxPublishDelay(5L, TimeUnit.SECONDS).batchingMaxMessages(numMsgs).enableBatching(true).batcherBuilder(builder).create();
        ArrayList sendFutureList = Lists.newArrayList();
        for (i = 0; i < numMsgs; ++i) {
            byte[] message = ("my-message-" + i).getBytes();
            sendFutureList.add(producer.sendAsync((Object)message));
        }
        FutureUtil.waitForAll((List)sendFutureList).get();
        for (i = 0; i < numMsgs; ++i) {
            Message received = consumer.receive();
            Assert.assertEquals((long)received.getSequenceId(), (long)i);
            consumer.acknowledge(received);
        }
        producer.close();
        consumer.close();
    }

    @Test(dataProvider="containerBuilder")
    public void testRetrieveSequenceIdSpecify(BatcherBuilder builder) throws Exception {
        int i;
        int numMsgs = 10;
        String topicName = "persistent://prop/ns-abc/testRetrieveSequenceIdSpecify-" + UUID.randomUUID();
        String subscriptionName = "sub-1";
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("sub-1").subscriptionType(SubscriptionType.Shared).subscribe();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).batchingMaxPublishDelay(5L, TimeUnit.SECONDS).batchingMaxMessages(numMsgs).enableBatching(true).batcherBuilder(builder).create();
        ArrayList sendFutureList = Lists.newArrayList();
        for (i = 0; i < numMsgs; ++i) {
            byte[] message = ("my-message-" + i).getBytes();
            sendFutureList.add(producer.newMessage().sequenceId((long)(i + 100)).value((Object)message).sendAsync());
        }
        FutureUtil.waitForAll((List)sendFutureList).get();
        for (i = 0; i < numMsgs; ++i) {
            Message received = consumer.receive();
            Assert.assertEquals((long)received.getSequenceId(), (long)(i + 100));
            consumer.acknowledge(received);
        }
        producer.close();
        consumer.close();
    }

    @Test(dataProvider="codecAndContainerBuilder")
    public void testSendOverSizeMessage(CompressionType compressionType, BatcherBuilder builder) throws Exception {
        int numMsgs = 10;
        String topicName = "persistent://prop/ns-abc/testSendOverSizeMessage-" + UUID.randomUUID();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).batchingMaxPublishDelay(1L, TimeUnit.MILLISECONDS).batchingMaxMessages(2).enableBatching(true).compressionType(compressionType).batcherBuilder(builder).create();
        try {
            producer.send((Object)new byte[0xA00000]);
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)(e instanceof PulsarClientException.InvalidMessageException));
        }
        for (int i = 0; i < 10; ++i) {
            producer.send((Object)new byte[1024]);
        }
        producer.close();
    }

    @Test
    public void testBatchMessageDispatchingAccordingToPermits() throws Exception {
        int numMsgs = 1000;
        int batchMessages = 10;
        String topicName = "persistent://prop/ns-abc/testRetrieveSequenceIdSpecify-" + UUID.randomUUID();
        String subscriptionName = "sub-1";
        ConsumerImpl consumer1 = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("sub-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
        ConsumerImpl consumer2 = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("sub-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
        org.apache.pulsar.client.api.Producer producer = this.pulsarClient.newProducer().topic(topicName).batchingMaxMessages(batchMessages).batchingMaxPublishDelay(500L, TimeUnit.MILLISECONDS).enableBatching(true).create();
        ArrayList sendFutureList = Lists.newArrayList();
        for (int i = 0; i < numMsgs; ++i) {
            byte[] message = ("my-message-" + i).getBytes();
            sendFutureList.add(producer.newMessage().value((Object)message).sendAsync());
        }
        FutureUtil.waitForAll((List)sendFutureList).get();
        Assert.assertEquals((float)consumer1.numMessagesInQueue(), (float)batchMessages, (float)batchMessages);
        Assert.assertEquals((float)consumer2.numMessagesInQueue(), (float)batchMessages, (float)batchMessages);
        producer.close();
        consumer1.close();
    }
}

