/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.ProducerStats;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker-api"})
public class SimpleProducerConsumerStatTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerStatTest.class);

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetupForStatsTest();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider(name="batch")
    public Object[][] batchMessageDelayMsProvider() {
        return new Object[][]{{0}, {1000}};
    }

    @DataProvider(name="batch_with_timeout")
    public Object[][] ackTimeoutSecProvider() {
        return new Object[][]{{0, 0}, {0, 2}, {1000, 0}, {1000, 2}};
    }

    @Test(dataProvider="batch_with_timeout")
    public void testSyncProducerAndConsumer(int batchMessageDelayMs, int ackTimeoutSec) throws Exception {
        boolean isAckTimeoutTaskEnabledForCumulativeAck;
        log.info("-- Starting {} test --", (Object)this.methodName);
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/tp1/my-ns/my-topic1"}).subscriptionName("my-subscriber-name");
        boolean bl = isAckTimeoutTaskEnabledForCumulativeAck = ackTimeoutSec > 0;
        if (ackTimeoutSec > 0) {
            consumerBuilder.ackTimeout((long)ackTimeoutSec, TimeUnit.SECONDS);
        }
        Consumer consumer = consumerBuilder.subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/tp1/my-ns/my-topic1");
        if (batchMessageDelayMs != 0) {
            producerBuilder.enableBatching(true).batchingMaxPublishDelay((long)batchMessageDelayMs, TimeUnit.MILLISECONDS).batchingMaxMessages(5);
        }
        Producer producer = producerBuilder.create();
        int numMessages = 11;
        for (int i = 0; i < numMessages; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < numMessages; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.info("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative(msg);
        Thread.sleep(2000L);
        consumer.close();
        producer.close();
        this.validatingLogInfo(consumer, producer, isAckTimeoutTaskEnabledForCumulativeAck);
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="batch_with_timeout")
    public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs, int ackTimeoutSec) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/tp1/my-ns/my-topic2"}).subscriptionName("my-subscriber-name");
        if (ackTimeoutSec > 0) {
            consumerBuilder.ackTimeout((long)ackTimeoutSec, TimeUnit.SECONDS);
        }
        Consumer consumer = consumerBuilder.subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/tp1/my-ns/my-topic2").messageRoutingMode(MessageRoutingMode.SinglePartition);
        if (batchMessageDelayMs != 0) {
            producerBuilder.enableBatching(true).batchingMaxPublishDelay((long)batchMessageDelayMs, TimeUnit.MILLISECONDS).batchingMaxMessages(5);
        } else {
            producerBuilder.enableBatching(false);
        }
        Producer producer = producerBuilder.create();
        ArrayList futures = Lists.newArrayList();
        int numMessages = 50;
        for (int i = 0; i < numMessages; ++i) {
            String message = "my-message-" + i;
            CompletableFuture future = producer.sendAsync((Object)message.getBytes());
            futures.add(future);
        }
        log.info("Waiting for async publish to complete");
        for (Future future : futures) {
            future.get();
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < numMessages; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        CompletableFuture ackFuture = consumer.acknowledgeCumulativeAsync(msg);
        log.info("Waiting for async ack to complete");
        ackFuture.get();
        Thread.sleep(2000L);
        consumer.close();
        producer.close();
        this.validatingLogInfo(consumer, producer, batchMessageDelayMs == 0 && ackTimeoutSec > 0);
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="batch_with_timeout")
    public void testAsyncProducerAndReceiveAsyncAndAsyncAck(int batchMessageDelayMs, int ackTimeoutSec) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/tp1/my-ns/my-topic2"}).subscriptionName("my-subscriber-name");
        if (ackTimeoutSec > 0) {
            consumerBuilder.ackTimeout((long)ackTimeoutSec, TimeUnit.SECONDS);
        }
        Consumer consumer = consumerBuilder.subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/tp1/my-ns/my-topic2").messageRoutingMode(MessageRoutingMode.SinglePartition);
        if (batchMessageDelayMs != 0) {
            producerBuilder.enableBatching(true).batchingMaxPublishDelay((long)batchMessageDelayMs, TimeUnit.MILLISECONDS).batchingMaxMessages(5);
        } else {
            producerBuilder.enableBatching(false);
        }
        Producer producer = producerBuilder.create();
        ArrayList futures = Lists.newArrayList();
        int numMessages = 101;
        for (int i = 0; i < numMessages; ++i) {
            String message = "my-message-" + i;
            CompletableFuture future = producer.sendAsync((Object)message.getBytes());
            futures.add(future);
        }
        log.info("Waiting for async publish to complete");
        for (Future future : futures) {
            future.get();
        }
        Message msg = null;
        CompletableFuture future_msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < numMessages; ++i) {
            future_msg = consumer.receiveAsync();
            Thread.sleep(10L);
            msg = (Message)future_msg.get();
            String receivedMessage = new String(msg.getData());
            log.info("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        CompletableFuture ackFuture = consumer.acknowledgeCumulativeAsync(msg);
        log.info("Waiting for async ack to complete");
        ackFuture.get();
        Thread.sleep(5000L);
        consumer.close();
        producer.close();
        this.validatingLogInfo(consumer, producer, batchMessageDelayMs == 0 && ackTimeoutSec > 0);
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="batch", timeOut=100000L)
    public void testMessageListener(int batchMessageDelayMs) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        int numMessages = 100;
        CountDownLatch latch = new CountDownLatch(numMessages);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/tp1/my-ns/my-topic3"}).subscriptionName("my-subscriber-name").ackTimeout(100L, TimeUnit.SECONDS).messageListener((MessageListener & Serializable)(consumer1, msg) -> {
            Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
            String receivedMessage = new String(msg.getData());
            log.debug("Received message [{}] in the listener", (Object)receivedMessage);
            consumer1.acknowledgeAsync(msg);
            latch.countDown();
        }).subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/tp1/my-ns/my-topic3");
        if (batchMessageDelayMs != 0) {
            producerBuilder.enableBatching(true).batchingMaxPublishDelay((long)batchMessageDelayMs, TimeUnit.MILLISECONDS).batchingMaxMessages(5);
        }
        Producer producer = producerBuilder.create();
        ArrayList futures = Lists.newArrayList();
        for (int i = 0; i < numMessages; ++i) {
            String message = "my-message-" + i;
            CompletableFuture future = producer.sendAsync((Object)message.getBytes());
            futures.add(future);
        }
        log.info("Waiting for async publish to complete");
        for (Future future : futures) {
            future.get();
        }
        Thread.sleep(5000L);
        log.info("Waiting for message listener to ack all messages");
        Assert.assertTrue((boolean)latch.await(numMessages, TimeUnit.SECONDS), (String)"Timed out waiting for message listener acks");
        consumer.close();
        producer.close();
        this.validatingLogInfo(consumer, producer, true);
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="batch")
    public void testSendTimeout(int batchMessageDelayMs) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/tp1/my-ns/my-topic5"}).subscriptionName("my-subscriber-name").subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/tp1/my-ns/my-topic5").sendTimeout(1, TimeUnit.SECONDS);
        if (batchMessageDelayMs != 0) {
            producerBuilder.enableBatching(true).batchingMaxPublishDelay(2L * (long)batchMessageDelayMs, TimeUnit.MILLISECONDS).batchingMaxMessages(5);
        }
        Producer producer = producerBuilder.create();
        String message = "my-message";
        this.stopBroker();
        CompletableFuture future = producer.sendAsync((Object)"my-message".getBytes());
        try {
            future.get();
            Assert.fail((String)"Send operation should have failed");
        }
        catch (ExecutionException executionException) {
            // empty catch block
        }
        this.startBroker();
        Message msg = consumer.receive(3, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg);
        consumer.close();
        producer.close();
        Thread.sleep(1000L);
        ConsumerStats cStat = consumer.getStats();
        ProducerStats pStat = producer.getStats();
        Assert.assertEquals((long)pStat.getTotalMsgsSent(), (long)0L);
        Assert.assertEquals((long)pStat.getTotalSendFailed(), (long)1L);
        Assert.assertEquals((long)cStat.getTotalMsgsReceived(), (long)0L);
        Assert.assertEquals((long)cStat.getTotalMsgsReceived(), (long)cStat.getTotalAcksSent());
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    public void testBatchMessagesRateOut() throws PulsarClientException, InterruptedException, PulsarAdminException {
        log.info("-- Starting {} test --", (Object)this.methodName);
        String topicName = "persistent://my-property/cluster/my-ns/testBatchMessagesRateOut";
        double produceRate = 17.0;
        int batchSize = 5;
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("my-subscriber-name").subscribe();
        Producer producer = this.pulsarClient.newProducer().topic(topicName).batchingMaxMessages(batchSize).enableBatching(true).batchingMaxPublishDelay(2L, TimeUnit.SECONDS).create();
        AtomicBoolean runTest = new AtomicBoolean(true);
        Thread t1 = new Thread(() -> {
            RateLimiter r = RateLimiter.create((double)produceRate);
            while (runTest.get()) {
                r.acquire();
                producer.sendAsync((Object)"Hello World".getBytes());
                consumer.receiveAsync().thenAccept(arg_0 -> ((Consumer)consumer).acknowledgeAsync(arg_0));
            }
        });
        t1.start();
        Thread.sleep(2000L);
        runTest.set(false);
        this.pulsar.getBrokerService().updateRates();
        double actualRate = this.admin.topics().getStats((String)topicName).msgRateOut;
        Assert.assertTrue((actualRate > produceRate / (double)batchSize ? 1 : 0) != 0);
        consumer.unsubscribe();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    private void validatingLogInfo(Consumer<?> consumer, Producer<?> producer, boolean verifyAckCount) throws InterruptedException {
        Thread.sleep(1000L);
        ConsumerStats cStat = consumer.getStats();
        ProducerStats pStat = producer.getStats();
        Assert.assertEquals((long)pStat.getTotalMsgsSent(), (long)cStat.getTotalMsgsReceived());
        Assert.assertEquals((long)pStat.getTotalBytesSent(), (long)cStat.getTotalBytesReceived());
        Assert.assertEquals((long)pStat.getTotalMsgsSent(), (long)pStat.getTotalAcksReceived());
        if (verifyAckCount) {
            Assert.assertEquals((long)cStat.getTotalMsgsReceived(), (long)cStat.getTotalAcksSent());
        }
    }

    @Test
    public void testAddBrokerLatencyStats() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/tp1/my-ns/my-topic1");
        Producer producer = producerBuilder.create();
        int numMessages = 120;
        for (int i = 0; i < numMessages; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        this.pulsar.getBrokerService().updateRates();
        JsonArray metrics = this.admin.brokerStats().getMetrics();
        boolean latencyCaptured = false;
        for (int i = 0; i < metrics.size(); ++i) {
            try {
                String data = metrics.get(i).getAsJsonObject().get("metrics").toString();
                if (!data.contains("brk_AddEntryLatencyBuckets")) continue;
                JsonObject stat = metrics.get(i).getAsJsonObject().get("metrics").getAsJsonObject();
                for (String key : stat.keySet()) {
                    double val;
                    if (!key.startsWith("brk_AddEntryLatencyBuckets") || !((val = stat.get(key).getAsDouble()) > 0.0)) continue;
                    latencyCaptured = true;
                }
                System.out.println(stat.toString());
                continue;
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        Assert.assertTrue((boolean)latencyCaptured);
        producer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }
}

