package org.apache.pulsar.broker.loadbalance.extensions.reporter;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.models.TopKBundles;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.PulsarStats;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporterTest.class */
public class TopBundleLoadDataReporterTest {
    PulsarService pulsar;
    LoadDataStore store;
    BrokerService brokerService;
    PulsarStats pulsarStats;
    Map<String, NamespaceBundleStats> bundleStats;
    ServiceConfiguration config;
    private NamespaceResources.IsolationPolicyResources isolationPolicyResources;
    private PulsarResources pulsarResources;
    private LocalPoliciesResources localPoliciesResources;
    String bundle1 = "my-tenant/my-namespace1/0x00000000_0x0FFFFFFF";
    String bundle2 = "my-tenant/my-namespace2/0x00000000_0x0FFFFFFF";
    String bundle = this.bundle1;
    String broker = "broker-1";

    @BeforeMethod
    void setup() throws MetadataStoreException {
        this.config = new ServiceConfiguration();
        this.config.setLoadBalancerDebugModeEnabled(true);
        this.pulsar = (PulsarService) Mockito.mock(PulsarService.class);
        this.store = (LoadDataStore) Mockito.mock(LoadDataStore.class);
        this.brokerService = (BrokerService) Mockito.mock(BrokerService.class);
        this.pulsarStats = (PulsarStats) Mockito.mock(PulsarStats.class);
        this.pulsarResources = (PulsarResources) Mockito.mock(PulsarResources.class);
        this.isolationPolicyResources = (NamespaceResources.IsolationPolicyResources) Mockito.mock(NamespaceResources.IsolationPolicyResources.class);
        NamespaceResources namespaceResources = (NamespaceResources) Mockito.mock(NamespaceResources.class);
        this.localPoliciesResources = (LocalPoliciesResources) Mockito.mock(LocalPoliciesResources.class);
        ((PulsarService) Mockito.doReturn(this.brokerService).when(this.pulsar)).getBrokerService();
        ((PulsarService) Mockito.doReturn(this.config).when(this.pulsar)).getConfiguration();
        ((BrokerService) Mockito.doReturn(this.pulsarStats).when(this.brokerService)).getPulsarStats();
        ((LoadDataStore) Mockito.doReturn(CompletableFuture.completedFuture(null)).when(this.store)).pushAsync((String) ArgumentMatchers.any(), ArgumentMatchers.any());
        ((LoadDataStore) Mockito.doReturn(CompletableFuture.completedFuture(null)).when(this.store)).removeAsync((String) ArgumentMatchers.any());
        ((PulsarService) Mockito.doReturn(this.pulsarResources).when(this.pulsar)).getPulsarResources();
        ((PulsarResources) Mockito.doReturn(namespaceResources).when(this.pulsarResources)).getNamespaceResources();
        ((NamespaceResources) Mockito.doReturn(this.isolationPolicyResources).when(namespaceResources)).getIsolationPolicies();
        ((NamespaceResources.IsolationPolicyResources) Mockito.doReturn(Optional.empty()).when(this.isolationPolicyResources)).getIsolationDataPolicies((String) ArgumentMatchers.any());
        ((PulsarResources) Mockito.doReturn(this.localPoliciesResources).when(this.pulsarResources)).getLocalPolicies();
        ((LocalPoliciesResources) Mockito.doReturn(Optional.empty()).when(this.localPoliciesResources)).getLocalPolicies((NamespaceName) ArgumentMatchers.any());
        this.bundleStats = new HashMap();
        NamespaceBundleStats namespaceBundleStats = new NamespaceBundleStats();
        namespaceBundleStats.msgRateIn = 500.0d;
        this.bundleStats.put(this.bundle1, namespaceBundleStats);
        NamespaceBundleStats namespaceBundleStats2 = new NamespaceBundleStats();
        namespaceBundleStats2.msgRateIn = 10000.0d;
        this.bundleStats.put(this.bundle2, namespaceBundleStats2);
        ((BrokerService) Mockito.doReturn(this.bundleStats).when(this.brokerService)).getBundleStats();
    }

