/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class ConsumedLedgersTrimTest
extends BrokerTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumedLedgersTrimTest.class);

    @Override
    protected void setup() throws Exception {
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void TestConsumedLedgersTrim() throws Exception {
        this.conf.setRetentionCheckIntervalInSeconds(1);
        super.baseSetup();
        String topicName = "persistent://prop/ns-abc/TestConsumedLedgersTrim";
        String subscriptionName = "my-subscriber-name";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/TestConsumedLedgersTrim").producerName("producer-name").create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/TestConsumedLedgersTrim"}).subscriptionName("my-subscriber-name").subscribe();
            try {
                Topic topicRef = (Topic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/TestConsumedLedgersTrim").get();
                Assert.assertNotNull((Object)topicRef);
                PersistentTopic persistentTopic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://prop/ns-abc/TestConsumedLedgersTrim").get();
                ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
                managedLedgerConfig.setRetentionSizeInMB(1L);
                managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS);
                managedLedgerConfig.setMaxEntriesPerLedger(2);
                managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
                int msgNum = 10;
                for (int i = 0; i < msgNum; ++i) {
                    producer.send((Object)new byte[0x100000]);
                }
                ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)persistentTopic.getManagedLedger();
                Assert.assertEquals((int)managedLedger.getLedgersInfoAsList().size(), (int)(msgNum / 2));
                Thread.sleep(1200L);
                Assert.assertEquals((int)managedLedger.getLedgersInfoAsList().size(), (int)(msgNum / 2));
                for (int i = 0; i < msgNum; ++i) {
                    Message msg = consumer.receive(2, TimeUnit.SECONDS);
                    Assert.assertNotNull((Object)msg);
                    consumer.acknowledge(msg);
                }
                Assert.assertEquals((int)managedLedger.getLedgersInfoAsList().size(), (int)(msgNum / 2));
                Thread.sleep(1500L);
                Assert.assertEquals((int)managedLedger.getLedgersInfoAsList().size(), (int)1);
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConsumedLedgersTrimNoSubscriptions() throws Exception {
        this.conf.setRetentionCheckIntervalInSeconds(1);
        this.conf.setBrokerDeleteInactiveTopicsEnabled(false);
        super.baseSetup();
        String topicName = "persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions";
        Producer producer = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions").producerName("producer-name").create();
        try {
            PersistentTopic persistentTopic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions").get();
            ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
            managedLedgerConfig.setRetentionSizeInMB(-1L);
            managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS);
            managedLedgerConfig.setMaxEntriesPerLedger(1000);
            managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
            MessageId initialMessageId = (MessageId)persistentTopic.getLastMessageId().get();
            LOG.info("lastmessageid " + initialMessageId);
            int msgNum = 7;
            for (int i = 0; i < msgNum; ++i) {
                producer.send((Object)new byte[0x100000]);
            }
            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)persistentTopic.getManagedLedger();
            Assert.assertEquals((int)managedLedger.getLedgersInfoAsList().size(), (int)1);
            MessageId messageIdBeforeRestart = this.pulsar.getAdminClient().topics().getLastMessageId("persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions");
            LOG.info("messageIdBeforeRestart " + messageIdBeforeRestart);
            Assert.assertNotEquals((Object)messageIdBeforeRestart, (Object)initialMessageId);
            this.restartBroker();
            Awaitility.await().ignoreExceptions().untilAsserted(() -> Assert.assertNotNull(((Optional)this.pulsar.getBrokerService().getTopicIfExists("persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions").get(3L, TimeUnit.SECONDS)).get()));
            this.pulsar.getAdminClient().topics().getStats("persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions");
            MessageId messageIdAfterRestart = this.pulsar.getAdminClient().topics().getLastMessageId("persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions");
            LOG.info("lastmessageid " + messageIdAfterRestart);
            Assert.assertEquals((Object)messageIdAfterRestart, (Object)messageIdBeforeRestart);
            persistentTopic = (PersistentTopic)this.pulsar.getBrokerService().getOrCreateTopic("persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions").get();
            managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
            managedLedgerConfig.setRetentionSizeInMB(-1L);
            managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS);
            managedLedgerConfig.setMaxEntriesPerLedger(1);
            managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
            managedLedger = (ManagedLedgerImpl)persistentTopic.getManagedLedger();
            Assert.assertEquals((int)managedLedger.getLedgersInfoAsList().size(), (int)2);
            Thread.sleep(3000L);
            CompletableFuture f = new CompletableFuture();
            managedLedger.trimConsumedLedgersInBackground(f);
            f.join();
            MessageId messageIdAfterTrim = this.pulsar.getAdminClient().topics().getLastMessageId("persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions");
            LOG.info("lastmessageid " + messageIdAfterTrim);
            Assert.assertEquals((Object)messageIdAfterTrim, (Object)MessageId.earliest);
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }
}

