/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api.v1;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.EncryptionKeyInfo;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.v1.V1_ProducerConsumerBase;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"flaky"})
public class V1_ProducerConsumerTest
extends V1_ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(V1_ProducerConsumerTest.class);
    private static final long BATCHING_MAX_PUBLISH_DELAY_THRESHOLD = 1L;

    @Override
    @BeforeMethod(alwaysRun=true)
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider(name="batch")
    public Object[][] codecProvider() {
        return new Object[][]{{0}, {1000}};
    }

    @Test(dataProvider="batch")
    public void testSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/use/my-ns/my-topic1");
        if (batchMessageDelayMs != 0) {
            producerBuilder.enableBatching(true).batchingMaxPublishDelay((long)batchMessageDelayMs, TimeUnit.MILLISECONDS).batchingMaxMessages(5);
        } else {
            producerBuilder.enableBatching(false);
        }
        Producer producer = producerBuilder.create();
        for (int i = 0; i < 10; ++i) {
            producer.send((Object)("my-message-" + i));
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = (String)msg.getValue();
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="batch")
    public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").batchingMaxMessages(5).batchingMaxPublishDelay(1L, TimeUnit.MILLISECONDS).enableBatching(batchMessageDelayMs != 0).create();
        ArrayList futures = Lists.newArrayList();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            CompletableFuture future = producer.sendAsync((Object)message.getBytes());
            futures.add(future);
        }
        log.info("Waiting for async publish to complete");
        for (Future future : futures) {
            future.get();
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.info("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        CompletableFuture ackFuture = consumer.acknowledgeCumulativeAsync(msg);
        log.info("Waiting for async ack to complete");
        ackFuture.get();
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="batch", timeOut=100000L)
    public void testMessageListener(int batchMessageDelayMs) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int numMessages = 100;
        CountDownLatch latch = new CountDownLatch(numMessages);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic3"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).messageListener((MessageListener & Serializable)(c, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            log.debug("Received message [{}] in the listener", (Object)receivedMessage);
            c.acknowledgeAsync(msg);
            latch.countDown();
        }).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic3").batchingMaxMessages(5).batchingMaxPublishDelay(1L, TimeUnit.MILLISECONDS).enableBatching(batchMessageDelayMs != 0).create();
        ArrayList futures = Lists.newArrayList();
        for (int i = 0; i < numMessages; ++i) {
            String message = "my-message-" + i;
            CompletableFuture future = producer.sendAsync((Object)message.getBytes());
            futures.add(future);
        }
        log.info("Waiting for async publish to complete");
        for (Future future : futures) {
            future.get();
        }
        log.info("Waiting for message listener to ack all messages");
        Assert.assertTrue((boolean)latch.await(numMessages, TimeUnit.SECONDS), (String)"Timed out waiting for message listener acks");
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="batch")
    public void testBackoffAndReconnect(int batchMessageDelayMs) throws Exception {
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic4"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).startMessageIdInclusive().subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic4").batchingMaxMessages(5).batchingMaxPublishDelay(1L, TimeUnit.MILLISECONDS).enableBatching(batchMessageDelayMs != 0).create();
        CompletionStage lastFuture = null;
        for (int i2 = 0; i2 < 10; ++i2) {
            lastFuture = producer.sendAsync((Object)("my-message-" + i2).getBytes()).thenApply(msgId -> {
                log.info("Published message id: {}", msgId);
                return msgId;
            });
        }
        lastFuture.get();
        Message msg = null;
        for (i = 0; i < 10; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            log.info("Received: [{}]", (Object)new String(msg.getData()));
        }
        log.info("-- Restarting broker --");
        this.restartBroker();
        msg = null;
        log.info("Receiving duplicate messages..");
        for (i = 0; i < 10; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            log.info("Received: [{}]", (Object)new String(msg.getData()));
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="batch")
    public void testSendTimeout(int batchMessageDelayMs) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic5"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic5").batchingMaxMessages(5).batchingMaxPublishDelay(2L, TimeUnit.MILLISECONDS).enableBatching(batchMessageDelayMs != 0).sendTimeout(1, TimeUnit.SECONDS).create();
        String message = "my-message";
        this.stopBroker();
        CompletableFuture future = producer.sendAsync((Object)"my-message".getBytes());
        try {
            future.get();
            Assert.fail((String)"Send operation should have failed");
        }
        catch (ExecutionException executionException) {
            // empty catch block
        }
        this.startBroker();
        Message msg = consumer.receive(3, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testInvalidSequence() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        PulsarClient client1 = PulsarClient.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build();
        client1.close();
        try {
            client1.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic6"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
            Assert.fail((String)"Should fail");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)(e instanceof PulsarClientException.AlreadyClosedException));
        }
        try {
            client1.newProducer().topic("persistent://my-property/use/my-ns/my-topic6").create();
            Assert.fail((String)"Should fail");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)(e instanceof PulsarClientException.AlreadyClosedException));
        }
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic6"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic6").create();
        try {
            TypedMessageBuilder builder = producer.newMessage().value((Object)"InvalidMessage".getBytes());
            Message msg = ((TypedMessageBuilderImpl)builder).getMessage();
            consumer.acknowledge(msg);
        }
        catch (PulsarClientException.InvalidMessageException invalidMessageException) {
            // empty catch block
        }
        consumer.close();
        try {
            consumer.receive();
            Assert.fail((String)"Should fail");
        }
        catch (PulsarClientException.AlreadyClosedException alreadyClosedException) {
            // empty catch block
        }
        try {
            consumer.unsubscribe();
            Assert.fail((String)"Should fail");
        }
        catch (PulsarClientException.AlreadyClosedException alreadyClosedException) {
            // empty catch block
        }
        producer.close();
        try {
            producer.send((Object)"message".getBytes());
            Assert.fail((String)"Should fail");
        }
        catch (PulsarClientException.AlreadyClosedException alreadyClosedException) {
            // empty catch block
        }
    }

    @Test
    public void testSillyUser() throws Exception {
        try {
            PulsarClient.builder().serviceUrl("invalid://url").build();
            Assert.fail((String)"should fail");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)(e instanceof PulsarClientException.InvalidServiceURL));
        }
        try {
            this.pulsarClient.newProducer().sendTimeout(-1, TimeUnit.SECONDS);
            Assert.fail((String)"should fail");
        }
        catch (IllegalArgumentException e) {
            // empty catch block
        }
        try {
            this.pulsarClient.newProducer().topic("invalid://topic").create();
            Assert.fail((String)"should fail");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)(e instanceof PulsarClientException.InvalidTopicNameException));
        }
        try {
            this.pulsarClient.newConsumer().messageListener(null);
            Assert.fail((String)"should fail");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        try {
            this.pulsarClient.newConsumer().subscriptionType(null);
            Assert.fail((String)"should fail");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        try {
            this.pulsarClient.newConsumer().receiverQueueSize(-1);
            Assert.fail((String)"should fail");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        try {
            this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic7"}).subscriptionName(null).subscribe();
            Assert.fail((String)"Should fail");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        try {
            this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic7"}).subscriptionName("").subscribe();
            Assert.fail((String)"Should fail");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        try {
            this.pulsarClient.newConsumer().topic(new String[]{"invalid://topic7"}).subscriptionName(null).subscribe();
            Assert.fail((String)"Should fail");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="batch")
    public void testConcurrentConsumerReceiveWhileReconnect(int batchMessageDelayMs) throws Exception {
        int recvQueueSize = 100;
        int numConsumersThreads = 10;
        String topic = "persistent://my-property/use/my-ns/my-topic-" + UUID.randomUUID().toString();
        String subName = UUID.randomUUID().toString();
        final Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName(subName).startMessageIdInclusive().receiverQueueSize(100).subscribe();
        ExecutorService executor = Executors.newCachedThreadPool();
        try {
            Message msg;
            final CyclicBarrier barrier = new CyclicBarrier(11);
            for (int i = 0; i < 10; ++i) {
                executor.submit(new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        barrier.await();
                        consumer.receive();
                        return null;
                    }
                });
            }
            barrier.await();
            Thread.sleep(100L);
            this.restartBroker();
            Thread.sleep(2000L);
            Producer producer = this.pulsarClient.newProducer().topic(topic).batchingMaxPublishDelay(1L, TimeUnit.MILLISECONDS).batchingMaxMessages(5).enableBatching(batchMessageDelayMs != 0).create();
            for (int i = 0; i < 100; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            ConsumerImpl consumerImpl = (ConsumerImpl)consumer;
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)consumerImpl.getAvailablePermits(), (int)10));
            Assert.assertEquals((int)consumerImpl.numMessagesInQueue(), (int)90);
            barrier.reset();
            for (int i = 0; i < 10; ++i) {
                executor.submit(new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        barrier.await();
                        consumer.receive();
                        return null;
                    }
                });
            }
            barrier.await();
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)consumerImpl.getAvailablePermits(), (int)20));
            Assert.assertEquals((int)consumerImpl.numMessagesInQueue(), (int)80);
            while ((msg = consumer.receive(1, TimeUnit.SECONDS)) != null) {
            }
            Assert.assertEquals((int)consumerImpl.getAvailablePermits(), (int)0);
            Assert.assertEquals((int)consumerImpl.numMessagesInQueue(), (int)0);
            barrier.reset();
            for (int i = 0; i < 10; ++i) {
                executor.submit(new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        barrier.await();
                        consumer.receive();
                        return null;
                    }
                });
            }
            barrier.await();
            Thread.sleep(100L);
            this.restartBroker();
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)consumerImpl.getAvailablePermits(), (int)10));
            Assert.assertEquals((int)consumerImpl.numMessagesInQueue(), (int)90);
            consumer.close();
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    @Test
    public void testSendBigMessageSize() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topic = "persistent://my-property/use/my-ns/bigMsg";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/bigMsg").create();
        producer.newMessage().value((Object)new byte[0x500000]);
        try {
            producer.send((Object)new byte[0x500001]);
            Assert.fail((String)"Should have thrown exception");
        }
        catch (PulsarClientException.InvalidMessageException invalidMessageException) {
            // empty catch block
        }
    }

    @Test(groups={"quarantine"})
    public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception {
        int i;
        int i2;
        log.info("-- Starting {} test --", (Object)this.methodName);
        long batchMessageDelayMs = 100L;
        int receiverSize = 10;
        String topicName = "cache-topic-" + UUID.randomUUID().toString();
        String sub1 = "faster-sub1";
        String sub2 = "slower-sub2";
        Consumer subscriber1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/" + topicName}).subscriptionName("faster-sub1").subscriptionType(SubscriptionType.Shared).receiverQueueSize(10).subscribe();
        String topic = "persistent://my-property/use/my-ns/" + topicName;
        Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(true).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).batchingMaxMessages(5).create();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topic).get();
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)topicRef.getManagedLedger();
        Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache");
        cacheField.setAccessible(true);
        Field modifiersField = Field.class.getDeclaredField("modifiers");
        modifiersField.setAccessible(true);
        modifiersField.setInt(cacheField, cacheField.getModifiers() & 0xFFFFFFEF);
        EntryCacheImpl entryCache = (EntryCacheImpl)Mockito.spy((Object)((EntryCacheImpl)cacheField.get(ledger)));
        cacheField.set(ledger, entryCache);
        Message msg = null;
        for (i2 = 0; i2 < 30; ++i2) {
            String message = "my-message-" + i2;
            producer.send((Object)message.getBytes());
        }
        for (i2 = 0; i2 < 30; ++i2) {
            msg = subscriber1.receive(5, TimeUnit.SECONDS);
            subscriber1.acknowledge(msg);
        }
        ((EntryCacheImpl)Mockito.verify((Object)entryCache, (VerificationMode)Mockito.atLeastOnce())).invalidateEntries((PositionImpl)Mockito.any());
        Thread.sleep(1000L);
        producer.send((Object)"message".getBytes());
        msg = subscriber1.receive(5, TimeUnit.SECONDS);
        Consumer subscriber2 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/" + topicName}).subscriptionName("slower-sub2").subscriptionType(SubscriptionType.Shared).subscribe();
        int moreMessages = 10;
        for (i = 0; i < 20; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        for (i = 0; i < 20; ++i) {
            msg = subscriber1.receive(5, TimeUnit.SECONDS);
            subscriber1.acknowledge(msg);
        }
        Thread.sleep(1000L);
        producer.send((Object)"message".getBytes());
        msg = subscriber1.receive(5, TimeUnit.SECONDS);
        V1_ProducerConsumerTest.retryStrategically(test -> entryCache.getSize() > 0L, 10, 100L);
        Assert.assertTrue((entryCache.getSize() != 0L ? 1 : 0) != 0);
        subscriber2.close();
        V1_ProducerConsumerTest.retryStrategically(test -> entryCache.getSize() == 0L, 5, 100L);
        Assert.assertEquals((long)entryCache.getSize(), (long)0L);
        subscriber1.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=2000L)
    public void testAsyncProducerAndConsumer() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 100;
        HashSet produceMsgs = Sets.newHashSet();
        HashSet consumeMsgs = Sets.newHashSet();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
        for (int i = 0; i < 100; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            produceMsgs.add(message);
        }
        log.info(" start receiving messages :");
        CountDownLatch latch = new CountDownLatch(100);
        ExecutorService executor = Executors.newFixedThreadPool(1);
        try {
            this.receiveAsync((Consumer<byte[]>)consumer, 100, 0, latch, consumeMsgs, executor);
            latch.await();
            Assert.assertEquals((int)produceMsgs.size(), (int)100);
            produceMsgs.removeAll(consumeMsgs);
            Assert.assertTrue((boolean)produceMsgs.isEmpty());
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=2000L)
    public void testAsyncProducerAndConsumerWithZeroQueueSize() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 100;
        HashSet produceMsgs = Sets.newHashSet();
        HashSet consumeMsgs = Sets.newHashSet();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
        for (int i = 0; i < 100; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            produceMsgs.add(message);
        }
        log.info(" start receiving messages :");
        CountDownLatch latch = new CountDownLatch(100);
        ExecutorService executor = Executors.newFixedThreadPool(1);
        try {
            this.receiveAsync((Consumer<byte[]>)consumer, 100, 0, latch, consumeMsgs, executor);
            latch.await();
            Assert.assertEquals((int)produceMsgs.size(), (int)100);
            produceMsgs.removeAll(consumeMsgs);
            Assert.assertTrue((boolean)produceMsgs.isEmpty());
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    @Test
    public void testSendCallBack() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 100;
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(false).create();
        for (int i = 0; i < 100; ++i) {
            String message = "my-message-" + i;
            AtomicInteger msgLength = new AtomicInteger();
            CompletionStage future = producer.sendAsync((Object)message.getBytes()).handle((r, ex) -> {
                if (ex != null) {
                    log.error("Message send failed:", ex);
                } else {
                    msgLength.set(message.length());
                }
                return null;
            });
            ((CompletableFuture)future).get();
            Assert.assertEquals((int)message.getBytes().length, (int)msgLength.get());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testSharedConsumerAckDifferentConsumer() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        ConsumerBuilder cb = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).receiverQueueSize(1);
        Consumer consumer1 = cb.subscribe();
        Consumer consumer2 = cb.subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        HashSet consumerMsgSet1 = Sets.newHashSet();
        HashSet consumerMsgSet2 = Sets.newHashSet();
        for (int i = 0; i < 5; ++i) {
            msg = consumer1.receive();
            consumerMsgSet1.add(msg);
            msg = consumer2.receive();
            consumerMsgSet2.add(msg);
        }
        consumerMsgSet1.forEach(m -> {
            try {
                consumer2.acknowledge(m);
            }
            catch (PulsarClientException e) {
                Assert.fail();
            }
        });
        consumerMsgSet2.forEach(m -> {
            try {
                consumer1.acknowledge(m);
            }
            catch (PulsarClientException e) {
                Assert.fail();
            }
        });
        consumer1.redeliverUnacknowledgedMessages();
        consumer2.redeliverUnacknowledgedMessages();
        try {
            if (consumer1.receive(100, TimeUnit.MILLISECONDS) != null || consumer2.receive(100, TimeUnit.MILLISECONDS) != null) {
                Assert.fail();
            }
        }
        finally {
            consumer1.close();
            consumer2.close();
        }
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    private void receiveAsync(Consumer<byte[]> consumer, int totalMessage, int currentMessage, CountDownLatch latch, Set<String> consumeMsg, ExecutorService executor) throws PulsarClientException {
        if (currentMessage < totalMessage) {
            CompletableFuture future = consumer.receiveAsync();
            future.handle((msg, exception) -> {
                if (exception == null) {
                    consumeMsg.add(new String(msg.getData()));
                    try {
                        consumer.acknowledge(msg);
                    }
                    catch (PulsarClientException e1) {
                        Assert.fail((String)"message acknowledge failed", (Throwable)e1);
                    }
                    executor.execute(() -> {
                        try {
                            this.receiveAsync(consumer, totalMessage, currentMessage + 1, latch, consumeMsg, executor);
                        }
                        catch (PulsarClientException e) {
                            Assert.fail((String)"message receive failed", (Throwable)e);
                        }
                    });
                    latch.countDown();
                }
                return null;
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConsumerBlockingWithUnAckedMessages() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            int unAckedMessagesBufferSize = 500;
            int receiverQueueSize = 10;
            int totalProducedMsgs = 600;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(500);
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic").create();
            for (int i = 0; i < 600; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            Message msg = null;
            ArrayList messages = Lists.newArrayList();
            for (int i = 0; i < 600 && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                messages.add(msg);
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)messages.size(), (int)500);
            messages.forEach(m -> {
                try {
                    consumer.acknowledge(m);
                }
                catch (PulsarClientException e) {
                    Assert.fail((String)"ack failed", (Throwable)e);
                }
            });
            int remainingMessages = 600 - messages.size();
            for (int i = 0; i < remainingMessages; ++i) {
                msg = consumer.receive(1, TimeUnit.SECONDS);
                if (msg == null) continue;
                messages.add(msg);
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)600, (int)messages.size());
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConsumerBlockingWithUnAckedMessagesMultipleIteration() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            int unAckedMessagesBufferSize = 500;
            int receiverQueueSize = 10;
            int totalProducedMsgs = 1500;
            int totalReceiveIteration = 3;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(500);
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic").create();
            for (int i = 0; i < 1500; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            int totalReceivedMessages = 0;
            for (int j = 0; j < 3; ++j) {
                Message msg = null;
                ArrayList messages = Lists.newArrayList();
                for (int i = 0; i < 1500 && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                    messages.add(msg);
                    log.info("Received message: " + new String(msg.getData()));
                }
                Assert.assertEquals((int)messages.size(), (int)500);
                messages.forEach(m -> {
                    try {
                        consumer.acknowledge(m);
                    }
                    catch (PulsarClientException e) {
                        Assert.fail((String)"ack failed", (Throwable)e);
                    }
                });
                totalReceivedMessages += messages.size();
            }
            Assert.assertEquals((int)totalReceivedMessages, (int)1500);
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMutlipleSharedConsumerBlockingWithUnAckedMessages() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            int maxUnackedMessages = 20;
            int receiverQueueSize = 10;
            int totalProducedMsgs = 100;
            int totalReceiveMessages = 0;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(20);
            Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
            PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
            try {
                int i;
                Consumer consumer2 = newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
                Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic").create();
                for (int i2 = 0; i2 < 100; ++i2) {
                    String message = "my-message-" + i2;
                    producer.send((Object)message.getBytes());
                }
                Message msg = null;
                ArrayList messages = Lists.newArrayList();
                for (i = 0; i < 100 && (msg = consumer1.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                    messages.add(msg);
                    ++totalReceiveMessages;
                    log.info("Received message: " + new String(msg.getData()));
                }
                Assert.assertEquals((int)messages.size(), (int)20);
                messages.clear();
                for (i = 0; i < 80 && (msg = consumer2.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                    messages.add(msg);
                    ++totalReceiveMessages;
                    log.info("Received message: " + new String(msg.getData()));
                }
                Assert.assertEquals((int)messages.size(), (int)20);
                messages.forEach(m -> {
                    try {
                        consumer2.acknowledge(m);
                    }
                    catch (PulsarClientException e) {
                        Assert.fail((String)"shouldn't have failed ", (Throwable)e);
                    }
                });
                messages.clear();
                for (i = 0; i < 60 && (msg = consumer2.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                    messages.add(msg);
                    ++totalReceiveMessages;
                    consumer2.acknowledge(msg);
                    log.info("Received message: " + new String(msg.getData()));
                }
                Assert.assertEquals((int)100, (int)totalReceiveMessages);
                producer.close();
                consumer1.close();
                consumer2.close();
                log.info("-- Exiting {} test --", (Object)this.methodName);
            }
            finally {
                if (Collections.singletonList(newPulsarClient).get(0) != null) {
                    newPulsarClient.close();
                }
            }
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testShouldNotBlockConsumerIfRedeliverBeforeReceive() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        int totalReceiveMsg = 0;
        try {
            Message msg;
            int i;
            int receiverQueueSize = 20;
            int totalProducedMsgs = 100;
            ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(20).ackTimeout(1L, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Shared).subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic").enableBatching(false).create();
            for (i = 0; i < 100; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            Thread.sleep(1000L);
            Assert.assertEquals((int)consumer.numMessagesInQueue(), (int)20);
            Thread.sleep(2000L);
            Assert.assertEquals((int)consumer.numMessagesInQueue(), (int)20);
            for (i = 0; i < 100 && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                consumer.acknowledge(msg);
                ++totalReceiveMsg;
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)100, (int)totalReceiveMsg);
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUnackBlockRedeliverMessages() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        int totalReceiveMsg = 0;
        try {
            int unAckedMessagesBufferSize = 20;
            int receiverQueueSize = 10;
            int totalProducedMsgs = 100;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(20);
            ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic").create();
            for (int i = 0; i < 100; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            Message msg = null;
            ArrayList messages = Lists.newArrayList();
            for (int i = 0; i < 100 && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                messages.add(msg);
                ++totalReceiveMsg;
                log.info("Received message: " + new String(msg.getData()));
            }
            consumer.redeliverUnacknowledgedMessages();
            Thread.sleep(1000L);
            int alreadyConsumedMessages = messages.size();
            messages.clear();
            for (int i = 0; i < 100 && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                consumer.acknowledge(msg);
                ++totalReceiveMsg;
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)(100 + alreadyConsumedMessages), (int)totalReceiveMsg);
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="batch")
    public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            int i;
            int maxUnackedMessages = 20;
            int receiverQueueSize = 10;
            int totalProducedMsgs = 100;
            int totalReceiveMessages = 0;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(20);
            Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").subscriptionType(SubscriptionType.Shared).receiverQueueSize(10).subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic").enableBatching(batchMessageDelayMs != 0).batchingMaxPublishDelay(1L, TimeUnit.MILLISECONDS).batchingMaxMessages(5).create();
            ArrayList futures = Lists.newArrayList();
            for (int i2 = 0; i2 < 100; ++i2) {
                String message = "my-message-" + i2;
                futures.add(producer.sendAsync((Object)message.getBytes()));
            }
            FutureUtil.waitForAll((List)futures).get();
            Message msg = null;
            ArrayList messages = Lists.newArrayList();
            for (i = 0; i < 100 && (msg = consumer1.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                messages.add(msg);
                ++totalReceiveMessages;
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertNotEquals((Object)messages.size(), (Object)100);
            messages.forEach(m -> {
                try {
                    consumer1.acknowledge(m);
                }
                catch (PulsarClientException e) {
                    Assert.fail((String)"shouldn't have failed ", (Throwable)e);
                }
            });
            messages.clear();
            for (i = 0; i < 100 && (msg = consumer1.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                messages.add(msg);
                ++totalReceiveMessages;
                consumer1.acknowledge(msg);
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)100, (int)totalReceiveMessages);
            producer.close();
            consumer1.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBlockUnackConsumerAckByDifferentConsumer() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            int i;
            int maxUnackedMessages = 20;
            int receiverQueueSize = 10;
            int totalProducedMsgs = 100;
            int totalReceiveMessages = 0;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(20);
            ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared);
            Consumer consumer1 = consumerBuilder.subscribe();
            Consumer consumer2 = consumerBuilder.subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic").create();
            for (int i2 = 0; i2 < 100; ++i2) {
                String message = "my-message-" + i2;
                producer.send((Object)message.getBytes());
            }
            Message msg = null;
            ArrayList messages = Lists.newArrayList();
            for (i = 0; i < 100 && (msg = consumer1.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                messages.add(msg);
                ++totalReceiveMessages;
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)messages.size(), (int)20);
            messages.forEach(m -> {
                try {
                    consumer2.acknowledge(m);
                }
                catch (PulsarClientException e) {
                    Assert.fail((String)"shouldn't have failed ", (Throwable)e);
                }
            });
            for (i = 0; i < 100 && (msg = consumer1.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                ++totalReceiveMessages;
                consumer2.acknowledge(msg);
                log.info("Received message: " + new String(msg.getData()));
            }
            for (i = 0; i < 100 && (msg = consumer2.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                ++totalReceiveMessages;
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)100, (int)totalReceiveMessages);
            producer.close();
            consumer1.close();
            consumer2.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    @Test
    public void testEnabledChecksumClient() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 10;
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").subscribe();
        int batchMessageDelayMs = 300;
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(true).batchingMaxPublishDelay(300L, TimeUnit.MILLISECONDS).batchingMaxMessages(5).create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            int unAckedMessagesBufferSize = 10;
            int receiverQueueSize = 20;
            int totalProducedMsgs = 20;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(10);
            ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").subscriptionType(SubscriptionType.Shared).receiverQueueSize(20).subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic").create();
            for (int i = 0; i < 20; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
                Thread.sleep(10L);
            }
            Message msg = null;
            ArrayList messages1 = Lists.newArrayList();
            for (int i = 0; i < 20 && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                messages1.add(msg);
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)messages1.size(), (int)10);
            Set redeliveryMessages = messages1.stream().map(m -> (MessageIdImpl)m.getMessageId()).collect(Collectors.toSet());
            consumer.redeliverUnacknowledgedMessages((Set)Sets.newHashSet(redeliveryMessages));
            Thread.sleep(1000L);
            HashSet messages2 = Sets.newHashSet();
            for (int i = 0; i < 20 && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                messages2.add((MessageIdImpl)msg.getMessageId());
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)messages1.size(), (int)messages2.size());
            messages2.removeAll(redeliveryMessages);
            Assert.assertEquals((int)messages2.size(), (int)0);
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhileProduce() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int unAckedMessages = this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
        try {
            int unAckedMessagesBufferSize = 10;
            int receiverQueueSize = 20;
            int totalProducedMsgs = 50;
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(10);
            ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(20).subscriptionType(SubscriptionType.Shared).subscribe();
            consumer.close();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic").create();
            for (int i = 0; i < 50; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
                Thread.sleep(10L);
            }
            consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).receiverQueueSize(20).subscriptionName("subscriber-1").subscriptionType(SubscriptionType.Shared).subscribe();
            Message msg = null;
            ArrayList messages1 = Lists.newArrayList();
            for (int i = 0; i < 50 && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                messages1.add(msg);
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)messages1.size(), (int)20);
            Set redeliveryMessages = messages1.stream().map(m -> (MessageIdImpl)m.getMessageId()).collect(Collectors.toSet());
            consumer.redeliverUnacknowledgedMessages((Set)Sets.newHashSet(redeliveryMessages));
            Thread.sleep(1000L);
            HashSet messages2 = Sets.newHashSet();
            for (int i = 0; i < 50 && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
                messages2.add((MessageIdImpl)msg.getMessageId());
                log.info("Received message: " + new String(msg.getData()));
            }
            Assert.assertEquals((int)messages1.size(), (int)messages2.size());
            messages2.removeAll(redeliveryMessages);
            Assert.assertEquals((int)messages2.size(), (int)0);
            producer.close();
            consumer.close();
            log.info("-- Exiting {} test --", (Object)this.methodName);
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPriorityConsumer() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer consumer1 = newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).priorityLevel(1).receiverQueueSize(5).subscribe();
            PulsarClient newPulsarClient1 = this.newPulsarClient(this.lookupUrl.toString(), 0);
            try {
                Consumer consumer2 = newPulsarClient1.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).priorityLevel(1).receiverQueueSize(5).subscribe();
                PulsarClient newPulsarClient2 = this.newPulsarClient(this.lookupUrl.toString(), 0);
                try {
                    Consumer consumer3 = newPulsarClient2.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).priorityLevel(1).receiverQueueSize(5).subscribe();
                    PulsarClient newPulsarClient3 = this.newPulsarClient(this.lookupUrl.toString(), 0);
                    try {
                        int i;
                        CompletableFuture future;
                        String message;
                        Consumer consumer4 = newPulsarClient3.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).priorityLevel(2).receiverQueueSize(5).subscribe();
                        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
                        ArrayList futures = Lists.newArrayList();
                        for (int i2 = 0; i2 < 15; ++i2) {
                            message = "my-message-" + i2;
                            future = producer.sendAsync((Object)message.getBytes());
                            futures.add(future);
                        }
                        log.info("Waiting for async publish to complete");
                        for (Future future2 : futures) {
                            future2.get();
                        }
                        for (i = 0; i < 20; ++i) {
                            consumer1.receive(100, TimeUnit.MILLISECONDS);
                            consumer2.receive(100, TimeUnit.MILLISECONDS);
                        }
                        for (i = 0; i < 5; ++i) {
                            message = "my-message-" + i;
                            future = producer.sendAsync((Object)message.getBytes());
                            futures.add(future);
                        }
                        Assert.assertNull((Object)consumer4.receive(100, TimeUnit.MILLISECONDS));
                        producer.close();
                        consumer1.close();
                        consumer2.close();
                        consumer3.close();
                        consumer4.close();
                        log.info("-- Exiting {} test --", (Object)this.methodName);
                    }
                    finally {
                        if (Collections.singletonList(newPulsarClient3).get(0) != null) {
                            newPulsarClient3.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(newPulsarClient2).get(0) != null) {
                        newPulsarClient2.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(newPulsarClient1).get(0) != null) {
                    newPulsarClient1.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSharedSamePriorityConsumer() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int queueSize = 5;
        int maxUnAckMsgs = this.pulsar.getConfiguration().getMaxConcurrentLookupRequest();
        this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(5);
        Consumer c1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).subscribe();
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Message msg;
            int i;
            Consumer c2 = newPulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).subscribe();
            Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").enableBatching(false).create();
            ArrayList futures = Lists.newArrayList();
            int totalPublishMessages = 500;
            for (int i2 = 0; i2 < 500; ++i2) {
                String message = "my-message-" + i2;
                CompletableFuture future = producer.sendAsync((Object)message.getBytes());
                futures.add(future);
            }
            log.info("Waiting for async publish to complete");
            for (Future future : futures) {
                future.get();
            }
            ArrayList messages = Lists.newArrayList();
            for (i = 0; i < 500 && (msg = c1.receive(500, TimeUnit.MILLISECONDS)) != null; ++i) {
                messages.add(msg);
            }
            for (i = 0; i < 500 && (msg = c2.receive(500, TimeUnit.MILLISECONDS)) != null; ++i) {
                messages.add(msg);
            }
            Assert.assertEquals((int)10, (int)messages.size());
            PulsarClient newPulsarClient1 = this.newPulsarClient(this.lookupUrl.toString(), 0);
            try {
                Consumer c3 = newPulsarClient1.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).subscribe();
                PulsarClient newPulsarClient2 = this.newPulsarClient(this.lookupUrl.toString(), 0);
                try {
                    Consumer c4 = newPulsarClient2.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).subscribe();
                    PulsarClient newPulsarClient3 = this.newPulsarClient(this.lookupUrl.toString(), 0);
                    try {
                        Message msg2;
                        int i3;
                        Consumer c5 = newPulsarClient3.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic2"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(5).subscribe();
                        for (i3 = 0; i3 < 500 && (msg2 = c4.receive(500, TimeUnit.MILLISECONDS)) != null; ++i3) {
                            messages.add(msg2);
                        }
                        for (i3 = 0; i3 < 500 && (msg2 = c5.receive(500, TimeUnit.MILLISECONDS)) != null; ++i3) {
                            messages.add(msg2);
                        }
                        for (i3 = 0; i3 < 500 && (msg2 = c3.receive(500, TimeUnit.MILLISECONDS)) != null; ++i3) {
                            messages.add(msg2);
                            c3.acknowledge(msg2);
                        }
                        Assert.assertEquals((int)messages.size(), (int)500);
                        producer.close();
                        c1.close();
                        c2.close();
                        c3.close();
                        c4.close();
                        c5.close();
                        this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnAckMsgs);
                        log.info("-- Exiting {} test --", (Object)this.methodName);
                    }
                    finally {
                        if (Collections.singletonList(newPulsarClient3).get(0) != null) {
                            newPulsarClient3.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(newPulsarClient2).get(0) != null) {
                        newPulsarClient2.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(newPulsarClient1).get(0) != null) {
                    newPulsarClient1.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        }
    }

    @Test
    public void testRedeliveryFailOverConsumer() throws Exception {
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        int receiverQueueSize = 10;
        ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/unacked-topic"}).subscriptionName("subscriber-1").receiverQueueSize(10).subscriptionType(SubscriptionType.Failover).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/unacked-topic").create();
        int consumeMsgInParts = 4;
        for (int i2 = 0; i2 < 10; ++i2) {
            String message = "my-message-" + i2;
            producer.send((Object)message.getBytes());
            Thread.sleep(10L);
        }
        Message msg = null;
        ArrayList messages1 = Lists.newArrayList();
        for (i = 0; i < consumeMsgInParts && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
            messages1.add(msg);
            consumer.acknowledge(msg);
            log.info("Received message: " + new String(msg.getData()));
        }
        Assert.assertEquals((int)messages1.size(), (int)consumeMsgInParts);
        consumer.redeliverUnacknowledgedMessages();
        messages1.clear();
        for (i = 0; i < consumeMsgInParts && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i) {
            messages1.add(msg);
            consumer.acknowledge(msg);
            log.info("Received message: " + new String(msg.getData()));
        }
        Assert.assertEquals((int)messages1.size(), (int)consumeMsgInParts);
        consumer.redeliverUnacknowledgedMessages();
        for (i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            Thread.sleep(100L);
        }
        int remainingMsgs = 20 - 2 * consumeMsgInParts;
        messages1.clear();
        for (int i3 = 0; i3 < remainingMsgs && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i3) {
            messages1.add(msg);
            consumer.acknowledge(msg);
            log.info("Received message: " + new String(msg.getData()));
        }
        Assert.assertEquals((int)messages1.size(), (int)remainingMsgs);
        producer.close();
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(timeOut=5000L)
    public void testFailReceiveAsyncOnConsumerClose() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/failAsyncReceive-1"}).subscriptionName("my-subscriber-name").subscribe();
        consumer.close();
        try {
            consumer.receiveAsync().get(1L, TimeUnit.SECONDS);
            Assert.fail((String)"it should have failed because consumer is already closed");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException.AlreadyClosedException));
        }
        int numPartitions = 4;
        TopicName topicName = TopicName.get((String)"persistent://my-property/use/my-ns/failAsyncReceive-2");
        this.admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
        Consumer partitionedConsumer = this.pulsarClient.newConsumer().topic(new String[]{topicName.toString()}).subscriptionName("my-partitioned-subscriber").subscribe();
        partitionedConsumer.close();
        try {
            partitionedConsumer.receiveAsync().get(1L, TimeUnit.SECONDS);
            Assert.fail((String)"it should have failed because consumer is already closed");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException.AlreadyClosedException));
        }
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testECDSAEncryption() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 10;
        HashSet messageSet = Sets.newHashSet();
        class EncKeyReader
        implements CryptoKeyReader {
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            EncKeyReader() {
            }

            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }

            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }
        }
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/myecdsa-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/myecdsa-topic1").addEncryptionKey("client-ecdsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testRSAEncryption() throws Exception {
        String message;
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 10;
        HashSet messageSet = Sets.newHashSet();
        class EncKeyReader
        implements CryptoKeyReader {
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            EncKeyReader() {
            }

            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }

            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }
        }
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/myrsa-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/myrsa-topic1").addEncryptionKey("client-rsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        Producer producer2 = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/myrsa-topic1").addEncryptionKey("client-rsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        for (i = 0; i < 10; ++i) {
            message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        for (i = 10; i < 20; ++i) {
            message = "my-message-" + i;
            producer2.send((Object)message.getBytes());
        }
        Message msg = null;
        for (int i2 = 0; i2 < 20; ++i2) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i2;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testEncryptionFailure() throws Exception {
        class EncKeyReader
        implements CryptoKeyReader {
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            EncKeyReader() {
            }

            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        log.error("Failed to read certificate from {}", (Object)CERT_FILE_PATH);
                    }
                }
                return null;
            }

            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        log.error("Failed to read certificate from {}", (Object)CERT_FILE_PATH);
                    }
                }
                return null;
            }
        }
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 10;
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/use/my-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
        try {
            this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/use/myenc-ns/myenc-topic1").enableBatching(false).addEncryptionKey("client-non-existant-rsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
            Assert.fail((String)"Producer creation should not suceed if failing to read key");
        }
        catch (Exception exception) {
            // empty catch block
        }
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/use/my-ns/myenc-topic1").enableBatching(false).addEncryptionKey("client-rsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        for (int i = 0; i < 10; ++i) {
            producer.send((Object)("my-message-" + i));
        }
        msg = consumer.receive(5, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg, (String)"Receive should have failed with no keyreader");
        consumer.close();
        consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/use/my-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME).subscribe();
        int msgNum = 0;
        try {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = (String)msg.getValue();
            String expectedMessage = "my-message-" + msgNum++;
            Assert.assertNotEquals((Object)receivedMessage, (Object)expectedMessage, (String)("Received encrypted message " + receivedMessage + " should not match the expected message " + expectedMessage));
            consumer.acknowledgeCumulative(msg);
        }
        catch (Exception e) {
            Assert.fail((String)"Failed to receive message even aftet ConsumerCryptoFailureAction.CONSUME is set.");
        }
        consumer.close();
        consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/use/my-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).cryptoFailureAction(ConsumerCryptoFailureAction.FAIL).subscribe();
        for (int i = msgNum; i < 9; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = (String)msg.getValue();
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        consumer.close();
        consumer.close();
        consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/use/my-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD).subscribe();
        msg = consumer.receive(5, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg, (String)"Message received even after ConsumerCryptoFailureAction.DISCARD is set.");
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }
}

