/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"quarantine"})
public class ConsumerBatchReceiveTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(ConsumerBatchReceiveTest.class);

    @Override
    @BeforeClass(alwaysRun=true)
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterClass(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider(name="batchReceivePolicy")
    public Object[][] batchReceivePolicyProvider() {
        return new Object[][]{{BatchReceivePolicy.DEFAULT_POLICY, true, 1000, false}, {BatchReceivePolicy.builder().timeout(50, TimeUnit.MILLISECONDS).build(), true, 1000, false}, {BatchReceivePolicy.builder().maxNumMessages(10).build(), true, 1000, false}, {BatchReceivePolicy.builder().maxNumMessages(13).timeout(50, TimeUnit.MILLISECONDS).build(), true, 1000, false}, {BatchReceivePolicy.builder().maxNumBytes(64).timeout(50, TimeUnit.MILLISECONDS).build(), true, 1000, false}, {BatchReceivePolicy.DEFAULT_POLICY, false, 1000, false}, {BatchReceivePolicy.builder().timeout(50, TimeUnit.MILLISECONDS).build(), false, 1000, false}, {BatchReceivePolicy.builder().maxNumMessages(10).build(), false, 1000, false}, {BatchReceivePolicy.builder().maxNumMessages(13).timeout(50, TimeUnit.MILLISECONDS).build(), false, 1000, false}, {BatchReceivePolicy.builder().maxNumBytes(64).timeout(50, TimeUnit.MILLISECONDS).build(), false, 1000, false}, {BatchReceivePolicy.builder().maxNumMessages(70).build(), true, 50, false}, {BatchReceivePolicy.builder().maxNumMessages(50).timeout(10, TimeUnit.MILLISECONDS).build(), true, 30, false}, {BatchReceivePolicy.builder().maxNumMessages(-10).timeout(10, TimeUnit.MILLISECONDS).build(), true, 10, false}, {BatchReceivePolicy.builder().maxNumBytes(-100).timeout(50, TimeUnit.MILLISECONDS).build(), true, 30, false}, {BatchReceivePolicy.builder().maxNumMessages(-10).maxNumBytes(-100).timeout(50, TimeUnit.MILLISECONDS).build(), true, 30, false}, {BatchReceivePolicy.builder().maxNumMessages(70).build(), false, 50, false}, {BatchReceivePolicy.builder().maxNumMessages(50).timeout(50, TimeUnit.MILLISECONDS).build(), false, 30, false}, {BatchReceivePolicy.builder().maxNumMessages(-10).timeout(50, TimeUnit.MILLISECONDS).build(), false, 30, false}, {BatchReceivePolicy.builder().maxNumBytes(-100).timeout(50, TimeUnit.MILLISECONDS).build(), false, 30, false}, {BatchReceivePolicy.builder().maxNumMessages(-10).maxNumBytes(-100).timeout(50, TimeUnit.MILLISECONDS).build(), false, 30, false}, {BatchReceivePolicy.builder().maxNumMessages(0).maxNumBytes(0).timeout(50, TimeUnit.MILLISECONDS).build(), false, 30, false}, {BatchReceivePolicy.builder().maxNumMessages(-1).maxNumBytes(-1).timeout(50, TimeUnit.MILLISECONDS).build(), false, 30, false}, {BatchReceivePolicy.DEFAULT_POLICY, true, 1000, true}, {BatchReceivePolicy.builder().timeout(50, TimeUnit.MILLISECONDS).build(), true, 1000, true}, {BatchReceivePolicy.builder().maxNumMessages(10).build(), true, 1000, true}, {BatchReceivePolicy.builder().maxNumMessages(13).timeout(50, TimeUnit.MILLISECONDS).build(), true, 1000, true}, {BatchReceivePolicy.builder().maxNumBytes(64).timeout(50, TimeUnit.MILLISECONDS).build(), true, 1000, true}, {BatchReceivePolicy.DEFAULT_POLICY, false, 1000, true}, {BatchReceivePolicy.builder().timeout(50, TimeUnit.MILLISECONDS).build(), false, 1000, true}, {BatchReceivePolicy.builder().maxNumMessages(10).build(), false, 1000, true}, {BatchReceivePolicy.builder().maxNumMessages(13).timeout(50, TimeUnit.MILLISECONDS).build(), false, 1000, true}, {BatchReceivePolicy.builder().maxNumBytes(64).timeout(50, TimeUnit.MILLISECONDS).build(), false, 1000, true}, {BatchReceivePolicy.builder().maxNumMessages(70).build(), true, 50, true}, {BatchReceivePolicy.builder().maxNumMessages(50).timeout(10, TimeUnit.MILLISECONDS).build(), true, 30, true}, {BatchReceivePolicy.builder().maxNumMessages(-10).timeout(10, TimeUnit.MILLISECONDS).build(), true, 10, true}, {BatchReceivePolicy.builder().maxNumBytes(-100).timeout(50, TimeUnit.MILLISECONDS).build(), true, 30, true}, {BatchReceivePolicy.builder().maxNumMessages(-10).maxNumBytes(-100).timeout(50, TimeUnit.MILLISECONDS).build(), true, 30, true}, {BatchReceivePolicy.builder().maxNumMessages(70).build(), false, 50, true}, {BatchReceivePolicy.builder().maxNumMessages(50).timeout(50, TimeUnit.MILLISECONDS).build(), false, 30, true}, {BatchReceivePolicy.builder().maxNumMessages(-10).timeout(50, TimeUnit.MILLISECONDS).build(), false, 30, true}, {BatchReceivePolicy.builder().maxNumBytes(-100).timeout(50, TimeUnit.MILLISECONDS).build(), false, 30, true}, {BatchReceivePolicy.builder().maxNumMessages(-10).maxNumBytes(-100).timeout(50, TimeUnit.MILLISECONDS).build(), false, 30, true}, {BatchReceivePolicy.builder().maxNumMessages(0).maxNumBytes(0).timeout(50, TimeUnit.MILLISECONDS).build(), false, 30, true}, {BatchReceivePolicy.builder().maxNumMessages(-1).maxNumBytes(-1).timeout(50, TimeUnit.MILLISECONDS).build(), false, 30, true}};
    }

    @Test(dataProvider="batchReceivePolicy")
    public void testBatchReceiveNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize, boolean isEnableAckReceipt) throws Exception {
        String topic = "persistent://my-property/my-ns/batch-receive-non-partition-" + UUID.randomUUID();
        this.testBatchReceive(topic, batchReceivePolicy, batchProduce, receiverQueueSize, isEnableAckReceipt);
    }

    @Test(dataProvider="batchReceivePolicy")
    public void testBatchReceivePartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize, boolean isEnableAckReceipt) throws Exception {
        String topic = "persistent://my-property/my-ns/batch-receive-" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(topic, 3);
        this.testBatchReceive(topic, batchReceivePolicy, batchProduce, receiverQueueSize, isEnableAckReceipt);
    }

    @Test(dataProvider="batchReceivePolicy")
    public void testAsyncBatchReceiveNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize, boolean isEnableAckReceipt) throws Exception {
        String topic = "persistent://my-property/my-ns/batch-receive-non-partition-async-" + UUID.randomUUID();
        this.testBatchReceiveAsync(topic, batchReceivePolicy, batchProduce, receiverQueueSize, isEnableAckReceipt);
    }

    @Test(dataProvider="batchReceivePolicy")
    public void testAsyncBatchReceivePartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize, boolean isEnableAckReceipt) throws Exception {
        String topic = "persistent://my-property/my-ns/batch-receive-async-" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(topic, 3);
        this.testBatchReceiveAsync(topic, batchReceivePolicy, batchProduce, receiverQueueSize, isEnableAckReceipt);
    }

    @Test(dataProvider="batchReceivePolicy")
    public void testBatchReceiveAndRedeliveryNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize, boolean isEnableAckReceipt) throws Exception {
        String topic = "persistent://my-property/my-ns/batch-receive-and-redelivery-non-partition-" + UUID.randomUUID();
        this.testBatchReceiveAndRedelivery(topic, batchReceivePolicy, batchProduce, receiverQueueSize, isEnableAckReceipt);
    }

    @Test(dataProvider="batchReceivePolicy")
    public void testBatchReceiveAndRedeliveryPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize, boolean isEnableAckReceipt) throws Exception {
        String topic = "persistent://my-property/my-ns/batch-receive-and-redelivery-" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(topic, 3);
        this.testBatchReceiveAndRedelivery(topic, batchReceivePolicy, batchProduce, receiverQueueSize, isEnableAckReceipt);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void verifyBatchSizeIsEqualToPolicyConfiguration() throws Exception {
        int muxNumMessages = 100;
        int messagesToSend = 500;
        String topic = "persistent://my-property/my-ns/batch-receive-size" + UUID.randomUUID();
        BatchReceivePolicy batchReceivePolicy = BatchReceivePolicy.builder().maxNumMessages(100).build();
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topic}).subscriptionName("s2").batchReceivePolicy(batchReceivePolicy).subscribe();
            try {
                this.sendMessagesAsyncAndWait((Producer<String>)producer, 500);
                this.receiveAllBatchesAndVerifyBatchSizeIsEqualToMaxNumMessages((Consumer<String>)consumer, batchReceivePolicy, 5);
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    private void receiveAllBatchesAndVerifyBatchSizeIsEqualToMaxNumMessages(Consumer<String> consumer, BatchReceivePolicy batchReceivePolicy, int numOfExpectedBatches) throws PulsarClientException {
        for (int i = 0; i < numOfExpectedBatches; ++i) {
            Messages messages = consumer.batchReceive();
            log.info("Received {} messages in a single batch receive verifying batch size.", (Object)messages.size());
            Assert.assertEquals((int)messages.size(), (int)batchReceivePolicy.getMaxNumMessages());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testBatchReceive(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize, boolean enableAckReceipt) throws Exception {
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(Schema.STRING).topic(topic);
        if (!batchProduce) {
            producerBuilder.enableBatching(false);
        }
        Producer producer = producerBuilder.create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topic}).subscriptionName("s1").receiverQueueSize(receiverQueueSize).batchReceivePolicy(batchReceivePolicy).isAckReceiptEnabled(enableAckReceipt).enableBatchIndexAcknowledgment(enableAckReceipt).subscribe();
            try {
                this.sendMessagesAsyncAndWait((Producer<String>)producer, 100);
                this.batchReceiveAndCheck((Consumer<String>)consumer, 100);
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testBatchReceiveAsync(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize, boolean isEnableAckReceipt) throws Exception {
        if (batchReceivePolicy.getTimeoutMs() <= 0L) {
            return;
        }
        PulsarClient pulsarClient = PulsarClient.builder().ioThreads(10).serviceUrl(this.lookupUrl.toString()).build();
        try {
            ProducerBuilder producerBuilder = pulsarClient.newProducer(Schema.STRING).topic(topic);
            if (!batchProduce) {
                producerBuilder.enableBatching(false);
            }
            Producer producer = producerBuilder.create();
            try {
                Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topic}).subscriptionName("s1").receiverQueueSize(receiverQueueSize).batchReceivePolicy(batchReceivePolicy).isAckReceiptEnabled(isEnableAckReceipt).enableBatchIndexAcknowledgment(isEnableAckReceipt).subscribe();
                try {
                    this.sendMessagesAsyncAndWait((Producer<String>)producer, 100);
                    CountDownLatch latch = new CountDownLatch(101);
                    this.receiveAsync((Consumer<String>)consumer, 100, latch);
                    latch.await();
                }
                finally {
                    if (Collections.singletonList(consumer).get(0) != null) {
                        consumer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(pulsarClient).get(0) != null) {
                pulsarClient.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testBatchReceiveAndRedelivery(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize, boolean isEnableAckReceipt) throws Exception {
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(Schema.STRING).topic(topic);
        if (!batchProduce) {
            producerBuilder.enableBatching(false);
        }
        Producer producer = producerBuilder.create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topic}).subscriptionName("s1").receiverQueueSize(receiverQueueSize).batchReceivePolicy(batchReceivePolicy).isAckReceiptEnabled(isEnableAckReceipt).enableBatchIndexAcknowledgment(isEnableAckReceipt).ackTimeout(1L, TimeUnit.SECONDS).subscribe();
            try {
                this.sendMessagesAsyncAndWait((Producer<String>)producer, 100);
                this.batchReceiveAndRedelivery((Consumer<String>)consumer, 100);
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    private void receiveAsync(Consumer<String> consumer, int expected, CountDownLatch latch) {
        consumer.batchReceiveAsync().thenAccept(messages -> {
            if (messages != null) {
                log.info("Received {} messages in a single batch receive.", (Object)messages.size());
                for (Message message : messages) {
                    Assert.assertNotNull((Object)message.getValue());
                    log.info("Get message {} from batch", message.getValue());
                    latch.countDown();
                }
                consumer.acknowledgeAsync(messages);
                if (messages.size() < expected) {
                    ForkJoinPool.commonPool().execute(() -> this.receiveAsync(consumer, expected - messages.size(), latch));
                } else {
                    Assert.assertEquals((int)(expected - messages.size()), (int)0);
                    latch.countDown();
                }
            }
        });
    }

    private void sendMessagesAsyncAndWait(Producer<String> producer, int messages) throws Exception {
        CountDownLatch latch = new CountDownLatch(messages);
        for (int i = 0; i < messages; ++i) {
            String message = "my-message-" + i;
            producer.sendAsync((Object)message).thenAccept(messageId -> {
                log.info("Message {} published {}", (Object)message, messageId);
                if (messageId != null) {
                    latch.countDown();
                }
            });
        }
        latch.await();
    }

    private void batchReceiveAndCheck(Consumer<String> consumer, int expected) throws Exception {
        int messageReceived = 0;
        do {
            Messages messages;
            if ((messages = consumer.batchReceive()) == null) continue;
            messageReceived += messages.size();
            log.info("Received {} messages in a single batch receive.", (Object)messages.size());
            for (Message message : messages) {
                Assert.assertNotNull((Object)message.getValue());
                log.info("Get message {} from batch", message.getValue());
            }
            consumer.acknowledge(messages);
        } while (messageReceived < expected);
        Assert.assertEquals((int)expected, (int)messageReceived);
    }

    private void batchReceiveAndRedelivery(Consumer<String> consumer, int expected) throws Exception {
        Messages messages;
        int messageReceived = 0;
        do {
            if ((messages = consumer.batchReceive()) == null) continue;
            messageReceived += messages.size();
            log.info("Received {} messages in a single batch receive.", (Object)messages.size());
            for (Message message : messages) {
                Assert.assertNotNull((Object)message.getValue());
                log.info("Get message {} from batch", message.getValue());
            }
        } while (messageReceived < expected);
        Assert.assertEquals((int)expected, (int)messageReceived);
        do {
            if ((messages = consumer.batchReceive()) != null) {
                messageReceived += messages.size();
                log.info("Received {} messages in a single batch receive.", (Object)messages.size());
                for (Message message : messages) {
                    Assert.assertNotNull((Object)message.getValue());
                    log.info("Get message {} from batch", message.getValue());
                }
            }
            consumer.acknowledge(messages);
        } while (messageReceived < expected * 2);
        Assert.assertEquals((int)(expected * 2), (int)messageReceived);
    }
}

