/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class MessageTTLTest
extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(MessageTTLTest.class);

    @Override
    @BeforeClass
    protected void setup() throws Exception {
        this.conf.setTtlDurationDefaultInSeconds(1);
        this.conf.setBrokerDeleteInactiveTopicsEnabled(false);
        super.baseSetup();
    }

    @Override
    @AfterClass(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testMessageExpiryAfterTopicUnload() throws Exception {
        int numMsgs = 50;
        String topicName = "persistent://prop/ns-abc/testttl";
        String subscriptionName = "ttl-sub-1";
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/testttl"}).subscriptionName("ttl-sub-1").subscribe().close();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testttl").enableBatching(false).create();
        ArrayList sendFutureList = Lists.newArrayList();
        for (int i = 0; i < numMsgs; ++i) {
            byte[] message = ("my-message-" + i).getBytes();
            sendFutureList.add(producer.sendAsync((Object)message));
        }
        FutureUtil.waitForAll((List)sendFutureList).get();
        producer.close();
        this.admin.topics().unload("persistent://prop/ns-abc/testttl");
        this.admin.topics().getStats("persistent://prop/ns-abc/testttl");
        PersistentTopicInternalStats internalStatsBeforeExpire = this.admin.topics().getInternalStats("persistent://prop/ns-abc/testttl");
        PersistentTopicInternalStats.CursorStats statsBeforeExpire = (PersistentTopicInternalStats.CursorStats)internalStatsBeforeExpire.cursors.get("ttl-sub-1");
        log.info("markDeletePosition before expire {}", (Object)statsBeforeExpire.markDeletePosition);
        Assert.assertEquals((String)statsBeforeExpire.markDeletePosition, (String)PositionImpl.get((long)3L, (long)-1L).toString());
        Thread.sleep((long)this.conf.getTtlDurationDefaultInSeconds() * 2000L);
        log.info("***** run message expiry now");
        this.runMessageExpiryCheck();
        PersistentTopicInternalStats internalStatsAfterExpire = this.admin.topics().getInternalStats("persistent://prop/ns-abc/testttl");
        PersistentTopicInternalStats.CursorStats statsAfterExpire = (PersistentTopicInternalStats.CursorStats)internalStatsAfterExpire.cursors.get("ttl-sub-1");
        log.info("markDeletePosition after expire {}", (Object)statsAfterExpire.markDeletePosition);
        Assert.assertEquals((String)statsAfterExpire.markDeletePosition, (String)PositionImpl.get((long)3L, (long)(numMsgs - 1)).toString());
    }
}

