/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class SubscriptionSeekTest
extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(SubscriptionSeekTest.class);

    @Override
    @BeforeClass
    protected void setup() throws Exception {
        super.baseSetup();
        this.conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
    }

    @Override
    @AfterClass(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testSeek() throws Exception {
        String topicName = "persistent://prop/use/ns-abc/testSeek";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/testSeek").create();
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testSeek"}).subscriptionName("my-subscription").receiverQueueSize(0).subscribe();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/testSeek").get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((int)topicRef.getProducers().size(), (int)1);
        Assert.assertEquals((long)topicRef.getSubscriptions().size(), (long)1L);
        ArrayList<MessageId> messageIds = new ArrayList<MessageId>();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            MessageId msgId = producer.send((Object)message.getBytes());
            messageIds.add(msgId);
        }
        PersistentSubscription sub = topicRef.getSubscription("my-subscription");
        Assert.assertEquals((long)sub.getNumberOfEntriesInBacklog(false), (long)10L);
        consumer.seek(MessageId.latest);
        Assert.assertEquals((long)sub.getNumberOfEntriesInBacklog(false), (long)0L);
        Awaitility.await().until(() -> ((org.apache.pulsar.client.api.Consumer)consumer).isConnected());
        consumer.seek(MessageId.earliest);
        Assert.assertEquals((long)sub.getNumberOfEntriesInBacklog(false), (long)10L);
        Awaitility.await().until(() -> ((org.apache.pulsar.client.api.Consumer)consumer).isConnected());
        consumer.seek((MessageId)messageIds.get(5));
        Assert.assertEquals((long)sub.getNumberOfEntriesInBacklog(false), (long)5L);
        MessageIdImpl messageId = (MessageIdImpl)messageIds.get(5);
        MessageIdImpl beforeEarliest = new MessageIdImpl(messageId.getLedgerId() - 1L, messageId.getEntryId(), messageId.getPartitionIndex());
        MessageIdImpl afterLatest = new MessageIdImpl(messageId.getLedgerId() + 1L, messageId.getEntryId(), messageId.getPartitionIndex());
        log.info("MessageId {}: beforeEarliest: {}, afterLatest: {}", new Object[]{messageId, beforeEarliest, afterLatest});
        Awaitility.await().until(() -> ((org.apache.pulsar.client.api.Consumer)consumer).isConnected());
        consumer.seek((MessageId)beforeEarliest);
        Assert.assertEquals((long)sub.getNumberOfEntriesInBacklog(false), (long)10L);
        Awaitility.await().until(() -> ((org.apache.pulsar.client.api.Consumer)consumer).isConnected());
        consumer.seek((MessageId)afterLatest);
        Assert.assertEquals((long)sub.getNumberOfEntriesInBacklog(false), (long)0L);
    }

    @Test
    public void testSeekForBatch() throws Exception {
        String topicName = "persistent://prop/use/ns-abcd/testSeekForBatch";
        String subscriptionName = "my-subscription-batch";
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).topic("persistent://prop/use/ns-abcd/testSeekForBatch").create();
        ArrayList<MessageId> messageIds = new ArrayList<MessageId>();
        ArrayList<CompletableFuture> futureMessageIds = new ArrayList<CompletableFuture>();
        ArrayList<String> messages = new ArrayList<String>();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            messages.add(message);
            CompletableFuture messageIdCompletableFuture = producer.sendAsync((Object)message);
            futureMessageIds.add(messageIdCompletableFuture);
        }
        for (CompletableFuture futureMessageId : futureMessageIds) {
            MessageId messageId = (MessageId)futureMessageId.get();
            messageIds.add(messageId);
        }
        producer.close();
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/use/ns-abcd/testSeekForBatch"}).subscriptionName(subscriptionName).startMessageIdInclusive().subscribe();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abcd/testSeekForBatch").get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((long)topicRef.getSubscriptions().size(), (long)1L);
        consumer.seek(MessageId.earliest);
        Message receiveBeforEarliest = consumer.receive();
        Assert.assertEquals((String)((String)receiveBeforEarliest.getValue()), (String)((String)messages.get(0)));
        consumer.seek(MessageId.latest);
        Message receiveAfterLatest = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertNull((Object)receiveAfterLatest);
        for (MessageId messageId : messageIds) {
            consumer.seek(messageId);
            MessageId receiveId = consumer.receive().getMessageId();
            Assert.assertEquals((Object)receiveId, (Object)messageId);
        }
    }

    @Test
    public void testSeekForBatchByAdmin() throws PulsarClientException, ExecutionException, InterruptedException, PulsarAdminException {
        String topicName = "persistent://prop/use/ns-abcd/testSeekForBatchByAdmin-" + UUID.randomUUID().toString();
        String subscriptionName = "my-subscription-batch";
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).topic(topicName).create();
        ArrayList<MessageId> messageIds = new ArrayList<MessageId>();
        ArrayList<CompletableFuture> futureMessageIds = new ArrayList<CompletableFuture>();
        ArrayList<String> messages = new ArrayList<String>();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            messages.add(message);
            CompletableFuture messageIdCompletableFuture = producer.sendAsync((Object)message);
            futureMessageIds.add(messageIdCompletableFuture);
        }
        for (CompletableFuture futureMessageId : futureMessageIds) {
            MessageId messageId = (MessageId)futureMessageId.get();
            messageIds.add(messageId);
        }
        producer.close();
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionName(subscriptionName).subscribe();
        this.admin.topics().resetCursor(topicName, subscriptionName, MessageId.earliest);
        Awaitility.await().until(() -> ((org.apache.pulsar.client.api.Consumer)consumer).isConnected());
        Message receiveBeforeEarliest = consumer.receive();
        Assert.assertEquals((String)((String)receiveBeforeEarliest.getValue()), (String)((String)messages.get(0)));
        this.admin.topics().resetCursor(topicName, subscriptionName, MessageId.latest);
        Awaitility.await().until(() -> ((org.apache.pulsar.client.api.Consumer)consumer).isConnected());
        Message receiveAfterLatest = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertNull((Object)receiveAfterLatest);
        this.admin.topics().resetCursor(topicName, subscriptionName, (MessageId)messageIds.get(0), true);
        Awaitility.await().until(() -> ((org.apache.pulsar.client.api.Consumer)consumer).isConnected());
        Message received = consumer.receive();
        Assert.assertEquals((Object)received.getMessageId(), messageIds.get(1));
        this.admin.topics().resetCursor(topicName, subscriptionName, (MessageId)messageIds.get(0), false);
        Awaitility.await().until(() -> ((org.apache.pulsar.client.api.Consumer)consumer).isConnected());
        received = consumer.receive();
        Assert.assertEquals((Object)received.getMessageId(), messageIds.get(0));
        this.admin.topics().resetCursor(topicName, subscriptionName, (MessageId)messageIds.get(messageIds.size() - 1), true);
        Awaitility.await().until(() -> ((org.apache.pulsar.client.api.Consumer)consumer).isConnected());
        received = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertNull((Object)received);
        this.admin.topics().resetCursor(topicName, subscriptionName, (MessageId)messageIds.get(messageIds.size() - 1), false);
        Awaitility.await().until(() -> ((org.apache.pulsar.client.api.Consumer)consumer).isConnected());
        received = consumer.receive();
        Assert.assertEquals((Object)received.getMessageId(), messageIds.get(messageIds.size() - 1));
        this.admin.topics().resetCursor(topicName, subscriptionName, (MessageId)new BatchMessageIdImpl(-1L, -1L, -1, 10), true);
        Awaitility.await().until(() -> ((org.apache.pulsar.client.api.Consumer)consumer).isConnected());
        received = consumer.receive();
        Assert.assertEquals((Object)received.getMessageId(), messageIds.get(0));
    }

    @Test
    public void testConcurrentResetCursor() throws Exception {
        class ResetCursorThread
        extends Thread {
            ResetCursorThread() {
            }

            @Override
            public void run() {
                try {
                    SubscriptionSeekTest.this.admin.topics().resetCursor(topicName, "test-sub-name", (MessageId)messageIds.get(3));
                }
                catch (PulsarAdminException e) {
                    exceptions.add(e);
                }
            }
        }
        int i;
        final String topicName = "persistent://prop/use/ns-abc/testConcurrentReset_" + System.currentTimeMillis();
        String subscriptionName = "test-sub-name";
        Producer producer = this.pulsarClient.newProducer().topic(topicName).create();
        this.admin.topics().createSubscription(topicName, "test-sub-name", MessageId.earliest);
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topicName).get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((int)topicRef.getProducers().size(), (int)1);
        final ArrayList<MessageId> messageIds = new ArrayList<MessageId>();
        for (int i2 = 0; i2 < 10; ++i2) {
            String message = "my-message-" + i2;
            MessageId msgId = producer.send((Object)message.getBytes());
            messageIds.add(msgId);
        }
        final LinkedList exceptions = Lists.newLinkedList();
        LinkedList resetCursorThreads = Lists.newLinkedList();
        for (i = 0; i < 4; ++i) {
            ResetCursorThread thread = new ResetCursorThread();
            resetCursorThreads.add(thread);
        }
        for (i = 0; i < 4; ++i) {
            ((ResetCursorThread)resetCursorThreads.get(i)).start();
        }
        for (i = 0; i < 4; ++i) {
            ((ResetCursorThread)resetCursorThreads.get(i)).join();
        }
        for (PulsarAdminException exception : exceptions) {
            log.error("Meet Exception", (Throwable)exception);
            Assert.assertTrue((boolean)exception.getMessage().contains("Failed to fence subscription"));
        }
    }

    @Test
    public void testSeekOnPartitionedTopic() throws Exception {
        String topicName = "persistent://prop/use/ns-abc/testSeekPartitions";
        this.admin.topics().createPartitionedTopic("persistent://prop/use/ns-abc/testSeekPartitions", 2);
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testSeekPartitions"}).subscriptionName("my-subscription").subscribe();
        try {
            consumer.seek(MessageId.latest);
        }
        catch (PulsarClientException e) {
            Assert.fail((String)"Should not have exception");
        }
    }

    @Test
    public void testSeekTime() throws Exception {
        String topicName = "persistent://prop/use/ns-abc/testSeekTime";
        String resetTimeStr = "100s";
        long resetTimeInMillis = TimeUnit.SECONDS.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds((String)resetTimeStr));
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/testSeekTime").create();
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testSeekTime"}).subscriptionName("my-subscription").receiverQueueSize(0).subscribe();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/testSeekTime").get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((int)topicRef.getProducers().size(), (int)1);
        Assert.assertEquals((long)topicRef.getSubscriptions().size(), (long)1L);
        PersistentSubscription sub = topicRef.getSubscription("my-subscription");
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Assert.assertEquals((long)sub.getNumberOfEntriesInBacklog(false), (long)10L);
        long currentTimestamp = System.currentTimeMillis();
        consumer.seek(currentTimestamp);
        Assert.assertEquals((long)sub.getNumberOfEntriesInBacklog(false), (long)0L);
        Awaitility.await().until(() -> ((org.apache.pulsar.client.api.Consumer)consumer).isConnected());
        consumer.seek(currentTimestamp - resetTimeInMillis);
        Assert.assertEquals((long)sub.getNumberOfEntriesInBacklog(false), (long)10L);
    }

    @Test
    public void testSeekTimeByFunction() throws Exception {
        Message message;
        String topicName = "persistent://prop/use/ns-abc/test" + UUID.randomUUID();
        int partitionNum = 4;
        int msgNum = 20;
        this.admin.topics().createPartitionedTopic(topicName, partitionNum);
        this.creatProducerAndSendMsg(topicName, msgNum);
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).startMessageIdInclusive().topic(new String[]{topicName}).subscriptionName("my-sub").subscribe();
        long now = System.currentTimeMillis();
        consumer.seek(topic -> now);
        Assert.assertNull((Object)consumer.receive(1, TimeUnit.SECONDS));
        consumer.seek(topic -> {
            TopicName name = TopicName.get((String)topic);
            switch (name.getPartitionIndex()) {
                case 0: {
                    return MessageId.latest;
                }
                case 1: {
                    return MessageId.earliest;
                }
                case 2: {
                    return now;
                }
                case 3: {
                    return now - 999999L;
                }
            }
            return null;
        });
        int count = 0;
        while ((message = consumer.receive(1, TimeUnit.SECONDS)) != null) {
            ++count;
        }
        int msgNumInPartition0 = 0;
        int msgNumInPartition1 = msgNum / partitionNum;
        int msgNumInPartition2 = 0;
        int msgNumInPartition3 = msgNum / partitionNum;
        Assert.assertEquals((int)count, (int)(msgNumInPartition0 + msgNumInPartition1 + msgNumInPartition2 + msgNumInPartition3));
    }

    @Test
    public void testSeekTimeOnPartitionedTopic() throws Exception {
        int i;
        String topicName = "persistent://prop/use/ns-abc/testSeekTimePartitions";
        String resetTimeStr = "100s";
        int partitions = 2;
        long resetTimeInMillis = TimeUnit.SECONDS.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds((String)"100s"));
        this.admin.topics().createPartitionedTopic("persistent://prop/use/ns-abc/testSeekTimePartitions", 2);
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/testSeekTimePartitions").create();
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testSeekTimePartitions"}).subscriptionName("my-subscription").subscribe();
        ArrayList<PersistentSubscription> subs = new ArrayList<PersistentSubscription>();
        for (i = 0; i < 2; ++i) {
            PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/testSeekTimePartitions-partition-" + i).get();
            Assert.assertNotNull((Object)topicRef);
            Assert.assertEquals((int)topicRef.getProducers().size(), (int)1);
            Assert.assertEquals((long)topicRef.getSubscriptions().size(), (long)1L);
            PersistentSubscription sub = topicRef.getSubscription("my-subscription");
            Assert.assertNotNull((Object)sub);
            subs.add(sub);
        }
        for (i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        long backlogs = 0L;
        for (PersistentSubscription sub : subs) {
            backlogs += sub.getNumberOfEntriesInBacklog(false);
        }
        Assert.assertEquals((long)backlogs, (long)10L);
        backlogs = 0L;
        long currentTimestamp = System.currentTimeMillis();
        consumer.seek(currentTimestamp);
        for (PersistentSubscription sub : subs) {
            backlogs += sub.getNumberOfEntriesInBacklog(false);
        }
        Assert.assertEquals((long)backlogs, (long)0L);
        Awaitility.await().until(() -> ((org.apache.pulsar.client.api.Consumer)consumer).isConnected());
        consumer.seek(currentTimestamp - resetTimeInMillis);
        backlogs = 0L;
        for (PersistentSubscription sub : subs) {
            backlogs += sub.getNumberOfEntriesInBacklog(false);
        }
        Assert.assertEquals((long)backlogs, (long)10L);
    }

    @Test
    public void testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek() throws Exception {
        String topicName = "persistent://prop/use/ns-abc/testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek";
        org.apache.pulsar.client.api.Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek"}).subscriptionType(SubscriptionType.Shared).subscriptionName("my-subscription").subscribe();
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek"}).subscriptionType(SubscriptionType.Shared).subscriptionName("my-subscription").subscribe();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek").get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((long)topicRef.getSubscriptions().size(), (long)1L);
        List consumers = ((PersistentSubscription)topicRef.getSubscriptions().get((Object)"my-subscription")).getConsumers();
        Assert.assertEquals((int)consumers.size(), (int)2);
        HashSet<String> connectedSinceSet = new HashSet<String>();
        for (Consumer consumer : consumers) {
            connectedSinceSet.add(consumer.getStats().getConnectedSince());
        }
        Assert.assertEquals((int)connectedSinceSet.size(), (int)2);
        consumer1.seek(MessageId.earliest);
        Awaitility.await().until(() -> ((org.apache.pulsar.client.api.Consumer)consumer1).isConnected());
        consumers = ((PersistentSubscription)topicRef.getSubscriptions().get((Object)"my-subscription")).getConsumers();
        Assert.assertEquals((int)consumers.size(), (int)2);
        for (Consumer consumer : consumers) {
            Assert.assertFalse((boolean)connectedSinceSet.contains(consumer.getStats().getConnectedSince()));
        }
    }

    @Test
    public void testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek() throws Exception {
        String topicName = "persistent://prop/use/ns-abc/testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek";
        org.apache.pulsar.client.api.Consumer consumer1 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek"}).subscriptionType(SubscriptionType.Failover).subscriptionName("my-subscription").subscribe();
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek"}).subscriptionType(SubscriptionType.Failover).subscriptionName("my-subscription").subscribe();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek").get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((long)topicRef.getSubscriptions().size(), (long)1L);
        List consumers = ((PersistentSubscription)topicRef.getSubscriptions().get((Object)"my-subscription")).getConsumers();
        Assert.assertEquals((int)consumers.size(), (int)2);
        HashSet<String> connectedSinceSet = new HashSet<String>();
        for (Consumer consumer : consumers) {
            connectedSinceSet.add(consumer.getStats().getConnectedSince());
        }
        Assert.assertEquals((int)connectedSinceSet.size(), (int)2);
        consumer1.seek(MessageId.earliest);
        Awaitility.await().until(() -> ((org.apache.pulsar.client.api.Consumer)consumer1).isConnected());
        consumers = ((PersistentSubscription)topicRef.getSubscriptions().get((Object)"my-subscription")).getConsumers();
        Assert.assertEquals((int)consumers.size(), (int)2);
        boolean hasConsumerNotDisconnected = false;
        for (Consumer consumer : consumers) {
            if (!connectedSinceSet.contains(consumer.getStats().getConnectedSince())) continue;
            hasConsumerNotDisconnected = true;
        }
        Assert.assertTrue((boolean)hasConsumerNotDisconnected);
    }

    @Test
    public void testSeekByFunction() throws Exception {
        Message message;
        String topicName = "persistent://prop/use/ns-abc/test" + UUID.randomUUID();
        int partitionNum = 4;
        int msgNum = 160;
        this.admin.topics().createPartitionedTopic(topicName, partitionNum);
        this.creatProducerAndSendMsg(topicName, msgNum);
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).startMessageIdInclusive().topic(new String[]{topicName}).subscriptionName("my-sub").subscribe();
        TopicName partitionedTopic = TopicName.get((String)topicName);
        Reader reader = this.pulsarClient.newReader(Schema.STRING).startMessageId(MessageId.earliest).topic(partitionedTopic.getPartition(0).toString()).create();
        ArrayList<MessageId> list = new ArrayList<MessageId>();
        while (reader.hasMessageAvailable()) {
            list.add(reader.readNext().getMessageId());
        }
        MessageId middleMsgIdInPartition0 = (MessageId)list.get(list.size() / 2);
        List msgNotIn = list.subList(0, list.size() / 2 - 1);
        MessageId lastMsgInPartition1 = this.admin.topics().getLastMessageId(partitionedTopic.getPartition(1).toString());
        reader.close();
        reader = this.pulsarClient.newReader(Schema.STRING).startMessageId(MessageId.earliest).topic(partitionedTopic.getPartition(2).toString()).create();
        MessageId firstMsgInPartition2 = reader.readNext().getMessageId();
        consumer.seek(topic -> {
            int index = TopicName.get((String)topic).getPartitionIndex();
            if (index == 0) {
                return middleMsgIdInPartition0;
            }
            if (index == 1) {
                return lastMsgInPartition1;
            }
            if (index == 2) {
                return firstMsgInPartition2;
            }
            return null;
        });
        HashSet<MessageId> received = new HashSet<MessageId>();
        while ((message = consumer.receive(2, TimeUnit.SECONDS)) != null) {
            TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl)message.getMessageId();
            received.add(topicMessageId.getInnerMessageId());
        }
        int msgNumFromPartition1 = list.size() / 2;
        int msgNumFromPartition2 = 1;
        int msgNumFromPartition3 = msgNum / partitionNum;
        Assert.assertEquals((int)received.size(), (int)(msgNumFromPartition1 + msgNumFromPartition2 + msgNumFromPartition3));
        Assert.assertTrue((boolean)received.contains(middleMsgIdInPartition0));
        Assert.assertTrue((boolean)received.contains(lastMsgInPartition1));
        Assert.assertTrue((boolean)received.contains(firstMsgInPartition2));
        for (MessageId messageId : msgNotIn) {
            Assert.assertFalse((boolean)received.contains(messageId));
        }
        reader.close();
        consumer.close();
    }

    private List<MessageId> creatProducerAndSendMsg(String topic, int msgNum) throws Exception {
        ArrayList<MessageId> messageIds = new ArrayList<MessageId>();
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).topic(topic).create();
        for (int i = 0; i < msgNum; ++i) {
            messageIds.add(producer.send((Object)("msg" + i)));
        }
        producer.close();
        return messageIds;
    }

    @Test
    public void testSeekByFunctionAndMultiTopic() throws Exception {
        Message message;
        String topicName = "persistent://prop/use/ns-abc/test" + UUID.randomUUID();
        String topicName2 = "persistent://prop/use/ns-abc/test" + UUID.randomUUID();
        int partitionNum = 3;
        int msgNum = 15;
        this.admin.topics().createPartitionedTopic(topicName, partitionNum);
        this.admin.topics().createPartitionedTopic(topicName2, partitionNum);
        this.creatProducerAndSendMsg(topicName, msgNum);
        this.creatProducerAndSendMsg(topicName2, msgNum);
        TopicName topic = TopicName.get((String)topicName);
        TopicName topic2 = TopicName.get((String)topicName2);
        MessageId msgIdInTopic1Partition0 = this.admin.topics().getLastMessageId(topic.getPartition(0).toString());
        MessageId msgIdInTopic1Partition2 = this.admin.topics().getLastMessageId(topic.getPartition(2).toString());
        MessageId msgIdInTopic2Partition0 = this.admin.topics().getLastMessageId(topic2.getPartition(0).toString());
        MessageId msgIdInTopic2Partition2 = this.admin.topics().getLastMessageId(topic2.getPartition(2).toString());
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).startMessageIdInclusive().topics(Arrays.asList(topicName, topicName2)).subscriptionName("my-sub").subscribe();
        consumer.seek(partitionedTopic -> {
            if (partitionedTopic.equals(topic.getPartition(0).toString())) {
                return msgIdInTopic1Partition0;
            }
            if (partitionedTopic.equals(topic.getPartition(2).toString())) {
                return msgIdInTopic1Partition2;
            }
            if (partitionedTopic.equals(topic2.getPartition(0).toString())) {
                return msgIdInTopic2Partition0;
            }
            if (partitionedTopic.equals(topic2.getPartition(2).toString())) {
                return msgIdInTopic2Partition2;
            }
            return MessageId.earliest;
        });
        int count = 0;
        while ((message = consumer.receive(2, TimeUnit.SECONDS)) != null) {
            ++count;
        }
        int msgInTopic1Partition0 = 1;
        int msgInTopic1Partition1 = msgNum / partitionNum;
        int msgInTopic1Partition2 = 1;
        Assert.assertEquals((int)count, (int)((msgInTopic1Partition0 + msgInTopic1Partition1 + msgInTopic1Partition2) * 2));
    }

    @Test
    public void testExceptionBySeekFunction() throws Exception {
        String topicName = "persistent://prop/use/ns-abc/test" + UUID.randomUUID();
        this.creatProducerAndSendMsg(topicName, 10);
        org.apache.pulsar.client.api.Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("my-sub").subscribe();
        try {
            consumer.seek((Function)null);
            Assert.fail((String)"should fail");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e instanceof PulsarClientException));
            Assert.assertTrue((boolean)e.getMessage().contains("Function must be set"));
        }
        Assert.assertNull(consumer.seekAsync(topic -> null).get());
        try {
            Assert.assertNull(consumer.seekAsync(topic -> new Object()).get());
            Assert.fail((String)"should fail");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof PulsarClientException));
            Assert.assertTrue((boolean)e.getCause().getMessage().contains("Only support seek by messageId or timestamp"));
        }
    }
}

