/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.ReplicatorTestBase;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"quarantine"})
public class ReplicatorRateLimiterTest
extends ReplicatorTestBase {
    protected String methodName;
    private static final Logger log = LoggerFactory.getLogger(ReplicatorRateLimiterTest.class);

    @BeforeMethod
    public void beforeMethod(Method m) throws Exception {
        this.methodName = m.getName();
    }

    @Override
    @BeforeClass(timeOut=300000L)
    public void setup() throws Exception {
        super.setup();
    }

    @Override
    @AfterClass(alwaysRun=true, timeOut=300000L)
    public void cleanup() throws Exception {
        super.cleanup();
    }

    @DataProvider(name="dispatchRateType")
    public Object[][] dispatchRateProvider() {
        return new Object[][]{{DispatchRateType.messageRate}, {DispatchRateType.byteRate}};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReplicatorRatePriority() throws Exception {
        this.cleanup();
        this.config1.setSystemTopicEnabled(true);
        this.config1.setTopicLevelPoliciesEnabled(true);
        this.config1.setDispatchThrottlingRatePerReplicatorInMsg(100);
        this.config1.setDispatchThrottlingRatePerReplicatorInByte(200L);
        this.setup();
        String namespace = "pulsar/replicatorchange-" + System.currentTimeMillis();
        String topicName = "persistent://" + namespace + "/ratechange";
        this.admin1.namespaces().createNamespace(namespace);
        this.admin1.namespaces().setNamespaceReplicationClusters(namespace, (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2"}));
        PulsarClient client1 = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            client1.newProducer().topic(topicName).create().close();
            PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
            Assert.assertTrue((boolean)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().isPresent());
            Assert.assertEquals((long)((DispatchRateLimiter)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), (long)100L);
            Assert.assertEquals((long)((DispatchRateLimiter)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), (long)200L);
            DispatchRate nsDispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(50).dispatchThrottlingRateInByte(60L).ratePeriodInSecond(60).build();
            this.admin1.namespaces().setReplicatorDispatchRate(namespace, nsDispatchRate);
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin1.namespaces().getReplicatorDispatchRate(namespace), (Object)nsDispatchRate));
            Assert.assertEquals((long)((DispatchRateLimiter)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), (long)50L);
            Assert.assertEquals((long)((DispatchRateLimiter)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), (long)60L);
            DispatchRate topicRate = DispatchRate.builder().dispatchThrottlingRateInMsg(10).dispatchThrottlingRateInByte(20L).ratePeriodInSecond(30).build();
            this.admin1.topics().setReplicatorDispatchRate(topicName, topicRate);
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin1.topics().getReplicatorDispatchRate(topicName), (Object)topicRate));
            Assert.assertEquals((long)((DispatchRateLimiter)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), (long)10L);
            Assert.assertEquals((long)((DispatchRateLimiter)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), (long)20L);
            DispatchRate nsDispatchRate2 = DispatchRate.builder().dispatchThrottlingRateInMsg(500).dispatchThrottlingRateInByte(600L).ratePeriodInSecond(700).build();
            this.admin1.namespaces().setReplicatorDispatchRate(namespace, nsDispatchRate2);
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin1.namespaces().getReplicatorDispatchRate(namespace), (Object)nsDispatchRate2));
            Assert.assertEquals((long)((DispatchRateLimiter)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), (long)20L);
            this.admin1.topics().removeReplicatorDispatchRate(topicName);
            Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin1.topics().getReplicatorDispatchRate(topicName)));
            Assert.assertEquals((long)((DispatchRateLimiter)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), (long)500L);
            Assert.assertEquals((long)((DispatchRateLimiter)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), (long)600L);
            this.admin1.namespaces().setReplicatorDispatchRate(namespace, null);
            Awaitility.await().untilAsserted(() -> Assert.assertEquals((long)((DispatchRateLimiter)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), (long)100L));
            Assert.assertEquals((long)((DispatchRateLimiter)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), (long)200L);
        }
        finally {
            if (Collections.singletonList(client1).get(0) != null) {
                client1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReplicatorRateLimiterDynamicallyChange() throws Exception {
        log.info("--- Starting ReplicatorTest::{} --- ", (Object)this.methodName);
        String namespace = "pulsar/replicatorchange-" + System.currentTimeMillis();
        String topicName = "persistent://" + namespace + "/ratechange";
        this.admin1.namespaces().createNamespace(namespace);
        this.admin1.namespaces().setNamespaceReplicationClusters(namespace, (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2"}));
        PulsarClient client1 = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Producer producer = client1.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            producer.close();
            PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
            Assert.assertFalse((boolean)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().isPresent());
            int messageRate = 100;
            DispatchRate dispatchRateMsg = DispatchRate.builder().dispatchThrottlingRateInMsg(messageRate).dispatchThrottlingRateInByte(-1L).ratePeriodInSecond(360).build();
            this.admin1.namespaces().setReplicatorDispatchRate(namespace, dispatchRateMsg);
            boolean replicatorUpdated = false;
            int retry = 5;
            for (int i = 0; i < retry; ++i) {
                if (((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().isPresent()) {
                    replicatorUpdated = true;
                    break;
                }
                if (i == retry - 1) continue;
                Thread.sleep(100L);
            }
            Assert.assertTrue((boolean)replicatorUpdated);
            Assert.assertEquals((long)((DispatchRateLimiter)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), (long)messageRate);
            messageRate = 500;
            DispatchRate dispatchRateByte = DispatchRate.builder().dispatchThrottlingRateInMsg(-1).dispatchThrottlingRateInByte((long)messageRate).ratePeriodInSecond(360).build();
            this.admin1.namespaces().setReplicatorDispatchRate(namespace, dispatchRateByte);
            replicatorUpdated = false;
            for (int i = 0; i < retry; ++i) {
                if (((DispatchRateLimiter)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte() == (long)messageRate) {
                    replicatorUpdated = true;
                    break;
                }
                if (i == retry - 1) continue;
                Thread.sleep(100L);
            }
            Assert.assertTrue((boolean)replicatorUpdated);
            Assert.assertEquals((Object)this.admin1.namespaces().getReplicatorDispatchRate(namespace), (Object)dispatchRateByte);
        }
        finally {
            if (Collections.singletonList(client1).get(0) != null) {
                client1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="dispatchRateType")
    public void testReplicatorRateLimiterMessageNotReceivedAllMessages(DispatchRateType dispatchRateType) throws Exception {
        log.info("--- Starting ReplicatorTest::{} --- ", (Object)this.methodName);
        String namespace = "pulsar/replicatorbyteandmsg-" + dispatchRateType.toString() + "-" + System.currentTimeMillis();
        String topicName = "persistent://" + namespace + "/notReceivedAll";
        this.admin1.namespaces().createNamespace(namespace);
        this.admin1.namespaces().setNamespaceReplicationClusters(namespace, (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2"}));
        int messageRate = 100;
        DispatchRate dispatchRate = DispatchRateType.messageRate.equals((Object)dispatchRateType) ? DispatchRate.builder().dispatchThrottlingRateInMsg(100).dispatchThrottlingRateInByte(-1L).ratePeriodInSecond(360).build() : DispatchRate.builder().dispatchThrottlingRateInMsg(-1).dispatchThrottlingRateInByte(100L).ratePeriodInSecond(360).build();
        this.admin1.namespaces().setReplicatorDispatchRate(namespace, dispatchRate);
        PulsarClient client1 = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Producer producer = client1.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
            boolean replicatorUpdated = false;
            int retry = 5;
            for (int i = 0; i < retry; ++i) {
                if (((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().isPresent()) {
                    replicatorUpdated = true;
                    break;
                }
                if (i == retry - 1) continue;
                Thread.sleep(100L);
            }
            Assert.assertTrue((boolean)replicatorUpdated);
            if (DispatchRateType.messageRate.equals((Object)dispatchRateType)) {
                Assert.assertEquals((long)((DispatchRateLimiter)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), (long)100L);
            } else {
                Assert.assertEquals((long)((DispatchRateLimiter)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), (long)100L);
            }
            PulsarClient client2 = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                AtomicInteger totalReceived = new AtomicInteger(0);
                Consumer consumer = client2.newConsumer().topic(new String[]{topicName}).subscriptionName("sub2-in-cluster2").messageListener((MessageListener & Serializable)(c1, msg) -> {
                    Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
                    String receivedMessage = new String(msg.getData());
                    log.debug("Received message [{}] in the listener", (Object)receivedMessage);
                    totalReceived.incrementAndGet();
                }).subscribe();
                int numMessages = 500;
                for (int i = 0; i < numMessages; ++i) {
                    producer.send((Object)new byte[80]);
                }
                log.info("Received message number: [{}]", (Object)totalReceived.get());
                Assert.assertTrue((totalReceived.get() < 200 ? 1 : 0) != 0);
                consumer.close();
                producer.close();
            }
            finally {
                if (Collections.singletonList(client2).get(0) != null) {
                    client2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client1).get(0) != null) {
                client1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReplicatorRateLimiterMessageReceivedAllMessages() throws Exception {
        log.info("--- Starting ReplicatorTest::{} --- ", (Object)this.methodName);
        String namespace = "pulsar/replicatormsg-" + System.currentTimeMillis();
        String topicName = "persistent://" + namespace + "/notReceivedAll";
        this.admin1.namespaces().createNamespace(namespace);
        this.admin1.namespaces().setNamespaceReplicationClusters(namespace, (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2"}));
        int messageRate = 100;
        DispatchRate dispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(100).dispatchThrottlingRateInByte(-1L).ratePeriodInSecond(360).build();
        this.admin1.namespaces().setReplicatorDispatchRate(namespace, dispatchRate);
        PulsarClient client1 = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Producer producer = client1.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
            boolean replicatorUpdated = false;
            int retry = 5;
            for (int i = 0; i < retry; ++i) {
                if (((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().isPresent()) {
                    replicatorUpdated = true;
                    break;
                }
                if (i == retry - 1) continue;
                Thread.sleep(100L);
            }
            Assert.assertTrue((boolean)replicatorUpdated);
            Assert.assertEquals((long)((DispatchRateLimiter)((Replicator)topic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), (long)100L);
            PulsarClient client2 = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                int i;
                AtomicInteger totalReceived = new AtomicInteger(0);
                Consumer consumer = client2.newConsumer().topic(new String[]{topicName}).subscriptionName("sub2-in-cluster2").messageListener((MessageListener & Serializable)(c1, msg) -> {
                    Assert.assertNotNull((Object)msg, (String)"Message cannot be null");
                    String receivedMessage = new String(msg.getData());
                    log.debug("Received message [{}] in the listener", (Object)receivedMessage);
                    totalReceived.incrementAndGet();
                }).subscribe();
                int numMessages = 50;
                for (i = 0; i < numMessages; ++i) {
                    producer.send((Object)new byte[80]);
                }
                Thread.sleep(1000L);
                log.info("Received message number: [{}]", (Object)totalReceived.get());
                Assert.assertEquals((int)totalReceived.get(), (int)numMessages);
                numMessages = 200;
                for (i = 0; i < numMessages; ++i) {
                    producer.send((Object)new byte[80]);
                }
                Thread.sleep(1000L);
                log.info("Received message number: [{}]", (Object)totalReceived.get());
                Assert.assertEquals((int)totalReceived.get(), (int)100);
                consumer.close();
                producer.close();
            }
            finally {
                if (Collections.singletonList(client2).get(0) != null) {
                    client2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client1).get(0) != null) {
                client1.close();
            }
        }
    }

    static enum DispatchRateType {
        messageRate,
        byteRate;

    }
}

