/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MultiTopicsReaderImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.apache.pulsar.schema.Schemas;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker-impl"})
public class ReaderTest
extends MockedPulsarServiceBaseTest {
    private static final Logger log = LoggerFactory.getLogger(ReaderTest.class);
    private static final String subscription = "reader-sub";

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("my-property", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        this.admin.namespaces().createNamespace("my-property/my-ns", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    private Set<String> publishMessages(String topic, int count, boolean enableBatch) throws Exception {
        HashSet<String> keys = new HashSet<String>();
        ProducerBuilder builder = this.pulsarClient.newProducer();
        builder.messageRoutingMode(MessageRoutingMode.SinglePartition);
        builder.maxPendingMessages(count);
        builder.batchingMaxPublishDelay(1L, TimeUnit.DAYS);
        builder.topic(topic);
        if (enableBatch) {
            builder.enableBatching(true);
            builder.batchingMaxMessages(count);
        } else {
            builder.enableBatching(false);
        }
        try (Producer producer = builder.create();){
            Future lastFuture = null;
            for (int i = 0; i < count; ++i) {
                String key = "key" + i;
                byte[] data = ("my-message-" + i).getBytes();
                lastFuture = producer.newMessage().key(key).value((Object)data).sendAsync();
                keys.add(key);
            }
            producer.flush();
            lastFuture.get();
        }
        return keys;
    }

    @Test
    public void testReadMessageWithoutBatching() throws Exception {
        String topic = "persistent://my-property/my-ns/my-reader-topic";
        this.testReadMessages(topic, false);
    }

    @Test
    public void testReadMessageWithoutBatchingWithMessageInclusive() throws Exception {
        String topic = "persistent://my-property/my-ns/my-reader-topic-inclusive";
        Set<String> keys = this.publishMessages(topic, 10, false);
        Reader reader = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest).startMessageIdInclusive().readerName(subscription).create();
        Assert.assertTrue((boolean)reader.hasMessageAvailable());
        Assert.assertTrue((boolean)keys.remove(reader.readNext().getKey()));
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
    }

    @Test
    public void testReadMessageWithBatching() throws Exception {
        String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching";
        this.testReadMessages(topic, true);
    }

    @Test
    public void testReadMessageWithBatchingWithMessageInclusive() throws Exception {
        String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching-inclusive";
        Set<String> keys = this.publishMessages(topic, 10, true);
        Reader reader = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest).startMessageIdInclusive().readerName(subscription).create();
        while (reader.hasMessageAvailable()) {
            Assert.assertTrue((boolean)keys.remove(reader.readNext().getKey()));
        }
        Assert.assertEquals((int)keys.size(), (int)9);
        Assert.assertFalse((boolean)keys.contains("key9"));
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
    }

    private void testReadMessages(String topic, boolean enableBatch) throws Exception {
        int numKeys = 10;
        Set<String> keys = this.publishMessages(topic, numKeys, enableBatch);
        Reader reader = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).readerName(subscription).create();
        while (reader.hasMessageAvailable()) {
            Message message = reader.readNext();
            Assert.assertTrue((boolean)keys.remove(message.getKey()));
        }
        Assert.assertTrue((boolean)keys.isEmpty());
        Reader readLatest = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest).readerName("reader-sublatest").create();
        Assert.assertFalse((boolean)readLatest.hasMessageAvailable());
    }

    @Test
    public void testMultiTopicSeekByFunction() throws Exception {
        Message message;
        String topicName = "persistent://my-property/my-ns/test" + UUID.randomUUID();
        int msgNum = 10;
        this.publishMessages(topicName, msgNum, false);
        Reader reader = this.pulsarClient.newReader().startMessageIdInclusive().startMessageId(MessageId.latest).topic(topicName).subscriptionName("my-sub").create();
        long now = System.currentTimeMillis();
        reader.seek(topic -> now);
        Assert.assertNull((Object)reader.readNext(1, TimeUnit.SECONDS));
        reader.seek(topic -> {
            Assert.assertFalse((boolean)TopicName.get((String)topic).isPartitioned());
            return now - 999999L;
        });
        int count = 0;
        while ((message = reader.readNext(1, TimeUnit.SECONDS)) != null) {
            ++count;
        }
        Assert.assertEquals((int)count, (int)msgNum);
        reader.seek(topic -> {
            Assert.assertFalse((boolean)TopicName.get((String)topic).isPartitioned());
            return MessageId.earliest;
        });
        count = 0;
        while ((message = reader.readNext(1, TimeUnit.SECONDS)) != null) {
            ++count;
        }
        Assert.assertEquals((int)count, (int)msgNum);
    }

    @Test
    public void testReadFromPartition() throws Exception {
        String topic = "persistent://my-property/my-ns/testReadFromPartition";
        String partition0 = topic + "-partition-0";
        this.admin.topics().createPartitionedTopic(topic, 4);
        int numKeys = 10;
        Set<String> keys = this.publishMessages(partition0, numKeys, false);
        Reader reader = this.pulsarClient.newReader().topic(partition0).startMessageId(MessageId.earliest).create();
        while (reader.hasMessageAvailable()) {
            Message message = reader.readNext();
            Assert.assertTrue((boolean)keys.remove(message.getKey()));
        }
        Assert.assertTrue((boolean)keys.isEmpty());
    }

    @Test
    public void testReaderWithTimeLong() throws Exception {
        Message msg;
        String ns = "my-property/my-ns";
        String topic = "persistent://" + ns + "/testReadFromPartition";
        RetentionPolicies retention = new RetentionPolicies(-1, -1);
        this.admin.namespaces().setRetention(ns, retention);
        ProducerBuilder produceBuilder = this.pulsarClient.newProducer();
        produceBuilder.topic(topic);
        produceBuilder.enableBatching(false);
        Producer producer = produceBuilder.create();
        MessageId lastMsgId = null;
        int totalMsg = 10;
        long oldMsgPublishTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(5L);
        for (int i = 0; i < totalMsg; ++i) {
            TypedMessageBuilderImpl msg2 = (TypedMessageBuilderImpl)producer.newMessage().value((Object)("old" + i).getBytes());
            msg2.getMetadataBuilder().setPublishTime(oldMsgPublishTime).setSequenceId((long)i).setProducerName(producer.getProducerName()).setReplicatedFrom("us-west1");
            lastMsgId = msg2.send();
        }
        long newMsgPublishTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1L);
        MessageId firstMsgId = null;
        for (int i = 0; i < totalMsg; ++i) {
            TypedMessageBuilderImpl msg3 = (TypedMessageBuilderImpl)producer.newMessage().value((Object)("new" + i).getBytes());
            msg3.getMetadataBuilder().setPublishTime(newMsgPublishTime).setProducerName(producer.getProducerName()).setReplicatedFrom("us-west1");
            MessageId msgId = msg3.send();
            if (firstMsgId != null) continue;
            firstMsgId = msgId;
        }
        Reader reader = this.pulsarClient.newReader().topic(topic).startMessageFromRollbackDuration(2L, TimeUnit.HOURS).create();
        ArrayList receivedMessageIds = Lists.newArrayList();
        while (reader.hasMessageAvailable() && (msg = reader.readNext(1, TimeUnit.SECONDS)) != null) {
            System.out.println("msg.getMessageId()=" + msg.getMessageId() + ", data=" + new String(msg.getData()));
            receivedMessageIds.add(msg.getMessageId());
        }
        Assert.assertEquals((int)receivedMessageIds.size(), (int)totalMsg);
        Assert.assertEquals(receivedMessageIds.get(0), firstMsgId);
        this.restartBroker();
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRemoveSubscriptionForReaderNeedRemoveCursor() throws IOException, PulsarAdminException {
        String topic = "persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor";
        Reader reader1 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor").startMessageId(MessageId.earliest).create();
        try {
            Reader reader2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor").startMessageId(MessageId.earliest).create();
            try {
                Assert.assertEquals((int)this.admin.topics().getStats("persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor").getSubscriptions().size(), (int)2);
                Assert.assertEquals((int)this.admin.topics().getInternalStats((String)"persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor", (boolean)false).cursors.size(), (int)2);
                reader1.close();
                Assert.assertEquals((int)this.admin.topics().getStats("persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor").getSubscriptions().size(), (int)1);
                Assert.assertEquals((int)this.admin.topics().getInternalStats((String)"persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor", (boolean)false).cursors.size(), (int)1);
                reader2.close();
                Assert.assertEquals((int)this.admin.topics().getStats("persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor").getSubscriptions().size(), (int)0);
                Assert.assertEquals((int)this.admin.topics().getInternalStats((String)"persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor", (boolean)false).cursors.size(), (int)0);
            }
            finally {
                if (Collections.singletonList(reader2).get(0) != null) {
                    reader2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(reader1).get(0) != null) {
                reader1.close();
            }
        }
    }

    @Test
    public void testReaderHasMessageAvailable() throws Exception {
        String topic = "persistent://my-property/my-ns/testReaderHasMessageAvailable" + System.currentTimeMillis();
        Reader reader = this.pulsarClient.newReader(Schema.STRING).topic(topic).startMessageId(MessageId.latest).startMessageIdInclusive().create();
        try {
            Assert.assertFalse((boolean)reader.hasMessageAvailable());
        }
        finally {
            if (Collections.singletonList(reader).get(0) != null) {
                reader.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKeyHashRangeReader() throws IOException {
        List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
        String topic = "persistent://my-property/my-ns/testKeyHashRangeReader";
        try {
            this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testKeyHashRangeReader").startMessageId(MessageId.earliest).keyHashRange(new Range[]{Range.of((int)0, (int)10000), Range.of((int)8000, (int)12000)}).create();
            Assert.fail((String)"should failed with unexpected key hash range");
        }
        catch (IllegalArgumentException e) {
            log.error("Create key hash range failed", (Throwable)e);
        }
        try {
            this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testKeyHashRangeReader").startMessageId(MessageId.earliest).keyHashRange(new Range[]{Range.of((int)30000, (int)20000)}).create();
            Assert.fail((String)"should failed with unexpected key hash range");
        }
        catch (IllegalArgumentException e) {
            log.error("Create key hash range failed", (Throwable)e);
        }
        try {
            this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testKeyHashRangeReader").startMessageId(MessageId.earliest).keyHashRange(new Range[]{Range.of((int)80000, (int)90000)}).create();
            Assert.fail((String)"should failed with unexpected key hash range");
        }
        catch (IllegalArgumentException e) {
            log.error("Create key hash range failed", (Throwable)e);
        }
        Reader reader = this.pulsarClient.newReader(Schema.STRING).topic("persistent://my-property/my-ns/testKeyHashRangeReader").startMessageId(MessageId.earliest).keyHashRange(new Range[]{Range.of((int)0, (int)32768)}).create();
        try {
            Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/testKeyHashRangeReader").enableBatching(false).create();
            try {
                Message msg;
                int expectedMessages = 0;
                for (String key : keys) {
                    int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % 65536;
                    if (slot <= 32768) {
                        ++expectedMessages;
                    }
                    producer.newMessage().key(key).value((Object)key).send();
                    log.info("Publish message to slot {}", (Object)slot);
                }
                ArrayList<Object> receivedMessages = new ArrayList<Object>();
                do {
                    if ((msg = reader.readNext(1, TimeUnit.SECONDS)) == null) continue;
                    receivedMessages.add(msg.getValue());
                } while (msg != null);
                Assert.assertTrue((expectedMessages > 0 ? 1 : 0) != 0);
                Assert.assertEquals((int)receivedMessages.size(), (int)expectedMessages);
                for (String string : receivedMessages) {
                    log.info("Receive message {}", (Object)string);
                    Assert.assertTrue((Integer.parseInt(string) <= 32768 ? 1 : 0) != 0);
                }
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(reader).get(0) != null) {
                reader.close();
            }
        }
    }

    @Test
    public void testReaderSubName() throws Exception {
        this.doTestReaderSubName(true);
        this.doTestReaderSubName(false);
    }

    private void doTestReaderSubName(boolean setPrefix) throws Exception {
        String topic = "persistent://my-property/my-ns/testReaderSubName" + System.currentTimeMillis();
        String subName = "my-sub-name";
        ReaderBuilder builder = this.pulsarClient.newReader(Schema.STRING).subscriptionName("my-sub-name").topic(topic).startMessageId(MessageId.earliest);
        if (setPrefix) {
            builder = builder.subscriptionRolePrefix("my-sub-name" + System.currentTimeMillis());
        }
        Reader reader = builder.create();
        ReaderImpl readerImpl = (ReaderImpl)reader;
        Assert.assertEquals((String)readerImpl.getConsumer().getSubscription(), (String)"my-sub-name");
        reader.close();
        String topic2 = "persistent://my-property/my-ns/testReaderSubName2" + System.currentTimeMillis();
        this.admin.topics().createPartitionedTopic(topic2, 3);
        builder = this.pulsarClient.newReader(Schema.STRING).subscriptionName("my-sub-name").topic(topic2).startMessageId(MessageId.earliest);
        if (setPrefix) {
            builder = builder.subscriptionRolePrefix("my-sub-name" + System.currentTimeMillis());
        }
        reader = builder.create();
        MultiTopicsReaderImpl multiTopicsReader = (MultiTopicsReaderImpl)reader;
        multiTopicsReader.getMultiTopicsConsumer().getConsumers().forEach(consumerImpl -> Assert.assertEquals((String)consumerImpl.getSubscription(), (String)"my-sub-name"));
        multiTopicsReader.close();
    }

    @Test
    public void testSameSubName() throws Exception {
        String topic = "persistent://my-property/my-ns/testSameSubName";
        String subName = "my-sub-name";
        Reader reader = this.pulsarClient.newReader(Schema.STRING).subscriptionName("my-sub-name").topic("persistent://my-property/my-ns/testSameSubName").startMessageId(MessageId.earliest).create();
        try (Reader ignored = this.pulsarClient.newReader(Schema.STRING).subscriptionName("my-sub-name").topic("persistent://my-property/my-ns/testSameSubName").startMessageId(MessageId.earliest).create();){
            Assert.fail((String)"should fail");
        }
        catch (PulsarClientException e) {
            Assert.assertTrue((boolean)(e instanceof PulsarClientException.ConsumerBusyException));
            Assert.assertTrue((boolean)e.getMessage().contains("Exclusive consumer is already connected"));
        }
        reader.close();
        this.pulsarClient.newReader(Schema.STRING).subscriptionName("my-sub-name").topic("persistent://my-property/my-ns/testSameSubName").startMessageId(MessageId.earliest).create().close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testAvoidUsingIoThreadToGetValueOfMessage() throws Exception {
        String topic = "persistent://my-property/my-ns/testAvoidUsingIoThreadToGetValueOfMessage";
        Producer producer = this.pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class)).topic("persistent://my-property/my-ns/testAvoidUsingIoThreadToGetValueOfMessage").create();
        try {
            producer.send((Object)new Schemas.PersonOne(1));
            Reader reader = this.pulsarClient.newReader(Schema.AVRO(Schemas.PersonOne.class)).topic("persistent://my-property/my-ns/testAvoidUsingIoThreadToGetValueOfMessage").startMessageId(MessageId.earliest).create();
            try {
                CountDownLatch latch = new CountDownLatch(1);
                ArrayList received = new ArrayList(1);
                Awaitility.await().untilAsserted(() -> Assert.assertTrue((((ReaderImpl)reader).getConsumer().incomingMessages.size() > 0 ? 1 : 0) != 0));
                reader.hasMessageAvailableAsync().whenComplete((has, e) -> {
                    if (e == null && has.booleanValue()) {
                        CompletableFuture future = reader.readNextAsync();
                        Awaitility.await().pollInterval(1L, TimeUnit.MILLISECONDS).untilAsserted(future::isDone);
                        future.whenComplete((msg, ex) -> {
                            if (ex == null) {
                                received.add(msg.getValue());
                            }
                            latch.countDown();
                        });
                    } else {
                        latch.countDown();
                    }
                });
                latch.await();
                Assert.assertEquals((int)received.size(), (int)1);
            }
            finally {
                if (Collections.singletonList(reader).get(0) != null) {
                    reader.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    @Test(timeOut=10000L)
    public void removeNonPersistentTopicReaderTest() throws Exception {
        String topic = "non-persistent://my-property/my-ns/non-topic";
        Reader reader = this.pulsarClient.newReader().topic("non-persistent://my-property/my-ns/non-topic").startMessageId(MessageId.earliest).create();
        Reader reader2 = this.pulsarClient.newReader().topic("non-persistent://my-property/my-ns/non-topic").startMessageId(MessageId.earliest).create();
        Awaitility.await().pollDelay(3L, TimeUnit.SECONDS).until(() -> {
            TopicStats topicStats = this.admin.topics().getStats("non-persistent://my-property/my-ns/non-topic");
            System.out.println("subscriptions size: " + topicStats.getSubscriptions().size());
            return topicStats.getSubscriptions().size() == 2;
        });
        reader.close();
        reader2.close();
        Awaitility.await().until(() -> {
            TopicStats topicStats = this.admin.topics().getStats("non-persistent://my-property/my-ns/non-topic");
            System.out.println("subscriptions size: " + topicStats.getSubscriptions().size());
            return topicStats.getSubscriptions().size() == 0;
        });
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"non-persistent://my-property/my-ns/non-topic"}).subscriptionName("sub").subscribe();
        consumer.close();
        Awaitility.await().pollDelay(3L, TimeUnit.SECONDS).until(() -> {
            TopicStats topicStats = this.admin.topics().getStats("non-persistent://my-property/my-ns/non-topic");
            System.out.println("subscriptions size: " + topicStats.getSubscriptions().size());
            return topicStats.getSubscriptions().size() == 1;
        });
    }
}

