package org.apache.pulsar.broker.loadbalance.extensions.reporter;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.PulsarStats;
import org.apache.pulsar.broker.stats.BrokerStats;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporterTest.class */
public class BrokerLoadDataReporterTest {
    PulsarService pulsar;
    LoadDataStore store;
    BrokerService brokerService;
    PulsarStats pulsarStats;
    ServiceConfiguration config;
    BrokerStats brokerStats;
    SystemResourceUsage usage;
    String broker = "broker1";
    String bundle = "bundle1";
    ScheduledExecutorService executor;

    @BeforeMethod
    void setup() {
        this.config = new ServiceConfiguration();
        this.config.setLoadBalancerDebugModeEnabled(true);
        this.pulsar = (PulsarService) Mockito.mock(PulsarService.class);
        this.store = (LoadDataStore) Mockito.mock(LoadDataStore.class);
        this.brokerService = (BrokerService) Mockito.mock(BrokerService.class);
        this.pulsarStats = (PulsarStats) Mockito.mock(PulsarStats.class);
        ((PulsarService) Mockito.doReturn(this.brokerService).when(this.pulsar)).getBrokerService();
        ((PulsarService) Mockito.doReturn(this.config).when(this.pulsar)).getConfiguration();
        this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager"));
        ((PulsarService) Mockito.doReturn(this.executor).when(this.pulsar)).getLoadManagerExecutor();
        ((BrokerService) Mockito.doReturn(this.pulsarStats).when(this.brokerService)).getPulsarStats();
        this.brokerStats = new BrokerStats(0);
        this.brokerStats.topics = 6;
        this.brokerStats.bundleCount = 5;
        this.brokerStats.msgRateIn = 3.0d;
        this.brokerStats.msgRateOut = 4.0d;
        this.brokerStats.msgThroughputIn = 1.0d;
        this.brokerStats.msgThroughputOut = 2.0d;
        ((BrokerService) Mockito.doReturn(this.pulsarStats).when(this.brokerService)).getPulsarStats();
        ((PulsarStats) Mockito.doReturn(this.brokerStats).when(this.pulsarStats)).getBrokerStats();
        ((LoadDataStore) Mockito.doReturn(CompletableFuture.completedFuture(null)).when(this.store)).pushAsync((String) ArgumentMatchers.any(), ArgumentMatchers.any());
        ((LoadDataStore) Mockito.doReturn(CompletableFuture.completedFuture(null)).when(this.store)).removeAsync((String) ArgumentMatchers.any());
        this.usage = new SystemResourceUsage();
        this.usage.setCpu(new ResourceUsage(1.0d, 100.0d));
        this.usage.setMemory(new ResourceUsage(800.0d, 200.0d));
        this.usage.setDirectMemory(new ResourceUsage(2.0d, 100.0d));
        this.usage.setBandwidthIn(new ResourceUsage(3.0d, 100.0d));
        this.usage.setBandwidthOut(new ResourceUsage(4.0d, 100.0d));
    }

    @AfterMethod
    void shutdown() {
        this.executor.shutdown();
    }

