package org.apache.pulsar.broker.loadbalance;

import com.beust.jcommander.internal.Maps;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.class */
public class AntiAffinityNamespaceGroupTest {
    private LocalBookkeeperEnsemble bkEnsemble;
    private URL url1;
    private PulsarService pulsar1;
    private PulsarAdmin admin1;
    private URL url2;
    private PulsarService pulsar2;
    private PulsarAdmin admin2;
    private String primaryHost;
    private String secondaryHost;
    private NamespaceBundleFactory nsFactory;
    private ModularLoadManagerImpl primaryLoadManager;
    private ModularLoadManagerImpl secondaryLoadManager;
    private ExecutorService executor;

    private static Object getField(Object obj, String str) throws Exception {
        Field declaredField = obj.getClass().getDeclaredField(str);
        declaredField.setAccessible(true);
        return declaredField.get(obj);
    }

    @BeforeMethod
    void setup() throws Exception {
        this.executor = new ThreadPoolExecutor(5, 20, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue());
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> {
            return 0;
        });
        this.bkEnsemble.start();
        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
        serviceConfiguration.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
        serviceConfiguration.setClusterName("use");
        serviceConfiguration.setWebServicePort(Optional.of(0));
        serviceConfiguration.setMetadataStoreUrl("zk:127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
        serviceConfiguration.setBrokerShutdownTimeoutMs(0L);
        serviceConfiguration.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(Double.valueOf(1.0d)));
        serviceConfiguration.setBrokerServicePort(Optional.of(0));
        serviceConfiguration.setFailureDomainsEnabled(true);
        serviceConfiguration.setLoadBalancerEnabled(true);
        serviceConfiguration.setAdvertisedAddress("localhost");
        serviceConfiguration.setLoadBalancerBrokerOverloadedThresholdPercentage(400);
        createCluster(this.bkEnsemble.getZkClient(), serviceConfiguration);
        this.pulsar1 = new PulsarService(serviceConfiguration);
        this.pulsar1.start();
        this.primaryHost = String.format("%s:%d", "localhost", this.pulsar1.getListenPortHTTP().get());
        this.url1 = new URL("http://127.0.0.1:" + this.pulsar1.getListenPortHTTP().get());
        this.admin1 = PulsarAdmin.builder().serviceHttpUrl(this.url1.toString()).build();
        ServiceConfiguration serviceConfiguration2 = new ServiceConfiguration();
        serviceConfiguration2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
        serviceConfiguration2.setClusterName("use");
        serviceConfiguration2.setWebServicePort(Optional.of(0));
        serviceConfiguration2.setMetadataStoreUrl("zk:127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
        serviceConfiguration2.setBrokerShutdownTimeoutMs(0L);
        serviceConfiguration2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(Double.valueOf(1.0d)));
        serviceConfiguration2.setBrokerServicePort(Optional.of(0));
        serviceConfiguration2.setFailureDomainsEnabled(true);
        serviceConfiguration2.setAdvertisedAddress("localhost");
        serviceConfiguration2.setLoadBalancerBrokerOverloadedThresholdPercentage(400);
        this.pulsar2 = new PulsarService(serviceConfiguration2);
        this.pulsar2.start();
        this.secondaryHost = String.format("%s:%d", "localhost", this.pulsar2.getListenPortHTTP().get());
        this.url2 = new URL("http://127.0.0.1:" + serviceConfiguration2.getWebServicePort().get());
        this.admin2 = PulsarAdmin.builder().serviceHttpUrl(this.url2.toString()).build();
        this.primaryLoadManager = (ModularLoadManagerImpl) getField(this.pulsar1.getLoadManager().get(), "loadManager");
        this.secondaryLoadManager = (ModularLoadManagerImpl) getField(this.pulsar2.getLoadManager().get(), "loadManager");
        this.nsFactory = new NamespaceBundleFactory(this.pulsar1, Hashing.crc32());
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.pulsar1.getState(), PulsarService.State.Started);
            Assert.assertEquals(this.pulsar2.getState(), PulsarService.State.Started);
        });
    }

    @AfterMethod(alwaysRun = true)
    void shutdown() throws Exception {
        this.executor.shutdownNow();
        this.admin1.close();
        this.admin2.close();
        this.pulsar2.close();
        this.pulsar1.close();
        this.bkEnsemble.stop();
    }

    private void createCluster(ZooKeeper zooKeeper, ServiceConfiguration serviceConfiguration) throws Exception {
        ZkUtils.createFullPathOptimistic(zooKeeper, "/admin/clusters/" + serviceConfiguration.getClusterName(), ObjectMapperFactory.getThreadLocal().writeValueAsBytes(ClusterData.builder().serviceUrl("http://" + serviceConfiguration.getAdvertisedAddress() + ":" + serviceConfiguration.getWebServicePort().get()).build()), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    @Test
    public void testClusterDomain() {
    }

    @Test
    public void testAntiAffinityNamespaceFilteringWithDomain() throws Exception {
        this.pulsar1.getConfiguration().setFailureDomainsEnabled(true);
        this.admin1.tenants().createTenant("my-tenant", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2"}), Sets.newHashSet(new String[]{"use"})));
        for (int i = 0; i < 5; i++) {
            String str = "my-tenant/use/my-ns" + i;
            this.admin1.namespaces().createNamespace(str);
            this.admin1.namespaces().setNamespaceAntiAffinityGroup(str, "my-antiaffinity");
        }
        HashSet newHashSet = Sets.newHashSet();
        Map newHashMap = Maps.newHashMap();
        newHashSet.add("brokerName-0");
        newHashMap.put("brokerName-0", "domain-0");
        newHashSet.add("brokerName-1");
        newHashMap.put("brokerName-1", "domain-0");
        newHashSet.add("brokerName-2");
        newHashMap.put("brokerName-2", "domain-1");
        newHashSet.add("brokerName-3");
        newHashMap.put("brokerName-3", "domain-1");
        HashSet newHashSet2 = Sets.newHashSet();
        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> build = ConcurrentOpenHashMap.newBuilder().build();
        Assert.assertEquals(newHashSet.size(), 4);
        newHashSet2.addAll(newHashSet);
        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(this.pulsar1, "my-tenant/use/my-ns0/0x00000000_0xffffffff", newHashSet, build, newHashMap);
        Assert.assertEquals(newHashSet.size(), 4);
        selectBrokerForNamespace(build, "brokerName-0", "my-tenant/use/my-ns0", "my-tenant/use/my-ns0/0x00000000_0xffffffff");
        newHashSet2.addAll(newHashSet);
        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(this.pulsar1, "my-tenant/use/my-ns1/0x00000000_0xffffffff", newHashSet2, build, newHashMap);
        Assert.assertEquals(newHashSet2.size(), 2);
        newHashSet2.forEach(str2 -> {
            Assert.assertEquals((String) newHashMap.get(str2), "domain-1");
        });
        selectBrokerForNamespace(build, "brokerName-2", "my-tenant/use/my-ns1", "my-tenant/use/my-ns1/0x00000000_0xffffffff");
        newHashSet2.addAll(newHashSet);
        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(this.pulsar1, "my-tenant/use/my-ns2/0x00000000_0xffffffff", newHashSet2, build, newHashMap);
        Assert.assertEquals(newHashSet2.size(), 2);
        Assert.assertTrue(newHashSet2.contains("brokerName-1"));
        Assert.assertTrue(newHashSet2.contains("brokerName-3"));
        selectBrokerForNamespace(build, "brokerName-1", "my-tenant/use/my-ns2", "my-tenant/use/my-ns2/0x00000000_0xffffffff");
        newHashSet2.addAll(newHashSet);
        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(this.pulsar1, "my-tenant/use/my-ns3/0x00000000_0xffffffff", newHashSet2, build, newHashMap);
        Assert.assertEquals(newHashSet2.size(), 1);
        Assert.assertTrue(newHashSet2.contains("brokerName-3"));
        selectBrokerForNamespace(build, "brokerName-3", "my-tenant/use/my-ns3", "my-tenant/use/my-ns3/0x00000000_0xffffffff");
        newHashSet2.addAll(newHashSet);
        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(this.pulsar1, "my-tenant/use/my-ns4/0x00000000_0xffffffff", newHashSet2, build, newHashMap);
        Assert.assertEquals(newHashSet2.size(), 4);
    }

    @Test
    public void testAntiAffinityNamespaceFilteringWithoutDomain() throws Exception {
        this.admin1.tenants().createTenant("my-tenant", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2"}), Sets.newHashSet(new String[]{"use"})));
        for (int i = 0; i < 5; i++) {
            String str = "my-tenant/use/my-ns" + i;
            this.admin1.namespaces().createNamespace(str);
            this.admin1.namespaces().setNamespaceAntiAffinityGroup(str, "my-antiaffinity");
        }
        HashSet newHashSet = Sets.newHashSet();
        HashSet newHashSet2 = Sets.newHashSet();
        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> build = ConcurrentOpenHashMap.newBuilder().build();
        newHashSet.add("broker-0");
        newHashSet.add("broker-1");
        newHashSet.add("broker-2");
        newHashSet2.addAll(newHashSet);
        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(this.pulsar1, "my-tenant/use/my-ns0/0x00000000_0xffffffff", newHashSet, build, (Map) null);
        Assert.assertEquals(newHashSet.size(), 3);
        selectBrokerForNamespace(build, "broker-0", "my-tenant/use/my-ns0", "my-tenant/use/my-ns0/0x00000000_0xffffffff");
        newHashSet2.addAll(newHashSet);
        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(this.pulsar1, "my-tenant/use/my-ns1/0x00000000_0xffffffff", newHashSet2, build, (Map) null);
        Assert.assertEquals(newHashSet2.size(), 2);
        Assert.assertTrue(newHashSet2.contains("broker-1"));
        Assert.assertTrue(newHashSet2.contains("broker-2"));
        selectBrokerForNamespace(build, "broker-1", "my-tenant/use/my-ns1", "my-tenant/use/my-ns1/0x00000000_0xffffffff");
        newHashSet2.addAll(newHashSet);
        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(this.pulsar1, "my-tenant/use/my-ns2/0x00000000_0xffffffff", newHashSet2, build, (Map) null);
        Assert.assertEquals(newHashSet2.size(), 1);
        Assert.assertTrue(newHashSet2.contains("broker-2"));
        selectBrokerForNamespace(build, "broker-2", "my-tenant/use/my-ns2", "my-tenant/use/my-ns2/0x00000000_0xffffffff");
        newHashSet2.addAll(newHashSet);
        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(this.pulsar1, "my-tenant/use/my-ns3/0x00000000_0xffffffff", newHashSet2, build, (Map) null);
        Assert.assertEquals(newHashSet2.size(), 3);
    }

    private void selectBrokerForNamespace(ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> concurrentOpenHashMap, String str, String str2, String str3) {
        ConcurrentOpenHashSet build = ConcurrentOpenHashSet.newBuilder().build();
        build.add(str3);
        ConcurrentOpenHashMap build2 = ConcurrentOpenHashMap.newBuilder().build();
        build2.put(str2, build);
        concurrentOpenHashMap.put(str, build2);
    }

    @Test
    public void testBrokerSelectionForAntiAffinityGroup() throws Exception {
        String str = this.primaryHost;
        String str2 = this.secondaryHost;
        String clusterName = this.pulsar1.getConfiguration().getClusterName();
        String str3 = "tenant-" + UUID.randomUUID().toString();
        String str4 = str3 + "/" + clusterName + "/ns1";
        String str5 = str3 + "/" + clusterName + "/ns2";
        this.admin1.clusters().createFailureDomain(clusterName, "domain1", FailureDomain.builder().brokers(Collections.singleton(str)).build());
        this.admin1.clusters().createFailureDomain(clusterName, "domain2", FailureDomain.builder().brokers(Collections.singleton(str2)).build());
        this.admin1.tenants().createTenant(str3, new TenantInfoImpl((Set) null, Sets.newHashSet(new String[]{clusterName})));
        this.admin1.namespaces().createNamespace(str4);
        this.admin1.namespaces().createNamespace(str5);
        this.admin1.namespaces().setNamespaceAntiAffinityGroup(str4, "group");
        this.admin1.namespaces().setNamespaceAntiAffinityGroup(str5, "group");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(isLoadManagerUpdatedDomainCache(this.primaryLoadManager));
            Assert.assertTrue(isLoadManagerUpdatedDomainCache(this.secondaryLoadManager));
        });
        Assert.assertNotEquals((String) this.primaryLoadManager.selectBrokerForAssignment(makeBundle(str3, clusterName, "ns1")).get(), (String) this.primaryLoadManager.selectBrokerForAssignment(makeBundle(str3, clusterName, "ns2")).get());
    }

    @Test
    public void testLoadSheddingUtilWithAntiAffinityNamespace() throws Exception {
        this.admin1.tenants().createTenant("my-tenant", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2"}), Sets.newHashSet(new String[]{"use"})));
        for (int i = 0; i < 5; i++) {
            String str = "my-tenant/use/my-ns" + i;
            this.admin1.namespaces().createNamespace(str);
            this.admin1.namespaces().setNamespaceAntiAffinityGroup(str, "my-antiaffinity");
        }
        HashSet newHashSet = Sets.newHashSet();
        HashSet newHashSet2 = Sets.newHashSet();
        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> build = ConcurrentOpenHashMap.newBuilder().build();
        newHashSet.add("broker-0");
        newHashSet.add("broker-1");
        newHashSet.add("broker-2");
        newHashSet2.addAll(newHashSet);
        selectBrokerForNamespace(build, "broker-0", "my-tenant/use/my-ns0", "my-tenant/use/my-ns0/0x00000000_0xffffffff");
        Assert.assertTrue(LoadManagerShared.shouldAntiAffinityNamespaceUnload("my-tenant/use/my-ns0", "/0x00000000_0xffffffff", "broker-0", this.pulsar1, build, newHashSet2));
        selectBrokerForNamespace(build, "broker-1", "my-tenant/use/my-ns1", "my-tenant/use/my-ns0/0x00000000_0xffffffff");
        Assert.assertTrue(LoadManagerShared.shouldAntiAffinityNamespaceUnload("my-tenant/use/my-ns0", "/0x00000000_0xffffffff", "broker-0", this.pulsar1, build, newHashSet2));
        selectBrokerForNamespace(build, "broker-2", "my-tenant/use/my-ns2", "my-tenant/use/my-ns0/0x00000000_0xffffffff");
        Assert.assertFalse(LoadManagerShared.shouldAntiAffinityNamespaceUnload("my-tenant/use/my-ns0", "/0x00000000_0xffffffff", "broker-0", this.pulsar1, build, newHashSet2));
    }

    @Test
    public void testLoadSheddingWithAntiAffinityNamespace() throws Exception {
        this.admin1.tenants().createTenant("my-tenant", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2"}), Sets.newHashSet(new String[]{"use"})));
        for (int i = 0; i < 5; i++) {
            String str = "my-tenant/use/my-ns" + i;
            this.admin1.namespaces().createNamespace(str);
            this.admin1.namespaces().setNamespaceAntiAffinityGroup(str, "my-antiaffinity");
        }
        PulsarClient build = PulsarClient.builder().serviceUrl(this.pulsar1.getSafeWebServiceAddress()).build();
        try {
            Producer create = build.newProducer().topic("persistent://my-tenant/use/my-ns0/my-topic1").create();
            ModularLoadManagerImpl loadManager = ((ModularLoadManagerWrapper) this.pulsar1.getLoadManager().get()).getLoadManager();
            this.pulsar1.getBrokerService().updateRates();
            loadManager.updateAll();
            Assert.assertTrue(loadManager.shouldAntiAffinityNamespaceUnload("my-tenant/use/my-ns0", "0x00000000_0xffffffff", this.primaryHost));
            create.close();
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    private boolean isLoadManagerUpdatedDomainCache(ModularLoadManagerImpl modularLoadManagerImpl) throws Exception {
        Field declaredField = ModularLoadManagerImpl.class.getDeclaredField("brokerToFailureDomainMap");
        declaredField.setAccessible(true);
        return !((Map) declaredField.get(modularLoadManagerImpl)).isEmpty();
    }

    private NamespaceBundle makeBundle(String str, String str2, String str3) {
        return this.nsFactory.getBundle(NamespaceName.get(str, str2, str3), Range.range(NamespaceBundles.FULL_LOWER_BOUND, BoundType.CLOSED, NamespaceBundles.FULL_UPPER_BOUND, BoundType.CLOSED));
    }
}
