package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import io.netty.util.Timeout;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker-api"})
/* loaded from: input_file:org/apache/pulsar/client/api/PartitionedProducerConsumerTest.class */
public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(PartitionedProducerConsumerTest.class);
    private ExecutorService executor;

    /* loaded from: input_file:org/apache/pulsar/client/api/PartitionedProducerConsumerTest$AlwaysTwoMessageRouter.class */
    private static class AlwaysTwoMessageRouter implements MessageRouter {
        private AlwaysTwoMessageRouter() {
        }

        public int choosePartition(Message<?> message, TopicMetadata topicMetadata) {
            return 2;
        }
    }

    /* loaded from: input_file:org/apache/pulsar/client/api/PartitionedProducerConsumerTest$RouterWithTopicName.class */
    private static class RouterWithTopicName implements MessageRouter {
        static String topicName = null;

        private RouterWithTopicName() {
        }

        public int choosePartition(Message<?> message, TopicMetadata topicMetadata) {
            topicName = message.getTopicName();
            return 2;
        }
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
        this.executor = Executors.newFixedThreadPool(1, new DefaultThreadFactory("PartitionedProducerConsumerTest"));
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
        this.executor.shutdownNow();
    }

    @Test(timeOut = 30000)
    public void testRoundRobinProducer() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        TopicName topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic1-" + System.currentTimeMillis());
        this.admin.topics().createPartitionedTopic(topicName.toString(), 4);
        Producer create = newPulsarClient.newProducer().topic(topicName.toString()).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer subscribe = newPulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-partitioned-subscriber").subscribe();
        Assert.assertEquals(subscribe.getTopic(), topicName.toString());
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        HashSet newHashSet = Sets.newHashSet();
        for (int i2 = 0; i2 < 10; i2++) {
            Message receive = subscribe.receive(5, TimeUnit.SECONDS);
            Assert.assertNotNull(receive, "Message should not be null");
            subscribe.acknowledge(receive);
            String str = new String(receive.getData());
            log.debug("Received message: [{}]", str);
            Assert.assertTrue(newHashSet.add(str), "Message " + str + " already received");
        }
        create.close();
        subscribe.unsubscribe();
        subscribe.close();
        newPulsarClient.close();
        this.admin.topics().deletePartitionedTopic(topicName.toString());
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(timeOut = 30000)
    public void testPartitionedTopicNameWithSpecialCharacter() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        TopicName topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic1-" + System.currentTimeMillis() + "! * ' ( ) ; : @ & = + $ , \\ ? % # [ ]");
        this.admin.topics().createPartitionedTopic(topicName.toString(), 4);
        this.pulsarClient.newProducer().topic(topicName.toString()).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create().close();
        this.admin.topics().deletePartitionedTopic(topicName.toString());
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(timeOut = 30000)
    public void testCustomPartitionProducer() throws Exception {
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        TopicName topicName = null;
        Producer producer = null;
        Consumer consumer = null;
        try {
            log.info("-- Starting {} test --", this.methodName);
            topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic1-" + System.currentTimeMillis());
            this.admin.topics().createPartitionedTopic(topicName.toString(), 4);
            producer = newPulsarClient.newProducer().topic(topicName.toString()).messageRouter(new AlwaysTwoMessageRouter()).create();
            consumer = newPulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-partitioned-subscriber").subscribe();
            for (int i = 0; i < 16; i++) {
                producer.newMessage().key(String.valueOf(i)).value(("my-message-" + i).getBytes()).send();
            }
            HashSet newHashSet = Sets.newHashSet();
            for (int i2 = 0; i2 < 16; i2++) {
                Message receive = consumer.receive(5, TimeUnit.SECONDS);
                Assert.assertNotNull(receive, "Message should not be null");
                consumer.acknowledge(receive);
                String str = new String(receive.getData());
                log.debug("Received message: [{}]", str);
                testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i2);
            }
            producer.close();
            consumer.unsubscribe();
            consumer.close();
            newPulsarClient.close();
            this.admin.topics().deletePartitionedTopic(topicName.toString());
            log.info("-- Exiting {} test --", this.methodName);
        } catch (Throwable th) {
            producer.close();
            consumer.unsubscribe();
            consumer.close();
            newPulsarClient.close();
            this.admin.topics().deletePartitionedTopic(topicName.toString());
            log.info("-- Exiting {} test --", this.methodName);
            throw th;
        }
    }

    @Test(timeOut = 30000)
    public void testSinglePartitionProducer() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        TopicName topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic2-" + System.currentTimeMillis());
        this.admin.topics().createPartitionedTopic(topicName.toString(), 4);
        Producer create = newPulsarClient.newProducer().topic(topicName.toString()).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer subscribe = newPulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-partitioned-subscriber").subscribe();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        HashSet newHashSet = Sets.newHashSet();
        for (int i2 = 0; i2 < 10; i2++) {
            Message receive = subscribe.receive(5, TimeUnit.SECONDS);
            Assert.assertNotNull(receive, "Message should not be null");
            subscribe.acknowledge(receive);
            String str = new String(receive.getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, "my-message-" + i2);
        }
        create.close();
        subscribe.unsubscribe();
        subscribe.close();
        newPulsarClient.close();
        this.admin.topics().deletePartitionedTopic(topicName.toString());
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(timeOut = 30000)
    public void testKeyBasedProducer() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        TopicName topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic3-" + System.currentTimeMillis());
        this.admin.topics().createPartitionedTopic(topicName.toString(), 4);
        Producer create = newPulsarClient.newProducer().topic(topicName.toString()).create();
        Consumer subscribe = newPulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-partitioned-subscriber").subscribe();
        for (int i = 0; i < 5; i++) {
            create.newMessage().key("dummykey1").value(("my-message-" + i).getBytes()).send();
        }
        for (int i2 = 5; i2 < 10; i2++) {
            create.newMessage().key("dummykey2").value(("my-message-" + i2).getBytes()).send();
        }
        HashSet newHashSet = Sets.newHashSet();
        for (int i3 = 0; i3 < 10; i3++) {
            Message receive = subscribe.receive(5, TimeUnit.SECONDS);
            Assert.assertNotNull(receive, "Message should not be null");
            subscribe.acknowledge(receive);
            String str = new String(receive.getData());
            log.debug("Received message: [{}]", str);
            testKeyBasedOrder(newHashSet, str);
        }
        create.close();
        subscribe.unsubscribe();
        subscribe.close();
        newPulsarClient.close();
        this.admin.topics().deletePartitionedTopic(topicName.toString());
        log.info("-- Exiting {} test --", this.methodName);
    }

    private void testKeyBasedOrder(Set<String> set, String str) {
        int parseInt = Integer.parseInt(str.substring(str.lastIndexOf(45) + 1));
        if (parseInt != 0 && parseInt != 5) {
            Assert.assertTrue(set.contains("my-message-" + (parseInt - 1)), "Message my-message-" + (parseInt - 1) + " should come before my-message-" + parseInt);
        }
        Assert.assertTrue(set.add(str), "Received duplicate message " + str);
    }

    @Test(timeOut = 100000)
    public void testPauseAndResume() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        String topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic-pr-" + System.currentTimeMillis()).toString();
        this.admin.topics().createPartitionedTopic(topicName, 2);
        int i = 20 * 2;
        AtomicReference atomicReference = new AtomicReference(new CountDownLatch(i));
        AtomicInteger atomicInteger = new AtomicInteger();
        Consumer subscribe = newPulsarClient.newConsumer().receiverQueueSize(20).topic(new String[]{topicName}).subscriptionName("my-partitioned-subscriber").messageListener((consumer, message) -> {
            Assert.assertNotNull(message, "Message cannot be null");
            log.debug("Received message [{}] in the listener", new String(message.getData()));
            consumer.acknowledgeAsync(message);
            atomicInteger.incrementAndGet();
            ((CountDownLatch) atomicReference.get()).countDown();
        }).subscribe();
        Producer create = newPulsarClient.newProducer().topic(topicName).create();
        subscribe.pause();
        for (int i2 = 0; i2 < i * 2; i2++) {
            create.send(("my-message-" + i2).getBytes());
        }
        log.info("Waiting for message listener to ack " + i + " messages");
        Assert.assertTrue(((CountDownLatch) atomicReference.get()).await(i, TimeUnit.SECONDS), "Timed out waiting for message listener acks");
        log.info("Giving message listener an opportunity to receive messages while paused");
        Thread.sleep(2000L);
        Assert.assertEquals(atomicInteger.intValue(), i, "Consumer received messages while paused");
        atomicReference.set(new CountDownLatch(i));
        subscribe.resume();
        log.info("Waiting for message listener to ack all messages");
        Assert.assertTrue(((CountDownLatch) atomicReference.get()).await(i, TimeUnit.SECONDS), "Timed out waiting for message listener acks");
        subscribe.close();
        create.close();
        newPulsarClient.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(timeOut = 30000)
    public void testPauseAndResumeWithUnloading() throws Exception {
        String str = "persistent://my-property/my-ns/pause-and-resume-with-unloading" + System.currentTimeMillis();
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        this.admin.topics().createPartitionedTopic(str, 2);
        AtomicReference atomicReference = new AtomicReference(new CountDownLatch(40));
        AtomicInteger atomicInteger = new AtomicInteger();
        Consumer subscribe = newPulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("sub").receiverQueueSize(20).messageListener((consumer, message) -> {
            Assert.assertNotNull(message, "Message cannot be null");
            consumer.acknowledgeAsync(message);
            atomicInteger.incrementAndGet();
            ((CountDownLatch) atomicReference.get()).countDown();
        }).subscribe();
        subscribe.pause();
        Producer create = newPulsarClient.newProducer().topic(str).enableBatching(false).create();
        for (int i = 0; i < 80; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Assert.assertTrue(((CountDownLatch) atomicReference.get()).await(40L, TimeUnit.SECONDS), "Timed out waiting for message listener acks");
        for (int i2 = 0; i2 < 2; i2++) {
            this.admin.topics().unload(TopicName.get(str).getPartition(i2).toString());
        }
        Thread.sleep(2000L);
        Assert.assertEquals(atomicInteger.intValue(), 40, "Consumer received messages while paused");
        atomicReference.set(new CountDownLatch(40));
        subscribe.resume();
        Assert.assertTrue(((CountDownLatch) atomicReference.get()).await(40L, TimeUnit.SECONDS), "Timed out waiting for message listener acks");
        subscribe.unsubscribe();
        create.close();
        newPulsarClient.close();
        this.admin.topics().deletePartitionedTopic(str, true);
    }

    @Test(timeOut = 30000)
    public void testInvalidSequence() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        TopicName topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic4-" + System.currentTimeMillis());
        this.admin.topics().createPartitionedTopic(topicName.toString(), 4);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-subscriber-name").subscribe();
        Producer create = this.pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        try {
            subscribe.acknowledge(create.newMessage().value("InvalidMessage".getBytes()).getMessage());
        } catch (PulsarClientException.InvalidMessageException e) {
        }
        subscribe.close();
        try {
            subscribe.receive();
            Assert.fail("Should fail");
        } catch (PulsarClientException.AlreadyClosedException e2) {
        }
        try {
            subscribe.unsubscribe();
            Assert.fail("Should fail");
        } catch (PulsarClientException.AlreadyClosedException e3) {
        }
        create.close();
        try {
            create.send("message".getBytes());
            Assert.fail("Should fail");
        } catch (PulsarClientException.AlreadyClosedException e4) {
        }
        this.admin.topics().deletePartitionedTopic(topicName.toString());
    }

    @Test(timeOut = 30000)
    public void testSillyUser() throws Exception {
        TopicName topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic5-" + System.currentTimeMillis());
        this.admin.topics().createPartitionedTopic(topicName.toString(), 4);
        Producer producer = null;
        Consumer consumer = null;
        try {
            this.pulsarClient.newProducer().messageRouter((MessageRouter) null);
            Assert.fail("should fail");
        } catch (NullPointerException e) {
        }
        try {
            this.pulsarClient.newProducer().messageRoutingMode((MessageRoutingMode) null);
            Assert.fail("should fail");
        } catch (NullPointerException e2) {
        }
        try {
            producer = this.pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-sub").subscribe();
            producer.send("message1".getBytes());
            producer.send("message2".getBytes());
            consumer.receive();
            consumer.acknowledgeCumulative(consumer.receive());
            producer.close();
            consumer.unsubscribe();
            consumer.close();
            this.admin.topics().deletePartitionedTopic(topicName.toString());
        } catch (Throwable th) {
            producer.close();
            consumer.unsubscribe();
            consumer.close();
            throw th;
        }
    }

    @Test(timeOut = 30000)
    public void testDeletePartitionedTopic() throws Exception {
        TopicName topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic6-" + System.currentTimeMillis());
        this.admin.topics().createPartitionedTopic(topicName.toString(), 4);
        Producer create = this.pulsarClient.newProducer().topic(topicName.toString()).create();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-sub").subscribe();
        subscribe.unsubscribe();
        subscribe.close();
        create.close();
        this.admin.topics().deletePartitionedTopic(topicName.toString());
        if (this.pulsarClient.newProducer().topic(topicName.toString()).create() instanceof PartitionedProducerImpl) {
            Assert.fail("should fail since partitioned topic was deleted");
        }
    }

    @Test(timeOut = 30000)
    public void testAsyncPartitionedProducerConsumer() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        HashSet newHashSet = Sets.newHashSet();
        HashSet newHashSet2 = Sets.newHashSet();
        TopicName topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic1-" + System.currentTimeMillis());
        this.admin.topics().createPartitionedTopic(topicName.toString(), 4);
        Producer create = this.pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-partitioned-subscriber").subscriptionType(SubscriptionType.Shared).subscribe();
        for (int i = 0; i < 100; i++) {
            String str = "my-message-" + i;
            newHashSet.add(str);
            create.send(str.getBytes());
        }
        log.info(" start receiving messages :");
        CountDownLatch countDownLatch = new CountDownLatch(100);
        receiveAsync(subscribe, 100, 0, countDownLatch, newHashSet2, this.executor);
        countDownLatch.await();
        Assert.assertEquals(newHashSet.size(), 100);
        newHashSet.removeAll(newHashSet2);
        Assert.assertTrue(newHashSet.isEmpty());
        create.close();
        subscribe.unsubscribe();
        subscribe.close();
        this.admin.topics().deletePartitionedTopic(topicName.toString());
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(timeOut = 30000)
    public void testAsyncPartitionedProducerConsumerQueueSizeOne() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        HashSet newHashSet = Sets.newHashSet();
        HashSet newHashSet2 = Sets.newHashSet();
        TopicName topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic1-" + System.currentTimeMillis());
        this.admin.topics().createPartitionedTopic(topicName.toString(), 4);
        Producer create = newPulsarClient.newProducer().topic(topicName.toString()).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer<byte[]> subscribe = newPulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-partitioned-subscriber").receiverQueueSize(1).subscribe();
        for (int i = 0; i < 100; i++) {
            String str = "my-message-" + i;
            newHashSet.add(str);
            create.send(str.getBytes());
        }
        log.info(" start receiving messages :");
        CountDownLatch countDownLatch = new CountDownLatch(100);
        receiveAsync(subscribe, 100, 0, countDownLatch, newHashSet2, this.executor);
        countDownLatch.await();
        Assert.assertEquals(newHashSet.size(), 100);
        newHashSet.removeAll(newHashSet2);
        Assert.assertTrue(newHashSet.isEmpty());
        create.close();
        subscribe.unsubscribe();
        subscribe.close();
        newPulsarClient.close();
        this.admin.topics().deletePartitionedTopic(topicName.toString());
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(timeOut = 30000)
    public void testFairDistributionForPartitionConsumers() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        String str = "persistent://my-property/my-ns/my-topic-" + System.currentTimeMillis();
        this.admin.topics().createPartitionedTopic(str, 2);
        Producer create = newPulsarClient.newProducer().topic(str + "-partition-0").messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Producer create2 = newPulsarClient.newProducer().topic(str + "-partition-1").messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer subscribe = newPulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-partitioned-subscriber").receiverQueueSize(10).subscribe();
        int i = 0;
        for (int i2 = 0; i2 < 9; i2++) {
            create.send(("producer1-" + i2).getBytes());
        }
        Thread.sleep(1000L);
        for (int i3 = 0; i3 < 5; i3++) {
            create2.send(("producer2-" + i3).getBytes());
        }
        for (int i4 = 0; i4 < 30; i4++) {
            i += new String(subscribe.receive().getData()).startsWith("producer2") ? 1 : 0;
            if (i4 >= 2) {
                create.send("producer1".getBytes());
                Thread.sleep(100L);
            }
        }
        Assert.assertTrue(i >= 4);
        create.close();
        create2.close();
        subscribe.unsubscribe();
        subscribe.close();
        newPulsarClient.close();
        this.admin.topics().deletePartitionedTopic(str);
        log.info("-- Exiting {} test --", this.methodName);
    }

    private void receiveAsync(Consumer<byte[]> consumer, int i, int i2, CountDownLatch countDownLatch, Set<String> set, ExecutorService executorService) throws PulsarClientException {
        if (i2 < i) {
            consumer.receiveAsync().handle((message, th) -> {
                if (th != null) {
                    return null;
                }
                set.add(new String(message.getData()));
                try {
                    consumer.acknowledge(message);
                } catch (PulsarClientException e) {
                    Assert.fail("message acknowledge failed", e);
                }
                executorService.execute(() -> {
                    try {
                        receiveAsync(consumer, i, i2 + 1, countDownLatch, set, executorService);
                    } catch (PulsarClientException e2) {
                        Assert.fail("message receive failed", e2);
                    }
                });
                countDownLatch.countDown();
                return null;
            });
        }
    }

    @Test
    public void testGetPartitionsForTopic() throws Exception {
        String str = "persistent://my-property/my-ns/my-partitionedtopic1-" + System.currentTimeMillis();
        this.admin.topics().createPartitionedTopic(str, 4);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 4; i++) {
            arrayList.add(str + "-partition-" + i);
        }
        Assert.assertEquals((Collection) this.pulsarClient.getPartitionsForTopic(str).join(), arrayList);
        Assert.assertEquals((Collection) this.pulsarClient.getPartitionsForTopic("persistent://my-property/my-ns/my-non-partitionedtopic1").join(), Collections.singletonList("persistent://my-property/my-ns/my-non-partitionedtopic1"));
    }

    @Test
    public void testMessageIdForSubscribeToSinglePartition() throws Exception {
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        TopicName topicName = null;
        Producer producer = null;
        Consumer consumer = null;
        Consumer consumer2 = null;
        try {
            log.info("-- Starting {} test --", this.methodName);
            topicName = TopicName.get("persistent://my-property/my-ns/my-topic-" + System.currentTimeMillis());
            TopicName partition = topicName.getPartition(2);
            this.admin.topics().createPartitionedTopic(topicName.toString(), 4);
            producer = newPulsarClient.newProducer().topic(topicName.toString()).messageRouter(new AlwaysTwoMessageRouter()).create();
            consumer = newPulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("subscriber-partitioned").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionType(SubscriptionType.Exclusive).subscribe();
            consumer2 = newPulsarClient.newConsumer().topic(new String[]{partition.toString()}).subscriptionName("subscriber-single").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionType(SubscriptionType.Exclusive).subscribe();
            for (int i = 0; i < 30; i++) {
                producer.newMessage().key(String.valueOf(i)).value(("my-message-" + i).getBytes()).send();
            }
            producer.flush();
            for (int i2 = 0; i2 < 30; i2++) {
                Message receive = consumer.receive(5, TimeUnit.SECONDS);
                Assert.assertEquals(receive.getMessageId().getInnerMessageId().getPartitionIndex(), 2);
                consumer.acknowledge(receive);
            }
            for (int i3 = 0; i3 < 30; i3++) {
                Message receive2 = consumer2.receive(5, TimeUnit.SECONDS);
                Assert.assertEquals(receive2.getMessageId().getPartitionIndex(), 2);
                consumer2.acknowledge(receive2);
            }
            producer.close();
            consumer.unsubscribe();
            consumer.close();
            consumer2.unsubscribe();
            consumer2.close();
            newPulsarClient.close();
            this.admin.topics().deletePartitionedTopic(topicName.toString());
            log.info("-- Exiting {} test --", this.methodName);
        } catch (Throwable th) {
            producer.close();
            consumer.unsubscribe();
            consumer.close();
            consumer2.unsubscribe();
            consumer2.close();
            newPulsarClient.close();
            this.admin.topics().deletePartitionedTopic(topicName.toString());
            log.info("-- Exiting {} test --", this.methodName);
            throw th;
        }
    }

    @Test(timeOut = 30000)
    public void testAutoUpdatePartitionsForProducerConsumer() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        String str = "persistent://my-property/my-ns/my-topic-" + System.currentTimeMillis();
        this.admin.topics().createPartitionedTopic(str, 2);
        PartitionedProducerImpl create = newPulsarClient.newProducer().topic(str).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).enableBatching(false).autoUpdatePartitions(true).create();
        MultiTopicsConsumerImpl subscribe = newPulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-partitioned-subscriber").subscriptionType(SubscriptionType.Shared).autoUpdatePartitions(true).subscribe();
        for (int i = 0; i < 30; i++) {
            create.send(("producerMsg first round message index: " + i).getBytes());
        }
        int i2 = 0;
        Message receive = subscribe.receive();
        do {
            i2++;
            subscribe.acknowledge(receive);
            log.info("Consumer acknowledged : " + new String(receive.getData()));
            receive = subscribe.receive(200, TimeUnit.MILLISECONDS);
        } while (receive != null);
        Assert.assertEquals(i2, 30);
        this.admin.topics().updatePartitionedTopic(str, 3);
        log.info("trigger partitionsAutoUpdateTimerTask for producer");
        Timeout partitionsAutoUpdateTimeout = create.getPartitionsAutoUpdateTimeout();
        partitionsAutoUpdateTimeout.task().run(partitionsAutoUpdateTimeout);
        Thread.sleep(200L);
        for (int i3 = 0; i3 < 30; i3++) {
            create.send(("producerMsg second round message index: " + i3).getBytes());
        }
        int i4 = 0;
        Message receive2 = subscribe.receive();
        do {
            i4++;
            subscribe.acknowledge(receive2);
            log.info("Consumer acknowledged : " + new String(receive2.getData()));
            receive2 = subscribe.receive(200, TimeUnit.MILLISECONDS);
        } while (receive2 != null);
        Assert.assertEquals(i4, 20);
        log.info("trigger partitionsAutoUpdateTimerTask for consumer");
        Timeout partitionsAutoUpdateTimeout2 = subscribe.getPartitionsAutoUpdateTimeout();
        partitionsAutoUpdateTimeout2.task().run(partitionsAutoUpdateTimeout2);
        Thread.sleep(200L);
        int i5 = 0;
        Message receive3 = subscribe.receive();
        do {
            i5++;
            subscribe.acknowledge(receive3);
            log.info("Consumer acknowledged : " + new String(receive3.getData()));
            receive3 = subscribe.receive(200, TimeUnit.MILLISECONDS);
        } while (receive3 != null);
        Assert.assertEquals(i5, 10);
        for (int i6 = 0; i6 < 30; i6++) {
            create.send(("producerMsg third round message index: " + i6).getBytes());
        }
        int i7 = 0;
        Message receive4 = subscribe.receive();
        do {
            i7++;
            subscribe.acknowledge(receive4);
            log.info("Consumer acknowledged : " + new String(receive4.getData()));
            receive4 = subscribe.receive(200, TimeUnit.MILLISECONDS);
        } while (receive4 != null);
        Assert.assertEquals(i7, 30);
        newPulsarClient.close();
        this.admin.topics().deletePartitionedTopic(str);
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test
    public void testCustomPartitionedProducer() throws Exception {
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        TopicName topicName = null;
        Producer producer = null;
        try {
            log.info("-- Starting {} test --", this.methodName);
            topicName = TopicName.get("persistent://my-property/my-ns/my-partitionedtopic1-" + System.currentTimeMillis());
            this.admin.topics().createPartitionedTopic(topicName.toString(), 4);
            producer = newPulsarClient.newProducer().topic(topicName.toString()).messageRouter(new RouterWithTopicName()).create();
            for (int i = 0; i < 1; i++) {
                producer.newMessage().key(String.valueOf(i)).value(("my-message-" + i).getBytes()).send();
            }
            Assert.assertEquals(RouterWithTopicName.topicName, topicName.toString());
            producer.close();
            newPulsarClient.close();
            this.admin.topics().deletePartitionedTopic(topicName.toString());
            log.info("-- Exiting {} test --", this.methodName);
        } catch (Throwable th) {
            producer.close();
            newPulsarClient.close();
            this.admin.topics().deletePartitionedTopic(topicName.toString());
            log.info("-- Exiting {} test --", this.methodName);
            throw th;
        }
    }

    @Test
    public void testPartitionedTopicInterceptor() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        TopicName topicName = TopicName.get("persistent://my-property/my-ns/interceptor-partitionedtopic1-" + System.currentTimeMillis());
        this.admin.topics().createPartitionedTopic(topicName.toString(), 4);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        Producer create = newPulsarClient.newProducer().topic(topicName.toString()).enableBatching(false).autoUpdatePartitions(true).autoUpdatePartitionsInterval(1, TimeUnit.SECONDS).intercept(new ProducerInterceptor[]{new ProducerInterceptor<byte[]>() { // from class: org.apache.pulsar.client.api.PartitionedProducerConsumerTest.1
            public void close() {
            }

            public Message<byte[]> beforeSend(Producer<byte[]> producer, Message<byte[]> message) {
                return message;
            }

            public void onSendAcknowledgement(Producer<byte[]> producer, Message<byte[]> message, MessageId messageId, Throwable th) {
            }

            public void onPartitionsChange(String str, int i) {
                atomicInteger.addAndGet(i);
            }
        }}).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        Consumer subscribe = newPulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-partitioned-subscriber").autoUpdatePartitionsInterval(1, TimeUnit.SECONDS).intercept(new ConsumerInterceptor[]{new ConsumerInterceptor<byte[]>() { // from class: org.apache.pulsar.client.api.PartitionedProducerConsumerTest.2
            public void close() {
            }

            public Message<byte[]> beforeConsume(Consumer<byte[]> consumer, Message<byte[]> message) {
                return message;
            }

            public void onAcknowledge(Consumer<byte[]> consumer, MessageId messageId, Throwable th) {
            }

            public void onAcknowledgeCumulative(Consumer<byte[]> consumer, MessageId messageId, Throwable th) {
            }

            public void onNegativeAcksSend(Consumer<byte[]> consumer, Set<MessageId> set) {
            }

            public void onAckTimeoutSend(Consumer<byte[]> consumer, Set<MessageId> set) {
            }

            public void onPartitionsChange(String str, int i) {
                atomicInteger2.addAndGet(i);
            }
        }}).subscribe();
        int i = 4 + 5;
        this.admin.topics().updatePartitionedTopic(topicName.toString(), i);
        Awaitility.await().atMost(10000L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(atomicInteger.get() == i && atomicInteger2.get() == i);
        });
        Assert.assertEquals(atomicInteger.get(), i);
        Assert.assertEquals(atomicInteger2.get(), i);
        create.close();
        subscribe.unsubscribe();
        subscribe.close();
        newPulsarClient.close();
        this.admin.topics().deletePartitionedTopic(topicName.toString());
        log.info("-- Exiting {} test --", this.methodName);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -213005378:
                if (implMethodName.equals("lambda$testPauseAndResume$26154963$1")) {
                    z = false;
                    break;
                }
                break;
            case 1212572667:
                if (implMethodName.equals("lambda$testPauseAndResumeWithUnloading$26154963$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case SHARED_VALUE:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/PartitionedProducerConsumerTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Ljava/util/concurrent/atomic/AtomicReference;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    AtomicReference atomicReference = (AtomicReference) serializedLambda.getCapturedArg(1);
                    return (consumer, message) -> {
                        Assert.assertNotNull(message, "Message cannot be null");
                        log.debug("Received message [{}] in the listener", new String(message.getData()));
                        consumer.acknowledgeAsync(message);
                        atomicInteger.incrementAndGet();
                        ((CountDownLatch) atomicReference.get()).countDown();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/PartitionedProducerConsumerTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Ljava/util/concurrent/atomic/AtomicReference;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger2 = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    AtomicReference atomicReference2 = (AtomicReference) serializedLambda.getCapturedArg(1);
                    return (consumer2, message2) -> {
                        Assert.assertNotNull(message2, "Message cannot be null");
                        consumer2.acknowledgeAsync(message2);
                        atomicInteger2.incrementAndGet();
                        ((CountDownLatch) atomicReference2.get()).countDown();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
