package org.apache.pulsar.broker.service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/MessageTTLTest.class */
public class MessageTTLTest extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(MessageTTLTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        this.conf.setTtlDurationDefaultInSeconds(1);
        this.conf.setBrokerDeleteInactiveTopicsEnabled(false);
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testMessageExpiryAfterTopicUnload() throws Exception {
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/testttl"}).subscriptionName("ttl-sub-1").subscribe().close();
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testttl").enableBatching(false).create();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 50; i++) {
            arrayList.add(create.sendAsync(("my-message-" + i).getBytes()));
        }
        FutureUtil.waitForAll(arrayList).get();
        MessageIdImpl messageIdImpl = (MessageIdImpl) ((CompletableFuture) arrayList.get(0)).get();
        MessageIdImpl messageIdImpl2 = (MessageIdImpl) ((CompletableFuture) arrayList.get(arrayList.size() - 1)).get();
        create.close();
        this.admin.topics().unload("persistent://prop/ns-abc/testttl");
        this.admin.topics().getStats("persistent://prop/ns-abc/testttl");
        ManagedLedgerInternalStats.CursorStats cursorStats = (ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats("persistent://prop/ns-abc/testttl").cursors.get("ttl-sub-1");
        log.info("markDeletePosition before expire {}", cursorStats.markDeletePosition);
        Assert.assertEquals(cursorStats.markDeletePosition, PositionFactory.create(messageIdImpl.getLedgerId(), -1L).toString());
        Awaitility.await().timeout(30L, TimeUnit.SECONDS).pollDelay(3L, TimeUnit.SECONDS).untilAsserted(() -> {
            runMessageExpiryCheck();
            log.info("***** run message expiry now");
            ManagedLedgerInternalStats.CursorStats cursorStats2 = (ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats("persistent://prop/ns-abc/testttl").cursors.get("ttl-sub-1");
            log.info("markDeletePosition after expire {}", cursorStats2.markDeletePosition);
            Assert.assertEquals(cursorStats2.markDeletePosition, PositionFactory.create(messageIdImpl2.getLedgerId(), messageIdImpl2.getEntryId()).toString());
        });
    }

    @Test
    public void testTTLPoliciesUpdate() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testTTLPoliciesUpdate").create();
        try {
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testTTLPoliciesUpdate").get();
            Assert.assertNotNull(persistentTopic);
            PersistentTopic persistentTopic2 = (PersistentTopic) Mockito.spy(persistentTopic);
            Policies policies = this.admin.namespaces().getPolicies("prop/ns-abc");
            policies.message_ttl_in_seconds = 10;
            persistentTopic2.onPoliciesUpdate(policies);
            ((PersistentTopic) Mockito.verify(persistentTopic2, Mockito.times(1))).checkMessageExpiry();
            TopicPolicies topicPolicies = new TopicPolicies();
            topicPolicies.setMessageTTLInSeconds(5);
            persistentTopic2.onUpdate(topicPolicies);
            ((PersistentTopic) Mockito.verify(persistentTopic2, Mockito.times(2))).checkMessageExpiry();
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testTtlFilteredByIgnoreSubscriptions() throws Exception {
        cleanup();
        HashSet hashSet = new HashSet();
        hashSet.add("__SUB_FILTER");
        this.conf.setAdditionalSystemCursorNames(hashSet);
        this.conf.setTtlDurationDefaultInSeconds(5);
        super.baseSetup();
        this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/testTTLFilteredByIgnoreSubscriptions"}).subscriptionName("__SUB_FILTER").subscribe().close();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic("persistent://prop/ns-abc/testTTLFilteredByIgnoreSubscriptions").create();
        for (int i = 0; i < 10; i++) {
            try {
                create.send("my-message-" + i);
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        }
        create.close();
        Optional topicReference = this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testTTLFilteredByIgnoreSubscriptions");
        Assert.assertTrue(topicReference.isPresent());
        PersistentSubscription subscription = ((Topic) topicReference.get()).getSubscription("__SUB_FILTER");
        Thread.sleep((5 - 1) * 1000);
        ((Topic) topicReference.get()).checkMessageExpiry();
        Thread.sleep(1000L);
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10L);
        Thread.sleep(2000L);
        ((Topic) topicReference.get()).checkMessageExpiry();
        retryStrategically(r6 -> {
            return subscription.getNumberOfEntriesInBacklog(false) == 0;
        }, 5, 200L);
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10L);
        this.conf.setAdditionalSystemCursorNames(new TreeSet());
        if (Collections.singletonList(create).get(0) != null) {
            create.close();
        }
    }

    @Test
    public void testTtlWithoutIgnoreSubscriptions() throws Exception {
        cleanup();
        this.conf.setTtlDurationDefaultInSeconds(5);
        this.conf.setBrokerDeleteInactiveTopicsEnabled(false);
        super.baseSetup();
        this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/testTTLWithoutIgnoreSubscriptions"}).subscriptionName("__SUB_FILTER").subscribe().close();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic("persistent://prop/ns-abc/testTTLWithoutIgnoreSubscriptions").create();
        for (int i = 0; i < 10; i++) {
            try {
                create.send("my-message-" + i);
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        }
        create.close();
        Optional topicReference = this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testTTLWithoutIgnoreSubscriptions");
        Assert.assertTrue(topicReference.isPresent());
        PersistentSubscription subscription = ((Topic) topicReference.get()).getSubscription("__SUB_FILTER");
        Thread.sleep((5 - 1) * 1000);
        ((Topic) topicReference.get()).checkMessageExpiry();
        Thread.sleep(1000L);
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 10L);
        Thread.sleep(2000L);
        ((Topic) topicReference.get()).checkMessageExpiry();
        retryStrategically(r6 -> {
            return subscription.getNumberOfEntriesInBacklog(false) == 0;
        }, 5, 200L);
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 0L);
        if (Collections.singletonList(create).get(0) != null) {
            create.close();
        }
    }
}