    public void testZeroUpdatedAt() {
        ((PulsarStats) Mockito.doReturn(0L).when(this.pulsarStats)).getUpdatedAt();
        Assert.assertNull(new TopBundleLoadDataReporter(this.pulsar, "", this.store).generateLoadData());
    }

    public void testGenerateLoadData() throws IllegalAccessException {
        ((PulsarStats) Mockito.doReturn(1L).when(this.pulsarStats)).getUpdatedAt();
        this.config.setLoadBalancerMaxNumberOfBundlesInBundleLoadReport(2);
        TopBundleLoadDataReporter topBundleLoadDataReporter = new TopBundleLoadDataReporter(this.pulsar, "", this.store);
        TopKBundles topKBundles = new TopKBundles(this.pulsar);
        topKBundles.update(this.bundleStats, 2);
        Assert.assertEquals(topBundleLoadDataReporter.generateLoadData(), topKBundles.getLoadData());
        this.config.setLoadBalancerMaxNumberOfBundlesInBundleLoadReport(1);
        FieldUtils.writeDeclaredField(topBundleLoadDataReporter, "lastBundleStatsUpdatedAt", 0L, true);
        TopKBundles topKBundles2 = new TopKBundles(this.pulsar);
        topKBundles2.update(this.bundleStats, 1);
        Assert.assertEquals(topBundleLoadDataReporter.generateLoadData(), topKBundles2.getLoadData());
        this.config.setLoadBalancerMaxNumberOfBundlesInBundleLoadReport(0);
        FieldUtils.writeDeclaredField(topBundleLoadDataReporter, "lastBundleStatsUpdatedAt", 0L, true);
        TopKBundles topKBundles3 = new TopKBundles(this.pulsar);
        topKBundles3.update(this.bundleStats, 0);
        Assert.assertEquals(topBundleLoadDataReporter.generateLoadData(), topKBundles3.getLoadData());
        ((BrokerService) Mockito.doReturn(new HashMap()).when(this.brokerService)).getBundleStats();
        FieldUtils.writeDeclaredField(topBundleLoadDataReporter, "lastBundleStatsUpdatedAt", 0L, true);
        Assert.assertEquals(topBundleLoadDataReporter.generateLoadData(), new TopKBundles(this.pulsar).getLoadData());
    }

    public void testReportForce() {
        TopBundleLoadDataReporter topBundleLoadDataReporter = new TopBundleLoadDataReporter(this.pulsar, this.broker, this.store);
        topBundleLoadDataReporter.reportAsync(false);
        ((LoadDataStore) Mockito.verify(this.store, Mockito.times(0))).pushAsync((String) ArgumentMatchers.any(), ArgumentMatchers.any());
        topBundleLoadDataReporter.reportAsync(true);
        ((LoadDataStore) Mockito.verify(this.store, Mockito.times(1))).pushAsync(this.broker, new TopBundlesLoadData());
    }

    public void testReport() {
        this.pulsar.getConfiguration().setLoadBalancerMaxNumberOfBundlesInBundleLoadReport(1);
        TopBundleLoadDataReporter topBundleLoadDataReporter = new TopBundleLoadDataReporter(this.pulsar, this.broker, this.store);
        ((PulsarStats) Mockito.doReturn(1L).when(this.pulsarStats)).getUpdatedAt();
        TopKBundles topKBundles = new TopKBundles(this.pulsar);
        topKBundles.update(this.bundleStats, 1);
        topBundleLoadDataReporter.reportAsync(false);
        ((LoadDataStore) Mockito.verify(this.store, Mockito.times(1))).pushAsync(this.broker, topKBundles.getLoadData());
    }

