/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.nonpersistent;

import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class NonPersistentTopicTest
extends BrokerTestBase {
    @Override
    @BeforeMethod(alwaysRun=true)
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testAccumulativeStats() throws Exception {
        String topicName = "non-persistent://prop/ns-abc/aTopic";
        String sharedSubName = "shared";
        String failoverSubName = "failOver";
        Consumer consumer1 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"non-persistent://prop/ns-abc/aTopic"}).subscriptionType(SubscriptionType.Shared).subscriptionName("shared").subscribe();
        Consumer consumer2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"non-persistent://prop/ns-abc/aTopic"}).subscriptionType(SubscriptionType.Failover).subscriptionName("failOver").subscribe();
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("non-persistent://prop/ns-abc/aTopic").create();
        NonPersistentTopic topic = (NonPersistentTopic)this.pulsar.getBrokerService().getTopicReference("non-persistent://prop/ns-abc/aTopic").get();
        NonPersistentTopicStatsImpl stats = topic.getStats(false, false);
        Assert.assertEquals((long)stats.getBytesInCounter(), (long)0L);
        Assert.assertEquals((long)stats.getMsgInCounter(), (long)0L);
        Assert.assertEquals((long)stats.getBytesOutCounter(), (long)0L);
        Assert.assertEquals((long)stats.getMsgOutCounter(), (long)0L);
        producer.newMessage().value((Object)"test").eventTime(5L).send();
        Message msg = consumer1.receive();
        Assert.assertNotNull((Object)msg);
        msg = consumer2.receive();
        Assert.assertNotNull((Object)msg);
        NonPersistentTopicStatsImpl statsBeforeUnsubscribe = topic.getStats(false, false);
        Assert.assertTrue((statsBeforeUnsubscribe.getBytesInCounter() > 0L ? 1 : 0) != 0);
        Assert.assertTrue((statsBeforeUnsubscribe.getMsgInCounter() > 0L ? 1 : 0) != 0);
        Assert.assertTrue((statsBeforeUnsubscribe.getBytesOutCounter() > 0L ? 1 : 0) != 0);
        Assert.assertTrue((statsBeforeUnsubscribe.getMsgOutCounter() > 0L ? 1 : 0) != 0);
        consumer1.unsubscribe();
        consumer2.unsubscribe();
        producer.close();
        topic.getProducers().values().forEach(arg_0 -> ((NonPersistentTopic)topic).removeProducer(arg_0));
        Assert.assertEquals((int)topic.getProducers().size(), (int)0);
        NonPersistentTopicStatsImpl statsAfterUnsubscribe = topic.getStats(false, false);
        Assert.assertEquals((long)statsAfterUnsubscribe.getBytesInCounter(), (long)statsBeforeUnsubscribe.getBytesInCounter());
        Assert.assertEquals((long)statsAfterUnsubscribe.getMsgInCounter(), (long)statsBeforeUnsubscribe.getMsgInCounter());
        Assert.assertEquals((long)statsAfterUnsubscribe.getBytesOutCounter(), (long)statsBeforeUnsubscribe.getBytesOutCounter());
        Assert.assertEquals((long)statsAfterUnsubscribe.getMsgOutCounter(), (long)statsBeforeUnsubscribe.getMsgOutCounter());
    }
}

