/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class PersistentFailoverE2ETest
extends BrokerTestBase {
    private static final int CONSUMER_ADD_OR_REMOVE_WAIT_TIME = 100;

    @Override
    @BeforeClass
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override
    @AfterClass(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    private void verifyConsumerNotReceiveAnyStateChanges(TestConsumerStateEventListener listener) {
        Assert.assertNull((Object)listener.activeQueue.poll());
        Assert.assertNull((Object)listener.inActiveQueue.poll());
    }

    private void verifyConsumerActive(TestConsumerStateEventListener listener, int partitionId) throws Exception {
        Integer pid = listener.activeQueue.take();
        Assert.assertNotNull((Object)pid);
        Assert.assertEquals((int)partitionId, (int)pid);
        Assert.assertNull((Object)listener.inActiveQueue.poll());
    }

    private void verifyConsumerInactive(TestConsumerStateEventListener listener, int partitionId) throws Exception {
        Integer pid = listener.inActiveQueue.take();
        Assert.assertNotNull((Object)pid);
        Assert.assertEquals((int)partitionId, (int)pid);
        Assert.assertNull((Object)listener.activeQueue.poll());
    }

    @Test
    public void testSimpleConsumerEventsWithoutPartition() throws Exception {
        int i;
        String topicName = "persistent://prop/use/ns-abc/failover-topic1-" + System.currentTimeMillis();
        String subName = "sub1";
        int numMsgs = 100;
        TestConsumerStateEventListener listener1 = new TestConsumerStateEventListener("listener-1");
        TestConsumerStateEventListener listener2 = new TestConsumerStateEventListener("listener-2");
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("sub1").acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Failover);
        ConsumerBuilder consumerBulder1 = consumerBuilder.clone().consumerName("1").consumerEventListener((ConsumerEventListener)listener1);
        Consumer consumer1 = consumerBulder1.subscribe();
        Consumer consumer2 = consumerBuilder.clone().consumerName("2").consumerEventListener((ConsumerEventListener)listener2).subscribe();
        this.verifyConsumerActive(listener1, -1);
        this.verifyConsumerInactive(listener2, -1);
        listener2.inActiveQueue.clear();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        PersistentSubscription subRef = topicRef.getSubscription("sub1");
        Assert.assertNotNull((Object)topicRef);
        Assert.assertNotNull((Object)subRef);
        Assert.assertTrue((boolean)subRef.getDispatcher().isConsumerConnected());
        Assert.assertEquals((Object)subRef.getDispatcher().getType(), (Object)CommandSubscribe.SubType.Failover);
        ArrayList futures = Lists.newArrayListWithCapacity((int)100);
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i2 = 0; i2 < 100; ++i2) {
            String message = "my-message-" + i2;
            futures.add(producer.sendAsync((Object)message.getBytes()));
        }
        FutureUtil.waitForAll((List)futures).get();
        futures.clear();
        this.rolloverPerIntervalStats();
        Assert.assertEquals((long)subRef.getNumberOfEntriesInBacklog(false), (long)100L);
        Thread.sleep(100L);
        Message msg = null;
        Assert.assertNull((Object)consumer2.receive(100, TimeUnit.MILLISECONDS));
        for (i = 0; i < 100; ++i) {
            msg = consumer1.receive(1, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)msg);
            Assert.assertEquals((String)new String(msg.getData()), (String)("my-message-" + i));
            consumer1.acknowledge(msg);
        }
        this.rolloverPerIntervalStats();
        Thread.sleep(100L);
        Assert.assertEquals((long)subRef.getNumberOfEntriesInBacklog(false), (long)0L);
        for (i = 0; i < 100; ++i) {
            String message = "my-message-" + i;
            futures.add(producer.sendAsync((Object)message.getBytes()));
        }
        FutureUtil.waitForAll((List)futures).get();
        futures.clear();
        for (i = 0; i < 5; ++i) {
            msg = consumer1.receive(1, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)msg);
            Assert.assertEquals((String)new String(msg.getData()), (String)("my-message-" + i));
            consumer1.acknowledge(msg);
        }
        for (i = 5; i < 10; ++i) {
            msg = consumer1.receive(1, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)msg);
            Assert.assertEquals((String)new String(msg.getData()), (String)("my-message-" + i));
        }
        consumer1.close();
        Thread.sleep(100L);
        this.verifyConsumerActive(listener2, -1);
        this.verifyConsumerNotReceiveAnyStateChanges(listener1);
        for (i = 5; i < 100; ++i) {
            msg = consumer2.receive(1, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)msg);
            Assert.assertEquals((String)new String(msg.getData()), (String)("my-message-" + i));
            consumer2.acknowledge(msg);
        }
        Assert.assertNull((Object)consumer2.receive(100, TimeUnit.MILLISECONDS));
        this.rolloverPerIntervalStats();
        Thread.sleep(100L);
        Assert.assertEquals((long)subRef.getNumberOfEntriesInBacklog(false), (long)0L);
        try {
            consumer1.unsubscribe();
            Assert.fail((String)"should fail");
        }
        catch (PulsarClientException i3) {
            // empty catch block
        }
        consumer1.close();
        Thread.sleep(100L);
        try {
            consumer2.unsubscribe();
        }
        catch (PulsarClientException e) {
            Assert.fail((String)"Should not fail", (Throwable)e);
        }
        Thread.sleep(100L);
        subRef = topicRef.getSubscription("sub1");
        Assert.assertNull((Object)subRef);
        producer.close();
        consumer2.close();
        this.admin.topics().delete(topicName);
    }

    @Test
    public void testSimpleConsumerEventsWithPartition() throws Exception {
        int i;
        int i2;
        MessageIdImpl msgId;
        this.conf.setActiveConsumerFailoverDelayTimeMillis(0);
        this.restartBroker();
        int numPartitions = 4;
        String topicName = BrokerTestUtil.newUniqueName("persistent://prop/use/ns-abc/testSimpleConsumerEventsWithPartition");
        TopicName destName = TopicName.get((String)topicName);
        String subName = "sub1";
        int numMsgs = 100;
        HashSet<String> uniqueMessages = new HashSet<String>();
        this.admin.topics().createPartitionedTopic(topicName, numPartitions);
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("sub1").subscriptionType(SubscriptionType.Failover);
        ActiveInactiveListenerEvent listener1 = new ActiveInactiveListenerEvent();
        ActiveInactiveListenerEvent listener2 = new ActiveInactiveListenerEvent();
        Consumer consumer1 = consumerBuilder.clone().consumerName("1").consumerEventListener((ConsumerEventListener)listener1).receiverQueueSize(1).subscribe();
        Consumer consumer2 = consumerBuilder.clone().consumerName("2").consumerEventListener((ConsumerEventListener)listener2).receiverQueueSize(1).subscribe();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(destName.getPartition(0).toString()).get();
        PersistentDispatcherSingleActiveConsumer disp0 = (PersistentDispatcherSingleActiveConsumer)topicRef.getSubscription("sub1").getDispatcher();
        topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(destName.getPartition(1).toString()).get();
        PersistentDispatcherSingleActiveConsumer disp1 = (PersistentDispatcherSingleActiveConsumer)topicRef.getSubscription("sub1").getDispatcher();
        topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(destName.getPartition(2).toString()).get();
        PersistentDispatcherSingleActiveConsumer disp2 = (PersistentDispatcherSingleActiveConsumer)topicRef.getSubscription("sub1").getDispatcher();
        topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(destName.getPartition(3).toString()).get();
        PersistentDispatcherSingleActiveConsumer disp3 = (PersistentDispatcherSingleActiveConsumer)topicRef.getSubscription("sub1").getDispatcher();
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        for (int i3 = 0; i3 < 100; ++i3) {
            String message = "my-message-" + i3;
            producer.sendAsync((Object)message.getBytes());
        }
        producer.flush();
        int totalMessages = 0;
        Message msg = null;
        HashSet receivedPtns = Sets.newHashSet();
        while ((msg = consumer1.receive(1, TimeUnit.SECONDS)) != null) {
            ++totalMessages;
            consumer1.acknowledge(msg);
            msgId = (MessageIdImpl)((TopicMessageImpl)msg).getInnerMessageId();
            receivedPtns.add(msgId.getPartitionIndex());
        }
        Assert.assertTrue((boolean)Sets.difference((Set)listener1.activePtns, (Set)receivedPtns).isEmpty());
        Assert.assertTrue((boolean)Sets.difference((Set)listener2.inactivePtns, (Set)receivedPtns).isEmpty());
        Assert.assertEquals((int)totalMessages, (int)50);
        receivedPtns = Sets.newHashSet();
        while ((msg = consumer2.receive(1, TimeUnit.SECONDS)) != null) {
            ++totalMessages;
            consumer2.acknowledge(msg);
            msgId = (MessageIdImpl)((TopicMessageImpl)msg).getInnerMessageId();
            receivedPtns.add(msgId.getPartitionIndex());
        }
        Assert.assertTrue((boolean)Sets.difference((Set)listener1.inactivePtns, (Set)receivedPtns).isEmpty());
        Assert.assertTrue((boolean)Sets.difference((Set)listener2.activePtns, (Set)receivedPtns).isEmpty());
        Assert.assertEquals((int)totalMessages, (int)100);
        Assert.assertEquals((String)disp0.getActiveConsumer().consumerName(), (String)"1");
        Assert.assertEquals((String)disp1.getActiveConsumer().consumerName(), (String)"2");
        Assert.assertEquals((String)disp2.getActiveConsumer().consumerName(), (String)"1");
        Assert.assertEquals((String)disp3.getActiveConsumer().consumerName(), (String)"2");
        totalMessages = 0;
        for (i2 = 0; i2 < 100; ++i2) {
            String message = "my-message-" + i2;
            producer.sendAsync((Object)message.getBytes());
        }
        producer.flush();
        for (i2 = 0; i2 < 20; ++i2) {
            msg = consumer1.receive(1, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)msg);
            uniqueMessages.add(new String(msg.getData()));
            consumer1.acknowledge(msg);
        }
        Consumer consumer3 = consumerBuilder.clone().consumerName("3").receiverQueueSize(1).subscribe();
        Thread.sleep(100L);
        while ((msg = consumer1.receive(1, TimeUnit.SECONDS)) != null) {
            uniqueMessages.add(new String(msg.getData()));
            consumer1.acknowledge(msg);
        }
        while ((msg = consumer2.receive(1, TimeUnit.SECONDS)) != null) {
            uniqueMessages.add(new String(msg.getData()));
            consumer2.acknowledge(msg);
        }
        while ((msg = consumer3.receive(1, TimeUnit.SECONDS)) != null) {
            uniqueMessages.add(new String(msg.getData()));
            consumer3.acknowledge(msg);
        }
        Assert.assertEquals((int)uniqueMessages.size(), (int)100);
        Assert.assertEquals((String)disp0.getActiveConsumer().consumerName(), (String)"1");
        Assert.assertEquals((String)disp1.getActiveConsumer().consumerName(), (String)"2");
        Assert.assertEquals((String)disp2.getActiveConsumer().consumerName(), (String)"3");
        Assert.assertEquals((String)disp3.getActiveConsumer().consumerName(), (String)"1");
        uniqueMessages.clear();
        for (i = 0; i < 100; ++i) {
            String message = "my-message-" + i;
            producer.sendAsync((Object)message.getBytes());
        }
        producer.flush();
        for (i = 0; i < 10; ++i) {
            msg = consumer1.receive(1, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)msg);
            uniqueMessages.add(new String(msg.getData()));
            consumer1.acknowledge(msg);
        }
        consumer1.close();
        Thread.sleep(100L);
        while ((msg = consumer2.receive(1, TimeUnit.SECONDS)) != null) {
            uniqueMessages.add(new String(msg.getData()));
            consumer2.acknowledge(msg);
        }
        while ((msg = consumer3.receive(1, TimeUnit.SECONDS)) != null) {
            uniqueMessages.add(new String(msg.getData()));
            consumer3.acknowledge(msg);
        }
        Assert.assertEquals((int)uniqueMessages.size(), (int)100);
        Assert.assertEquals((String)disp0.getActiveConsumer().consumerName(), (String)"2");
        Assert.assertEquals((String)disp1.getActiveConsumer().consumerName(), (String)"3");
        Assert.assertEquals((String)disp2.getActiveConsumer().consumerName(), (String)"2");
        Assert.assertEquals((String)disp3.getActiveConsumer().consumerName(), (String)"3");
        producer.close();
        consumer2.close();
        consumer3.unsubscribe();
        this.admin.topics().deletePartitionedTopic(topicName);
    }

    @Test
    public void testActiveConsumerFailoverWithDelay() throws Exception {
        int i;
        String topicName = "persistent://prop/use/ns-abc/failover-topic3";
        String subName = "sub1";
        int numMsgs = 100;
        ArrayList receivedMessages = Lists.newArrayList();
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/failover-topic3"}).subscriptionName("sub1").subscriptionType(SubscriptionType.Failover).messageListener((MessageListener & Serializable)(consumer, msg) -> {
            try {
                List list = receivedMessages;
                synchronized (list) {
                    receivedMessages.add(msg);
                }
                consumer.acknowledge(msg);
            }
            catch (Exception e) {
                Assert.fail((String)"Should not fail");
            }
        });
        ConsumerBuilder consumerBuilder1 = consumerBuilder.clone().consumerName("1");
        ConsumerBuilder consumerBuilder2 = consumerBuilder.clone().consumerName("2");
        this.conf.setActiveConsumerFailoverDelayTimeMillis(500);
        this.restartBroker();
        Consumer consumer2 = consumerBuilder1.subscribe();
        consumer2.close();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/failover-topic3").get();
        PersistentSubscription subRef = topicRef.getSubscription("sub1");
        ArrayList futures = Lists.newArrayListWithCapacity((int)100);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/failover-topic3").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i2 = 0; i2 < 100; ++i2) {
            String message = "my-message-" + i2;
            futures.add(producer.sendAsync((Object)message.getBytes()));
        }
        FutureUtil.waitForAll((List)futures).get();
        futures.clear();
        producer.close();
        CompletableFuture subscribeFuture2 = consumerBuilder2.subscribeAsync();
        CompletableFuture subscribeFuture1 = consumerBuilder1.subscribeAsync();
        int retry = 20;
        for (i = 0; i < retry && (receivedMessages.size() < 100 || subRef.getNumberOfEntriesInBacklog(false) != 0L); ++i) {
            if (i == retry - 1) continue;
            Thread.sleep(100L);
        }
        Assert.assertEquals((int)receivedMessages.size(), (int)100);
        Assert.assertEquals((long)subRef.getNumberOfEntriesInBacklog(false), (long)0L);
        for (i = 0; i < receivedMessages.size(); ++i) {
            Assert.assertNotNull(receivedMessages.get(i));
            Assert.assertEquals((String)new String(((Message)receivedMessages.get(i)).getData()), (String)("my-message-" + i));
        }
        ((Consumer)subscribeFuture1.get()).close();
        ((Consumer)subscribeFuture2.get()).unsubscribe();
        this.admin.topics().delete("persistent://prop/use/ns-abc/failover-topic3");
    }

    private static class ActiveInactiveListenerEvent
    implements ConsumerEventListener {
        private final Set<Integer> activePtns = Sets.newHashSet();
        private final Set<Integer> inactivePtns = Sets.newHashSet();

        private ActiveInactiveListenerEvent() {
        }

        public synchronized void becameActive(Consumer<?> consumer, int partitionId) {
            this.activePtns.add(partitionId);
            this.inactivePtns.remove(partitionId);
        }

        public synchronized void becameInactive(Consumer<?> consumer, int partitionId) {
            this.activePtns.remove(partitionId);
            this.inactivePtns.add(partitionId);
        }
    }

    private static class TestConsumerStateEventListener
    implements ConsumerEventListener {
        final LinkedBlockingQueue<Integer> activeQueue = new LinkedBlockingQueue();
        final LinkedBlockingQueue<Integer> inActiveQueue = new LinkedBlockingQueue();
        String name = "";

        public TestConsumerStateEventListener(String name) {
            this.name = name;
        }

        public void becameActive(Consumer<?> consumer, int partitionId) {
            try {
                this.activeQueue.put(partitionId);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }

        public void becameInactive(Consumer<?> consumer, int partitionId) {
            try {
                this.inActiveQueue.put(partitionId);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }
}

