package org.apache.pulsar.client.api;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.PulsarChannelInitializer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandFlow;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-api"})
/* loaded from: input_file:org/apache/pulsar/client/api/NonDurableSubscriptionTest.class */
public class NonDurableSubscriptionTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(NonDurableSubscriptionTest.class);
    private final AtomicInteger numFlow = new AtomicInteger(0);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setTopicLevelPoliciesEnabled(false);
        this.conf.setSystemTopicEnabled(false);
        this.conf.setSubscriptionExpirationTimeMinutes(1);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    protected BrokerService customizeNewBrokerService(BrokerService brokerService) {
        brokerService.setPulsarChannelInitializerFactory((pulsarService, pulsarChannelOptions) -> {
            return new PulsarChannelInitializer(pulsarService, pulsarChannelOptions) { // from class: org.apache.pulsar.client.api.NonDurableSubscriptionTest.1
                protected ServerCnx newServerCnx(PulsarService pulsarService, String str) throws Exception {
                    return new ServerCnx(pulsarService) { // from class: org.apache.pulsar.client.api.NonDurableSubscriptionTest.1.1
                        protected void handleFlow(CommandFlow commandFlow) {
                            super.handleFlow(commandFlow);
                            NonDurableSubscriptionTest.this.numFlow.incrementAndGet();
                        }
                    };
                }
            };
        });
        return brokerService;
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testNonDurableSubscription() throws Exception {
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/nonDurable-topic1").create();
        try {
            ConsumerImpl subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/nonDurable-topic1"}).readCompacted(true).subscriptionMode(SubscriptionMode.NonDurable).subscriptionType(SubscriptionType.Exclusive).subscriptionName("my-nonDurable-subscriber").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            for (int i = 0; i < 10; i++) {
                try {
                    create.send("message" + i);
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    throw th;
                }
            }
            for (int i2 = 0; i2 < 5; i2++) {
                Message receive = subscribe.receive();
                Assert.assertNotNull(receive);
                Assert.assertEquals((String) receive.getValue(), "message" + i2);
                subscribe.acknowledge(receive);
            }
            subscribe.getClientCnx().close();
            for (int i3 = 5; i3 < 10; i3++) {
                Message receive2 = subscribe.receive();
                Assert.assertNotNull(receive2);
                Assert.assertEquals((String) receive2.getValue(), "message" + i3);
            }
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testSameSubscriptionNameForDurableAndNonDurableSubscription() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/same-sub-name-topic"}).readCompacted(true).subscriptionMode(SubscriptionMode.Durable).subscriptionType(SubscriptionType.Exclusive).subscriptionName("mix-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        try {
            subscribe.close();
            try {
                subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/same-sub-name-topic"}).readCompacted(true).subscriptionMode(SubscriptionMode.NonDurable).subscriptionType(SubscriptionType.Exclusive).subscriptionName("mix-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                try {
                    Assert.fail("should fail since durable subscription already exist.");
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                } finally {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                }
            } catch (PulsarClientException.NotAllowedException e) {
            }
            Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/same-sub-name-topic").create();
            try {
                Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/same-sub-name-topic"}).subscriptionMode(SubscriptionMode.NonDurable).subscriptionType(SubscriptionType.Shared).subscriptionName("mix-subscription-01").receiverQueueSize(1).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                try {
                    Consumer subscribe3 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/same-sub-name-topic"}).subscriptionMode(SubscriptionMode.Durable).subscriptionType(SubscriptionType.Shared).subscriptionName("mix-subscription-01").receiverQueueSize(1).startMessageIdInclusive().subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                    if (Collections.singletonList(subscribe3).get(0) != null) {
                        subscribe3.close();
                    }
                } catch (PulsarClientException.NotAllowedException e2) {
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe2).get(0) != null) {
                        subscribe2.close();
                    }
                    throw th;
                }
                if (Collections.singletonList(subscribe2).get(0) != null) {
                    subscribe2.close();
                }
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th2;
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test(timeOut = 10000)
    public void testDeleteInactiveNonPersistentSubscription() throws Exception {
        String str = "non-persistent://my-property/my-ns/topic-" + UUID.randomUUID();
        this.admin.topics().createNonPartitionedTopic(str);
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("my-subscriber").subscribe();
        NonPersistentTopic nonPersistentTopic = (NonPersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get();
        NonPersistentSubscription subscription = nonPersistentTopic.getSubscription("my-subscriber");
        Assert.assertNotNull(subscription);
        Assert.assertNotNull(subscription.getDispatcher());
        AssertJUnit.assertTrue(subscription.getDispatcher().isConsumerConnected());
        Assert.assertFalse(subscription.isReplicated());
        nonPersistentTopic.checkInactiveSubscriptions();
        Thread.sleep(500L);
        Assert.assertNotNull(nonPersistentTopic.getSubscription("my-subscriber"));
        subscribe.close();
        Thread.sleep(500L);
        Field declaredField = NonPersistentSubscription.class.getDeclaredField("lastActive");
        declaredField.setAccessible(true);
        declaredField.set(nonPersistentTopic.getSubscription("my-subscriber"), Long.valueOf(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5L)));
        nonPersistentTopic.checkInactiveSubscriptions();
        Thread.sleep(500L);
        AssertJUnit.assertNull(nonPersistentTopic.getSubscription("my-subscriber"));
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "subscriptionTypes")
    public static Object[][] subscriptionTypes() {
        ?? r0 = new Object[SubscriptionType.values().length];
        int i = 0;
        for (SubscriptionType subscriptionType : SubscriptionType.values()) {
            int i2 = i;
            i++;
            Object[] objArr = new Object[1];
            objArr[0] = subscriptionType;
            r0[i2] = objArr;
        }
        return r0;
    }

    /* JADX WARN: Finally extract failed */
    @Test(dataProvider = "subscriptionTypes")
    public void testNonDurableSubscriptionRecovery(SubscriptionType subscriptionType) throws Exception {
        log.info("testing {}", subscriptionType);
        String str = "persistent://my-property/my-ns/nonDurable-sub-recorvery-" + subscriptionType;
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).create();
        try {
            ConsumerImpl subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionMode(SubscriptionMode.NonDurable).subscriptionType(subscriptionType).subscriptionName("my-nonDurable-subscriber").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            for (int i = 0; i < 15; i++) {
                try {
                    create.send("message" + i);
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    throw th;
                }
            }
            for (int i2 = 0; i2 < 5; i2++) {
                Message receive = subscribe.receive();
                Assert.assertNotNull(receive);
                Assert.assertEquals((String) receive.getValue(), "message" + i2);
                subscribe.acknowledge(receive);
            }
            subscribe.getClientCnx().close();
            for (int i3 = 5; i3 < 10; i3++) {
                Message receive2 = subscribe.receive();
                Assert.assertNotNull(receive2);
                Assert.assertEquals((String) receive2.getValue(), "message" + i3);
            }
            restartBroker();
            for (int i4 = 10; i4 < 15; i4++) {
                Message receive3 = subscribe.receive();
                Assert.assertNotNull(receive3);
                Assert.assertEquals((String) receive3.getValue(), "message" + i4);
            }
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testFlowCountForMultiTopics() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/test-flow-count", 5);
        this.numFlow.set(0);
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/test-flow-count"}).subscriptionName("my-nonDurable-subscriber").subscriptionMode(SubscriptionMode.NonDurable).subscribe();
        subscribe.receive(1, TimeUnit.SECONDS);
        subscribe.close();
        Assert.assertEquals(this.numFlow.get(), 5);
    }

    private void trimLedgers(String str) {
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).join()).get());
        });
        ManagedLedgerImpl managedLedger = ((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).join()).get()).getManagedLedger();
        CompletableFuture completableFuture = new CompletableFuture();
        managedLedger.trimConsumedLedgersInBackground(completableFuture);
        completableFuture.join();
    }

    private void switchLedgerManually(String str) throws Exception {
        Method declaredMethod = ManagedLedgerImpl.class.getDeclaredMethod("ledgerClosed", LedgerHandle.class);
        Method declaredMethod2 = ManagedLedgerImpl.class.getDeclaredMethod("createLedgerAfterClosed", new Class[0]);
        declaredMethod.setAccessible(true);
        declaredMethod2.setAccessible(true);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).join()).get());
        });
        ManagedLedgerImpl managedLedger = ((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).join()).get()).getManagedLedger();
        LedgerHandle ledgerHandle = (LedgerHandle) WhiteboxImpl.getInternalState(managedLedger, "currentLedger");
        declaredMethod.invoke(managedLedger, ledgerHandle);
        declaredMethod2.invoke(managedLedger, new Object[0]);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotEquals(Long.valueOf(ledgerHandle.getId()), Long.valueOf(((LedgerHandle) WhiteboxImpl.getInternalState(managedLedger, "currentLedger")).getId()));
        });
    }

    @Test
    public void testHasMessageAvailableIfIncomingQueueNotEmpty() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
        Reader create = this.pulsarClient.newReader(Schema.STRING).topic(newUniqueName).receiverQueueSize(1).subscriptionName("non-durable-cursor").startMessageId(MessageIdImpl.earliest).create();
        Producer create2 = this.pulsarClient.newProducer(Schema.STRING).topic(newUniqueName).create();
        create2.send("1");
        switchLedgerManually(newUniqueName);
        trimLedgers(newUniqueName);
        boolean hasMessageAvailable = create.hasMessageAvailable();
        Message readNext = create.readNext(2, TimeUnit.SECONDS);
        if (readNext == null) {
            Assert.assertFalse(hasMessageAvailable);
        } else {
            log.info("receive msg: {}", readNext.getValue());
            AssertJUnit.assertTrue(hasMessageAvailable);
            Assert.assertEquals((String) readNext.getValue(), "1");
        }
        create.close();
        create2.close();
        this.admin.topics().delete(newUniqueName);
    }

    @Test
    public void testInitReaderAtSpecifiedPosition() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        this.admin.topics().createSubscription(newUniqueName, "s0", MessageId.earliest);
        ArrayList arrayList = new ArrayList();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(newUniqueName).create();
        for (int i = 0; i < 5; i++) {
            arrayList.add(Long.valueOf(create.send("1").getLedgerId()));
            this.admin.topics().unload(newUniqueName);
        }
        create.close();
        LedgerHandle ledgerHandle = (LedgerHandle) WhiteboxImpl.getInternalState(((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(newUniqueName, false).join()).get()).getManagedLedger(), "currentLedger");
        log.info("currentLedger: {}", Long.valueOf(ledgerHandle.getId()));
        log.info("start test s1");
        Reader create2 = this.pulsarClient.newReader(Schema.STRING).topic(newUniqueName).subscriptionName("s1").receiverQueueSize(0).startMessageId(new MessageIdImpl(((Long) arrayList.get(0)).longValue() - 1, -1L, -1)).create();
        ManagedLedgerInternalStats.CursorStats cursorStats = (ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats(newUniqueName).cursors.get("s1");
        log.info("cursor1 readPosition: {}, markDeletedPosition: {}", cursorStats.readPosition, cursorStats.markDeletePosition);
        PositionImpl parseReadPosition = parseReadPosition(cursorStats);
        Assert.assertEquals(parseReadPosition.getLedgerId(), (Long) arrayList.get(0));
        Assert.assertEquals(parseReadPosition.getEntryId(), 0L);
        create2.close();
        log.info("start test s2");
        Reader create3 = this.pulsarClient.newReader(Schema.STRING).topic(newUniqueName).subscriptionName("s2").receiverQueueSize(0).startMessageId(new MessageIdImpl(((Long) arrayList.get(0)).longValue() - 1, Long.MAX_VALUE, -1)).create();
        ManagedLedgerInternalStats.CursorStats cursorStats2 = (ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats(newUniqueName).cursors.get("s2");
        log.info("cursor2 readPosition: {}, markDeletedPosition: {}", cursorStats2.readPosition, cursorStats2.markDeletePosition);
        PositionImpl parseReadPosition2 = parseReadPosition(cursorStats2);
        Assert.assertEquals(parseReadPosition2.getLedgerId(), (Long) arrayList.get(0));
        Assert.assertEquals(parseReadPosition2.getEntryId(), 0L);
        create3.close();
        log.info("start test s3");
        Reader create4 = this.pulsarClient.newReader(Schema.STRING).topic(newUniqueName).subscriptionName("s3").receiverQueueSize(0).startMessageId(new MessageIdImpl(ledgerHandle.getId() + 1, -1L, -1)).create();
        ManagedLedgerInternalStats.CursorStats cursorStats3 = (ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats(newUniqueName).cursors.get("s3");
        log.info("cursor3 readPosition: {}, markDeletedPosition: {}", cursorStats3.readPosition, cursorStats3.markDeletePosition);
        PositionImpl parseReadPosition3 = parseReadPosition(cursorStats3);
        Assert.assertEquals(parseReadPosition3.getLedgerId(), ledgerHandle.getId());
        Assert.assertEquals(parseReadPosition3.getEntryId(), 0L);
        create4.close();
        log.info("start test s4");
        Reader create5 = this.pulsarClient.newReader(Schema.STRING).topic(newUniqueName).subscriptionName("s4").receiverQueueSize(0).startMessageId(new MessageIdImpl(ledgerHandle.getId() + 1, Long.MAX_VALUE, -1)).create();
        ManagedLedgerInternalStats.CursorStats cursorStats4 = (ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats(newUniqueName).cursors.get("s4");
        log.info("cursor4 readPosition: {}, markDeletedPosition: {}", cursorStats4.readPosition, cursorStats4.markDeletePosition);
        PositionImpl parseReadPosition4 = parseReadPosition(cursorStats4);
        Assert.assertEquals(parseReadPosition4.getLedgerId(), ledgerHandle.getId());
        Assert.assertEquals(parseReadPosition4.getEntryId(), 0L);
        create5.close();
        log.info("start test s5");
        Reader create6 = this.pulsarClient.newReader(Schema.STRING).topic(newUniqueName).subscriptionName("s5").receiverQueueSize(0).startMessageId(new MessageIdImpl(ledgerHandle.getId() + 1, Long.MAX_VALUE, -1)).create();
        ManagedLedgerInternalStats.CursorStats cursorStats5 = (ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats(newUniqueName).cursors.get("s5");
        log.info("cursor5 readPosition: {}, markDeletedPosition: {}", cursorStats5.readPosition, cursorStats5.markDeletePosition);
        PositionImpl parseReadPosition5 = parseReadPosition(cursorStats5);
        Assert.assertEquals(parseReadPosition5.getLedgerId(), ledgerHandle.getId());
        Assert.assertEquals(parseReadPosition5.getEntryId(), 0L);
        create6.close();
        log.info("start test s6");
        Reader create7 = this.pulsarClient.newReader(Schema.STRING).topic(newUniqueName).subscriptionName("s6").receiverQueueSize(0).startMessageId(new MessageIdImpl(((Long) arrayList.get(arrayList.size() - 1)).longValue(), -1L, -1)).create();
        ManagedLedgerInternalStats.CursorStats cursorStats6 = (ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats(newUniqueName).cursors.get("s6");
        log.info("cursor6 readPosition: {}, markDeletedPosition: {}", cursorStats6.readPosition, cursorStats6.markDeletePosition);
        PositionImpl parseReadPosition6 = parseReadPosition(cursorStats6);
        Assert.assertEquals(parseReadPosition6.getLedgerId(), (Long) arrayList.get(arrayList.size() - 1));
        Assert.assertEquals(parseReadPosition6.getEntryId(), 0L);
        create7.close();
        log.info("start test s7");
        Reader create8 = this.pulsarClient.newReader(Schema.STRING).topic(newUniqueName).subscriptionName("s7").receiverQueueSize(0).startMessageId(new MessageIdImpl(((Long) arrayList.get(arrayList.size() - 1)).longValue(), Long.MAX_VALUE, -1)).create();
        ManagedLedgerInternalStats.CursorStats cursorStats7 = (ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats(newUniqueName).cursors.get("s7");
        log.info("cursor7 readPosition: {}, markDeletedPosition: {}", cursorStats7.readPosition, cursorStats7.markDeletePosition);
        PositionImpl parseReadPosition7 = parseReadPosition(cursorStats7);
        Assert.assertEquals(parseReadPosition7.getLedgerId(), ledgerHandle.getId());
        Assert.assertEquals(parseReadPosition7.getEntryId(), 0L);
        create8.close();
        log.info("start test s8");
        Reader create9 = this.pulsarClient.newReader(Schema.STRING).topic(newUniqueName).subscriptionName("s8").receiverQueueSize(0).startMessageId(new MessageIdImpl(((Long) arrayList.get(2)).longValue(), 0L, -1)).create();
        ManagedLedgerInternalStats.CursorStats cursorStats8 = (ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats(newUniqueName).cursors.get("s8");
        log.info("cursor8 readPosition: {}, markDeletedPosition: {}", cursorStats8.readPosition, cursorStats8.markDeletePosition);
        PositionImpl parseReadPosition8 = parseReadPosition(cursorStats8);
        Assert.assertEquals(parseReadPosition8.getLedgerId(), (Long) arrayList.get(2));
        Assert.assertEquals(parseReadPosition8.getEntryId(), 0L);
        create9.close();
        log.info("start test s9");
        Reader create10 = this.pulsarClient.newReader(Schema.STRING).topic(newUniqueName).subscriptionName("s9").receiverQueueSize(0).startMessageId(new MessageIdImpl(((Long) arrayList.get(2)).longValue(), Long.MAX_VALUE, -1)).create();
        ManagedLedgerInternalStats.CursorStats cursorStats9 = (ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats(newUniqueName).cursors.get("s9");
        log.info("cursor9 readPosition: {}, markDeletedPosition: {}", cursorStats9.readPosition, cursorStats9.markDeletePosition);
        PositionImpl parseReadPosition9 = parseReadPosition(cursorStats9);
        Assert.assertEquals(parseReadPosition9.getLedgerId(), (Long) arrayList.get(3));
        Assert.assertEquals(parseReadPosition9.getEntryId(), 0L);
        create10.close();
        log.info("start test s10");
        Reader create11 = this.pulsarClient.newReader(Schema.STRING).topic(newUniqueName).subscriptionName("s10").receiverQueueSize(0).startMessageId(new MessageIdImpl(((Long) arrayList.get(2)).longValue(), 0L, -1)).create();
        ManagedLedgerInternalStats.CursorStats cursorStats10 = (ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats(newUniqueName).cursors.get("s10");
        log.info("cursor10 readPosition: {}, markDeletedPosition: {}", cursorStats10.readPosition, cursorStats10.markDeletePosition);
        PositionImpl parseReadPosition10 = parseReadPosition(cursorStats10);
        Assert.assertEquals(parseReadPosition10.getLedgerId(), (Long) arrayList.get(2));
        Assert.assertEquals(parseReadPosition10.getEntryId(), 0L);
        create11.close();
        this.admin.topics().delete(newUniqueName, false);
    }

    private PositionImpl parseReadPosition(ManagedLedgerInternalStats.CursorStats cursorStats) {
        String[] split = cursorStats.readPosition.split(":");
        return PositionImpl.get(Long.valueOf(split[0]).longValue(), Long.valueOf(split[1]).longValue());
    }

    @Test
    public void testReaderInitAtDeletedPosition() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(newUniqueName).create();
        create.send("1");
        create.send("2");
        create.send("3");
        MessageIdImpl send = create.send("4");
        MessageIdImpl send2 = create.send("5");
        this.admin.topics().unload(newUniqueName);
        trimLedgers(newUniqueName);
        List list = this.admin.topics().getInternalStats(newUniqueName).ledgers;
        Assert.assertEquals(list.size(), 1);
        Assert.assertNotEquals(Long.valueOf(((ManagedLedgerInternalStats.LedgerInfo) list.get(0)).ledgerId), Long.valueOf(send2.getLedgerId()));
        Reader create2 = this.pulsarClient.newReader(Schema.STRING).topic(newUniqueName).subscriptionName("s1").startMessageId(new MessageIdImpl(send.getLedgerId(), send.getEntryId(), -1)).create();
        Assert.assertNull(create2.readNext(2, TimeUnit.SECONDS));
        Awaitility.await().untilAsserted(() -> {
            SubscriptionStats subscriptionStats = (SubscriptionStats) this.admin.topics().getStats(newUniqueName, true, true, true).getSubscriptions().get("s1");
            log.info("backlog size: {}", Long.valueOf(subscriptionStats.getMsgBacklog()));
            Assert.assertEquals(subscriptionStats.getMsgBacklog(), 0L);
            ManagedLedgerInternalStats.CursorStats cursorStats = (ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats(newUniqueName).cursors.get("s1");
            String[] split = cursorStats.markDeletePosition.split(":");
            PositionImpl positionImpl = PositionImpl.get(Long.valueOf(split[0]).longValue(), Long.valueOf(split[1]).longValue());
            PositionImpl positionImpl2 = PositionImpl.get(send2.getLedgerId(), send2.getEntryId());
            log.info("Expected mark deleted position: {}", positionImpl2);
            log.info("Actual mark deleted position: {}", cursorStats.markDeletePosition);
            AssertJUnit.assertTrue(positionImpl.compareTo(positionImpl2) >= 0);
        });
        create2.close();
        create.close();
        this.admin.topics().delete(newUniqueName, false);
    }

    @Test
    public void testTrimLedgerIfNoDurableCursor() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        Reader create = this.pulsarClient.newReader(Schema.STRING).topic(newUniqueName).receiverQueueSize(1).subscriptionName("non-durable-cursor").startMessageId(MessageIdImpl.earliest).create();
        this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{newUniqueName}).receiverQueueSize(1).subscriptionName("durable-cursor").subscribe().close();
        Producer create2 = this.pulsarClient.newProducer(Schema.STRING).topic(newUniqueName).create();
        create2.send("1");
        create2.send("2");
        create2.send("3");
        create2.send("4");
        MessageIdImpl send = create2.send("5");
        Assert.assertEquals((String) create.readNext(2, TimeUnit.SECONDS).getValue(), "1");
        Assert.assertEquals((String) create.readNext(2, TimeUnit.SECONDS).getValue(), "2");
        Assert.assertEquals((String) create.readNext(2, TimeUnit.SECONDS).getValue(), "3");
        this.admin.topics().unload(newUniqueName);
        Thread.sleep(3000L);
        this.admin.topics().deleteSubscription(newUniqueName, "durable-cursor");
        trimLedgers(newUniqueName);
        List list = this.admin.topics().getInternalStats(newUniqueName).ledgers;
        Assert.assertEquals(list.size(), 1);
        Assert.assertNotEquals(Long.valueOf(((ManagedLedgerInternalStats.LedgerInfo) list.get(0)).ledgerId), Long.valueOf(send.getLedgerId()));
        Awaitility.await().untilAsserted(() -> {
            SubscriptionStats subscriptionStats = (SubscriptionStats) this.admin.topics().getStats(newUniqueName, true, true, true).getSubscriptions().get("non-durable-cursor");
            log.info("backlog size: {}", Long.valueOf(subscriptionStats.getMsgBacklog()));
            Assert.assertEquals(subscriptionStats.getMsgBacklog(), 0L);
            ManagedLedgerInternalStats.CursorStats cursorStats = (ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats(newUniqueName).cursors.get("non-durable-cursor");
            String[] split = cursorStats.markDeletePosition.split(":");
            PositionImpl positionImpl = PositionImpl.get(Long.valueOf(split[0]).longValue(), Long.valueOf(split[1]).longValue());
            PositionImpl positionImpl2 = PositionImpl.get(send.getLedgerId(), send.getEntryId());
            log.info("Expected mark deleted position: {}", positionImpl2);
            log.info("Actual mark deleted position: {}", cursorStats.markDeletePosition);
            Assert.assertTrue(positionImpl.compareTo(positionImpl2) >= 0);
        });
        while (true) {
            Message readNext = create.readNext(2, TimeUnit.SECONDS);
            if (readNext == null) {
                ManagedLedgerImpl managedLedger = ((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(newUniqueName, false).join()).get()).getManagedLedger();
                ManagedCursorImpl managedCursorImpl = managedLedger.getCursors().get("non-durable-cursor");
                Assert.assertEquals(0L, managedCursorImpl.getNumberOfEntries());
                Assert.assertEquals(0L, managedLedger.getNumberOfEntries());
                create2.send("6");
                create2.send("7");
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertEquals(2L, managedLedger.getNumberOfEntries());
                    Assert.assertEquals(1L, managedCursorImpl.getNumberOfEntries());
                });
                Assert.assertEquals(0L, ((ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats(newUniqueName).cursors.get("non-durable-cursor")).messagesConsumedCounter);
                Assert.assertEquals((String) create.readNext(2, TimeUnit.SECONDS).getValue(), "6");
                Assert.assertEquals((String) create.readNext(2, TimeUnit.SECONDS).getValue(), "7");
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertEquals(2L, ((ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats(newUniqueName).cursors.get("non-durable-cursor")).messagesConsumedCounter);
                });
                create.close();
                create2.close();
                this.admin.topics().delete(newUniqueName, false);
                return;
            }
            log.info("clear msg: {}", readNext.getValue());
        }
    }
}
