/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.resourcegroup;

import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService;
import org.apache.pulsar.broker.resourcegroup.ResourceQuotaCalculator;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager;
import org.apache.pulsar.broker.service.resource.usage.NetworkUsage;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class ResourceGroupServiceTest
extends MockedPulsarServiceBaseTest {
    private ResourceGroupService rgs;
    int numAnonymousQuotaCalculations;
    private static final Logger log = LoggerFactory.getLogger(ResourceGroupServiceTest.class);
    private static final int PUBLISH_INTERVAL_SECS = 500;

    @Override
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        this.prepareData();
        ResourceQuotaCalculator dummyQuotaCalc = new ResourceQuotaCalculator(){
            private long numLocalReportsEvaluated;

            public boolean needToReportLocalUsage(long currentBytesUsed, long lastReportedBytes, long currentMessagesUsed, long lastReportedMessages, long lastReportTimeMSecsSinceEpoch) {
                int maxSuppressRounds = 5;
                return ++this.numLocalReportsEvaluated % 5L == 4L;
            }

            public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) {
                return ++ResourceGroupServiceTest.this.numAnonymousQuotaCalculations;
            }
        };
        ResourceUsageTopicTransportManager transportMgr = new ResourceUsageTopicTransportManager(this.pulsar);
        this.rgs = new ResourceGroupService(this.pulsar, TimeUnit.MILLISECONDS, transportMgr, dummyQuotaCalc);
    }

    @Override
    @AfterClass(alwaysRun=true)
    protected void cleanup() throws Exception {
        log.info("numAnonymousQuotaCalculations={}", (Object)this.numAnonymousQuotaCalculations);
        super.internalCleanup();
    }

    @Test
    public void measureOpsTime() throws PulsarAdminException {
        ResourceGroup.ResourceGroupMonitoringClass monClass;
        int numPerfTestIterations = 10000000;
        org.apache.pulsar.common.policies.data.ResourceGroup rgConfig = new org.apache.pulsar.common.policies.data.ResourceGroup();
        ResourceGroup.BytesAndMessagesCount stats = new ResourceGroup.BytesAndMessagesCount();
        String rgName = "measureRGIncStatTime";
        this.rgs.resourceGroupCreate("measureRGIncStatTime", rgConfig);
        ResourceGroup rg = this.rgs.resourceGroupGet("measureRGIncStatTime");
        long mSecsStart = System.currentTimeMillis();
        for (int ix = 0; ix < 10000000; ++ix) {
            for (int monClassIdx = 0; monClassIdx < ResourceGroup.ResourceGroupMonitoringClass.values().length; ++monClassIdx) {
                monClass = ResourceGroup.ResourceGroupMonitoringClass.values()[monClassIdx];
                rg.incrementLocalUsageStats(monClass, stats);
            }
        }
        long mSecsEnd = System.currentTimeMillis();
        long diffMsecs = mSecsEnd - mSecsStart;
        log.info("{} iterations of incrementLocalUsageStats on retRG in {} msecs ({} usecs for each)", new Object[]{10000000, diffMsecs, Float.valueOf(1000.0f * (float)diffMsecs / 1.0E7f)});
        String tenantName = "SomeTenant";
        String namespaceName = "SomeNameSpace";
        this.rgs.registerTenant("measureRGIncStatTime", "SomeTenant");
        this.rgs.registerNameSpace("measureRGIncStatTime", "SomeNameSpace");
        mSecsStart = System.currentTimeMillis();
        for (int ix = 0; ix < 10000000; ++ix) {
            for (int monClassIdx = 0; monClassIdx < ResourceGroup.ResourceGroupMonitoringClass.values().length; ++monClassIdx) {
                monClass = ResourceGroup.ResourceGroupMonitoringClass.values()[monClassIdx];
                this.rgs.incrementUsage("SomeTenant", "SomeNameSpace", monClass, stats);
            }
        }
        mSecsEnd = System.currentTimeMillis();
        diffMsecs = mSecsEnd - mSecsStart;
        log.info("{} iterations of incrementUsage on RGS in {} msecs ({} usecs for each)", new Object[]{10000000, diffMsecs, Float.valueOf(1000.0f * (float)diffMsecs / 1.0E7f)});
        this.rgs.unRegisterTenant("measureRGIncStatTime", "SomeTenant");
        this.rgs.unRegisterNameSpace("measureRGIncStatTime", "SomeNameSpace");
        mSecsStart = System.currentTimeMillis();
        for (int ix = 0; ix < 10000000; ++ix) {
            ResourceGroup retRG = this.rgs.resourceGroupGet(rg.resourceGroupName);
        }
        mSecsEnd = System.currentTimeMillis();
        diffMsecs = mSecsEnd - mSecsStart;
        log.info("{} iterations of GET on RGS in {} msecs ({} usecs for each)", new Object[]{10000000, diffMsecs, Float.valueOf(1000.0f * (float)diffMsecs / 1.0E7f)});
        this.rgs.resourceGroupDelete("measureRGIncStatTime");
    }

    @Test
    public void testResourceGroupOps() throws PulsarAdminException, InterruptedException {
        org.apache.pulsar.common.policies.data.ResourceGroup rgConfig = new org.apache.pulsar.common.policies.data.ResourceGroup();
        String rgName = "testRG";
        String randomRgName = "Something";
        rgConfig.setPublishRateInBytes(15000L);
        rgConfig.setPublishRateInMsgs(100);
        rgConfig.setDispatchRateInBytes(40000L);
        rgConfig.setDispatchRateInMsgs(500);
        int initialNumQuotaCalculations = this.numAnonymousQuotaCalculations;
        this.rgs.resourceGroupCreate("testRG", rgConfig);
        Assert.assertThrows(PulsarAdminException.class, () -> this.rgs.resourceGroupCreate("testRG", rgConfig));
        org.apache.pulsar.common.policies.data.ResourceGroup randomConfig = new org.apache.pulsar.common.policies.data.ResourceGroup();
        Assert.assertThrows(PulsarAdminException.class, () -> this.rgs.resourceGroupUpdate("Something", randomConfig));
        rgConfig.setPublishRateInBytes(rgConfig.getPublishRateInBytes() * 10L);
        rgConfig.setPublishRateInMsgs(rgConfig.getPublishRateInMsgs() * 10);
        rgConfig.setDispatchRateInBytes(rgConfig.getDispatchRateInBytes() / 10L);
        rgConfig.setDispatchRateInMsgs(rgConfig.getDispatchRateInMsgs() / 10);
        this.rgs.resourceGroupUpdate("testRG", rgConfig);
        Assert.assertEquals((long)this.rgs.getNumResourceGroups(), (long)1L);
        ResourceGroup retRG = null;
        retRG = this.rgs.resourceGroupGet("Something");
        Assert.assertEquals((Object)retRG, null);
        retRG = this.rgs.resourceGroupGet("testRG");
        Assert.assertNotEquals((Object)retRG, null);
        ResourceGroup.PerMonitoringClassFields monClassFields = retRG.monitoringClassFields[ResourceGroup.ResourceGroupMonitoringClass.Publish.ordinal()];
        Assert.assertEquals((long)monClassFields.configValuesPerPeriod.bytes, (long)rgConfig.getPublishRateInBytes());
        Assert.assertEquals((long)monClassFields.configValuesPerPeriod.messages, (long)rgConfig.getPublishRateInMsgs());
        monClassFields = retRG.monitoringClassFields[ResourceGroup.ResourceGroupMonitoringClass.Dispatch.ordinal()];
        Assert.assertEquals((long)monClassFields.configValuesPerPeriod.bytes, (long)rgConfig.getDispatchRateInBytes());
        Assert.assertEquals((long)monClassFields.configValuesPerPeriod.messages, (long)rgConfig.getDispatchRateInMsgs());
        Assert.assertThrows(PulsarAdminException.class, () -> this.rgs.resourceGroupDelete("Something"));
        Assert.assertEquals((long)this.rgs.getNumResourceGroups(), (long)1L);
        String SOME_RANDOM_TOPIC = "persistent://FakeTenant/FakeNameSpace/FakeTopic";
        TopicName topic = TopicName.get((String)"persistent://FakeTenant/FakeNameSpace/FakeTopic");
        String tenantName = topic.getTenant();
        String namespaceName = topic.getNamespacePortion();
        this.rgs.registerTenant("testRG", tenantName);
        this.rgs.registerNameSpace("testRG", namespaceName);
        Assert.assertThrows(PulsarAdminException.class, () -> this.rgs.resourceGroupDelete("testRG"));
        ResourceUsage usage = new ResourceUsage();
        for (int monClassIdx = 0; monClassIdx < ResourceGroup.ResourceGroupMonitoringClass.values().length; ++monClassIdx) {
            ResourceGroup.ResourceGroupMonitoringClass monClass = ResourceGroup.ResourceGroupMonitoringClass.values()[monClassIdx];
            NetworkUsage nwUsage = monClass == ResourceGroup.ResourceGroupMonitoringClass.Publish ? usage.setPublish() : usage.setDispatch();
            HashSet<Boolean> myBoolSet = new HashSet<Boolean>();
            myBoolSet.clear();
            for (int idx = 0; idx < 5; ++idx) {
                boolean needToReport = retRG.setUsageInMonitoredEntity(monClass, nwUsage);
                myBoolSet.add(needToReport);
            }
            Assert.assertTrue((boolean)myBoolSet.contains(true));
            Assert.assertTrue((boolean)myBoolSet.contains(false));
        }
        this.rgs.unRegisterTenant("testRG", tenantName);
        this.rgs.unRegisterNameSpace("testRG", namespaceName);
        ResourceGroup.BytesAndMessagesCount publishQuota = this.rgs.getPublishRateLimiters("testRG");
        int numQuotaCalcsDuringTest = this.numAnonymousQuotaCalculations - initialNumQuotaCalculations;
        if (numQuotaCalcsDuringTest == 0) {
            Assert.assertEquals((long)publishQuota.messages, (long)rgConfig.getPublishRateInMsgs());
            Assert.assertEquals((long)publishQuota.bytes, (long)rgConfig.getPublishRateInBytes());
        }
        this.rgs.calculateQuotaForAllResourceGroups();
        publishQuota = this.rgs.getPublishRateLimiters("testRG");
        Assert.assertTrue((publishQuota.messages > 0L && publishQuota.messages <= (long)this.numAnonymousQuotaCalculations ? 1 : 0) != 0);
        Assert.assertTrue((publishQuota.bytes > 0L && publishQuota.bytes <= (long)this.numAnonymousQuotaCalculations ? 1 : 0) != 0);
        this.rgs.resourceGroupDelete("testRG");
        Assert.assertThrows(PulsarAdminException.class, () -> this.rgs.getPublishRateLimiters("testRG"));
        Assert.assertEquals((long)this.rgs.getNumResourceGroups(), (long)0L);
    }

    private void prepareData() throws PulsarAdminException {
        this.conf.setResourceUsageTransportPublishIntervalInSecs(500);
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
    }
}

