/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"flaky"})
public class MultiTopicsReaderTest
extends MockedPulsarServiceBaseTest {
    private static final String subscription = "reader-multi-topics-sub";

    @Override
    @BeforeMethod(alwaysRun=true)
    protected void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("test", new ClusterData(this.pulsar.getWebServiceAddress()));
        this.admin.tenants().createTenant("my-property", new TenantInfo((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"})));
        Policies policies = new Policies();
        policies.replication_clusters = Sets.newHashSet((Object[])new String[]{"test"});
        policies.retention_policies = new RetentionPolicies(-1, -1);
        this.admin.namespaces().createNamespace("my-property/my-ns", policies);
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(timeOut=30000L)
    public void testReadMessageWithoutBatching() throws Exception {
        String topic = "persistent://my-property/my-ns/my-reader-topic" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(topic, 3);
        this.testReadMessages(topic, false);
    }

    @Test(timeOut=20000L)
    public void testReadMessageWithoutBatchingWithMessageInclusive() throws Exception {
        String topic = "persistent://my-property/my-ns/my-reader-topic-inclusive" + UUID.randomUUID();
        int topicNum = 3;
        this.admin.topics().createPartitionedTopic(topic, topicNum);
        Set<String> keys = this.publishMessages(topic, 10, false);
        Reader reader = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest).startMessageIdInclusive().readerName(subscription).create();
        int count = 0;
        while (reader.hasMessageAvailable()) {
            if (!keys.remove(reader.readNext(5, TimeUnit.SECONDS).getKey())) continue;
            ++count;
        }
        Assert.assertEquals((int)count, (int)topicNum);
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        reader.close();
    }

    @Test(timeOut=10000L)
    public void testReadMessageWithBatching() throws Exception {
        String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(topic, 3);
        this.testReadMessages(topic, true);
    }

    @Test(timeOut=10000L)
    public void testReadMessageWithBatchingWithMessageInclusive() throws Exception {
        String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching-inclusive" + UUID.randomUUID();
        int topicNum = 3;
        int msgNum = 15;
        this.admin.topics().createPartitionedTopic(topic, topicNum);
        Set<String> keys = this.publishMessages(topic, msgNum, true);
        Reader reader = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest).startMessageIdInclusive().readerName(subscription).create();
        while (reader.hasMessageAvailable()) {
            keys.remove(reader.readNext(2, TimeUnit.SECONDS).getKey());
        }
        Assert.assertEquals((int)keys.size(), (int)(msgNum - topicNum));
        Assert.assertFalse((boolean)keys.contains("key14"));
        Assert.assertFalse((boolean)keys.contains("key13"));
        Assert.assertFalse((boolean)keys.contains("key12"));
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        reader.close();
    }

    @Test(timeOut=10000L)
    public void testReaderWithTimeLong() throws Exception {
        Message msg;
        String ns = "my-property/my-ns";
        String topic = "persistent://" + ns + "/testReadFromPartition" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(topic, 3);
        RetentionPolicies retention = new RetentionPolicies(-1, -1);
        this.admin.namespaces().setRetention(ns, retention);
        ProducerBuilder produceBuilder = this.pulsarClient.newProducer();
        produceBuilder.topic(topic);
        produceBuilder.enableBatching(false);
        Producer producer = produceBuilder.create();
        int totalMsg = 10;
        long oldMsgPublishTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(5L);
        for (int i = 0; i < totalMsg; ++i) {
            TypedMessageBuilderImpl msg2 = (TypedMessageBuilderImpl)producer.newMessage().value((Object)("old" + i).getBytes());
            msg2.getMetadataBuilder().setPublishTime(oldMsgPublishTime).setSequenceId((long)i).setProducerName(producer.getProducerName()).setReplicatedFrom("us-west1");
            msg2.send();
        }
        long newMsgPublishTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1L);
        MessageId firstMsgId = null;
        for (int i = 0; i < totalMsg; ++i) {
            TypedMessageBuilderImpl msg3 = (TypedMessageBuilderImpl)producer.newMessage().value((Object)("new" + i).getBytes());
            msg3.getMetadataBuilder().setPublishTime(newMsgPublishTime).setProducerName(producer.getProducerName()).setReplicatedFrom("us-west1");
            MessageId msgId = msg3.send();
            if (firstMsgId != null) continue;
            firstMsgId = msgId;
        }
        Reader reader = this.pulsarClient.newReader().topic(topic).startMessageFromRollbackDuration(2L, TimeUnit.HOURS).create();
        ArrayList receivedMessageIds = Lists.newArrayList();
        while (reader.hasMessageAvailable() && (msg = reader.readNext(1, TimeUnit.SECONDS)) != null) {
            receivedMessageIds.add(msg.getMessageId());
        }
        Assert.assertEquals((int)receivedMessageIds.size(), (int)totalMsg);
        this.restartBroker();
        Assert.assertFalse((boolean)reader.hasMessageAvailable());
        reader.close();
        producer.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=10000L)
    public void testRemoveSubscriptionForReaderNeedRemoveCursor() throws IOException, PulsarAdminException {
        String topic = "persistent://my-property/my-ns/testRemoveSubscriptionForReaderNeedRemoveCursor" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(topic, 3);
        Reader reader1 = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).create();
        try {
            Reader reader2 = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).create();
            try {
                Assert.assertEquals((int)this.admin.topics().getSubscriptions(topic).size(), (int)2);
                for (PersistentTopicInternalStats value : this.admin.topics().getPartitionedInternalStats((String)topic).partitions.values()) {
                    Assert.assertEquals((int)value.cursors.size(), (int)2);
                }
                reader1.close();
                Assert.assertEquals((int)this.admin.topics().getSubscriptions(topic).size(), (int)1);
                for (PersistentTopicInternalStats value : this.admin.topics().getPartitionedInternalStats((String)topic).partitions.values()) {
                    Assert.assertEquals((int)value.cursors.size(), (int)1);
                }
                reader2.close();
                Assert.assertEquals((int)this.admin.topics().getSubscriptions(topic).size(), (int)0);
                for (PersistentTopicInternalStats value : this.admin.topics().getPartitionedInternalStats((String)topic).partitions.values()) {
                    Assert.assertEquals((int)value.cursors.size(), (int)0);
                }
            }
            finally {
                if (Collections.singletonList(reader2).get(0) != null) {
                    reader2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(reader1).get(0) != null) {
                reader1.close();
            }
        }
    }

    @Test(timeOut=10000L)
    public void testMultiReaderSeek() throws Exception {
        String topic = "persistent://my-property/my-ns/testKeyHashRangeReader" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(topic, 3);
        this.publishMessages(topic, 100, false);
    }

    @Test
    public void testMultiTopicSeekByFunction() throws Exception {
        Message message;
        String topicName = "persistent://my-property/my-ns/test" + UUID.randomUUID();
        int partitionNum = 4;
        int msgNum = 20;
        this.admin.topics().createPartitionedTopic(topicName, partitionNum);
        this.publishMessages(topicName, msgNum, false);
        Reader reader = this.pulsarClient.newReader().startMessageIdInclusive().startMessageId(MessageId.latest).topic(topicName).subscriptionName("my-sub").create();
        long now = System.currentTimeMillis();
        reader.seek(topic -> now);
        Assert.assertNull((Object)reader.readNext(1, TimeUnit.SECONDS));
        reader.seek(topic -> {
            TopicName name = TopicName.get((String)topic);
            switch (name.getPartitionIndex()) {
                case 0: {
                    return MessageId.latest;
                }
                case 1: {
                    return MessageId.earliest;
                }
                case 2: {
                    return now;
                }
                case 3: {
                    return now - 999999L;
                }
            }
            return null;
        });
        int count = 0;
        while ((message = reader.readNext(1, TimeUnit.SECONDS)) != null) {
            ++count;
        }
        int msgNumInPartition0 = 0;
        int msgNumInPartition1 = msgNum / partitionNum;
        int msgNumInPartition2 = 0;
        int msgNumInPartition3 = msgNum / partitionNum;
        Assert.assertEquals((int)count, (int)(msgNumInPartition0 + msgNumInPartition1 + msgNumInPartition2 + msgNumInPartition3));
    }

    @Test
    public void testMultiTopicSeekByFunctionWithException() throws Exception {
        String topicName = "persistent://my-property/my-ns/test" + UUID.randomUUID();
        int partitionNum = 4;
        int msgNum = 20;
        this.admin.topics().createPartitionedTopic(topicName, partitionNum);
        this.publishMessages(topicName, msgNum, false);
        Reader reader = this.pulsarClient.newReader().startMessageIdInclusive().startMessageId(MessageId.latest).topic(topicName).subscriptionName("my-sub").create();
        long now = System.currentTimeMillis();
        reader.seek(topic -> now);
        Assert.assertNull((Object)reader.readNext(1, TimeUnit.SECONDS));
        try {
            reader.seek(topic -> {
                TopicName name = TopicName.get((String)topic);
                switch (name.getPartitionIndex()) {
                    case 0: {
                        throw new RuntimeException("test");
                    }
                    case 1: {
                        return MessageId.latest;
                    }
                    case 2: {
                        return MessageId.earliest;
                    }
                    case 3: {
                        return now - 999999L;
                    }
                }
                return null;
            });
        }
        catch (Exception e) {
            Assert.assertEquals((String)e.getMessage(), (String)"test");
            Assert.assertTrue((boolean)(e instanceof RuntimeException));
        }
    }

    @Test(timeOut=20000L)
    public void testMultiTopic() throws Exception {
        String topic = "persistent://my-property/my-ns/topic" + UUID.randomUUID();
        String topic2 = "persistent://my-property/my-ns/topic2" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(topic2, 3);
        String topic3 = "persistent://my-property/my-ns/topic3" + UUID.randomUUID();
        List<String> topics = Arrays.asList(topic, topic2, topic3);
        PulsarClientImpl client = (PulsarClientImpl)this.pulsarClient;
        Reader reader = this.pulsarClient.newReader(Schema.STRING).startMessageId(MessageId.earliest).topics(topics).readerName("my-reader").create();
        ArrayList<Producer> producerList = new ArrayList<Producer>();
        for (String topicName : topics) {
            producerList.add(this.pulsarClient.newProducer(Schema.STRING).topic(topicName).create());
        }
        int msgNum = 10;
        HashSet<String> messages = new HashSet<String>();
        for (int i = 0; i < producerList.size(); ++i) {
            Producer producer = (Producer)producerList.get(i);
            for (int j = 0; j < msgNum; ++j) {
                String msg = i + "msg" + j;
                producer.send((Object)msg);
                messages.add(msg);
            }
        }
        while (reader.hasMessageAvailable()) {
            messages.remove(reader.readNext(5, TimeUnit.SECONDS).getValue());
        }
        Assert.assertEquals((int)messages.size(), (int)0);
        Assert.assertEquals((int)client.consumersCount(), (int)1);
        for (Producer producer : producerList) {
            producer.close();
        }
        reader.close();
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)client.consumersCount(), (int)0));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=10000L)
    public void testKeyHashRangeReader() throws Exception {
        List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
        String topic = "persistent://my-property/my-ns/testKeyHashRangeReader" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(topic, 3);
        try {
            this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).keyHashRange(new Range[]{Range.of((int)0, (int)10000), Range.of((int)8000, (int)12000)}).create();
            Assert.fail((String)"should failed with unexpected key hash range");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        try {
            this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).keyHashRange(new Range[]{Range.of((int)30000, (int)20000)}).create();
            Assert.fail((String)"should failed with unexpected key hash range");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        try {
            this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).keyHashRange(new Range[]{Range.of((int)80000, (int)90000)}).create();
            Assert.fail((String)"should failed with unexpected key hash range");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        Reader reader = this.pulsarClient.newReader(Schema.STRING).topic(topic).startMessageId(MessageId.earliest).keyHashRange(new Range[]{Range.of((int)0, (int)32768)}).create();
        try {
            Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
            try {
                Message msg;
                int expectedMessages = 0;
                for (String key : keys) {
                    int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % 65536;
                    if (slot <= 32768) {
                        ++expectedMessages;
                    }
                    producer.newMessage().key(key).value((Object)key).send();
                }
                ArrayList<Object> receivedMessages = new ArrayList<Object>();
                do {
                    if ((msg = reader.readNext(1, TimeUnit.SECONDS)) == null) continue;
                    receivedMessages.add(msg.getValue());
                } while (msg != null);
                Assert.assertTrue((expectedMessages > 0 ? 1 : 0) != 0);
                Assert.assertEquals((int)receivedMessages.size(), (int)expectedMessages);
                for (String string : receivedMessages) {
                    Assert.assertTrue((Integer.parseInt(string) <= 32768 ? 1 : 0) != 0);
                }
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(reader).get(0) != null) {
                reader.close();
            }
        }
    }

    private void testReadMessages(String topic, boolean enableBatch) throws Exception {
        int numKeys = 9;
        Set<String> keys = this.publishMessages(topic, numKeys, enableBatch);
        Reader reader = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).readerName(subscription).create();
        while (reader.hasMessageAvailable()) {
            keys.remove(reader.readNext(5, TimeUnit.SECONDS).getKey());
        }
        Assert.assertEquals((int)keys.size(), (int)0);
        Reader readLatest = this.pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest).readerName("reader-multi-topics-sublatest").create();
        Assert.assertFalse((boolean)readLatest.hasMessageAvailable());
    }

    private Set<String> publishMessages(String topic, int count, boolean enableBatch) throws Exception {
        HashSet<String> keys = new HashSet<String>();
        ProducerBuilder builder = this.pulsarClient.newProducer();
        builder.messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
        builder.batchingMaxPublishDelay(1L, TimeUnit.DAYS);
        builder.topic(topic);
        if (enableBatch) {
            builder.enableBatching(true);
            builder.batchingMaxMessages(count);
        } else {
            builder.enableBatching(false);
            builder.maxPendingMessages(1);
        }
        try (Producer producer = builder.create();){
            ArrayList<CompletableFuture> list = new ArrayList<CompletableFuture>();
            for (int i = 0; i < count; ++i) {
                String key = "key" + i;
                byte[] data = ("my-message-" + i).getBytes();
                if (enableBatch) {
                    list.add(producer.newMessage().key(key).value((Object)data).sendAsync());
                } else {
                    producer.newMessage().key(key).value((Object)data).send();
                }
                keys.add(key);
            }
            producer.flush();
            FutureUtil.waitForAll(list).get();
        }
        return keys;
    }
}

