/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.EncryptionKeyInfo;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.MultiTopicsReaderImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"flaky"})
public class TopicReaderTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(TopicReaderTest.class);

    @Override
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterClass(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider
    public static Object[][] variationsForExpectedPos() {
        return new Object[][]{{true, true, 10}, {true, false, 10}, {false, true, 10}, {false, false, 10}, {true, true, 100}, {true, false, 100}, {false, true, 100}, {false, false, 100}};
    }

    @DataProvider
    public static Object[][] variationsForResetOnLatestMsg() {
        return new Object[][]{{true, 20}, {false, 20}};
    }

    @DataProvider
    public static Object[][] variationsForHasMessageAvailable() {
        return new Object[][]{{true, true}, {true, false}, {false, true}, {false, false}};
    }

    @Test
    public void testSimpleReader() throws Exception {
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReader").startMessageId(MessageId.earliest).create();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testSimpleReader").create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = reader.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        reader.close();
        producer.close();
    }

    @Test
    public void testSimpleMultiReader() throws Exception {
        String topic = "persistent://my-property/my-ns/testSimpleMultiReader";
        this.admin.topics().createPartitionedTopic(topic, 3);
        Reader reader = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).create();
        Producer producer = this.pulsarClient.newProducer().topic(topic).create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = reader.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            Assert.assertTrue((boolean)messageSet.add(receivedMessage));
        }
        reader.close();
        producer.close();
    }

    @Test
    public void testReaderAfterMessagesWerePublished() throws Exception {
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderAfterMessagesWerePublished").create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderAfterMessagesWerePublished").startMessageId(MessageId.earliest).create();
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = reader.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        reader.close();
        producer.close();
    }

    @Test
    public void testMultiReaderAfterMessagesWerePublished() throws Exception {
        String topic = "persistent://my-property/my-ns/testMultiReaderAfterMessagesWerePublished";
        this.admin.topics().createPartitionedTopic(topic, 3);
        Producer producer = this.pulsarClient.newProducer().topic(topic).create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Reader reader = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).create();
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = reader.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            Assert.assertTrue((boolean)messageSet.add(receivedMessage));
        }
        reader.close();
        producer.close();
    }

    @Test
    public void testMultipleReaders() throws Exception {
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testMultipleReaders").create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Reader reader1 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultipleReaders").startMessageId(MessageId.earliest).create();
        Reader reader2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultipleReaders").startMessageId(MessageId.earliest).create();
        Message msg = null;
        HashSet messageSet1 = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = reader1.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet1, receivedMessage, expectedMessage);
        }
        HashSet messageSet2 = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = reader2.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet2, receivedMessage, expectedMessage);
        }
        reader1.close();
        reader2.close();
        producer.close();
    }

    @Test
    public void testMultiMultipleReaders() throws Exception {
        String topic = "persistent://my-property/my-ns/testMultiMultipleReaders";
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/testMultiMultipleReaders", 3);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testMultiMultipleReaders").create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Reader reader1 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultiMultipleReaders").startMessageId(MessageId.earliest).create();
        Reader reader2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultiMultipleReaders").startMessageId(MessageId.earliest).create();
        Message msg = null;
        HashSet messageSet1 = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = reader1.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            Assert.assertTrue((boolean)messageSet1.add(receivedMessage));
        }
        HashSet messageSet2 = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            msg = reader2.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            Assert.assertTrue((boolean)messageSet2.add(receivedMessage));
        }
        reader1.close();
        reader2.close();
        producer.close();
    }

    @Test
    public void testTopicStats() throws Exception {
        String topicName = "persistent://my-property/my-ns/testTopicStats";
        Reader reader1 = this.pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
        Reader reader2 = this.pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
        TopicStats stats = this.admin.topics().getStats(topicName);
        Assert.assertEquals((int)stats.subscriptions.size(), (int)2);
        reader1.close();
        stats = this.admin.topics().getStats(topicName);
        Assert.assertEquals((int)stats.subscriptions.size(), (int)1);
        reader2.close();
        stats = this.admin.topics().getStats(topicName);
        Assert.assertEquals((int)stats.subscriptions.size(), (int)0);
    }

    @Test
    public void testMultiTopicStats() throws Exception {
        String topicName = "persistent://my-property/my-ns/testMultiTopicStats";
        this.admin.topics().createPartitionedTopic(topicName, 3);
        Reader reader1 = this.pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
        Reader reader2 = this.pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
        PartitionedTopicStats stats = this.admin.topics().getPartitionedStats(topicName, true);
        Assert.assertEquals((int)stats.subscriptions.size(), (int)2);
        reader1.close();
        stats = this.admin.topics().getPartitionedStats(topicName, true);
        Assert.assertEquals((int)stats.subscriptions.size(), (int)1);
        reader2.close();
        stats = this.admin.topics().getPartitionedStats(topicName, true);
        Assert.assertEquals((int)stats.subscriptions.size(), (int)0);
    }

    @Test(dataProvider="variationsForResetOnLatestMsg")
    public void testReaderOnLatestMessage(boolean startInclusive, int numOfMessages) throws Exception {
        String topicName = "persistent://my-property/my-ns/ReaderOnLatestMessage";
        int halfOfMsgs = numOfMessages / 2;
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderOnLatestMessage").create();
        for (int i = 0; i < halfOfMsgs; ++i) {
            producer.send((Object)String.format("my-message-%d", i).getBytes());
        }
        ReaderBuilder readerBuilder = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderOnLatestMessage").startMessageId(MessageId.latest);
        if (startInclusive) {
            readerBuilder.startMessageIdInclusive();
        }
        Reader reader = readerBuilder.create();
        for (int i = halfOfMsgs; i < numOfMessages; ++i) {
            producer.send((Object)String.format("my-message-%d", i).getBytes());
        }
        HashSet messageSet = Sets.newHashSet();
        for (int i = halfOfMsgs; i < numOfMessages; ++i) {
            Message message = reader.readNext();
            String receivedMessage = new String(message.getData());
            String expectedMessage = String.format("my-message-%d", i);
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        Assert.assertTrue((boolean)reader.isConnected());
        Assert.assertEquals((int)((ReaderImpl)reader).getConsumer().numMessagesInQueue(), (int)0);
        Assert.assertEquals((int)messageSet.size(), (int)halfOfMsgs);
        reader.close();
        producer.close();
    }

    @Test(dataProvider="variationsForResetOnLatestMsg")
    public void testMultiReaderOnLatestMessage(boolean startInclusive, int numOfMessages) throws Exception {
        String topicName = "persistent://my-property/my-ns/testMultiReaderOnLatestMessage" + System.currentTimeMillis();
        this.admin.topics().createPartitionedTopic(topicName, 3);
        int halfOfMsgs = numOfMessages / 2;
        Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
        HashSet<byte[]> oldMessage = new HashSet<byte[]>();
        for (int i = 0; i < halfOfMsgs; ++i) {
            byte[] message = String.format("my-message-%d", i).getBytes();
            producer.send((Object)message);
            oldMessage.add(message);
        }
        ReaderBuilder readerBuilder = this.pulsarClient.newReader().topic(topicName).startMessageId(MessageId.latest);
        if (startInclusive) {
            readerBuilder.startMessageIdInclusive();
        }
        Reader reader = readerBuilder.create();
        for (int i = halfOfMsgs; i < numOfMessages; ++i) {
            producer.send((Object)String.format("my-message-%d", i).getBytes());
        }
        HashSet messageSet = Sets.newHashSet();
        for (int i = halfOfMsgs; i < numOfMessages; ++i) {
            Message message = reader.readNext();
            Assert.assertFalse((boolean)oldMessage.contains(message));
            String receivedMessage = new String(message.getData());
            Assert.assertTrue((boolean)messageSet.add(receivedMessage));
        }
        Assert.assertTrue((boolean)reader.isConnected());
        Assert.assertEquals((int)((MultiTopicsReaderImpl)reader).getMultiTopicsConsumer().numMessagesInQueue(), (int)0);
        Assert.assertEquals((int)messageSet.size(), (int)halfOfMsgs);
        producer.close();
        reader.close();
    }

    @Test
    public void testReaderOnSpecificMessage() throws Exception {
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderOnSpecificMessage").create();
        ArrayList<MessageId> messageIds = new ArrayList<MessageId>();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            messageIds.add(producer.send((Object)message.getBytes()));
        }
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderOnSpecificMessage").startMessageId((MessageId)messageIds.get(4)).create();
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 5; i < 10; ++i) {
            msg = reader.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        reader.close();
        producer.close();
    }

    @Test
    public void testReaderOnSpecificMessageWithBatches() throws Exception {
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderOnSpecificMessageWithBatches").enableBatching(true).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.sendAsync((Object)message.getBytes());
        }
        producer.send((Object)"my-message-10".getBytes());
        Reader reader1 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderOnSpecificMessageWithBatches").startMessageId(MessageId.earliest).create();
        Object lastMessageId = null;
        for (int i = 0; i < 5; ++i) {
            Message msg = reader1.readNext();
            lastMessageId = msg.getMessageId();
        }
        Assert.assertEquals(lastMessageId.getClass(), BatchMessageIdImpl.class);
        System.out.println("CREATING READER ON MSG ID: " + lastMessageId);
        Reader reader2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderOnSpecificMessageWithBatches").startMessageId((MessageId)lastMessageId).create();
        for (int i = 5; i < 11; ++i) {
            Message msg = reader2.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            Assert.assertEquals((String)receivedMessage, (String)expectedMessage);
        }
        producer.close();
    }

    @Test
    public void testECDSAEncryption() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 10;
        HashSet messageSet = Sets.newHashSet();
        class EncKeyReader
        implements CryptoKeyReader {
            final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            EncKeyReader() {
            }

            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }

            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }
        }
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/test-reader-myecdsa-topic1").startMessageId(MessageId.latest).cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/test-reader-myecdsa-topic1").addEncryptionKey("client-ecdsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        for (int i = 0; i < 10; ++i) {
            msg = reader.readNext(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        producer.close();
        reader.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testMultiReaderECDSAEncryption() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int totalMsg = 10;
        HashSet messageSet = Sets.newHashSet();
        String topic = "persistent://my-property/my-ns/test-multi-reader-myecdsa-topic1";
        this.admin.topics().createPartitionedTopic(topic, 3);
        class EncKeyReader
        implements CryptoKeyReader {
            final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            EncKeyReader() {
            }

            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }

            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }
        }
        Reader reader = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest).cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        Producer producer = this.pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        for (int i = 0; i < 10; ++i) {
            msg = reader.readNext(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            Assert.assertTrue((boolean)messageSet.add(receivedMessage), (String)("Received duplicate message " + receivedMessage));
        }
        producer.close();
        reader.close();
    }

    @Test
    public void testDefaultCryptoKeyReader() throws Exception {
        int i;
        int i2;
        String topic = "persistent://my-property/my-ns/test-reader-default-crypto-key-reader" + System.currentTimeMillis();
        String ecdsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem";
        String ecdsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-ecdsa.pem";
        String ecdsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlIS01JR2pCZ2NxaGtqT1BRSUJNSUdYQWdFQk1Cd0dCeXFHU000OUFRRUNFUUQvLy8vOS8vLy8vLy8vLy8vLwovLy8vTURzRUVQLy8vLzMvLy8vLy8vLy8vLy8vLy93RUVPaDFlY0VRZWZROTJDU1pQQ3p1WHRNREZRQUFEZzFOCmFXNW5hSFZoVVhVTXdEcEVjOUEyZVFRaEJCWWY5MUtMaVpzdERDaGdmS1VzVzRiUFdzZzVXNi9yRThBdG9wTGQKN1hxREFoRUEvLy8vL2dBQUFBQjFvdzBia0RpaEZRSUJBUU1pQUFUcktqNlJQSEdQTktjWktJT2NjTjR0Z0VOTQpuMWR6S2pMck1aVGtKNG9BYVE9PQotLS0tLUVORCBQVUJMSUMgS0VZLS0tLS0K";
        String ecdsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBFQyBQQVJBTUVURVJTLS0tLS0KTUlHWEFnRUJNQndHQnlxR1NNNDlBUUVDRVFELy8vLzkvLy8vLy8vLy8vLy8vLy8vTURzRUVQLy8vLzMvLy8vLwovLy8vLy8vLy8vd0VFT2gxZWNFUWVmUTkyQ1NaUEN6dVh0TURGUUFBRGcxTmFXNW5hSFZoVVhVTXdEcEVjOUEyCmVRUWhCQllmOTFLTGlac3REQ2hnZktVc1c0YlBXc2c1VzYvckU4QXRvcExkN1hxREFoRUEvLy8vL2dBQUFBQjEKb3cwYmtEaWhGUUlCQVE9PQotLS0tLUVORCBFQyBQQVJBTUVURVJTLS0tLS0KLS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1JSFlBZ0VCQkJEZXU5aGM4a092TDNwbCtMWVNqTHE5b0lHYU1JR1hBZ0VCTUJ3R0J5cUdTTTQ5QVFFQ0VRRC8KLy8vOS8vLy8vLy8vLy8vLy8vLy9NRHNFRVAvLy8vMy8vLy8vLy8vLy8vLy8vL3dFRU9oMWVjRVFlZlE5MkNTWgpQQ3p1WHRNREZRQUFEZzFOYVc1bmFIVmhVWFVNd0RwRWM5QTJlUVFoQkJZZjkxS0xpWnN0RENoZ2ZLVXNXNGJQCldzZzVXNi9yRThBdG9wTGQ3WHFEQWhFQS8vLy8vZ0FBQUFCMW93MGJrRGloRlFJQkFhRWtBeUlBQk9zcVBwRTgKY1k4MHB4a29nNXh3M2kyQVEweWZWM01xTXVzeGxPUW5pZ0JwCi0tLS0tRU5EIEVDIFBSSVZBVEUgS0VZLS0tLS0K";
        String rsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-rsa.pem";
        String rsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-rsa.pem";
        String rsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0aHB2blhMdkNtRzRNKzZ4dFl0RCtucGNWUFp3MWkxUjkwZk1zCjdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG5OS2dXN2M3SUJNclAwQUVtMzdIVHUwTFNPalAyT0hYbHZ2bFEKR1FJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg==";
        String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQrbnBjVlBadzFpMVI5MGZNczdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG4KTktnVzdjN0lCTXJQMEFFbTM3SFR1MExTT2pQMk9IWGx2dmxRR1FJREFRQUJBb0lCQUFhSkZBaTJDN3UzY05yZgpBc3RZOXZWRExvTEl2SEZabGtCa3RqS1pEWW1WSXNSYitoU0NWaXdWVXJXTEw2N1I2K0l2NGVnNERlVE9BeDAwCjhwbmNYS2daVHcyd0liMS9RalIvWS9SamxhQzhsa2RtUldsaTd1ZE1RQ1pWc3lodVNqVzZQajd2cjhZRTR3b2oKRmhOaWp4RUdjZjl3V3JtTUpyemRuVFdRaVhCeW8rZVR2VVE5QlBnUEdyUmpzTVptVGtMeUFWSmZmMkRmeE81YgpJV0ZEWURKY3lZQU1DSU1RdTd2eXMvSTUwb3U2aWxiMUNPNlFNNlo3S3BQZU9vVkZQd3R6Ymg4Y2Y5eE04VU5TCmo2Si9KbWRXaGdJMzRHUzNOQTY4eFRRNlBWN3pqbmhDYytpY2NtM0pLeXpHWHdhQXBBWitFb2NlLzlqNFdLbXUKNUI0emlSMENnWUVBM2wvOU9IYmwxem15VityUnhXT0lqL2kyclR2SHp3Qm5iblBKeXVlbUw1Vk1GZHBHb2RRMwp2d0h2eVFtY0VDUlZSeG1Yb2pRNFF1UFBIczNxcDZ3RUVGUENXeENoTFNUeGxVYzg1U09GSFdVMk85OWpWN3pJCjcrSk9wREsvTXN0c3g5bkhnWGR1SkYrZ2xURnRBM0xIOE9xeWx6dTJhRlBzcHJ3S3VaZjk0UThDZ1lFQXovWngKYWtFRytQRU10UDVZUzI4Y1g1WGZqc0lYL1YyNkZzNi9zSDE2UWpVSUVkZEU1VDRmQ3Vva3hDalNpd1VjV2htbApwSEVKNVM1eHAzVllSZklTVzNqUlczcXN0SUgxdHBaaXBCNitTMHpUdUptTEpiQTNJaVdFZzJydE10N1gxdUp2CkEvYllPcWUwaE9QVHVYdVpkdFZaMG5NVEtrN0dHOE82VmtCSTdGY0NnWUVBa0RmQ21zY0pnczdKYWhsQldIbVgKekg5cHdlbStTUEtqSWMvNE5CNk4rZGdpa3gyUHAwNWhwUC9WaWhVd1lJdWZ2cy9MTm9nVllOUXJ0SGVwVW5yTgoyK1RtYkhiWmdOU3YxTGR4dDgyVWZCN3kwRnV0S3U2bGhtWEh5TmVjaG8zRmk4c2loMFYwYWlTV21ZdUhmckFICkdhaXNrRVpLbzFpaVp2UVhKSXg5TzJNQ2dZQVRCZjByOWhUWU10eXh0YzZIMy9zZGQwMUM5dGhROGdEeTB5alAKMFRxYzBkTVNKcm9EcW1JV2tvS1lldzkvYmhGQTRMVzVUQ25Xa0NBUGJIbU50RzRmZGZiWXdta0gvaGRuQTJ5MApqS2RscGZwOEdYZVVGQUdIR3gxN0ZBM3NxRnZnS1VoMGVXRWdSSFVMN3ZkUU1WRkJnSlM5M283elFNOTRmTGdQCjZjT0I4d0tCZ0ZjR1Y0R2pJMld3OWNpbGxhQzU1NE12b1NqZjhCLyswNGtYekRPaDhpWUlJek85RVVpbDFqaksKSnZ4cDRobkx6VEtXYnV4M01FV3F1ckxrWWFzNkdwS0JqdytpTk9DYXI2WWRxV0dWcU0zUlV4N1BUVWFad2tLeApVZFA2M0lmWTdpWkNJVC9RYnlIUXZJVWUyTWFpVm5IK3VseGRrSzZZNWU3Z3hjYmNrSUg0Ci0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg==";
        int numMsg = 10;
        HashMap privateKeyFileMap = Maps.newHashMap();
        privateKeyFileMap.put("client-ecdsa.pem", "file:./src/test/resources/certificate/private-key.client-ecdsa.pem");
        privateKeyFileMap.put("client-rsa.pem", "file:./src/test/resources/certificate/private-key.client-rsa.pem");
        HashMap privateKeyDataMap = Maps.newHashMap();
        privateKeyDataMap.put("client-ecdsa.pem", "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBFQyBQQVJBTUVURVJTLS0tLS0KTUlHWEFnRUJNQndHQnlxR1NNNDlBUUVDRVFELy8vLzkvLy8vLy8vLy8vLy8vLy8vTURzRUVQLy8vLzMvLy8vLwovLy8vLy8vLy8vd0VFT2gxZWNFUWVmUTkyQ1NaUEN6dVh0TURGUUFBRGcxTmFXNW5hSFZoVVhVTXdEcEVjOUEyCmVRUWhCQllmOTFLTGlac3REQ2hnZktVc1c0YlBXc2c1VzYvckU4QXRvcExkN1hxREFoRUEvLy8vL2dBQUFBQjEKb3cwYmtEaWhGUUlCQVE9PQotLS0tLUVORCBFQyBQQVJBTUVURVJTLS0tLS0KLS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1JSFlBZ0VCQkJEZXU5aGM4a092TDNwbCtMWVNqTHE5b0lHYU1JR1hBZ0VCTUJ3R0J5cUdTTTQ5QVFFQ0VRRC8KLy8vOS8vLy8vLy8vLy8vLy8vLy9NRHNFRVAvLy8vMy8vLy8vLy8vLy8vLy8vL3dFRU9oMWVjRVFlZlE5MkNTWgpQQ3p1WHRNREZRQUFEZzFOYVc1bmFIVmhVWFVNd0RwRWM5QTJlUVFoQkJZZjkxS0xpWnN0RENoZ2ZLVXNXNGJQCldzZzVXNi9yRThBdG9wTGQ3WHFEQWhFQS8vLy8vZ0FBQUFCMW93MGJrRGloRlFJQkFhRWtBeUlBQk9zcVBwRTgKY1k4MHB4a29nNXh3M2kyQVEweWZWM01xTXVzeGxPUW5pZ0JwCi0tLS0tRU5EIEVDIFBSSVZBVEUgS0VZLS0tLS0K");
        privateKeyDataMap.put("client-rsa.pem", "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQrbnBjVlBadzFpMVI5MGZNczdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG4KTktnVzdjN0lCTXJQMEFFbTM3SFR1MExTT2pQMk9IWGx2dmxRR1FJREFRQUJBb0lCQUFhSkZBaTJDN3UzY05yZgpBc3RZOXZWRExvTEl2SEZabGtCa3RqS1pEWW1WSXNSYitoU0NWaXdWVXJXTEw2N1I2K0l2NGVnNERlVE9BeDAwCjhwbmNYS2daVHcyd0liMS9RalIvWS9SamxhQzhsa2RtUldsaTd1ZE1RQ1pWc3lodVNqVzZQajd2cjhZRTR3b2oKRmhOaWp4RUdjZjl3V3JtTUpyemRuVFdRaVhCeW8rZVR2VVE5QlBnUEdyUmpzTVptVGtMeUFWSmZmMkRmeE81YgpJV0ZEWURKY3lZQU1DSU1RdTd2eXMvSTUwb3U2aWxiMUNPNlFNNlo3S3BQZU9vVkZQd3R6Ymg4Y2Y5eE04VU5TCmo2Si9KbWRXaGdJMzRHUzNOQTY4eFRRNlBWN3pqbmhDYytpY2NtM0pLeXpHWHdhQXBBWitFb2NlLzlqNFdLbXUKNUI0emlSMENnWUVBM2wvOU9IYmwxem15VityUnhXT0lqL2kyclR2SHp3Qm5iblBKeXVlbUw1Vk1GZHBHb2RRMwp2d0h2eVFtY0VDUlZSeG1Yb2pRNFF1UFBIczNxcDZ3RUVGUENXeENoTFNUeGxVYzg1U09GSFdVMk85OWpWN3pJCjcrSk9wREsvTXN0c3g5bkhnWGR1SkYrZ2xURnRBM0xIOE9xeWx6dTJhRlBzcHJ3S3VaZjk0UThDZ1lFQXovWngKYWtFRytQRU10UDVZUzI4Y1g1WGZqc0lYL1YyNkZzNi9zSDE2UWpVSUVkZEU1VDRmQ3Vva3hDalNpd1VjV2htbApwSEVKNVM1eHAzVllSZklTVzNqUlczcXN0SUgxdHBaaXBCNitTMHpUdUptTEpiQTNJaVdFZzJydE10N1gxdUp2CkEvYllPcWUwaE9QVHVYdVpkdFZaMG5NVEtrN0dHOE82VmtCSTdGY0NnWUVBa0RmQ21zY0pnczdKYWhsQldIbVgKekg5cHdlbStTUEtqSWMvNE5CNk4rZGdpa3gyUHAwNWhwUC9WaWhVd1lJdWZ2cy9MTm9nVllOUXJ0SGVwVW5yTgoyK1RtYkhiWmdOU3YxTGR4dDgyVWZCN3kwRnV0S3U2bGhtWEh5TmVjaG8zRmk4c2loMFYwYWlTV21ZdUhmckFICkdhaXNrRVpLbzFpaVp2UVhKSXg5TzJNQ2dZQVRCZjByOWhUWU10eXh0YzZIMy9zZGQwMUM5dGhROGdEeTB5alAKMFRxYzBkTVNKcm9EcW1JV2tvS1lldzkvYmhGQTRMVzVUQ25Xa0NBUGJIbU50RzRmZGZiWXdta0gvaGRuQTJ5MApqS2RscGZwOEdYZVVGQUdIR3gxN0ZBM3NxRnZnS1VoMGVXRWdSSFVMN3ZkUU1WRkJnSlM5M283elFNOTRmTGdQCjZjT0I4d0tCZ0ZjR1Y0R2pJMld3OWNpbGxhQzU1NE12b1NqZjhCLyswNGtYekRPaDhpWUlJek85RVVpbDFqaksKSnZ4cDRobkx6VEtXYnV4M01FV3F1ckxrWWFzNkdwS0JqdytpTk9DYXI2WWRxV0dWcU0zUlV4N1BUVWFad2tLeApVZFA2M0lmWTdpWkNJVC9RYnlIUXZJVWUyTWFpVm5IK3VseGRrSzZZNWU3Z3hjYmNrSUg0Ci0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg==");
        Reader reader1 = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest).defaultCryptoKeyReader("file:./src/test/resources/certificate/private-key.client-ecdsa.pem").create();
        Reader reader2 = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest).defaultCryptoKeyReader("data:application/x-pem-file;base64,LS0tLS1CRUdJTiBFQyBQQVJBTUVURVJTLS0tLS0KTUlHWEFnRUJNQndHQnlxR1NNNDlBUUVDRVFELy8vLzkvLy8vLy8vLy8vLy8vLy8vTURzRUVQLy8vLzMvLy8vLwovLy8vLy8vLy8vd0VFT2gxZWNFUWVmUTkyQ1NaUEN6dVh0TURGUUFBRGcxTmFXNW5hSFZoVVhVTXdEcEVjOUEyCmVRUWhCQllmOTFLTGlac3REQ2hnZktVc1c0YlBXc2c1VzYvckU4QXRvcExkN1hxREFoRUEvLy8vL2dBQUFBQjEKb3cwYmtEaWhGUUlCQVE9PQotLS0tLUVORCBFQyBQQVJBTUVURVJTLS0tLS0KLS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1JSFlBZ0VCQkJEZXU5aGM4a092TDNwbCtMWVNqTHE5b0lHYU1JR1hBZ0VCTUJ3R0J5cUdTTTQ5QVFFQ0VRRC8KLy8vOS8vLy8vLy8vLy8vLy8vLy9NRHNFRVAvLy8vMy8vLy8vLy8vLy8vLy8vL3dFRU9oMWVjRVFlZlE5MkNTWgpQQ3p1WHRNREZRQUFEZzFOYVc1bmFIVmhVWFVNd0RwRWM5QTJlUVFoQkJZZjkxS0xpWnN0RENoZ2ZLVXNXNGJQCldzZzVXNi9yRThBdG9wTGQ3WHFEQWhFQS8vLy8vZ0FBQUFCMW93MGJrRGloRlFJQkFhRWtBeUlBQk9zcVBwRTgKY1k4MHB4a29nNXh3M2kyQVEweWZWM01xTXVzeGxPUW5pZ0JwCi0tLS0tRU5EIEVDIFBSSVZBVEUgS0VZLS0tLS0K").create();
        Reader reader3 = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest).defaultCryptoKeyReader((Map)privateKeyFileMap).create();
        Reader reader4 = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest).defaultCryptoKeyReader((Map)privateKeyDataMap).create();
        Producer producer1 = this.pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem").defaultCryptoKeyReader("file:./src/test/resources/certificate/public-key.client-ecdsa.pem").create();
        Producer producer2 = this.pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem").defaultCryptoKeyReader("data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlIS01JR2pCZ2NxaGtqT1BRSUJNSUdYQWdFQk1Cd0dCeXFHU000OUFRRUNFUUQvLy8vOS8vLy8vLy8vLy8vLwovLy8vTURzRUVQLy8vLzMvLy8vLy8vLy8vLy8vLy93RUVPaDFlY0VRZWZROTJDU1pQQ3p1WHRNREZRQUFEZzFOCmFXNW5hSFZoVVhVTXdEcEVjOUEyZVFRaEJCWWY5MUtMaVpzdERDaGdmS1VzVzRiUFdzZzVXNi9yRThBdG9wTGQKN1hxREFoRUEvLy8vL2dBQUFBQjFvdzBia0RpaEZRSUJBUU1pQUFUcktqNlJQSEdQTktjWktJT2NjTjR0Z0VOTQpuMWR6S2pMck1aVGtKNG9BYVE9PQotLS0tLUVORCBQVUJMSUMgS0VZLS0tLS0K").create();
        for (i2 = 0; i2 < 10; ++i2) {
            producer1.send((Object)("my-message-" + i2).getBytes());
        }
        for (i2 = 10; i2 < 20; ++i2) {
            producer2.send((Object)("my-message-" + i2).getBytes());
        }
        producer1.close();
        producer2.close();
        for (Reader reader : Lists.newArrayList((Object[])new Reader[]{reader1, reader2})) {
            for (i = 0; i < 20; ++i) {
                MessageImpl msg = (MessageImpl)reader.readNext(5, TimeUnit.SECONDS);
                msg.getEncryptionCtx().orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
                Assert.assertEquals((String)new String(msg.getData()), (String)("my-message-" + i));
            }
        }
        reader1.close();
        reader2.close();
        Producer producer3 = this.pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem").defaultCryptoKeyReader("file:./src/test/resources/certificate/public-key.client-rsa.pem").create();
        Producer producer4 = this.pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem").defaultCryptoKeyReader("data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0aHB2blhMdkNtRzRNKzZ4dFl0RCtucGNWUFp3MWkxUjkwZk1zCjdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG5OS2dXN2M3SUJNclAwQUVtMzdIVHUwTFNPalAyT0hYbHZ2bFEKR1FJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg==").create();
        for (i = 20; i < 30; ++i) {
            producer3.send((Object)("my-message-" + i).getBytes());
        }
        for (i = 30; i < 40; ++i) {
            producer4.send((Object)("my-message-" + i).getBytes());
        }
        producer3.close();
        producer4.close();
        for (Reader reader : Lists.newArrayList((Object[])new Reader[]{reader3, reader4})) {
            for (int i3 = 0; i3 < 40; ++i3) {
                MessageImpl msg = (MessageImpl)reader.readNext(5, TimeUnit.SECONDS);
                msg.getEncryptionCtx().orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
                Assert.assertEquals((String)new String(msg.getData()), (String)("my-message-" + i3));
            }
        }
        reader3.close();
        reader4.close();
    }

    @Test
    public void testSimpleReaderReachEndOfTopic() throws Exception {
        String expectedMessage;
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReaderReachEndOfTopic").startMessageId(MessageId.earliest).create();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testSimpleReaderReachEndOfTopic").create();
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        for (int i = 0; i < 100; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        MessageImpl msg = null;
        HashSet messageSet = Sets.newHashSet();
        int index = 0;
        while (reader.hasMessageAvailable()) {
            msg = (MessageImpl)reader.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            expectedMessage = "my-message-" + index++;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        Assert.assertEquals((int)index, (int)100);
        Assert.assertNull((Object)reader.readNext(1, TimeUnit.SECONDS));
        for (int i = 100; i < 200; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        while (reader.hasMessageAvailable()) {
            msg = (MessageImpl)reader.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            expectedMessage = "my-message-" + index++;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        Assert.assertEquals((int)index, (int)200);
        Assert.assertNull((Object)reader.readNext(1, TimeUnit.SECONDS));
        reader.close();
        producer.close();
    }

    @Test
    public void testSimpleMultiReaderReachEndOfTopic() throws Exception {
        String topic = "persistent://my-property/my-ns/testSimpleMultiReaderReachEndOfTopic";
        this.admin.topics().createPartitionedTopic(topic, 3);
        Reader reader = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).create();
        Producer producer = this.pulsarClient.newProducer().topic(topic).create();
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        for (int i = 0; i < 100; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        TopicMessageImpl msg = null;
        HashSet messageSet = Sets.newHashSet();
        int index = 0;
        while (reader.hasMessageAvailable()) {
            msg = (TopicMessageImpl)reader.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            ++index;
            Assert.assertTrue((boolean)messageSet.add(receivedMessage), (String)("Received duplicate message " + receivedMessage));
        }
        Assert.assertEquals((int)index, (int)100);
        Assert.assertNull((Object)reader.readNext(1, TimeUnit.SECONDS));
        for (int i = 100; i < 200; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        while (reader.hasMessageAvailable()) {
            msg = (TopicMessageImpl)reader.readNext(1, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            ++index;
            Assert.assertTrue((boolean)messageSet.add(receivedMessage), (String)("Received duplicate message " + receivedMessage));
        }
        Assert.assertEquals((int)index, (int)200);
        Assert.assertNull((Object)reader.readNext(1, TimeUnit.SECONDS));
        reader.close();
        producer.close();
    }

    @Test
    public void testReaderReachEndOfTopicOnMessageWithBatches() throws Exception {
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderReachEndOfTopicOnMessageWithBatches").startMessageId(MessageId.earliest).create();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderReachEndOfTopicOnMessageWithBatches").enableBatching(true).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).create();
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        for (int i = 0; i < 100; ++i) {
            String message = "my-message-" + i;
            producer.sendAsync((Object)message.getBytes());
        }
        producer.send((Object)"my-message-10".getBytes());
        MessageId lastMessageId = null;
        int index = 0;
        Assert.assertTrue((boolean)reader.hasMessageAvailable());
        if (reader.hasMessageAvailable()) {
            Message msg = reader.readNext();
            lastMessageId = msg.getMessageId();
            Assert.assertEquals(lastMessageId.getClass(), BatchMessageIdImpl.class);
            while (msg != null) {
                ++index;
                msg = reader.readNext(100, TimeUnit.MILLISECONDS);
            }
            Assert.assertEquals((int)index, (int)101);
        }
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        reader.close();
        producer.close();
    }

    @Test
    public void testMultiReaderReachEndOfTopicOnMessageWithBatches() throws Exception {
        String topic = "persistent://my-property/my-ns/testMultiReaderReachEndOfTopicOnMessageWithBatches";
        this.admin.topics().createPartitionedTopic(topic, 3);
        Reader reader = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).create();
        Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(true).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).create();
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        for (int i = 0; i < 100; ++i) {
            String message = "my-message-" + i;
            producer.sendAsync((Object)message.getBytes());
        }
        producer.send((Object)"my-message-10".getBytes());
        MessageId lastMessageId = null;
        int index = 0;
        Assert.assertTrue((boolean)reader.hasMessageAvailable());
        if (reader.hasMessageAvailable()) {
            Message msg = reader.readNext();
            lastMessageId = msg.getMessageId();
            Assert.assertEquals(lastMessageId.getClass(), TopicMessageIdImpl.class);
            while (msg != null) {
                ++index;
                msg = reader.readNext(100, TimeUnit.MILLISECONDS);
            }
            Assert.assertEquals((int)index, (int)101);
        }
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        reader.close();
        producer.close();
    }

    @Test
    public void testMessageAvailableAfterRestart() throws Exception {
        String topic = "persistent://my-property/use/my-ns/testMessageAvailableAfterRestart";
        String content = "my-message-1";
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").subscribe().close();
        try (Reader reader = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).create();){
            Assert.assertFalse((boolean)reader.hasMessageAvailable());
        }
        var4_4 = null;
        try (Producer producer = this.pulsarClient.newProducer().topic(topic).create();){
            producer.send((Object)content.getBytes());
        }
        catch (Throwable throwable) {
            var4_4 = throwable;
            throw throwable;
        }
        reader = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).create();
        var4_4 = null;
        try {
            Assert.assertTrue((boolean)reader.hasMessageAvailable());
        }
        catch (Throwable throwable) {
            var4_4 = throwable;
            throw throwable;
        }
        finally {
            if (reader != null) {
                if (var4_4 != null) {
                    try {
                        reader.close();
                    }
                    catch (Throwable throwable) {
                        var4_4.addSuppressed(throwable);
                    }
                } else {
                    reader.close();
                }
            }
        }
        ((Topic)this.pulsar.getBrokerService().getTopicReference(topic).get()).close(false).get();
        reader = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).create();
        var4_4 = null;
        try {
            Assert.assertTrue((boolean)reader.hasMessageAvailable());
            String readOut = new String(reader.readNext().getData());
            Assert.assertEquals((String)content, (String)readOut);
            Assert.assertFalse((boolean)reader.hasMessageAvailable());
        }
        catch (Throwable throwable) {
            var4_4 = throwable;
            throw throwable;
        }
        finally {
            if (reader != null) {
                if (var4_4 != null) {
                    try {
                        reader.close();
                    }
                    catch (Throwable throwable) {
                        var4_4.addSuppressed(throwable);
                    }
                } else {
                    reader.close();
                }
            }
        }
    }

    @Test
    public void testMultiReaderMessageAvailableAfterRestart() throws Exception {
        String topic = "persistent://my-property/use/my-ns/testMessageAvailableAfterRestart2";
        String content = "my-message-1";
        this.admin.topics().createPartitionedTopic(topic, 3);
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub2").subscribe().close();
        try (Reader reader = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).create();){
            Assert.assertFalse((boolean)reader.hasMessageAvailable());
        }
        var4_4 = null;
        try (Producer producer = this.pulsarClient.newProducer().topic(topic).create();){
            producer.send((Object)content.getBytes());
        }
        catch (Throwable throwable) {
            var4_4 = throwable;
            throw throwable;
        }
        reader = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).create();
        var4_4 = null;
        try {
            Assert.assertTrue((boolean)reader.hasMessageAvailable());
        }
        catch (Throwable throwable) {
            var4_4 = throwable;
            throw throwable;
        }
        finally {
            if (reader != null) {
                if (var4_4 != null) {
                    try {
                        reader.close();
                    }
                    catch (Throwable throwable) {
                        var4_4.addSuppressed(throwable);
                    }
                } else {
                    reader.close();
                }
            }
        }
        this.pulsar.getBrokerService().getTopics().keys().forEach(topicName -> {
            try {
                ((Topic)this.pulsar.getBrokerService().getTopicReference(topicName).get()).close(false).get();
            }
            catch (Exception e) {
                Assert.fail();
            }
        });
        reader = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).create();
        var4_4 = null;
        try {
            Assert.assertTrue((boolean)reader.hasMessageAvailable());
            String readOut = new String(reader.readNext().getData());
            Assert.assertEquals((String)content, (String)readOut);
            Assert.assertFalse((boolean)reader.hasMessageAvailable());
        }
        catch (Throwable throwable) {
            var4_4 = throwable;
            throw throwable;
        }
        finally {
            if (reader != null) {
                if (var4_4 != null) {
                    try {
                        reader.close();
                    }
                    catch (Throwable throwable) {
                        var4_4.addSuppressed(throwable);
                    }
                } else {
                    reader.close();
                }
            }
        }
    }

    @Test(dataProvider="variationsForHasMessageAvailable")
    public void testHasMessageAvailable(boolean enableBatch, boolean startInclusive) throws Exception {
        String topicName = "persistent://my-property/my-ns/HasMessageAvailable";
        int numOfMessage = 100;
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/HasMessageAvailable");
        if (enableBatch) {
            producerBuilder.enableBatching(true).batchingMaxMessages(10);
        } else {
            producerBuilder.enableBatching(false);
        }
        Producer producer = producerBuilder.create();
        CountDownLatch latch = new CountDownLatch(100);
        List allIds = Collections.synchronizedList(new ArrayList());
        for (int i = 0; i < 100; ++i) {
            producer.sendAsync((Object)String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> {
                if (e != null) {
                    Assert.fail();
                } else {
                    allIds.add(mid);
                }
                latch.countDown();
            });
        }
        latch.await();
        allIds.sort(null);
        for (MessageId messageId : allIds) {
            Reader reader = startInclusive ? this.pulsarClient.newReader().topic("persistent://my-property/my-ns/HasMessageAvailable").startMessageId(messageId).startMessageIdInclusive().create() : this.pulsarClient.newReader().topic("persistent://my-property/my-ns/HasMessageAvailable").startMessageId(messageId).create();
            if (startInclusive) {
                Assert.assertTrue((boolean)reader.hasMessageAvailable());
            } else if (messageId != allIds.get(allIds.size() - 1)) {
                Assert.assertTrue((boolean)reader.hasMessageAvailable());
            } else {
                Assert.assertFalse((boolean)reader.hasMessageAvailable());
            }
            reader.close();
        }
        producer.close();
    }

    @Test(timeOut=20000L)
    public void testHasMessageAvailableWithBatch() throws Exception {
        String topicName = "persistent://my-property/my-ns/testHasMessageAvailableWithBatch";
        int numOfMessage = 10;
        Producer producer = this.pulsarClient.newProducer().enableBatching(true).batchingMaxMessages(10).batchingMaxPublishDelay(2L, TimeUnit.SECONDS).topic("persistent://my-property/my-ns/testHasMessageAvailableWithBatch").create();
        MessageIdImpl messageId = (MessageIdImpl)producer.send((Object)"msg".getBytes());
        Assert.assertTrue((boolean)(messageId instanceof MessageIdImpl));
        ReaderImpl reader = (ReaderImpl)this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testHasMessageAvailableWithBatch").startMessageId((MessageId)messageId).startMessageIdInclusive().create();
        MessageIdImpl lastMsgId = (MessageIdImpl)reader.getConsumer().getLastMessageId();
        Assert.assertTrue((boolean)(messageId instanceof BatchMessageIdImpl));
        Assert.assertEquals((long)lastMsgId.getLedgerId(), (long)messageId.getLedgerId());
        Assert.assertEquals((long)lastMsgId.getEntryId(), (long)messageId.getEntryId());
        reader.close();
        CountDownLatch latch = new CountDownLatch(10);
        List<MessageId> allIds = Collections.synchronizedList(new ArrayList());
        for (int i = 0; i < 10; ++i) {
            producer.sendAsync((Object)String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> {
                if (e != null) {
                    Assert.fail();
                } else {
                    allIds.add(mid);
                }
                latch.countDown();
            });
        }
        producer.flush();
        latch.await();
        producer.close();
        for (MessageId id : allIds) {
            MessageId lastMessageId;
            reader = (ReaderImpl)this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testHasMessageAvailableWithBatch").startMessageId(id).startMessageIdInclusive().create();
            if (id instanceof BatchMessageIdImpl) {
                lastMessageId = reader.getConsumer().getLastMessageId();
                Assert.assertTrue((boolean)(lastMessageId instanceof BatchMessageIdImpl));
                log.info("id {} instance of BatchMessageIdImpl", (Object)id);
            } else {
                Assert.assertTrue((boolean)(id instanceof MessageIdImpl));
                lastMessageId = reader.getConsumer().getLastMessageId();
                Assert.assertTrue((boolean)(lastMessageId instanceof MessageIdImpl));
                log.info("id {} instance of MessageIdImpl", (Object)id);
            }
            reader.close();
        }
        producer = this.pulsarClient.newProducer().enableBatching(false).topic("persistent://my-property/my-ns/testHasMessageAvailableWithBatch").create();
        messageId = (MessageIdImpl)producer.send((Object)"non-batch".getBytes());
        Assert.assertFalse((boolean)(messageId instanceof BatchMessageIdImpl));
        Assert.assertTrue((boolean)(messageId instanceof MessageIdImpl));
        reader = (ReaderImpl)this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testHasMessageAvailableWithBatch").startMessageId((MessageId)messageId).create();
        MessageId lastMessageId = reader.getConsumer().getLastMessageId();
        Assert.assertFalse((boolean)(lastMessageId instanceof BatchMessageIdImpl));
        Assert.assertTrue((boolean)(lastMessageId instanceof MessageIdImpl));
        Assert.assertEquals((Object)lastMessageId, (Object)messageId);
        producer.close();
        reader.close();
    }

    @Test
    public void testReaderNonDurableIsAbleToSeekRelativeTime() throws Exception {
        int numOfMessage = 10;
        String topicName = "persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime").create();
        for (int i = 0; i < 10; ++i) {
            producer.send((Object)String.format("msg num %d", i).getBytes());
        }
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime").startMessageId(MessageId.earliest).create();
        Assert.assertTrue((boolean)reader.hasMessageAvailable());
        reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds((String)"-1m"));
        Assert.assertTrue((boolean)reader.hasMessageAvailable());
        reader.close();
        producer.close();
    }

    @Test
    public void testMultiReaderNonDurableIsAbleToSeekRelativeTime() throws Exception {
        int numOfMessage = 10;
        String topicName = "persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime";
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime", 3);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime").create();
        for (int i = 0; i < 10; ++i) {
            producer.send((Object)String.format("msg num %d", i).getBytes());
        }
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime").startMessageId(MessageId.earliest).create();
        Assert.assertTrue((boolean)reader.hasMessageAvailable());
        reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds((String)"-1m"));
        Assert.assertTrue((boolean)reader.hasMessageAvailable());
        reader.close();
        producer.close();
    }

    @Test
    public void testReaderIsAbleToSeekWithTimeOnBeginningOfTopic() throws Exception {
        String topicName = "persistent://my-property/my-ns/ReaderSeekWithTimeOnBeginningOfTopic";
        int numOfMessage = 10;
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderSeekWithTimeOnBeginningOfTopic").create();
        for (int i = 0; i < 10; ++i) {
            producer.send((Object)String.format("msg num %d", i).getBytes());
        }
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderSeekWithTimeOnBeginningOfTopic").startMessageId(MessageId.earliest).create();
        Assert.assertTrue((boolean)reader.hasMessageAvailable());
        HashSet messageSetA = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            Message message = reader.readNext();
            String receivedMessage = new String(message.getData());
            String expectedMessage = String.format("msg num %d", i);
            this.testMessageOrderAndDuplicates(messageSetA, receivedMessage, expectedMessage);
        }
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds((String)"-1m"));
        HashSet messageSetB = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            Message message = reader.readNext();
            String receivedMessage = new String(message.getData());
            String expectedMessage = String.format("msg num %d", i);
            this.testMessageOrderAndDuplicates(messageSetB, receivedMessage, expectedMessage);
        }
        Assert.assertTrue((boolean)reader.isConnected());
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        Assert.assertEquals((int)((ReaderImpl)reader).getConsumer().numMessagesInQueue(), (int)0);
        reader.close();
        producer.close();
    }

    @Test
    public void testMultiReaderIsAbleToSeekWithTimeOnBeginningOfTopic() throws Exception {
        String topicName = "persistent://my-property/my-ns/MultiReaderSeekWithTimeOnBeginningOfTopic";
        int numOfMessage = 10;
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/MultiReaderSeekWithTimeOnBeginningOfTopic", 3);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/MultiReaderSeekWithTimeOnBeginningOfTopic").create();
        for (int i = 0; i < 10; ++i) {
            producer.send((Object)String.format("msg num %d", i).getBytes());
        }
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/MultiReaderSeekWithTimeOnBeginningOfTopic").startMessageId(MessageId.earliest).create();
        Assert.assertTrue((boolean)reader.hasMessageAvailable());
        HashSet messageSetA = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            Message message = reader.readNext();
            String receivedMessage = new String(message.getData());
            Assert.assertTrue((boolean)messageSetA.add(receivedMessage), (String)("Received duplicate message " + receivedMessage));
        }
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds((String)"-1m"));
        HashSet messageSetB = Sets.newHashSet();
        for (int i = 0; i < 10; ++i) {
            Message message = reader.readNext();
            String receivedMessage = new String(message.getData());
            Assert.assertTrue((boolean)messageSetB.add(receivedMessage), (String)("Received duplicate message " + receivedMessage));
        }
        Assert.assertTrue((boolean)reader.isConnected());
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        Assert.assertEquals((int)((MultiTopicsReaderImpl)reader).getMultiTopicsConsumer().numMessagesInQueue(), (int)0);
        reader.close();
        producer.close();
    }

    @Test
    public void testReaderIsAbleToSeekWithMessageIdOnMiddleOfTopic() throws Exception {
        String topicName = "persistent://my-property/my-ns/ReaderSeekWithMessageIdOnMiddleOfTopic";
        int numOfMessage = 100;
        int halfMessages = 50;
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderSeekWithMessageIdOnMiddleOfTopic").create();
        for (int i = 0; i < 100; ++i) {
            producer.send((Object)String.format("msg num %d", i).getBytes());
        }
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderSeekWithMessageIdOnMiddleOfTopic").startMessageId(MessageId.earliest).create();
        Assert.assertTrue((boolean)reader.hasMessageAvailable());
        MessageId midmessageToSeek = null;
        HashSet messageSetA = Sets.newHashSet();
        for (int i = 0; i < 100; ++i) {
            Message message = reader.readNext();
            String receivedMessage = new String(message.getData());
            String expectedMessage = String.format("msg num %d", i);
            this.testMessageOrderAndDuplicates(messageSetA, receivedMessage, expectedMessage);
            if (i != 50) continue;
            midmessageToSeek = message.getMessageId();
        }
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        reader.seek(midmessageToSeek);
        HashSet messageSetB = Sets.newHashSet();
        for (int i = 51; i < 100; ++i) {
            Message message = reader.readNext();
            String receivedMessage = new String(message.getData());
            String expectedMessage = String.format("msg num %d", i);
            this.testMessageOrderAndDuplicates(messageSetB, receivedMessage, expectedMessage);
        }
        Assert.assertTrue((boolean)reader.isConnected());
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        Assert.assertEquals((int)((ReaderImpl)reader).getConsumer().numMessagesInQueue(), (int)0);
        reader.close();
        producer.close();
    }

    @Test
    public void testReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws Exception {
        String topicName = "persistent://my-property/my-ns/ReaderIsAbleToSeekWithTimeOnMiddleOfTopic";
        int numOfMessage = 10;
        int halfMessages = 5;
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderIsAbleToSeekWithTimeOnMiddleOfTopic").create();
        long l = System.currentTimeMillis();
        for (int i = 0; i < 10; ++i) {
            producer.send((Object)String.format("msg num %d", i).getBytes());
            Thread.sleep(100L);
        }
        Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderIsAbleToSeekWithTimeOnMiddleOfTopic").startMessageId(MessageId.earliest).create();
        int plusTime = 600;
        reader.seek(l + (long)plusTime);
        HashSet messageSet = Sets.newHashSet();
        for (int i = 6; i < 10; ++i) {
            Message message = reader.readNext();
            String receivedMessage = new String(message.getData());
            String expectedMessage = String.format("msg num %d", i);
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        reader.close();
        producer.close();
    }

    @Test
    public void testMultiReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws Exception {
        String topicName = "persistent://my-property/my-ns/testMultiReaderIsAbleToSeekWithTimeOnMiddleOfTopic" + System.currentTimeMillis();
        int numOfMessage = 10;
        int halfMessages = 5;
        this.admin.topics().createPartitionedTopic(topicName, 3);
        Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
        long halfTime = 0L;
        for (int i = 0; i < 10; ++i) {
            if (i == 5) {
                halfTime = System.currentTimeMillis();
            }
            producer.send((Object)String.format("msg num %d", i).getBytes());
        }
        Assert.assertTrue((halfTime != 0L ? 1 : 0) != 0);
        Reader reader = this.pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
        reader.seek(halfTime);
        HashSet messageSet = Sets.newHashSet();
        for (int i = 6; i < 10; ++i) {
            Message message = reader.readNext(10, TimeUnit.SECONDS);
            String receivedMessage = new String(message.getData());
            Assert.assertTrue((boolean)messageSet.add(receivedMessage), (String)("Received duplicate message " + receivedMessage));
        }
        reader.close();
        producer.close();
    }

    @Test(dataProvider="variationsForExpectedPos")
    public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages) throws Exception {
        String topicName = "persistent://my-property/my-ns/ReaderStartMessageIdAtExpectedPos";
        int resetIndex = new Random().nextInt(numOfMessages);
        int firstMessage = startInclusive ? resetIndex : resetIndex + 1;
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderStartMessageIdAtExpectedPos").enableBatching(batching).create();
        CountDownLatch latch = new CountDownLatch(numOfMessages);
        AtomicReference resetPos = new AtomicReference();
        for (int i = 0; i < numOfMessages; ++i) {
            int j = i;
            ((CompletableFuture)producer.sendAsync((Object)String.format("msg num %d", i).getBytes()).thenCompose(messageId -> FutureUtils.value((Object)Pair.of((Object)j, (Object)messageId)))).whenComplete((p, e) -> {
                if (e != null) {
                    Assert.fail((String)("send msg failed due to " + e.getMessage()));
                } else if ((Integer)p.getLeft() == resetIndex) {
                    resetPos.set(p.getRight());
                }
                latch.countDown();
            });
        }
        latch.await();
        ReaderBuilder readerBuilder = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderStartMessageIdAtExpectedPos").startMessageId((MessageId)resetPos.get());
        if (startInclusive) {
            readerBuilder.startMessageIdInclusive();
        }
        Reader reader = readerBuilder.create();
        HashSet messageSet = Sets.newHashSet();
        for (int i = firstMessage; i < numOfMessages; ++i) {
            Message message = reader.readNext();
            String receivedMessage = new String(message.getData());
            String expectedMessage = String.format("msg num %d", i);
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        Assert.assertTrue((boolean)reader.isConnected());
        Assert.assertEquals((int)((ReaderImpl)reader).getConsumer().numMessagesInQueue(), (int)0);
        Assert.assertEquals((int)messageSet.size(), (int)(numOfMessages - firstMessage));
        reader.close();
        producer.close();
    }

    @Test
    public void testReaderBuilderConcurrentCreate() throws Exception {
        int i;
        String topicName = "persistent://my-property/my-ns/testReaderBuilderConcurrentCreate_";
        int numTopic = 30;
        ReaderBuilder builder = this.pulsarClient.newReader().startMessageId(MessageId.earliest);
        ArrayList readers = Lists.newArrayListWithExpectedSize((int)numTopic);
        ArrayList producers = Lists.newArrayListWithExpectedSize((int)numTopic);
        for (i = 0; i < numTopic; ++i) {
            producers.add(this.pulsarClient.newProducer().topic(topicName + i).create());
        }
        for (i = 0; i < numTopic; ++i) {
            readers.add(builder.clone().topic(topicName + i).createAsync());
        }
        for (i = 0; i < numTopic; ++i) {
            Assert.assertEquals((String)((Reader)((CompletableFuture)readers.get(i)).get()).getTopic(), (String)(topicName + i));
            ((Reader)((CompletableFuture)readers.get(i)).get()).close();
            ((Producer)producers.get(i)).close();
        }
    }

    @Test(timeOut=10000L)
    public void testMultiReaderBuilderConcurrentCreate() throws Exception {
        int i;
        String topicName = "persistent://my-property/my-ns/testMultiReaderBuilderConcurrentCreate_";
        int numTopic = 30;
        ReaderBuilder builder = this.pulsarClient.newReader().startMessageId(MessageId.earliest);
        ArrayList readers = Lists.newArrayListWithExpectedSize((int)numTopic);
        ArrayList producers = Lists.newArrayListWithExpectedSize((int)numTopic);
        for (i = 0; i < numTopic; ++i) {
            this.admin.topics().createPartitionedTopic(topicName + i, 3);
            producers.add(this.pulsarClient.newProducer().topic(topicName + i).create());
        }
        for (i = 0; i < numTopic; ++i) {
            readers.add(builder.clone().topic(topicName + i).createAsync());
        }
        for (i = 0; i < numTopic; ++i) {
            Assert.assertTrue((boolean)((Reader)((CompletableFuture)readers.get(i)).get()).getTopic().startsWith("MultiTopicsConsumer-"));
            ((Reader)((CompletableFuture)readers.get(i)).get()).close();
            ((Producer)producers.get(i)).close();
        }
    }

    @Test
    public void testReaderStartInMiddleOfBatch() throws Exception {
        String topicName = "persistent://my-property/my-ns/ReaderStartInMiddleOfBatch";
        int numOfMessage = 100;
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderStartInMiddleOfBatch").enableBatching(true).batchingMaxMessages(10).create();
        CountDownLatch latch = new CountDownLatch(100);
        List<MessageId> allIds = Collections.synchronizedList(new ArrayList());
        for (int i = 0; i < 100; ++i) {
            producer.sendAsync((Object)String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> {
                if (e != null) {
                    Assert.fail();
                } else {
                    allIds.add(mid);
                }
                latch.countDown();
            });
        }
        latch.await();
        for (MessageId id : allIds) {
            Reader reader = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderStartInMiddleOfBatch").startMessageId(id).startMessageIdInclusive().create();
            MessageId idGot = reader.readNext().getMessageId();
            Assert.assertEquals((Object)idGot, (Object)id);
            reader.close();
        }
        producer.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHasMessageAvailableOnEmptyTopic() throws Exception {
        String topic = this.newTopicName();
        Reader r1 = this.pulsarClient.newReader(Schema.STRING).topic(topic).startMessageId(MessageId.earliest).create();
        try {
            Reader r2 = this.pulsarClient.newReader(Schema.STRING).topic(topic).startMessageId(MessageId.latest).create();
            try {
                Reader r2Inclusive = this.pulsarClient.newReader(Schema.STRING).topic(topic).startMessageId(MessageId.latest).startMessageIdInclusive().create();
                try {
                    Assert.assertFalse((boolean)r1.hasMessageAvailable());
                    Assert.assertFalse((boolean)r2.hasMessageAvailable());
                    Assert.assertFalse((boolean)r2Inclusive.hasMessageAvailable());
                    Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).create();
                    try {
                        producer.send((Object)"hello-1");
                        Assert.assertTrue((boolean)r1.hasMessageAvailable());
                        Assert.assertTrue((boolean)r2.hasMessageAvailable());
                        Assert.assertTrue((boolean)r2Inclusive.hasMessageAvailable());
                        Reader r3 = this.pulsarClient.newReader(Schema.STRING).topic(topic).startMessageId(MessageId.latest).create();
                        try {
                            Assert.assertFalse((boolean)r3.hasMessageAvailable());
                            producer.send((Object)"hello-2");
                            Assert.assertTrue((boolean)r1.hasMessageAvailable());
                            Assert.assertTrue((boolean)r2.hasMessageAvailable());
                            Assert.assertTrue((boolean)r2Inclusive.hasMessageAvailable());
                            Assert.assertTrue((boolean)r3.hasMessageAvailable());
                        }
                        finally {
                            if (Collections.singletonList(r3).get(0) != null) {
                                r3.close();
                            }
                        }
                    }
                    finally {
                        if (Collections.singletonList(producer).get(0) != null) {
                            producer.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(r2Inclusive).get(0) != null) {
                        r2Inclusive.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(r2).get(0) != null) {
                    r2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(r1).get(0) != null) {
                r1.close();
            }
        }
    }
}

