package org.apache.pulsar.compaction;

import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.class */
public class GetLastMessageIdCompactedTest extends ProducerConsumerBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    public void doInitConf() throws Exception {
        super.doInitConf();
        this.conf.setBrokerServiceCompactionMonitorIntervalInSeconds(Integer.MAX_VALUE);
        this.conf.setRetentionCheckIntervalInSeconds(Integer.MAX_VALUE);
    }

    private MessageIdImpl getLastMessageIdByTopic(String str) throws Exception {
        return (MessageIdImpl) ((Topic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).get()).get()).getLastMessageId().get();
    }

    private void triggerCompactionAndWait(String str) throws Exception {
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).get()).get();
        persistentTopic.triggerCompaction();
        Awaitility.await().untilAsserted(() -> {
            PositionImpl lastConfirmedEntry = persistentTopic.getManagedLedger().getLastConfirmedEntry();
            PositionImpl markDeletedPosition = persistentTopic.getSubscription("__compaction").getCursor().getMarkDeletedPosition();
            Assert.assertEquals(markDeletedPosition.getLedgerId(), lastConfirmedEntry.getLedgerId());
            Assert.assertEquals(markDeletedPosition.getEntryId(), lastConfirmedEntry.getEntryId());
        });
    }

    private void triggerLedgerSwitch(String str) throws Exception {
        this.admin.topics().unload(str);
        Awaitility.await().until(() -> {
            CompletableFuture topic = this.pulsar.getBrokerService().getTopic(str, false);
            if (!topic.isDone() || topic.isCompletedExceptionally()) {
                return false;
            }
            Optional optional = (Optional) topic.join();
            if (optional.isPresent()) {
                return Boolean.valueOf(((PersistentTopic) optional.get()).getManagedLedger().getState() == ManagedLedgerImpl.State.LedgerOpened);
            }
            return false;
        });
    }

    private void clearAllTheLedgersOutdated(String str) throws Exception {
        ManagedLedgerImpl managedLedger = ((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).get()).get()).getManagedLedger();
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            CompletableFuture completableFuture = new CompletableFuture();
            managedLedger.trimConsumedLedgersInBackground(completableFuture);
            completableFuture.join();
            return Boolean.valueOf(managedLedger.getLedgersInfo().size() == 1);
        });
    }

    @Test
    public void testGetLastMessageIdWhenLedgerEmpty() throws Exception {
        String str = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
        Consumer<String> createConsumer = createConsumer(str, "sub");
        MessageIdImpl lastMessageId = createConsumer.getLastMessageId();
        Assert.assertEquals(lastMessageId.getLedgerId(), -1L);
        Assert.assertEquals(lastMessageId.getEntryId(), -1L);
        createConsumer.close();
        this.admin.topics().delete(str, false);
    }

    private Producer<String> createProducer(boolean z, String str) throws Exception {
        ProducerBuilder enableBatching = this.pulsarClient.newProducer(Schema.STRING).topic(str).enableBatching(z);
        if (z) {
            enableBatching.batchingMaxBytes(Integer.MAX_VALUE).batchingMaxPublishDelay(3L, TimeUnit.HOURS).batchingMaxBytes(Integer.MAX_VALUE);
        }
        return enableBatching.create();
    }

    private Consumer<String> createConsumer(String str, String str2) throws Exception {
        return this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName(str2).receiverQueueSize(1).readCompacted(true).subscribe();
    }

    @Test
    public void testGetLastMessageIdWhenNoNonEmptyLedgerExists() throws Exception {
        String str = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
        ReaderImpl create = this.pulsarClient.newReader(Schema.STRING).topic(str).subscriptionName("sub").receiverQueueSize(1).startMessageId(MessageId.earliest).readCompacted(false).create();
        Producer<String> createProducer = createProducer(false, str);
        createProducer.newMessage().key("k0").value("v0").sendAsync().get();
        create.readNext();
        triggerLedgerSwitch(str);
        clearAllTheLedgersOutdated(str);
        MessageIdImpl lastMessageId = create.getConsumer().getLastMessageId();
        Assert.assertEquals(lastMessageId.getLedgerId(), -1L);
        Assert.assertEquals(lastMessageId.getEntryId(), -1L);
        create.close();
        createProducer.close();
        this.admin.topics().delete(str, false);
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "enabledBatch")
    public Object[][] enabledBatch() {
        return new Object[]{new Object[]{true}, new Object[]{false}};
    }

    @Test(dataProvider = "enabledBatch")
    public void testGetLastMessageIdBeforeCompaction(boolean z) throws Exception {
        String str = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
        Consumer<String> createConsumer = createConsumer(str, "sub");
        Producer<String> createProducer = createProducer(z, str);
        ArrayList arrayList = new ArrayList();
        arrayList.add(createProducer.newMessage().key("k0").value("v0").sendAsync());
        arrayList.add(createProducer.newMessage().key("k0").value("v1").sendAsync());
        arrayList.add(createProducer.newMessage().key("k0").value("v2").sendAsync());
        createProducer.flush();
        arrayList.add(createProducer.newMessage().key("k1").value("v0").sendAsync());
        arrayList.add(createProducer.newMessage().key("k1").value("v1").sendAsync());
        arrayList.add(createProducer.newMessage().key("k1").value("v2").sendAsync());
        createProducer.flush();
        FutureUtil.waitForAll(arrayList).join();
        BatchMessageIdImpl lastMessageIdByTopic = getLastMessageIdByTopic(str);
        BatchMessageIdImpl batchMessageIdImpl = (MessageIdImpl) createConsumer.getLastMessageId();
        Assert.assertEquals(batchMessageIdImpl.getLedgerId(), lastMessageIdByTopic.getLedgerId());
        Assert.assertEquals(batchMessageIdImpl.getEntryId(), lastMessageIdByTopic.getEntryId());
        if (z) {
            BatchMessageIdImpl batchMessageIdImpl2 = lastMessageIdByTopic;
            BatchMessageIdImpl batchMessageIdImpl3 = batchMessageIdImpl;
            Assert.assertEquals(batchMessageIdImpl3.getBatchSize(), batchMessageIdImpl2.getBatchSize());
            Assert.assertEquals(batchMessageIdImpl3.getBatchIndex(), batchMessageIdImpl2.getBatchIndex());
        }
        createConsumer.close();
        createProducer.close();
        this.admin.topics().delete(str, false);
    }

    @Test(dataProvider = "enabledBatch")
    public void testGetLastMessageIdAfterCompaction(boolean z) throws Exception {
        String str = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
        Consumer<String> createConsumer = createConsumer(str, "sub");
        Producer<String> createProducer = createProducer(z, str);
        ArrayList arrayList = new ArrayList();
        arrayList.add(createProducer.newMessage().key("k0").value("v0").sendAsync());
        arrayList.add(createProducer.newMessage().key("k0").value("v1").sendAsync());
        arrayList.add(createProducer.newMessage().key("k0").value("v2").sendAsync());
        createProducer.flush();
        arrayList.add(createProducer.newMessage().key("k1").value("v0").sendAsync());
        arrayList.add(createProducer.newMessage().key("k1").value("v1").sendAsync());
        arrayList.add(createProducer.newMessage().key("k1").value("v2").sendAsync());
        createProducer.flush();
        FutureUtil.waitForAll(arrayList).join();
        triggerCompactionAndWait(str);
        BatchMessageIdImpl lastMessageIdByTopic = getLastMessageIdByTopic(str);
        BatchMessageIdImpl batchMessageIdImpl = (MessageIdImpl) createConsumer.getLastMessageId();
        Assert.assertEquals(batchMessageIdImpl.getLedgerId(), lastMessageIdByTopic.getLedgerId());
        Assert.assertEquals(batchMessageIdImpl.getEntryId(), lastMessageIdByTopic.getEntryId());
        if (z) {
            BatchMessageIdImpl batchMessageIdImpl2 = lastMessageIdByTopic;
            BatchMessageIdImpl batchMessageIdImpl3 = batchMessageIdImpl;
            Assert.assertEquals(batchMessageIdImpl3.getBatchSize(), batchMessageIdImpl2.getBatchSize());
            Assert.assertEquals(batchMessageIdImpl3.getBatchIndex(), batchMessageIdImpl2.getBatchIndex());
        }
        createConsumer.close();
        createProducer.close();
        this.admin.topics().delete(str, false);
    }

    @Test(dataProvider = "enabledBatch")
    public void testGetLastMessageIdAfterCompactionEndWithNullMsg(boolean z) throws Exception {
        String str = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
        Consumer<String> createConsumer = createConsumer(str, "sub");
        Producer<String> createProducer = createProducer(z, str);
        ArrayList arrayList = new ArrayList();
        arrayList.add(createProducer.newMessage().key("k0").value("v0").sendAsync());
        arrayList.add(createProducer.newMessage().key("k0").value("v1").sendAsync());
        arrayList.add(createProducer.newMessage().key("k0").value("v2").sendAsync());
        createProducer.flush();
        arrayList.add(createProducer.newMessage().key("k1").value("v0").sendAsync());
        arrayList.add(createProducer.newMessage().key("k1").value("v1").sendAsync());
        arrayList.add(createProducer.newMessage().key("k1").value((Object) null).sendAsync());
        arrayList.add(createProducer.newMessage().key("k2").value("v0").sendAsync());
        arrayList.add(createProducer.newMessage().key("k2").value("v1").sendAsync());
        arrayList.add(createProducer.newMessage().key("k2").value((Object) null).sendAsync());
        createProducer.flush();
        FutureUtil.waitForAll(arrayList).join();
        triggerCompactionAndWait(str);
        BatchMessageIdImpl batchMessageIdImpl = (MessageIdImpl) ((CompletableFuture) arrayList.get(2)).get();
        BatchMessageIdImpl batchMessageIdImpl2 = (MessageIdImpl) createConsumer.getLastMessageId();
        Assert.assertEquals(batchMessageIdImpl2.getLedgerId(), batchMessageIdImpl.getLedgerId());
        Assert.assertEquals(batchMessageIdImpl2.getEntryId(), batchMessageIdImpl.getEntryId());
        if (z) {
            BatchMessageIdImpl batchMessageIdImpl3 = batchMessageIdImpl;
            BatchMessageIdImpl batchMessageIdImpl4 = batchMessageIdImpl2;
            Assert.assertEquals(batchMessageIdImpl4.getBatchSize(), batchMessageIdImpl3.getBatchSize());
            Assert.assertEquals(batchMessageIdImpl4.getBatchIndex(), batchMessageIdImpl3.getBatchIndex());
        }
        createConsumer.close();
        createProducer.close();
        this.admin.topics().delete(str, false);
    }

    @Test(dataProvider = "enabledBatch")
    public void testGetLastMessageIdAfterCompactionEndWithNullMsg2(boolean z) throws Exception {
        String str = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
        Consumer<String> createConsumer = createConsumer(str, "sub");
        Producer<String> createProducer = createProducer(z, str);
        ArrayList arrayList = new ArrayList();
        arrayList.add(createProducer.newMessage().key("k0").value("v0").sendAsync());
        arrayList.add(createProducer.newMessage().key("k0").value("v1").sendAsync());
        createProducer.flush();
        arrayList.add(createProducer.newMessage().key("k1").value("v0").sendAsync());
        arrayList.add(createProducer.newMessage().key("k1").value("v1").sendAsync());
        arrayList.add(createProducer.newMessage().key("k1").value("v2").sendAsync());
        arrayList.add(createProducer.newMessage().key("k2").value("v0").sendAsync());
        arrayList.add(createProducer.newMessage().key("k2").value("v1").sendAsync());
        arrayList.add(createProducer.newMessage().key("k2").value((Object) null).sendAsync());
        createProducer.flush();
        FutureUtil.waitForAll(arrayList).join();
        triggerCompactionAndWait(str);
        BatchMessageIdImpl batchMessageIdImpl = (MessageIdImpl) ((CompletableFuture) arrayList.get(4)).get();
        BatchMessageIdImpl batchMessageIdImpl2 = (MessageIdImpl) createConsumer.getLastMessageId();
        Assert.assertEquals(batchMessageIdImpl2.getLedgerId(), batchMessageIdImpl.getLedgerId());
        Assert.assertEquals(batchMessageIdImpl2.getEntryId(), batchMessageIdImpl.getEntryId());
        if (z) {
            BatchMessageIdImpl batchMessageIdImpl3 = batchMessageIdImpl;
            BatchMessageIdImpl batchMessageIdImpl4 = batchMessageIdImpl2;
            Assert.assertEquals(batchMessageIdImpl4.getBatchSize(), batchMessageIdImpl3.getBatchSize());
            Assert.assertEquals(batchMessageIdImpl4.getBatchIndex(), batchMessageIdImpl3.getBatchIndex());
        }
        createConsumer.close();
        createProducer.close();
        this.admin.topics().delete(str, false);
    }

    @Test(dataProvider = "enabledBatch")
    public void testGetLastMessageIdAfterCompactionAllNullMsg(boolean z) throws Exception {
        String str = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
        Consumer<String> createConsumer = createConsumer(str, "sub");
        Producer<String> createProducer = createProducer(z, str);
        ArrayList arrayList = new ArrayList();
        arrayList.add(createProducer.newMessage().key("k0").value("v0").sendAsync());
        arrayList.add(createProducer.newMessage().key("k0").value((Object) null).sendAsync());
        createProducer.flush();
        arrayList.add(createProducer.newMessage().key("k1").value("v0").sendAsync());
        arrayList.add(createProducer.newMessage().key("k1").value((Object) null).sendAsync());
        arrayList.add(createProducer.newMessage().key("k2").value("v0").sendAsync());
        arrayList.add(createProducer.newMessage().key("k2").value((Object) null).sendAsync());
        createProducer.flush();
        FutureUtil.waitForAll(arrayList).join();
        triggerCompactionAndWait(str);
        MessageIdImpl lastMessageId = createConsumer.getLastMessageId();
        Assert.assertFalse(lastMessageId instanceof BatchMessageIdImpl);
        Assert.assertEquals(lastMessageId.getLedgerId(), -1L);
        Assert.assertEquals(lastMessageId.getEntryId(), -1L);
        createConsumer.close();
        createProducer.close();
        this.admin.topics().delete(str, false);
    }
}
