package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/service/SubscriptionSeekTest.class */
public class SubscriptionSeekTest extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SubscriptionSeekTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.baseSetup();
        this.conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testSeek() throws Exception {
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/testSeek").create();
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://prop/use/ns-abc/testSeek").subscriptionName("my-subscription").receiverQueueSize(0).subscribe();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/testSeek").get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        Assert.assertEquals(persistentTopic.getSubscriptions().size(), 1L);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(create.send(("my-message-" + i).getBytes()));
        }
        PersistentSubscription subscription = persistentTopic.getSubscription("my-subscription");
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10L);
        subscribe.seek(MessageId.latest);
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 0L);
        Thread.sleep(500L);
        subscribe.seek(MessageId.earliest);
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10L);
        Thread.sleep(500L);
        subscribe.seek((MessageId) arrayList.get(5));
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 5L);
        MessageIdImpl messageIdImpl = (MessageIdImpl) arrayList.get(5);
        MessageIdImpl messageIdImpl2 = new MessageIdImpl(messageIdImpl.getLedgerId() - 1, messageIdImpl.getEntryId(), messageIdImpl.getPartitionIndex());
        MessageIdImpl messageIdImpl3 = new MessageIdImpl(messageIdImpl.getLedgerId() + 1, messageIdImpl.getEntryId(), messageIdImpl.getPartitionIndex());
        log.info("MessageId {}: beforeEarliest: {}, afterLatest: {}", messageIdImpl, messageIdImpl2, messageIdImpl3);
        Thread.sleep(500L);
        subscribe.seek((MessageId) messageIdImpl2);
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10L);
        Thread.sleep(500L);
        subscribe.seek((MessageId) messageIdImpl3);
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 0L);
    }

    @Test
    public void testSeekForBatch() throws Exception {
        Producer create = this.pulsarClient.newProducer(Schema.STRING).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).topic("persistent://prop/use/ns-abcd/testSeekForBatch").create();
        ArrayList<MessageId> arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        for (int i = 0; i < 10; i++) {
            String str = "my-message-" + i;
            arrayList3.add(str);
            arrayList2.add(create.sendAsync(str));
        }
        Iterator it = arrayList2.iterator();
        while (it.hasNext()) {
            arrayList.add((MessageId) ((CompletableFuture) it.next()).get());
        }
        create.close();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic("persistent://prop/use/ns-abcd/testSeekForBatch").subscriptionName("my-subscription-batch").startMessageIdInclusive().subscribe();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abcd/testSeekForBatch").get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertEquals(persistentTopic.getSubscriptions().size(), 1L);
        subscribe.seek(MessageId.earliest);
        Assert.assertEquals((String) subscribe.receive().getValue(), (String) arrayList3.get(0));
        subscribe.seek(MessageId.latest);
        Assert.assertNull(subscribe.receive(1, TimeUnit.SECONDS));
        for (MessageId messageId : arrayList) {
            subscribe.seek(messageId);
            Assert.assertEquals(subscribe.receive().getMessageId(), messageId);
        }
    }

    @Test
    public void testSeekForBatchByAdmin() throws PulsarClientException, ExecutionException, InterruptedException, PulsarAdminException {
        String str = "persistent://prop/use/ns-abcd/testSeekForBatchByAdmin-" + UUID.randomUUID().toString();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).topic(str).create();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        for (int i = 0; i < 10; i++) {
            String str2 = "my-message-" + i;
            arrayList3.add(str2);
            arrayList2.add(create.sendAsync(str2));
        }
        Iterator it = arrayList2.iterator();
        while (it.hasNext()) {
            arrayList.add((MessageId) ((CompletableFuture) it.next()).get());
        }
        create.close();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(str).subscriptionName("my-subscription-batch").subscribe();
        this.admin.topics().resetCursor(str, "my-subscription-batch", MessageId.earliest);
        Thread.sleep(1000L);
        Assert.assertEquals((String) subscribe.receive().getValue(), (String) arrayList3.get(0));
        this.admin.topics().resetCursor(str, "my-subscription-batch", MessageId.latest);
        Thread.sleep(1000L);
        Assert.assertNull(subscribe.receive(1, TimeUnit.SECONDS));
        this.admin.topics().resetCursor(str, "my-subscription-batch", (MessageId) arrayList.get(0), true);
        Thread.sleep(1000L);
        Assert.assertEquals(subscribe.receive().getMessageId(), arrayList.get(1));
        this.admin.topics().resetCursor(str, "my-subscription-batch", (MessageId) arrayList.get(0), false);
        Thread.sleep(1000L);
        Assert.assertEquals(subscribe.receive().getMessageId(), arrayList.get(0));
        this.admin.topics().resetCursor(str, "my-subscription-batch", (MessageId) arrayList.get(arrayList.size() - 1), true);
        Thread.sleep(1000L);
        Assert.assertNull(subscribe.receive(1, TimeUnit.SECONDS));
        this.admin.topics().resetCursor(str, "my-subscription-batch", (MessageId) arrayList.get(arrayList.size() - 1), false);
        Thread.sleep(1000L);
        Assert.assertEquals(subscribe.receive().getMessageId(), arrayList.get(arrayList.size() - 1));
        this.admin.topics().resetCursor(str, "my-subscription-batch", new BatchMessageIdImpl(-1L, -1L, -1, 10), true);
        Thread.sleep(1000L);
        Assert.assertEquals(subscribe.receive().getMessageId(), arrayList.get(0));
    }

    @Test
    public void testConcurrentResetCursor() throws Exception {
        final String str = "persistent://prop/use/ns-abc/testConcurrentReset_" + System.currentTimeMillis();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic(str).create();
        this.admin.topics().createSubscription(str, "test-sub-name", MessageId.earliest);
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference(str).get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        final ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(create.send(("my-message-" + i).getBytes()));
        }
        final LinkedList newLinkedList = Lists.newLinkedList();
        LinkedList newLinkedList2 = Lists.newLinkedList();
        for (int i2 = 0; i2 < 4; i2++) {
            newLinkedList2.add(new Thread() { // from class: org.apache.pulsar.broker.service.SubscriptionSeekTest.1ResetCursorThread
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        SubscriptionSeekTest.this.admin.topics().resetCursor(str, "test-sub-name", (MessageId) arrayList.get(3));
                    } catch (PulsarAdminException e) {
                        newLinkedList.add(e);
                    }
                }
            });
        }
        for (int i3 = 0; i3 < 4; i3++) {
            ((C1ResetCursorThread) newLinkedList2.get(i3)).start();
        }
        for (int i4 = 0; i4 < 4; i4++) {
            ((C1ResetCursorThread) newLinkedList2.get(i4)).join();
        }
        for (int i5 = 0; i5 < newLinkedList.size(); i5++) {
            log.error("Meet Exception", (Throwable) newLinkedList.get(i5));
            Assert.assertTrue(((PulsarAdminException) newLinkedList.get(i5)).getMessage().contains("Failed to fence subscription"));
        }
    }

    @Test
    public void testSeekOnPartitionedTopic() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://prop/use/ns-abc/testSeekPartitions", 2);
        try {
            this.pulsarClient.newConsumer().topic("persistent://prop/use/ns-abc/testSeekPartitions").subscriptionName("my-subscription").subscribe().seek(MessageId.latest);
        } catch (PulsarClientException e) {
            Assert.fail("Should not have exception");
        }
    }

    @Test
    public void testSeekTime() throws Exception {
        long millis = TimeUnit.SECONDS.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds("100s"));
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/testSeekTime").create();
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://prop/use/ns-abc/testSeekTime").subscriptionName("my-subscription").receiverQueueSize(0).subscribe();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/testSeekTime").get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        Assert.assertEquals(persistentTopic.getSubscriptions().size(), 1L);
        PersistentSubscription subscription = persistentTopic.getSubscription("my-subscription");
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10L);
        long currentTimeMillis = System.currentTimeMillis();
        subscribe.seek(currentTimeMillis);
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 0L);
        Thread.sleep(1000L);
        subscribe.seek(currentTimeMillis - millis);
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10L);
    }

    @Test
    public void testSeekTimeOnPartitionedTopic() throws Exception {
        long millis = TimeUnit.SECONDS.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds("100s"));
        this.admin.topics().createPartitionedTopic("persistent://prop/use/ns-abc/testSeekTimePartitions", 2);
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/testSeekTimePartitions").create();
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://prop/use/ns-abc/testSeekTimePartitions").subscriptionName("my-subscription").subscribe();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2; i++) {
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/testSeekTimePartitions-partition-" + i).get();
            Assert.assertNotNull(persistentTopic);
            Assert.assertEquals(persistentTopic.getProducers().size(), 1);
            Assert.assertEquals(persistentTopic.getSubscriptions().size(), 1L);
            PersistentSubscription subscription = persistentTopic.getSubscription("my-subscription");
            Assert.assertNotNull(subscription);
            arrayList.add(subscription);
        }
        for (int i2 = 0; i2 < 10; i2++) {
            create.send(("my-message-" + i2).getBytes());
        }
        long j = 0;
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            j += ((PersistentSubscription) it.next()).getNumberOfEntriesInBacklog(false);
        }
        Assert.assertEquals(j, 10L);
        long j2 = 0;
        long currentTimeMillis = System.currentTimeMillis();
        subscribe.seek(currentTimeMillis);
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            j2 += ((PersistentSubscription) it2.next()).getNumberOfEntriesInBacklog(false);
        }
        Assert.assertEquals(j2, 0L);
        Thread.sleep(1000L);
        subscribe.seek(currentTimeMillis - millis);
        long j3 = 0;
        Iterator it3 = arrayList.iterator();
        while (it3.hasNext()) {
            j3 += ((PersistentSubscription) it3.next()).getNumberOfEntriesInBacklog(false);
        }
        Assert.assertEquals(j3, 10L);
    }

    @Test
    public void testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek() throws Exception {
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://prop/use/ns-abc/testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek").subscriptionType(SubscriptionType.Shared).subscriptionName("my-subscription").subscribe();
        this.pulsarClient.newConsumer().topic("persistent://prop/use/ns-abc/testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek").subscriptionType(SubscriptionType.Shared).subscriptionName("my-subscription").subscribe();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek").get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertEquals(persistentTopic.getSubscriptions().size(), 1L);
        List consumers = ((PersistentSubscription) persistentTopic.getSubscriptions().get("my-subscription")).getConsumers();
        Assert.assertEquals(consumers.size(), 2);
        HashSet hashSet = new HashSet();
        Iterator it = consumers.iterator();
        while (it.hasNext()) {
            hashSet.add(((Consumer) it.next()).getStats().getConnectedSince());
        }
        Assert.assertEquals(hashSet.size(), 2);
        subscribe.seek(MessageId.earliest);
        Thread.sleep(1000L);
        List consumers2 = ((PersistentSubscription) persistentTopic.getSubscriptions().get("my-subscription")).getConsumers();
        Assert.assertEquals(consumers2.size(), 2);
        Iterator it2 = consumers2.iterator();
        while (it2.hasNext()) {
            Assert.assertFalse(hashSet.contains(((Consumer) it2.next()).getStats().getConnectedSince()));
        }
    }

    @Test
    public void testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek() throws Exception {
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://prop/use/ns-abc/testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek").subscriptionType(SubscriptionType.Failover).subscriptionName("my-subscription").subscribe();
        this.pulsarClient.newConsumer().topic("persistent://prop/use/ns-abc/testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek").subscriptionType(SubscriptionType.Failover).subscriptionName("my-subscription").subscribe();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek").get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertEquals(persistentTopic.getSubscriptions().size(), 1L);
        List consumers = ((PersistentSubscription) persistentTopic.getSubscriptions().get("my-subscription")).getConsumers();
        Assert.assertEquals(consumers.size(), 2);
        HashSet hashSet = new HashSet();
        Iterator it = consumers.iterator();
        while (it.hasNext()) {
            hashSet.add(((Consumer) it.next()).getStats().getConnectedSince());
        }
        Assert.assertEquals(hashSet.size(), 2);
        subscribe.seek(MessageId.earliest);
        Thread.sleep(1000L);
        List consumers2 = ((PersistentSubscription) persistentTopic.getSubscriptions().get("my-subscription")).getConsumers();
        Assert.assertEquals(consumers2.size(), 2);
        boolean z = false;
        Iterator it2 = consumers2.iterator();
        while (it2.hasNext()) {
            if (hashSet.contains(((Consumer) it2.next()).getStats().getConnectedSince())) {
                z = true;
            }
        }
        Assert.assertTrue(z);
    }
}