    public void testGenerate() throws IllegalAccessException {
        MockedStatic mockStatic = Mockito.mockStatic(LoadManagerShared.class);
        try {
            mockStatic.when(() -> {
                LoadManagerShared.getSystemResourceUsage((BrokerHostUsage) ArgumentMatchers.any());
            }).thenReturn(this.usage);
            ((PulsarStats) Mockito.doReturn(0L).when(this.pulsarStats)).getUpdatedAt();
            BrokerLoadDataReporter brokerLoadDataReporter = new BrokerLoadDataReporter(this.pulsar, "", this.store);
            BrokerLoadData brokerLoadData = new BrokerLoadData();
            brokerLoadData.update(this.usage, 1.0d, 2.0d, 3.0d, 4.0d, 5, 6, this.config);
            FieldUtils.writeDeclaredField(brokerLoadData, "updatedAt", 0L, true);
            BrokerLoadData generateLoadData = brokerLoadDataReporter.generateLoadData();
            FieldUtils.writeDeclaredField(generateLoadData, "updatedAt", 0L, true);
            Assert.assertEquals(generateLoadData, brokerLoadData);
            if (mockStatic != null) {
                mockStatic.close();
            }
        } catch (Throwable th) {
            if (mockStatic != null) {
                try {
                    mockStatic.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void testReport() throws IllegalAccessException {
        MockedStatic mockStatic = Mockito.mockStatic(LoadManagerShared.class);
        try {
            mockStatic.when(() -> {
                LoadManagerShared.getSystemResourceUsage((BrokerHostUsage) ArgumentMatchers.any());
            }).thenReturn(this.usage);
            BrokerLoadDataReporter brokerLoadDataReporter = new BrokerLoadDataReporter(this.pulsar, "broker-1", this.store);
            BrokerLoadData brokerLoadData = (BrokerLoadData) FieldUtils.readDeclaredField(brokerLoadDataReporter, "localData", true);
            brokerLoadData.setReportedAt(System.currentTimeMillis());
            BrokerLoadData brokerLoadData2 = (BrokerLoadData) FieldUtils.readDeclaredField(brokerLoadDataReporter, "lastData", true);
            brokerLoadData2.update(this.usage, 1.0d, 2.0d, 3.0d, 4.0d, 5, 6, this.config);
            brokerLoadDataReporter.reportAsync(false);
            ((LoadDataStore) Mockito.verify(this.store, Mockito.times(0))).pushAsync((String) ArgumentMatchers.any(), ArgumentMatchers.any());
            brokerLoadDataReporter.reportAsync(true);
            ((LoadDataStore) Mockito.verify(this.store, Mockito.times(1))).pushAsync((String) ArgumentMatchers.eq("broker-1"), ArgumentMatchers.any());
            brokerLoadDataReporter.reportAsync(false);
            ((LoadDataStore) Mockito.verify(this.store, Mockito.times(1))).pushAsync((String) ArgumentMatchers.eq("broker-1"), ArgumentMatchers.any());
            brokerLoadData.setReportedAt(0L);
            brokerLoadDataReporter.reportAsync(false);
            ((LoadDataStore) Mockito.verify(this.store, Mockito.times(2))).pushAsync((String) ArgumentMatchers.eq("broker-1"), ArgumentMatchers.any());
            brokerLoadData2.update(this.usage, 10000.0d, 2.0d, 3.0d, 4.0d, 5, 6, this.config);
            brokerLoadDataReporter.reportAsync(false);
            ((LoadDataStore) Mockito.verify(this.store, Mockito.times(3))).pushAsync((String) ArgumentMatchers.eq("broker-1"), ArgumentMatchers.any());
            if (mockStatic != null) {
                mockStatic.close();
            }
        } catch (Throwable th) {
            if (mockStatic != null) {
                try {
                    mockStatic.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testTombstone() throws IllegalAccessException, InterruptedException {
        BrokerLoadDataReporter brokerLoadDataReporter = (BrokerLoadDataReporter) Mockito.spy(new BrokerLoadDataReporter(this.pulsar, this.broker, this.store));
        brokerLoadDataReporter.handleEvent(this.bundle, new ServiceUnitStateData(ServiceUnitState.Assigning, this.broker, 1L), (Throwable) null);
        ((LoadDataStore) Mockito.verify(this.store, Mockito.times(0))).removeAsync((String) ArgumentMatchers.eq(this.broker));
        ((BrokerLoadDataReporter) Mockito.verify(brokerLoadDataReporter, Mockito.times(0))).tombstone();
        brokerLoadDataReporter.handleEvent(this.bundle, new ServiceUnitStateData(ServiceUnitState.Deleted, this.broker, 1L), (Throwable) null);
        ((LoadDataStore) Mockito.verify(this.store, Mockito.times(0))).removeAsync((String) ArgumentMatchers.eq(this.broker));
        ((BrokerLoadDataReporter) Mockito.verify(brokerLoadDataReporter, Mockito.times(0))).tombstone();
        brokerLoadDataReporter.handleEvent(this.bundle, new ServiceUnitStateData(ServiceUnitState.Init, this.broker, 1L), (Throwable) null);
        ((LoadDataStore) Mockito.verify(this.store, Mockito.times(0))).removeAsync((String) ArgumentMatchers.eq(this.broker));
        ((BrokerLoadDataReporter) Mockito.verify(brokerLoadDataReporter, Mockito.times(0))).tombstone();
        brokerLoadDataReporter.handleEvent(this.bundle, new ServiceUnitStateData(ServiceUnitState.Free, this.broker, 1L), (Throwable) null);
        ((LoadDataStore) Mockito.verify(this.store, Mockito.times(0))).removeAsync((String) ArgumentMatchers.eq(this.broker));
        ((BrokerLoadDataReporter) Mockito.verify(brokerLoadDataReporter, Mockito.times(0))).tombstone();
        brokerLoadDataReporter.handleEvent(this.bundle, new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-2", this.broker, 1L), new RuntimeException());
        ((LoadDataStore) Mockito.verify(this.store, Mockito.times(0))).removeAsync((String) ArgumentMatchers.eq(this.broker));
        ((BrokerLoadDataReporter) Mockito.verify(brokerLoadDataReporter, Mockito.times(0))).tombstone();
        brokerLoadDataReporter.handleEvent(this.bundle, new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-2", this.broker, 1L), (Throwable) null);
        Awaitility.waitAtMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
            ((BrokerLoadDataReporter) Mockito.verify(brokerLoadDataReporter, Mockito.times(1))).tombstone();
            ((LoadDataStore) Mockito.verify(this.store, Mockito.times(1))).removeAsync((String) ArgumentMatchers.eq(this.broker));
            Assert.assertEquals((BrokerLoadData) FieldUtils.readDeclaredField(brokerLoadDataReporter, "localData", true), new BrokerLoadData());
        });
        brokerLoadDataReporter.handleEvent(this.bundle, new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-2", this.broker, 1L), (Throwable) null);
        Awaitility.waitAtMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
            ((BrokerLoadDataReporter) Mockito.verify(brokerLoadDataReporter, Mockito.times(2))).tombstone();
            ((LoadDataStore) Mockito.verify(this.store, Mockito.times(1))).removeAsync((String) ArgumentMatchers.eq(this.broker));
            Assert.assertEquals((BrokerLoadData) FieldUtils.readDeclaredField(brokerLoadDataReporter, "localData", true), new BrokerLoadData());
        });
        FieldUtils.writeDeclaredField(brokerLoadDataReporter, "tombstoneDelayInMillis", 0, true);
        brokerLoadDataReporter.handleEvent(this.bundle, new ServiceUnitStateData(ServiceUnitState.Splitting, "broker-2", this.broker, 1L), (Throwable) null);
        Awaitility.waitAtMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
            ((BrokerLoadDataReporter) Mockito.verify(brokerLoadDataReporter, Mockito.times(3))).tombstone();
            ((LoadDataStore) Mockito.verify(this.store, Mockito.times(2))).removeAsync((String) ArgumentMatchers.eq(this.broker));
            Assert.assertEquals((BrokerLoadData) FieldUtils.readDeclaredField(brokerLoadDataReporter, "localData", true), new BrokerLoadData());
        });
        brokerLoadDataReporter.handleEvent(this.bundle, new ServiceUnitStateData(ServiceUnitState.Owned, this.broker, 1L), (Throwable) null);
        Awaitility.waitAtMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
            ((BrokerLoadDataReporter) Mockito.verify(brokerLoadDataReporter, Mockito.times(4))).tombstone();
            ((LoadDataStore) Mockito.verify(this.store, Mockito.times(3))).removeAsync((String) ArgumentMatchers.eq(this.broker));
            Assert.assertEquals((BrokerLoadData) FieldUtils.readDeclaredField(brokerLoadDataReporter, "localData", true), new BrokerLoadData());
        });
    }
}
