/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.stats;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class ConsumerStatsTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(ConsumerStatsTest.class);

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setMaxUnackedMessagesPerConsumer(0);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer() throws PulsarClientException, InterruptedException, PulsarAdminException {
        Assert.assertEquals((int)this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer(), (int)0);
        String topicName = "persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer"}).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscriptionName("sub").subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer").create();
        int messages = 10;
        for (int i = 0; i < 10; ++i) {
            producer.send((Object)("message-" + i).getBytes());
        }
        int received = 0;
        for (int i = 0; i < 10; ++i) {
            consumer.receive();
            ++received;
        }
        Assert.assertEquals((int)received, (int)10);
        received = 0;
        TopicStats stats = this.admin.topics().getStats("persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer");
        Assert.assertEquals((int)stats.getSubscriptions().size(), (int)1);
        Assert.assertEquals((int)((SubscriptionStats)stats.getSubscriptions().entrySet().iterator().next().getValue()).getConsumers().size(), (int)1);
        Assert.assertFalse((boolean)((ConsumerStats)((SubscriptionStats)stats.getSubscriptions().entrySet().iterator().next().getValue()).getConsumers().get(0)).isBlockedConsumerOnUnackedMsgs());
        Assert.assertEquals((int)((ConsumerStats)((SubscriptionStats)stats.getSubscriptions().entrySet().iterator().next().getValue()).getConsumers().get(0)).getUnackedMessages(), (int)10);
        for (int i = 0; i < 10; ++i) {
            consumer.acknowledge(consumer.receive());
            ++received;
        }
        Assert.assertEquals((int)received, (int)10);
        Thread.sleep(2000L);
        stats = this.admin.topics().getStats("persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer");
        Assert.assertFalse((boolean)((ConsumerStats)((SubscriptionStats)stats.getSubscriptions().entrySet().iterator().next().getValue()).getConsumers().get(0)).isBlockedConsumerOnUnackedMsgs());
        Assert.assertEquals((int)((ConsumerStats)((SubscriptionStats)stats.getSubscriptions().entrySet().iterator().next().getValue()).getConsumers().get(0)).getUnackedMessages(), (int)0);
    }

    @Test
    public void testAckStatsOnPartitionedTopicForExclusiveSubscription() throws PulsarAdminException, PulsarClientException, InterruptedException {
        int i;
        String topic = "persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription";
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription", 3);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription"}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("sub").subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription").create();
        int messages = 10;
        for (int i2 = 0; i2 < 10; ++i2) {
            producer.send((Object)("message-" + i2).getBytes());
        }
        int received = 0;
        for (i = 0; i < 10; ++i) {
            consumer.acknowledge(consumer.receive());
            ++received;
        }
        Assert.assertEquals((int)10, (int)received);
        Thread.sleep(2000L);
        for (i = 0; i < 3; ++i) {
            TopicStats stats = this.admin.topics().getStats("persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription-partition-" + i);
            Assert.assertEquals((int)stats.getSubscriptions().size(), (int)1);
            Assert.assertEquals((int)((SubscriptionStats)stats.getSubscriptions().entrySet().iterator().next().getValue()).getConsumers().size(), (int)1);
            Assert.assertEquals((int)((ConsumerStats)((SubscriptionStats)stats.getSubscriptions().entrySet().iterator().next().getValue()).getConsumers().get(0)).getUnackedMessages(), (int)0);
        }
    }

    @Test
    public void testUpdateStatsForActiveConsumerAndSubscription() throws Exception {
        String topicName = "persistent://prop/use/ns-abc/testUpdateStatsForActiveConsumerAndSubscription";
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testUpdateStatsForActiveConsumerAndSubscription"}).subscriptionType(SubscriptionType.Shared).subscriptionName("my-subscription").subscribe();
        PersistentTopic topicRef = (PersistentTopic)this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/testUpdateStatsForActiveConsumerAndSubscription").get();
        Assert.assertNotNull((Object)topicRef);
        Assert.assertEquals((long)topicRef.getSubscriptions().size(), (long)1L);
        List consumers = ((PersistentSubscription)topicRef.getSubscriptions().get((Object)"my-subscription")).getConsumers();
        Assert.assertEquals((int)consumers.size(), (int)1);
        ConsumerStatsImpl consumerStats = new ConsumerStatsImpl();
        consumerStats.msgOutCounter = 10L;
        consumerStats.bytesOutCounter = 1280L;
        ((org.apache.pulsar.broker.service.Consumer)consumers.get(0)).updateStats(consumerStats);
        ConsumerStatsImpl updatedStats = ((org.apache.pulsar.broker.service.Consumer)consumers.get(0)).getStats();
        Assert.assertEquals((long)updatedStats.getMsgOutCounter(), (long)10L);
        Assert.assertEquals((long)updatedStats.getBytesOutCounter(), (long)1280L);
    }

    @Test
    public void testConsumerStatsOutput() throws Exception {
        HashSet allowedFields = Sets.newHashSet((Object[])new String[]{"msgRateOut", "msgThroughputOut", "bytesOutCounter", "msgOutCounter", "msgRateRedeliver", "chunkedMessageRate", "consumerName", "availablePermits", "unackedMessages", "avgMessagesPerEntry", "blockedConsumerOnUnackedMsgs", "readPositionWhenJoining", "lastAckedTimestamp", "lastConsumedTimestamp", "keyHashRanges", "metadata", "address", "connectedSince", "clientVersion"});
        String topicName = "persistent://prop/use/ns-abc/testConsumerStatsOutput";
        String subName = "my-subscription";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testConsumerStatsOutput"}).subscriptionType(SubscriptionType.Shared).subscriptionName("my-subscription").subscribe();
        TopicStats stats = this.admin.topics().getStats("persistent://prop/use/ns-abc/testConsumerStatsOutput");
        ObjectMapper mapper = ObjectMapperFactory.create();
        JsonNode node = mapper.readTree(mapper.writer().writeValueAsString(((SubscriptionStats)stats.getSubscriptions().get("my-subscription")).getConsumers().get(0)));
        Iterator itr = node.fieldNames();
        while (itr.hasNext()) {
            String field = (String)itr.next();
            Assert.assertTrue((boolean)allowedFields.contains(field), (String)(field + " should not be exposed"));
        }
        consumer.close();
    }
}