    @Test
    public void testTombstone() throws IllegalAccessException {
        TopBundleLoadDataReporter topBundleLoadDataReporter = (TopBundleLoadDataReporter) Mockito.spy(new TopBundleLoadDataReporter(this.pulsar, this.broker, this.store));
        topBundleLoadDataReporter.handleEvent(this.bundle, new ServiceUnitStateData(ServiceUnitState.Assigning, this.broker, 1L), (Throwable) null);
        ((LoadDataStore) Mockito.verify(this.store, Mockito.times(0))).removeAsync((String) ArgumentMatchers.eq(this.broker));
        ((TopBundleLoadDataReporter) Mockito.verify(topBundleLoadDataReporter, Mockito.times(0))).tombstone();
        topBundleLoadDataReporter.handleEvent(this.bundle, new ServiceUnitStateData(ServiceUnitState.Deleted, this.broker, 1L), (Throwable) null);
        ((LoadDataStore) Mockito.verify(this.store, Mockito.times(0))).removeAsync((String) ArgumentMatchers.eq(this.broker));
        ((TopBundleLoadDataReporter) Mockito.verify(topBundleLoadDataReporter, Mockito.times(0))).tombstone();
        topBundleLoadDataReporter.handleEvent(this.bundle, new ServiceUnitStateData(ServiceUnitState.Init, this.broker, 1L), (Throwable) null);
        ((LoadDataStore) Mockito.verify(this.store, Mockito.times(0))).removeAsync((String) ArgumentMatchers.eq(this.broker));
        ((TopBundleLoadDataReporter) Mockito.verify(topBundleLoadDataReporter, Mockito.times(0))).tombstone();
        topBundleLoadDataReporter.handleEvent(this.bundle, new ServiceUnitStateData(ServiceUnitState.Free, this.broker, 1L), (Throwable) null);
        ((LoadDataStore) Mockito.verify(this.store, Mockito.times(0))).removeAsync((String) ArgumentMatchers.eq(this.broker));
        ((TopBundleLoadDataReporter) Mockito.verify(topBundleLoadDataReporter, Mockito.times(0))).tombstone();
        topBundleLoadDataReporter.handleEvent(this.bundle, new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-2", this.broker, 1L), new RuntimeException());
        ((LoadDataStore) Mockito.verify(this.store, Mockito.times(0))).removeAsync((String) ArgumentMatchers.eq(this.broker));
        ((TopBundleLoadDataReporter) Mockito.verify(topBundleLoadDataReporter, Mockito.times(0))).tombstone();
        topBundleLoadDataReporter.handleEvent(this.bundle, new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-2", this.broker, 1L), (Throwable) null);
        Awaitility.waitAtMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
            ((TopBundleLoadDataReporter) Mockito.verify(topBundleLoadDataReporter, Mockito.times(1))).tombstone();
            ((LoadDataStore) Mockito.verify(this.store, Mockito.times(1))).removeAsync((String) ArgumentMatchers.eq(this.broker));
        });
        topBundleLoadDataReporter.handleEvent(this.bundle, new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-2", this.broker, 1L), (Throwable) null);
        Awaitility.waitAtMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
            ((TopBundleLoadDataReporter) Mockito.verify(topBundleLoadDataReporter, Mockito.times(2))).tombstone();
            ((LoadDataStore) Mockito.verify(this.store, Mockito.times(1))).removeAsync((String) ArgumentMatchers.eq(this.broker));
        });
        FieldUtils.writeDeclaredField(topBundleLoadDataReporter, "tombstoneDelayInMillis", 0, true);
        topBundleLoadDataReporter.handleEvent(this.bundle, new ServiceUnitStateData(ServiceUnitState.Splitting, "broker-2", this.broker, 1L), (Throwable) null);
        Awaitility.waitAtMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
            ((TopBundleLoadDataReporter) Mockito.verify(topBundleLoadDataReporter, Mockito.times(3))).tombstone();
            ((LoadDataStore) Mockito.verify(this.store, Mockito.times(2))).removeAsync((String) ArgumentMatchers.eq(this.broker));
        });
        topBundleLoadDataReporter.handleEvent(this.bundle, new ServiceUnitStateData(ServiceUnitState.Owned, this.broker, 1L), (Throwable) null);
        Awaitility.waitAtMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
            ((TopBundleLoadDataReporter) Mockito.verify(topBundleLoadDataReporter, Mockito.times(4))).tombstone();
            ((LoadDataStore) Mockito.verify(this.store, Mockito.times(3))).removeAsync((String) ArgumentMatchers.eq(this.broker));
        });
    }
}
