/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.BatchMessageTest;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class BatchMessageWithBatchIndexLevelTest
extends BatchMessageTest {
    @Override
    @BeforeClass
    protected void setup() throws Exception {
        this.conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
        super.baseSetup();
    }

    @Test
    public void testBatchMessageAck() {
        int numMsgs = 40;
        String topicName = "persistent://prop/ns-abc/batchMessageAck-" + UUID.randomUUID();
        String subscriptionName = "sub-batch-1";
        ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("sub-batch-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).enableBatchIndexAcknowledgment(true).negativeAckRedeliveryDelay(100L, TimeUnit.MILLISECONDS).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic(topicName).batchingMaxMessages(20).batchingMaxPublishDelay(1L, TimeUnit.HOURS).enableBatching(true).create();
        ArrayList sendFutureList = Lists.newArrayList();
        for (int i = 0; i < numMsgs; ++i) {
            byte[] message = ("batch-message-" + i).getBytes();
            sendFutureList.add(producer.newMessage().value((Object)message).sendAsync());
        }
        FutureUtil.waitForAll((List)sendFutureList).get();
        PersistentTopic topic = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers)topic.getSubscription("sub-batch-1").getDispatcher();
        Message receive1 = consumer.receive();
        Message receive2 = consumer.receive();
        consumer.acknowledge(receive1);
        consumer.acknowledge(receive2);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)((Consumer)dispatcher.getConsumers().get(0)).getUnackedMessages(), (int)18));
        Message receive3 = consumer.receive();
        Message receive4 = consumer.receive();
        consumer.acknowledge(receive3);
        consumer.acknowledge(receive4);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)((Consumer)dispatcher.getConsumers().get(0)).getUnackedMessages(), (int)16));
        Message receive5 = consumer.receive();
        consumer.negativeAcknowledge(receive5);
        Awaitility.await().pollInterval(1L, TimeUnit.MILLISECONDS).untilAsserted(() -> Assert.assertEquals((int)((Consumer)dispatcher.getConsumers().get(0)).getUnackedMessages(), (int)0));
        consumer.receive();
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)((Consumer)dispatcher.getConsumers().get(0)).getUnackedMessages(), (int)16));
    }

    @DataProvider(name="testSubTypeAndEnableBatch")
    public Object[][] testSubTypeAndEnableBatch() {
        return new Object[][]{{SubscriptionType.Shared, Boolean.TRUE}, {SubscriptionType.Failover, Boolean.TRUE}, {SubscriptionType.Shared, Boolean.FALSE}, {SubscriptionType.Failover, Boolean.FALSE}};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="testSubTypeAndEnableBatch")
    private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subType, boolean enableBatch) throws Exception {
        int messageCount = 50;
        String topicName = "persistent://prop/ns-abc/testDecreaseWithAckReceipt" + UUID.randomUUID();
        String subscriptionName = "sub-batch-1";
        ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{topicName}).isAckReceiptEnabled(true).subscriptionName("sub-batch-1").subscriptionType(subType).enableBatchIndexAcknowledgment(true).subscribe();
        try {
            Producer producer = this.pulsarClient.newProducer().enableBatching(enableBatch).topic(topicName).batchingMaxMessages(10).create();
            try {
                int i;
                CountDownLatch countDownLatch = new CountDownLatch(50);
                for (i = 0; i < 50; ++i) {
                    producer.sendAsync((Object)(i + "").getBytes()).thenRun(countDownLatch::countDown);
                }
                countDownLatch.await();
                for (i = 0; i < 50; ++i) {
                    Message message = consumer.receive();
                    if (i >= 25) continue;
                    consumer.acknowledgeAsync(message.getMessageId()).get();
                }
                String topic = TopicName.get((String)topicName).toString();
                PersistentSubscription persistentSubscription = (PersistentSubscription)((Topic)((Optional)this.pulsar.getBrokerService().getTopic(topic, false).get()).get()).getSubscription("sub-batch-1");
                Awaitility.await().untilAsserted(() -> {
                    if (subType == SubscriptionType.Shared) {
                        Assert.assertEquals((int)((Consumer)persistentSubscription.getConsumers().get(0)).getUnackedMessages(), (int)25);
                    } else {
                        Assert.assertEquals((int)((Consumer)persistentSubscription.getConsumers().get(0)).getUnackedMessages(), (int)0);
                    }
                });
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }
}

