/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import java.util.ArrayList;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker-api"})
public class ConsumerAckListTest
extends ProducerConsumerBase {
    @Override
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterClass(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider(name="ackReceiptEnabled")
    public Object[][] ackReceiptEnabled() {
        return new Object[][]{{true}, {false}};
    }

    @Test(timeOut=30000L, dataProvider="ackReceiptEnabled")
    public void testBatchListAck(boolean ackReceiptEnabled) throws Exception {
        this.ackListMessage(true, true, ackReceiptEnabled);
        this.ackListMessage(true, false, ackReceiptEnabled);
        this.ackListMessage(false, false, ackReceiptEnabled);
        this.ackListMessage(false, true, ackReceiptEnabled);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void ackListMessage(boolean isBatch, boolean isPartitioned, boolean ackReceiptEnabled) throws Exception {
        String topic = "persistent://my-property/my-ns/batch-ack-" + UUID.randomUUID();
        String subName = "testBatchAck-sub" + UUID.randomUUID();
        int messageNum = ThreadLocalRandom.current().nextInt(50, 100);
        if (isPartitioned) {
            this.admin.topics().createPartitionedTopic(topic, 3);
        }
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).enableBatching(isBatch).batchingMaxPublishDelay(50L, TimeUnit.MILLISECONDS).topic(topic).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Shared).topic(new String[]{topic}).negativeAckRedeliveryDelay(1001L, TimeUnit.MILLISECONDS).subscriptionName(subName).enableBatchIndexAcknowledgment(ackReceiptEnabled).isAckReceiptEnabled(ackReceiptEnabled).subscribe();
            try {
                this.sendMessagesAsyncAndWait((Producer<String>)producer, messageNum);
                ArrayList<MessageId> messages = new ArrayList<MessageId>();
                for (int i = 0; i < messageNum; ++i) {
                    messages.add(consumer.receive().getMessageId());
                }
                consumer.acknowledge(messages);
                Thread.sleep(1000L);
                consumer.redeliverUnacknowledgedMessages();
                Message msg = consumer.receive(2, TimeUnit.SECONDS);
                Assert.assertNull((Object)msg);
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    private void sendMessagesAsyncAndWait(Producer<String> producer, int messages) throws Exception {
        CountDownLatch latch = new CountDownLatch(messages);
        for (int i = 0; i < messages; ++i) {
            String message = "my-message-" + i;
            producer.sendAsync((Object)message).thenAccept(messageId -> {
                if (messageId != null) {
                    latch.countDown();
                }
            });
        }
        latch.await();
    }
}

