/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.collections.CollectionUtils;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class PersistentQueueE2ETest
extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(PersistentQueueE2ETest.class);

    @Override
    @BeforeClass
    public void setup() throws Exception {
        super.baseSetup();
    }

    @Override
    @AfterClass(alwaysRun=true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    private void deleteTopic(String topicName) {
        try {
            this.admin.topics().delete(topicName);
        }
        catch (PulsarAdminException pulsarAdminException) {
            // empty catch block
        }
    }

    @Test
    public void testSimpleConsumerEvents() throws Exception {
        String topicName = "persistent://prop/use/ns-abc/shared-topic1";
        String subName = "sub1";
        int numMsgs = 100;
        Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/shared-topic1"}).subscriptionName("sub1").subscriptionType(SubscriptionType.Shared).subscribe();
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer consumer2 = newPulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/shared-topic1"}).subscriptionName("sub1").subscriptionType(SubscriptionType.Shared).subscribe();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/shared-topic1").get();
        PersistentSubscription subRef = topicRef.getSubscription("sub1");
        Assert.assertNotNull((Object)topicRef);
        Assert.assertNotNull((Object)subRef);
        Assert.assertTrue((boolean)subRef.getDispatcher().isConsumerConnected());
        Assert.assertEquals((Object)subRef.getDispatcher().getType(), (Object)CommandSubscribe.SubType.Shared);
        ArrayList futures = Lists.newArrayListWithCapacity((int)200);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/shared-topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 200; ++i) {
            String message = "my-message-" + i;
            futures.add(producer.sendAsync((Object)message.getBytes()));
        }
        FutureUtil.waitForAll((List)futures).get();
        this.rolloverPerIntervalStats();
        Assert.assertEquals((long)subRef.getNumberOfEntriesInBacklog(false), (long)200L);
        Thread.sleep(100L);
        Consumer c = consumer1;
        while (true) {
            try {
                while (true) {
                    Message msg = c.receive(1, TimeUnit.SECONDS);
                    c.acknowledge(msg);
                }
            }
            catch (PulsarClientException e2) {
                if (c.equals(consumer1)) {
                    consumer1.close();
                    c = consumer2;
                    continue;
                }
                this.rolloverPerIntervalStats();
                Thread.sleep(100L);
                Assert.assertEquals((long)subRef.getNumberOfEntriesInBacklog(false), (long)0L);
                try {
                    consumer1.unsubscribe();
                    Assert.fail((String)"should fail");
                }
                catch (PulsarClientException e2) {
                    // empty catch block
                }
                consumer1.close();
                producer.send((Object)"message".getBytes());
                Message msg = consumer2.receive();
                try {
                    consumer2.acknowledgeCumulative(msg);
                    Assert.fail((String)"Should fail");
                }
                catch (PulsarClientException e3) {
                    Assert.assertTrue((boolean)(e3 instanceof PulsarClientException.InvalidConfigurationException));
                }
                try {
                    consumer2.unsubscribe();
                }
                catch (PulsarClientException e4) {
                    Assert.fail((String)"Should not fail");
                }
                Thread.sleep(100L);
                subRef = topicRef.getSubscription("sub1");
                Assert.assertNull((Object)subRef);
                producer.close();
                consumer2.close();
                newPulsarClient.close();
                this.deleteTopic("persistent://prop/use/ns-abc/shared-topic1");
                return;
            }
            break;
        }
    }

    @Test
    public void testReplayOnConsumerDisconnect() throws Exception {
        String topicName = "persistent://prop/use/ns-abc/shared-topic3";
        String subName = "sub3";
        int numMsgs = 100;
        ArrayList messagesProduced = Lists.newArrayListWithCapacity((int)100);
        BlockingArrayQueue messagesConsumed = new BlockingArrayQueue(100);
        Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/shared-topic3"}).subscriptionName("sub3").subscriptionType(SubscriptionType.Shared).messageListener((arg_0, arg_1) -> PersistentQueueE2ETest.lambda$testReplayOnConsumerDisconnect$eb11ebcf$1((List)messagesConsumed, arg_0, arg_1)).subscribe();
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer consumer2 = newPulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/shared-topic3"}).subscriptionName("sub3").subscriptionType(SubscriptionType.Shared).messageListener((MessageListener & Serializable)(consumer, msg) -> {}).subscribe();
        ArrayList futures = Lists.newArrayListWithCapacity((int)200);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/shared-topic3").create();
        for (int i = 0; i < 100; ++i) {
            String message = "msg-" + i;
            futures.add(producer.sendAsync((Object)message.getBytes()));
            messagesProduced.add(message);
        }
        FutureUtil.waitForAll((List)futures).get();
        producer.close();
        consumer2.close();
        for (int n = 0; n < 10 && messagesConsumed.size() < 100; ++n) {
            Thread.sleep(100L);
        }
        Assert.assertTrue((boolean)CollectionUtils.subtract((Collection)messagesProduced, (Collection)messagesConsumed).isEmpty());
        consumer1.close();
        newPulsarClient.close();
        this.deleteTopic("persistent://prop/use/ns-abc/shared-topic3");
    }

    @Test(enabled=false)
    public void testRoundRobinBatchDistribution() throws Exception {
        String topicName = "persistent://prop/use/ns-abc/shared-topic5";
        String subName = "sub5";
        int numMsgs = 137;
        AtomicInteger counter1 = new AtomicInteger(0);
        AtomicInteger counter2 = new AtomicInteger(0);
        AtomicInteger counter3 = new AtomicInteger(0);
        CountDownLatch latch = new CountDownLatch(411);
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/shared-topic5"}).subscriptionName("sub5").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared);
        Consumer consumer1 = consumerBuilder.clone().messageListener((MessageListener & Serializable)(consumer, msg) -> {
            try {
                counter1.incrementAndGet();
                consumer.acknowledge(msg);
                latch.countDown();
            }
            catch (Exception e) {
                Assert.fail((String)"Should not fail");
            }
        }).subscribe();
        Consumer consumer2 = consumerBuilder.clone().messageListener((MessageListener & Serializable)(consumer, msg) -> {
            try {
                counter2.incrementAndGet();
                consumer.acknowledge(msg);
                latch.countDown();
            }
            catch (Exception e) {
                Assert.fail((String)"Should not fail");
            }
        }).subscribe();
        Consumer consumer3 = consumerBuilder.clone().messageListener((MessageListener & Serializable)(consumer, msg) -> {
            try {
                counter1.incrementAndGet();
                consumer.acknowledge(msg);
                latch.countDown();
            }
            catch (Exception e) {
                Assert.fail((String)"Should not fail");
            }
        }).subscribe();
        ArrayList futures = Lists.newArrayListWithCapacity((int)137);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/shared-topic5").create();
        for (int i = 0; i < 411; ++i) {
            String message = "msg-" + i;
            futures.add(producer.sendAsync((Object)message.getBytes()));
        }
        FutureUtil.waitForAll((List)futures).get();
        producer.close();
        latch.await(1L, TimeUnit.SECONDS);
        Assert.assertTrue((boolean)CollectionUtils.subtract((Collection)Lists.newArrayList((Object[])new Integer[]{140, 140, 131}), (Collection)Lists.newArrayList((Object[])new Integer[]{counter1.get(), counter2.get(), counter3.get()})).isEmpty());
        consumer1.close();
        consumer2.close();
        consumer3.close();
        this.deleteTopic("persistent://prop/use/ns-abc/shared-topic5");
    }

    @Test(timeOut=300000L)
    public void testSharedSingleAckedNormalTopic() throws Exception {
        String key = "test1";
        String topicName = "persistent://prop/use/ns-abc/topic-" + key;
        String subscriptionName = "my-shared-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 50;
        Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((int)topicRef.getProducers().size(), (int)1);
        ConsumerBuilder consumerBuilder1 = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared);
        Consumer consumer1 = consumerBuilder1.subscribe();
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        ConsumerBuilder consumerBuilder2 = newPulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared);
        Consumer consumer2 = consumerBuilder2.subscribe();
        for (int i = 0; i < 50; ++i) {
            String message = messagePredicate + i;
            producer.send((Object)message.getBytes());
            log.info("Producer produced " + message);
        }
        int receivedConsumer1 = 0;
        int receivedConsumer2 = 0;
        Message message1 = consumer1.receive();
        Message message2 = consumer2.receive();
        do {
            if (message1 != null) {
                log.info("Consumer 1 Received: " + new String(message1.getData()));
                ++receivedConsumer1;
            }
            if (message2 != null) {
                log.info("Consumer 2 Received: " + new String(message2.getData()));
                ++receivedConsumer2;
            }
            message1 = consumer1.receive(10000, TimeUnit.MILLISECONDS);
            message2 = consumer2.receive(10000, TimeUnit.MILLISECONDS);
        } while (message1 != null || message2 != null);
        log.info("Total receives = " + (receivedConsumer2 + receivedConsumer1));
        Assert.assertEquals((int)(receivedConsumer2 + receivedConsumer1), (int)50);
        log.info("Consumer 1 closed");
        consumer1.close();
        for (int i = 0; i < 50; ++i) {
            message2 = consumer2.receive(100, TimeUnit.MILLISECONDS);
            if (message2 == null) {
                log.info("Consumer 2 - No Message in Incoming Message Queue, will try again");
                continue;
            }
            log.info("Consumer 2 Received: " + new String(message2.getData()));
            ++receivedConsumer2;
        }
        newPulsarClient.close();
        log.info("Total receives by Consumer 2 = " + receivedConsumer2);
        Assert.assertEquals((int)receivedConsumer2, (int)50);
    }

    @Test(timeOut=60000L)
    public void testCancelReadRequestOnLastDisconnect() throws Exception {
        String key = "testCancelReadRequestOnLastDisconnect";
        String topicName = "persistent://prop/use/ns-abc/topic-" + key;
        String subscriptionName = "my-shared-subscription-" + key;
        String messagePredicate = "my-message-" + key + "-";
        int totalMessages = 10;
        Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((int)topicRef.getProducers().size(), (int)1);
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).receiverQueueSize(1000).subscriptionType(SubscriptionType.Shared);
        Consumer consumer1 = consumerBuilder.subscribe();
        Consumer consumer2 = consumerBuilder.subscribe();
        for (int i = 0; i < 10; ++i) {
            String message = messagePredicate + i;
            producer.send((Object)message.getBytes());
            log.info("Producer produced " + message);
        }
        int receivedConsumer1 = 0;
        int receivedConsumer2 = 0;
        Message message1 = consumer1.receive();
        Message message2 = consumer2.receive();
        do {
            if (message1 != null) {
                log.info("Consumer 1 Received: " + new String(message1.getData()));
                ++receivedConsumer1;
                consumer1.acknowledge(message1);
            }
            if (message2 != null) {
                log.info("Consumer 2 Received: " + new String(message2.getData()));
                ++receivedConsumer2;
                consumer2.acknowledge(message2);
            }
            message1 = consumer1.receive(5000, TimeUnit.MILLISECONDS);
            message2 = consumer2.receive(5000, TimeUnit.MILLISECONDS);
        } while (message1 != null || message2 != null);
        log.info("Total receives = " + (receivedConsumer2 + receivedConsumer1));
        Assert.assertEquals((int)(receivedConsumer2 + receivedConsumer1), (int)10);
        log.info("Consumer 1 closed");
        log.info("Consumer 2 closed");
        consumer1.close();
        consumer2.close();
        for (int i = 10; i < 20; ++i) {
            String message = messagePredicate + i;
            producer.send((Object)message.getBytes());
            log.info("Producer produced " + message);
        }
        consumer1 = consumerBuilder.subscribe();
        receivedConsumer1 = 0;
        message1 = consumer1.receive();
        while (message1 != null) {
            log.info("Consumer 1 Received: " + new String(message1.getData()));
            ++receivedConsumer1;
            message1 = consumer1.receive(5000, TimeUnit.MILLISECONDS);
        }
        log.info("Total receives by Consumer 2 = " + receivedConsumer2);
        Assert.assertEquals((int)receivedConsumer1, (int)10);
    }

    @Test
    public void testUnackedCountWithRedeliveries() throws Exception {
        String topicName = "persistent://prop/use/ns-abc/testUnackedCountWithRedeliveries";
        String subName = "sub3";
        int numMsgs = 10;
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/testUnackedCountWithRedeliveries").create();
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testUnackedCountWithRedeliveries"}).subscriptionName("sub3").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS);
        ConsumerImpl consumer1 = (ConsumerImpl)consumerBuilder.subscribe();
        for (int i = 0; i < 10; ++i) {
            producer.send((Object)("hello-" + i).getBytes());
        }
        HashSet<MessageId> c1_receivedMessages = new HashSet<MessageId>();
        for (int i = 0; i < 10; ++i) {
            c1_receivedMessages.add(consumer1.receive().getMessageId());
        }
        Consumer consumer2 = consumerBuilder.subscribe();
        consumer1.redeliverUnacknowledgedMessages(c1_receivedMessages);
        for (int i = 0; i < 10; ++i) {
            consumer2.receive();
        }
        for (MessageId msgId : c1_receivedMessages) {
            consumer1.acknowledge(msgId);
        }
        Awaitility.await().untilAsserted(() -> {
            TopicStats stats = this.admin.topics().getStats("persistent://prop/use/ns-abc/testUnackedCountWithRedeliveries");
            SubscriptionStats subStats = (SubscriptionStats)stats.subscriptions.get("sub3");
            Assert.assertEquals((long)subStats.msgBacklog, (long)0L);
            for (ConsumerStats cs : subStats.consumers) {
                Assert.assertEquals((int)cs.unackedMessages, (int)0);
            }
        });
        producer.close();
        consumer1.close();
        consumer2.close();
        this.deleteTopic("persistent://prop/use/ns-abc/testUnackedCountWithRedeliveries");
    }

    private static /* synthetic */ void lambda$testReplayOnConsumerDisconnect$eb11ebcf$1(List messagesConsumed, Consumer consumer, Message msg) {
        try {
            consumer.acknowledge(msg);
            messagesConsumed.add(new String(msg.getData()));
        }
        catch (Exception e) {
            Assert.fail((String)"Should not fail");
        }
    }
}

