/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import io.netty.util.Timeout;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups={"broker-api"})
public class PartitionedProducerConsumerTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(PartitionedProducerConsumerTest.class);
    private ExecutorService executor;

    @Override
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
        this.executor = Executors.newFixedThreadPool(1, (ThreadFactory)new DefaultThreadFactory("PartitionedProducerConsumerTest"));
    }

    @Override
    @AfterClass(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
        this.executor.shutdownNow();
    }

    @Test(timeOut=30000L)
    public void testRoundRobinProducer() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        PulsarClient pulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        int numPartitions = 4;
        TopicName topicName = TopicName.get((String)("persistent://my-property/my-ns/my-partitionedtopic1-" + System.currentTimeMillis()));
        this.admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
        Producer producer = pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer consumer = pulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-partitioned-subscriber").subscribe();
        Assert.assertEquals((String)consumer.getTopic(), (String)topicName.toString());
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            Message msg = consumer.receive(5, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)msg, (String)"Message should not be null");
            consumer.acknowledge(msg);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            Assert.assertTrue((boolean)messageSet.add(receivedMessage), (String)("Message " + receivedMessage + " already received"));
        }
        producer.close();
        consumer.unsubscribe();
        consumer.close();
        pulsarClient.close();
        this.admin.topics().deletePartitionedTopic(topicName.toString());
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(timeOut=30000L)
    public void testPartitionedTopicNameWithSpecialCharacter() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int numPartitions = 4;
        String specialCharacter = "! * ' ( ) ; : @ & = + $ , \\ ? % # [ ]";
        TopicName topicName = TopicName.get((String)("persistent://my-property/my-ns/my-partitionedtopic1-" + System.currentTimeMillis() + "! * ' ( ) ; : @ & = + $ , \\ ? % # [ ]"));
        this.admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
        Producer producer = this.pulsarClient.newProducer().topic(topicName.toString()).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        producer.close();
        this.admin.topics().deletePartitionedTopic(topicName.toString());
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testCustomPartitionProducer() throws Exception {
        PulsarClient pulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        TopicName topicName = null;
        Producer producer = null;
        Consumer consumer = null;
        int MESSAGE_COUNT = 16;
        try {
            log.info("-- Starting {} test --", (Object)this.methodName);
            int numPartitions = 4;
            topicName = TopicName.get((String)("persistent://my-property/my-ns/my-partitionedtopic1-" + System.currentTimeMillis()));
            this.admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
            producer = pulsarClient.newProducer().topic(topicName.toString()).messageRouter((MessageRouter)new AlwaysTwoMessageRouter()).create();
            consumer = pulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-partitioned-subscriber").subscribe();
            for (int i = 0; i < 16; ++i) {
                String message = "my-message-" + i;
                producer.newMessage().key(String.valueOf(i)).value((Object)message.getBytes()).send();
            }
            HashSet messageSet = Sets.newHashSet();
            for (int i = 0; i < 16; ++i) {
                Message msg = consumer.receive(5, TimeUnit.SECONDS);
                Assert.assertNotNull((Object)msg, (String)"Message should not be null");
                consumer.acknowledge(msg);
                String receivedMessage = new String(msg.getData());
                log.debug("Received message: [{}]", (Object)receivedMessage);
                String expectedMessage = "my-message-" + i;
                this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
            }
        }
        finally {
            producer.close();
            consumer.unsubscribe();
            consumer.close();
            pulsarClient.close();
            this.admin.topics().deletePartitionedTopic(topicName.toString());
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
    }

    @Test(timeOut=30000L)
    public void testSinglePartitionProducer() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        PulsarClient pulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        int numPartitions = 4;
        TopicName topicName = TopicName.get((String)("persistent://my-property/my-ns/my-partitionedtopic2-" + System.currentTimeMillis()));
        this.admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
        Producer producer = pulsarClient.newProducer().topic(topicName.toString()).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer consumer = pulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-partitioned-subscriber").subscribe();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            Message msg = consumer.receive(5, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)msg, (String)"Message should not be null");
            consumer.acknowledge(msg);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        producer.close();
        consumer.unsubscribe();
        consumer.close();
        pulsarClient.close();
        this.admin.topics().deletePartitionedTopic(topicName.toString());
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(timeOut=30000L)
    public void testKeyBasedProducer() throws Exception {
        String message;
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        PulsarClient pulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        int numPartitions = 4;
        TopicName topicName = TopicName.get((String)("persistent://my-property/my-ns/my-partitionedtopic3-" + System.currentTimeMillis()));
        String dummyKey1 = "dummykey1";
        String dummyKey2 = "dummykey2";
        this.admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
        Producer producer = pulsarClient.newProducer().topic(topicName.toString()).create();
        Consumer consumer = pulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-partitioned-subscriber").subscribe();
        for (i = 0; i < 5; ++i) {
            message = "my-message-" + i;
            producer.newMessage().key(dummyKey1).value((Object)message.getBytes()).send();
        }
        for (i = 5; i < 10; ++i) {
            message = "my-message-" + i;
            producer.newMessage().key(dummyKey2).value((Object)message.getBytes()).send();
        }
        HashSet messageSet = Sets.newHashSet();
        for (int i2 = 0; i2 < 10; ++i2) {
            Message msg = consumer.receive(5, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)msg, (String)"Message should not be null");
            consumer.acknowledge(msg);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            this.testKeyBasedOrder(messageSet, receivedMessage);
        }
        producer.close();
        consumer.unsubscribe();
        consumer.close();
        pulsarClient.close();
        this.admin.topics().deletePartitionedTopic(topicName.toString());
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    private void testKeyBasedOrder(Set<String> messageSet, String message) {
        int index = Integer.parseInt(message.substring(message.lastIndexOf(45) + 1));
        if (index != 0 && index != 5) {
            Assert.assertTrue((boolean)messageSet.contains("my-message-" + (index - 1)), (String)("Message my-message-" + (index - 1) + " should come before my-message-" + index));
        }
        Assert.assertTrue((boolean)messageSet.add(message), (String)("Received duplicate message " + message));
    }

    @Test(timeOut=100000L)
    public void testPauseAndResume() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        PulsarClient pulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        int numPartitions = 2;
        String topicName = TopicName.get((String)("persistent://my-property/my-ns/my-partitionedtopic-pr-" + System.currentTimeMillis())).toString();
        this.admin.topics().createPartitionedTopic(topicName, numPartitions);
        int receiverQueueSize = 20;
        int numMessages = receiverQueueSize * numPartitions;
        AtomicReference<CountDownLatch> latch = new AtomicReference<CountDownLatch>(new CountDownLatch(numMessages));
        AtomicInteger received = new AtomicInteger();
        Consumer consumer = pulsarClient.newConsumer().receiverQueueSize(receiverQueueSize).topic(new String[]{topicName}).subscriptionName("my-partitioned-subscriber").messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            log.debug("Received message [{}] in the listener", (Object)receivedMessage);
            c1.acknowledgeAsync(msg);
            received.incrementAndGet();
            ((CountDownLatch)latch.get()).countDown();
        }).subscribe();
        Producer producer = pulsarClient.newProducer().topic(topicName).create();
        consumer.pause();
        for (int i = 0; i < numMessages * 2; ++i) {
            producer.send((Object)("my-message-" + i).getBytes());
        }
        log.info("Waiting for message listener to ack " + numMessages + " messages");
        Assert.assertTrue((boolean)latch.get().await(numMessages, TimeUnit.SECONDS), (String)"Timed out waiting for message listener acks");
        log.info("Giving message listener an opportunity to receive messages while paused");
        Thread.sleep(2000L);
        Assert.assertEquals((int)received.intValue(), (int)numMessages, (String)"Consumer received messages while paused");
        latch.set(new CountDownLatch(numMessages));
        consumer.resume();
        log.info("Waiting for message listener to ack all messages");
        Assert.assertTrue((boolean)latch.get().await(numMessages, TimeUnit.SECONDS), (String)"Timed out waiting for message listener acks");
        consumer.close();
        producer.close();
        pulsarClient.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(timeOut=30000L)
    public void testPauseAndResumeWithUnloading() throws Exception {
        int i;
        String topicName = "persistent://my-property/my-ns/pause-and-resume-with-unloading" + System.currentTimeMillis();
        String subName = "sub";
        int receiverQueueSize = 20;
        int numPartitions = 2;
        int numMessages = 40;
        PulsarClient pulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        this.admin.topics().createPartitionedTopic(topicName, 2);
        AtomicReference<CountDownLatch> latch = new AtomicReference<CountDownLatch>(new CountDownLatch(40));
        AtomicInteger received = new AtomicInteger();
        Consumer consumer = pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("sub").receiverQueueSize(20).messageListener((MessageListener & Serializable)(c1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            c1.acknowledgeAsync(msg);
            received.incrementAndGet();
            ((CountDownLatch)latch.get()).countDown();
        }).subscribe();
        consumer.pause();
        Producer producer = pulsarClient.newProducer().topic(topicName).enableBatching(false).create();
        for (i = 0; i < 80; ++i) {
            producer.send((Object)("my-message-" + i).getBytes());
        }
        Assert.assertTrue((boolean)latch.get().await(40L, TimeUnit.SECONDS), (String)"Timed out waiting for message listener acks");
        for (i = 0; i < 2; ++i) {
            String partition = TopicName.get((String)topicName).getPartition(i).toString();
            this.admin.topics().unload(partition);
        }
        Thread.sleep(2000L);
        Assert.assertEquals((int)received.intValue(), (int)40, (String)"Consumer received messages while paused");
        latch.set(new CountDownLatch(40));
        consumer.resume();
        Assert.assertTrue((boolean)latch.get().await(40L, TimeUnit.SECONDS), (String)"Timed out waiting for message listener acks");
        consumer.unsubscribe();
        producer.close();
        pulsarClient.close();
        this.admin.topics().deletePartitionedTopic(topicName, true);
    }

    @Test(timeOut=30000L)
    public void testInvalidSequence() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int numPartitions = 4;
        TopicName topicName = TopicName.get((String)("persistent://my-property/my-ns/my-partitionedtopic4-" + System.currentTimeMillis()));
        this.admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-subscriber-name").subscribe();
        Producer producer = this.pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        try {
            TypedMessageBuilderImpl mb = (TypedMessageBuilderImpl)producer.newMessage().value((Object)"InvalidMessage".getBytes());
            consumer.acknowledge(mb.getMessage());
        }
        catch (PulsarClientException.InvalidMessageException invalidMessageException) {
            // empty catch block
        }
        consumer.close();
        try {
            consumer.receive();
            Assert.fail((String)"Should fail");
        }
        catch (PulsarClientException.AlreadyClosedException alreadyClosedException) {
            // empty catch block
        }
        try {
            consumer.unsubscribe();
            Assert.fail((String)"Should fail");
        }
        catch (PulsarClientException.AlreadyClosedException alreadyClosedException) {
            // empty catch block
        }
        producer.close();
        try {
            producer.send((Object)"message".getBytes());
            Assert.fail((String)"Should fail");
        }
        catch (PulsarClientException.AlreadyClosedException alreadyClosedException) {
            // empty catch block
        }
        this.admin.topics().deletePartitionedTopic(topicName.toString());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testSillyUser() throws Exception {
        int numPartitions = 4;
        TopicName topicName = TopicName.get((String)("persistent://my-property/my-ns/my-partitionedtopic5-" + System.currentTimeMillis()));
        this.admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
        Producer producer = null;
        Consumer consumer = null;
        try {
            this.pulsarClient.newProducer().messageRouter(null);
            Assert.fail((String)"should fail");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        try {
            this.pulsarClient.newProducer().messageRoutingMode(null);
            Assert.fail((String)"should fail");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        try {
            producer = this.pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-sub").subscribe();
            producer.send((Object)"message1".getBytes());
            producer.send((Object)"message2".getBytes());
            consumer.receive();
            Message msg2 = consumer.receive();
            consumer.acknowledgeCumulative(msg2);
        }
        finally {
            producer.close();
            consumer.unsubscribe();
            consumer.close();
        }
        this.admin.topics().deletePartitionedTopic(topicName.toString());
    }

    @Test(timeOut=30000L)
    public void testDeletePartitionedTopic() throws Exception {
        int numPartitions = 4;
        TopicName topicName = TopicName.get((String)("persistent://my-property/my-ns/my-partitionedtopic6-" + System.currentTimeMillis()));
        this.admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
        Producer producer = this.pulsarClient.newProducer().topic(topicName.toString()).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-sub").subscribe();
        consumer.unsubscribe();
        consumer.close();
        producer.close();
        this.admin.topics().deletePartitionedTopic(topicName.toString());
        Producer producer1 = this.pulsarClient.newProducer().topic(topicName.toString()).create();
        if (producer1 instanceof PartitionedProducerImpl) {
            Assert.fail((String)"should fail since partitioned topic was deleted");
        }
    }

    @Test(timeOut=30000L)
    public void testAsyncPartitionedProducerConsumer() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 100;
        HashSet produceMsgs = Sets.newHashSet();
        HashSet consumeMsgs = Sets.newHashSet();
        int numPartitions = 4;
        TopicName topicName = TopicName.get((String)("persistent://my-property/my-ns/my-partitionedtopic1-" + System.currentTimeMillis()));
        this.admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
        Producer producer = this.pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-partitioned-subscriber").subscriptionType(SubscriptionType.Shared).subscribe();
        for (int i = 0; i < 100; ++i) {
            String message = "my-message-" + i;
            produceMsgs.add(message);
            producer.send((Object)message.getBytes());
        }
        log.info(" start receiving messages :");
        CountDownLatch latch = new CountDownLatch(100);
        this.receiveAsync((Consumer<byte[]>)consumer, 100, 0, latch, consumeMsgs, this.executor);
        latch.await();
        Assert.assertEquals((int)produceMsgs.size(), (int)100);
        produceMsgs.removeAll(consumeMsgs);
        Assert.assertTrue((boolean)produceMsgs.isEmpty());
        producer.close();
        consumer.unsubscribe();
        consumer.close();
        this.admin.topics().deletePartitionedTopic(topicName.toString());
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(timeOut=30000L)
    public void testAsyncPartitionedProducerConsumerQueueSizeOne() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        PulsarClient pulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        int totalMsg = 100;
        HashSet produceMsgs = Sets.newHashSet();
        HashSet consumeMsgs = Sets.newHashSet();
        int numPartitions = 4;
        TopicName topicName = TopicName.get((String)("persistent://my-property/my-ns/my-partitionedtopic1-" + System.currentTimeMillis()));
        this.admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
        Producer producer = pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer consumer = pulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-partitioned-subscriber").receiverQueueSize(1).subscribe();
        for (int i = 0; i < 100; ++i) {
            String message = "my-message-" + i;
            produceMsgs.add(message);
            producer.send((Object)message.getBytes());
        }
        log.info(" start receiving messages :");
        CountDownLatch latch = new CountDownLatch(100);
        this.receiveAsync((Consumer<byte[]>)consumer, 100, 0, latch, consumeMsgs, this.executor);
        latch.await();
        Assert.assertEquals((int)produceMsgs.size(), (int)100);
        produceMsgs.removeAll(consumeMsgs);
        Assert.assertTrue((boolean)produceMsgs.isEmpty());
        producer.close();
        consumer.unsubscribe();
        consumer.close();
        pulsarClient.close();
        this.admin.topics().deletePartitionedTopic(topicName.toString());
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(timeOut=30000L)
    public void testFairDistributionForPartitionConsumers() throws Exception {
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        PulsarClient pulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        int numPartitions = 2;
        String topicName = "persistent://my-property/my-ns/my-topic-" + System.currentTimeMillis();
        String producer1Msg = "producer1";
        String producer2Msg = "producer2";
        int queueSize = 10;
        this.admin.topics().createPartitionedTopic(topicName, 2);
        Producer producer1 = pulsarClient.newProducer().topic(topicName + "-partition-0").messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer producer2 = pulsarClient.newProducer().topic(topicName + "-partition-1").messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer consumer = pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("my-partitioned-subscriber").receiverQueueSize(10).subscribe();
        int partition2Msgs = 0;
        for (i = 0; i < 9; ++i) {
            producer1.send((Object)("producer1-" + i).getBytes());
        }
        Thread.sleep(1000L);
        for (i = 0; i < 5; ++i) {
            producer2.send((Object)("producer2-" + i).getBytes());
        }
        int produceMsgInPartition1AfterNumberOfConsumeMessages = 2;
        for (int i2 = 0; i2 < 30; ++i2) {
            Message msg = consumer.receive();
            partition2Msgs += new String(msg.getData()).startsWith("producer2") ? 1 : 0;
            if (i2 < produceMsgInPartition1AfterNumberOfConsumeMessages) continue;
            producer1.send((Object)"producer1".getBytes());
            Thread.sleep(100L);
        }
        Assert.assertTrue((partition2Msgs >= 4 ? 1 : 0) != 0);
        producer1.close();
        producer2.close();
        consumer.unsubscribe();
        consumer.close();
        pulsarClient.close();
        this.admin.topics().deletePartitionedTopic(topicName);
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    private void receiveAsync(Consumer<byte[]> consumer, int totalMessage, int currentMessage, CountDownLatch latch, Set<String> consumeMsg, ExecutorService executor) throws PulsarClientException {
        if (currentMessage < totalMessage) {
            CompletableFuture future = consumer.receiveAsync();
            future.handle((msg, exception) -> {
                if (exception == null) {
                    consumeMsg.add(new String(msg.getData()));
                    try {
                        consumer.acknowledge(msg);
                    }
                    catch (PulsarClientException e1) {
                        Assert.fail((String)"message acknowledge failed", (Throwable)e1);
                    }
                    executor.execute(() -> {
                        try {
                            this.receiveAsync(consumer, totalMessage, currentMessage + 1, latch, consumeMsg, executor);
                        }
                        catch (PulsarClientException e) {
                            Assert.fail((String)"message receive failed", (Throwable)e);
                        }
                    });
                    latch.countDown();
                }
                return null;
            });
        }
    }

    @Test
    public void testGetPartitionsForTopic() throws Exception {
        int numPartitions = 4;
        String topic = "persistent://my-property/my-ns/my-partitionedtopic1-" + System.currentTimeMillis();
        this.admin.topics().createPartitionedTopic(topic, numPartitions);
        ArrayList<String> expectedPartitions = new ArrayList<String>();
        for (int i = 0; i < numPartitions; ++i) {
            expectedPartitions.add(topic + "-partition-" + i);
        }
        Assert.assertEquals((Collection)((Collection)this.pulsarClient.getPartitionsForTopic(topic).join()), expectedPartitions);
        String nonPartitionedTopic = "persistent://my-property/my-ns/my-non-partitionedtopic1";
        Assert.assertEquals((Collection)((Collection)this.pulsarClient.getPartitionsForTopic(nonPartitionedTopic).join()), Collections.singletonList(nonPartitionedTopic));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMessageIdForSubscribeToSinglePartition() throws Exception {
        PulsarClient pulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        TopicName topicName = null;
        TopicName partition2TopicName = null;
        Producer producer = null;
        Consumer consumer1 = null;
        Consumer consumer2 = null;
        int numPartitions = 4;
        int totalMessages = 30;
        try {
            int i;
            log.info("-- Starting {} test --", (Object)this.methodName);
            topicName = TopicName.get((String)("persistent://my-property/my-ns/my-topic-" + System.currentTimeMillis()));
            partition2TopicName = topicName.getPartition(2);
            this.admin.topics().createPartitionedTopic(topicName.toString(), 4);
            producer = pulsarClient.newProducer().topic(topicName.toString()).messageRouter((MessageRouter)new AlwaysTwoMessageRouter()).create();
            consumer1 = pulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("subscriber-partitioned").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionType(SubscriptionType.Exclusive).subscribe();
            consumer2 = pulsarClient.newConsumer().topic(new String[]{partition2TopicName.toString()}).subscriptionName("subscriber-single").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionType(SubscriptionType.Exclusive).subscribe();
            for (int i2 = 0; i2 < 30; ++i2) {
                String message = "my-message-" + i2;
                producer.newMessage().key(String.valueOf(i2)).value((Object)message.getBytes()).send();
            }
            producer.flush();
            for (i = 0; i < 30; ++i) {
                Message msg = consumer1.receive(5, TimeUnit.SECONDS);
                Assert.assertEquals((int)((MessageIdImpl)((TopicMessageIdImpl)msg.getMessageId()).getInnerMessageId()).getPartitionIndex(), (int)2);
                consumer1.acknowledge(msg);
            }
            for (i = 0; i < 30; ++i) {
                Message msg = consumer2.receive(5, TimeUnit.SECONDS);
                Assert.assertEquals((int)((MessageIdImpl)msg.getMessageId()).getPartitionIndex(), (int)2);
                consumer2.acknowledge(msg);
            }
        }
        finally {
            producer.close();
            consumer1.unsubscribe();
            consumer1.close();
            consumer2.unsubscribe();
            consumer2.close();
            pulsarClient.close();
            this.admin.topics().deletePartitionedTopic(topicName.toString());
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
    }

    @Test(timeOut=30000L)
    public void testAutoUpdatePartitionsForProducerConsumer() throws Exception {
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        PulsarClient pulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        int numPartitions = 2;
        String topicName = "persistent://my-property/my-ns/my-topic-" + System.currentTimeMillis();
        String producerMsg = "producerMsg";
        int totalMessages = 30;
        this.admin.topics().createPartitionedTopic(topicName, 2);
        Producer producer = pulsarClient.newProducer().topic(topicName).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).enableBatching(false).autoUpdatePartitions(true).create();
        Consumer consumer = pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("my-partitioned-subscriber").subscriptionType(SubscriptionType.Shared).autoUpdatePartitions(true).subscribe();
        for (int i2 = 0; i2 < 30; ++i2) {
            producer.send((Object)("producerMsg first round message index: " + i2).getBytes());
        }
        int messageSet = 0;
        Message message = consumer.receive();
        do {
            ++messageSet;
            consumer.acknowledge(message);
            log.info("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(200, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)30);
        this.admin.topics().updatePartitionedTopic(topicName, 3);
        log.info("trigger partitionsAutoUpdateTimerTask for producer");
        Timeout timeout = ((PartitionedProducerImpl)producer).getPartitionsAutoUpdateTimeout();
        timeout.task().run(timeout);
        Thread.sleep(200L);
        for (i = 0; i < 30; ++i) {
            producer.send((Object)("producerMsg second round message index: " + i).getBytes());
        }
        messageSet = 0;
        message = consumer.receive();
        do {
            ++messageSet;
            consumer.acknowledge(message);
            log.info("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(200, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)20);
        log.info("trigger partitionsAutoUpdateTimerTask for consumer");
        timeout = ((MultiTopicsConsumerImpl)consumer).getPartitionsAutoUpdateTimeout();
        timeout.task().run(timeout);
        Thread.sleep(200L);
        messageSet = 0;
        message = consumer.receive();
        do {
            ++messageSet;
            consumer.acknowledge(message);
            log.info("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(200, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)10);
        for (i = 0; i < 30; ++i) {
            producer.send((Object)("producerMsg third round message index: " + i).getBytes());
        }
        messageSet = 0;
        message = consumer.receive();
        do {
            ++messageSet;
            consumer.acknowledge(message);
            log.info("Consumer acknowledged : " + new String(message.getData()));
        } while ((message = consumer.receive(200, TimeUnit.MILLISECONDS)) != null);
        Assert.assertEquals((int)messageSet, (int)30);
        pulsarClient.close();
        this.admin.topics().deletePartitionedTopic(topicName);
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    private static class AlwaysTwoMessageRouter
    implements MessageRouter {
        private AlwaysTwoMessageRouter() {
        }

        public int choosePartition(Message<?> msg, TopicMetadata metadata) {
            return 2;
        }
    }
}

