/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.admin;

import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.collections.Lists;

@Test(groups={"broker"})
public class MaxUnackedMessagesTest
extends ProducerConsumerBase {
    private final String testTenant = "my-property";
    private final String testNamespace = "my-ns";
    private final String myNamespace = "my-property/my-ns";
    private final String testTopic = "persistent://my-property/my-ns/max-unacked-";

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
        this.resetConfig();
    }

    @Test(timeOut=10000L)
    public void testMaxUnackedMessagesOnSubscriptionApi() throws Exception {
        String topicName = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        this.admin.topics().createPartitionedTopic(topicName, 3);
        this.waitCacheInit(topicName);
        Integer max = this.admin.topics().getMaxUnackedMessagesOnSubscription(topicName);
        Assert.assertNull((Object)max);
        this.admin.topics().setMaxUnackedMessagesOnSubscription(topicName, 2048);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getMaxUnackedMessagesOnSubscription(topicName)));
        Assert.assertEquals((int)this.admin.topics().getMaxUnackedMessagesOnSubscription(topicName), (int)2048);
        this.admin.topics().removeMaxUnackedMessagesOnSubscription(topicName);
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getMaxUnackedMessagesOnSubscription(topicName)));
        Assert.assertNull((Object)this.admin.topics().getMaxUnackedMessagesOnSubscription(topicName));
    }

    @Test(timeOut=20000L)
    public void testMaxUnackedMessagesOnSubscription() throws Exception {
        String topicName = "persistent://my-property/my-ns/max-unacked-" + System.currentTimeMillis();
        String subscriberName = "test-sub" + System.currentTimeMillis();
        int unackMsgAllowed = 100;
        int receiverQueueSize = 10;
        int totalProducedMsgs = 200;
        this.pulsar.getConfiguration().setMaxUnackedMessagesPerSubscription(100);
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriberName).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared);
        Consumer consumer1 = consumerBuilder.subscribe();
        Consumer consumer2 = consumerBuilder.subscribe();
        Consumer consumer3 = consumerBuilder.subscribe();
        List consumers = Lists.newArrayList((Object[])new Consumer[]{consumer1, consumer2, consumer3});
        this.waitCacheInit(topicName);
        this.admin.topics().setMaxUnackedMessagesOnSubscription(topicName, 100);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getMaxUnackedMessagesOnSubscription(topicName)));
        Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
        for (int i = 0; i < 200; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        HashMap messages = Maps.newHashMap();
        for (int i = 0; i < 3; ++i) {
            for (int j = 0; j < 200 && (msg = ((Consumer)consumers.get(i)).receive(500, TimeUnit.MILLISECONDS)) != null; ++j) {
                messages.put(msg, consumers.get(i));
            }
        }
        Assert.assertEquals((float)100.0f, (float)messages.size(), (float)30.0f);
        messages.forEach((m, c) -> {
            try {
                c.acknowledge(m);
            }
            catch (PulsarClientException e) {
                Assert.fail((String)"ack failed", (Throwable)e);
            }
        });
        ConcurrentHashMap.KeySetView result = ConcurrentHashMap.newKeySet();
        int expectedRemainingMessages = 200 - messages.size();
        CountDownLatch latch = new CountDownLatch(expectedRemainingMessages);
        for (int i = 0; i < consumers.size(); ++i) {
            int consumerCount = i;
            for (int j = 0; j < 200; ++j) {
                ((Consumer)consumers.get(i)).receiveAsync().thenAccept(m -> {
                    result.add(m.getMessageId());
                    try {
                        ((Consumer)consumers.get(consumerCount)).acknowledge(m);
                    }
                    catch (PulsarClientException e) {
                        Assert.fail((String)"failed to ack msg", (Throwable)e);
                    }
                    latch.countDown();
                });
            }
        }
        latch.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((int)result.size(), (int)expectedRemainingMessages);
        producer.close();
        consumers.forEach(c -> {
            try {
                c.close();
            }
            catch (PulsarClientException pulsarClientException) {
                // empty catch block
            }
        });
    }

    @Test(timeOut=20000L)
    public void testMaxUnackedMessagesOnConsumerApi() throws Exception {
        String topicName = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        this.admin.topics().createPartitionedTopic(topicName, 3);
        this.waitCacheInit(topicName);
        Integer max = this.admin.topics().getMaxUnackedMessagesOnConsumer(topicName);
        Assert.assertNull((Object)max);
        this.admin.topics().setMaxUnackedMessagesOnConsumer(topicName, 2048);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getMaxUnackedMessagesOnConsumer(topicName)));
        Assert.assertEquals((int)this.admin.topics().getMaxUnackedMessagesOnConsumer(topicName), (int)2048);
        this.admin.topics().removeMaxUnackedMessagesOnConsumer(topicName);
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getMaxUnackedMessagesOnConsumer(topicName)));
        Assert.assertNull((Object)this.admin.topics().getMaxUnackedMessagesOnConsumer(topicName));
    }

    @Test(timeOut=20000L)
    public void testMaxUnackedMessagesOnConsumerAppliedApi() throws Exception {
        String topicName = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        this.admin.topics().createPartitionedTopic(topicName, 3);
        this.waitCacheInit(topicName);
        Integer max = this.admin.topics().getMaxUnackedMessagesOnConsumer(topicName, true);
        Assert.assertEquals((int)max, (int)this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer());
        this.admin.namespaces().setMaxUnackedMessagesPerConsumer("my-property/my-ns", 10);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.namespaces().getMaxUnackedMessagesPerConsumer("my-property/my-ns")));
        max = this.admin.topics().getMaxUnackedMessagesOnConsumer(topicName, true);
        Assert.assertEquals((int)max, (int)10);
        this.admin.topics().setMaxUnackedMessagesOnConsumer(topicName, 20);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getMaxUnackedMessagesOnConsumer(topicName)));
        max = this.admin.topics().getMaxUnackedMessagesOnConsumer(topicName, true);
        Assert.assertEquals((int)max, (int)20);
    }

    @Test
    public void testMaxUnackedMessagesOnSubApplied() throws Exception {
        String topicName = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        this.waitCacheInit(topicName);
        Assert.assertNull((Object)this.admin.namespaces().getMaxUnackedMessagesPerSubscription("my-property/my-ns"));
        Assert.assertNull((Object)this.admin.topics().getMaxUnackedMessagesOnSubscription(topicName));
        Assert.assertEquals((Object)this.admin.topics().getMaxUnackedMessagesOnSubscription(topicName, true), (Object)this.conf.getMaxUnackedMessagesPerSubscription());
        this.admin.namespaces().setMaxUnackedMessagesPerSubscription("my-property/my-ns", 10);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.namespaces().getMaxUnackedMessagesPerSubscription("my-property/my-ns"), (Object)10));
        this.admin.topics().setMaxUnackedMessagesOnSubscription(topicName, 20);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getMaxUnackedMessagesOnSubscription(topicName), (Object)20));
        this.admin.topics().removeMaxUnackedMessagesOnSubscription(topicName);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.namespaces().getMaxUnackedMessagesPerSubscription("my-property/my-ns"), (Object)10));
        this.admin.namespaces().removeMaxUnackedMessagesPerSubscription("my-property/my-ns");
        Assert.assertEquals((Object)this.admin.topics().getMaxUnackedMessagesOnSubscription(topicName, true), (Object)this.conf.getMaxUnackedMessagesPerSubscription());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testMaxUnackedMessagesOnConsumer() throws Exception {
        String topicName = "persistent://my-property/my-ns/max-unacked-" + System.currentTimeMillis();
        String subscriberName = "test-sub" + System.currentTimeMillis();
        int unackMsgAllowed = 100;
        int receiverQueueSize = 10;
        int totalProducedMsgs = 300;
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName(subscriberName).receiverQueueSize(10).ackTimeout(1L, TimeUnit.MINUTES).subscriptionType(SubscriptionType.Shared);
        Consumer consumer1 = consumerBuilder.subscribe();
        try {
            Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
            try {
                Message message2;
                for (int i = 0; i < 300; ++i) {
                    String message3 = "my-message-" + i;
                    producer.send((Object)message3);
                }
                int count = 0;
                ArrayList<Message> list = new ArrayList<Message>(300);
                for (int i = 0; i < 300 && (message2 = consumer1.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                    ++count;
                    list.add(message2);
                }
                Assert.assertEquals((int)count, (int)300);
                list.forEach(message -> {
                    try {
                        consumer1.acknowledge(message);
                    }
                    catch (PulsarClientException pulsarClientException) {
                        // empty catch block
                    }
                });
                this.waitCacheInit(topicName);
                this.admin.topics().setMaxUnackedMessagesOnConsumer(topicName, 100);
                Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getMaxUnackedMessagesOnConsumer(topicName)));
                Assert.assertEquals((int)this.admin.topics().getMaxUnackedMessagesOnConsumer(topicName), (int)100);
                Consumer consumer2 = consumerBuilder.subscribe();
                try {
                    Consumer consumer3 = consumerBuilder.subscribe();
                    try {
                        for (int i = 0; i < 300; ++i) {
                            String message4 = "my-message-" + i;
                            producer.send((Object)message4);
                        }
                        AtomicInteger consumer2Counter = new AtomicInteger(0);
                        AtomicInteger consumer3Counter = new AtomicInteger(0);
                        CountDownLatch countDownLatch = new CountDownLatch(2);
                        this.startConsumer((Consumer<String>)consumer2, consumer2Counter, countDownLatch);
                        this.startConsumer((Consumer<String>)consumer3, consumer3Counter, countDownLatch);
                        countDownLatch.await(10L, TimeUnit.SECONDS);
                        Assert.assertEquals((int)consumer2Counter.get(), (int)100);
                        Assert.assertEquals((int)consumer3Counter.get(), (int)100);
                    }
                    finally {
                        if (Collections.singletonList(consumer3).get(0) != null) {
                            consumer3.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(consumer2).get(0) != null) {
                        consumer2.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(consumer1).get(0) != null) {
                consumer1.close();
            }
        }
    }

    private void startConsumer(Consumer<String> consumer, AtomicInteger consumerCounter, CountDownLatch countDownLatch) {
        new Thread(() -> {
            block4: {
                try {
                    while (true) {
                        Message message;
                        if ((message = consumer.receive(500, TimeUnit.MILLISECONDS)) == null) {
                            countDownLatch.countDown();
                            break block4;
                        }
                        consumerCounter.incrementAndGet();
                    }
                }
                catch (PulsarClientException e) {}
                {
                }
            }
        }).start();
    }

    private void waitCacheInit(String topicName) throws Exception {
        this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("my-sub").subscribe().close();
        TopicName topic = TopicName.get((String)topicName);
    }
}

