package org.apache.pulsar.broker.loadbalance.extensions;

import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistryImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.class */
public class BrokerRegistryTest {
    private static final Logger log = LoggerFactory.getLogger(BrokerRegistryTest.class);
    private final List<PulsarService> pulsarServices = new CopyOnWriteArrayList();
    private final List<BrokerRegistryImpl> brokerRegistries = new CopyOnWriteArrayList();
    private ExecutorService executor;
    private LocalBookkeeperEnsemble bkEnsemble;

    /* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest$MockLoadManager.class */
    public static class MockLoadManager implements LoadManager {
        public void start() throws PulsarServerException {
        }

        public boolean isCentralized() {
            return false;
        }

        public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId serviceUnitId) throws Exception {
            return Optional.empty();
        }

        public LoadManagerReport generateLoadReport() throws Exception {
            return null;
        }

        public void setLoadReportForceUpdateFlag() {
        }

        public void writeLoadReportOnZookeeper() throws Exception {
        }

        public void writeResourceQuotasToZooKeeper() throws Exception {
        }

        public List<Metrics> getLoadBalancingMetrics() {
            return null;
        }

        public void doLoadShedding() {
        }

        public void doNamespaceBundleSplit() throws Exception {
        }

        public void disableBroker() throws Exception {
        }

        public Set<String> getAvailableBrokers() throws Exception {
            return null;
        }

        public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
            return null;
        }

        public String setNamespaceBundleAffinity(String str, String str2) {
            return null;
        }

        public void stop() throws PulsarServerException {
        }

        public void initialize(PulsarService pulsarService) {
        }
    }

    @BeforeClass
    void setup() throws Exception {
        this.executor = new ThreadPoolExecutor(5, 20, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue());
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> {
            return 0;
        });
        this.bkEnsemble.start();
    }

    private PulsarService createPulsarService() {
        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
        serviceConfiguration.setLoadBalancerEnabled(false);
        serviceConfiguration.setLoadManagerClassName(MockLoadManager.class.getName());
        serviceConfiguration.setClusterName("use");
        serviceConfiguration.setWebServicePort(Optional.of(0));
        serviceConfiguration.setMetadataStoreUrl("zk:127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
        serviceConfiguration.setBrokerShutdownTimeoutMs(0L);
        serviceConfiguration.setBrokerServicePort(Optional.of(0));
        serviceConfiguration.setAdvertisedAddress("localhost");
        PulsarService pulsarService = (PulsarService) Mockito.spy(new PulsarService(serviceConfiguration));
        this.pulsarServices.add(pulsarService);
        return pulsarService;
    }

    private BrokerRegistryImpl createBrokerRegistryImpl(PulsarService pulsarService) {
        BrokerRegistryImpl brokerRegistryImpl = (BrokerRegistryImpl) Mockito.spy(new BrokerRegistryImpl(pulsarService));
        this.brokerRegistries.add(brokerRegistryImpl);
        return brokerRegistryImpl;
    }

    @AfterClass(alwaysRun = true)
    void shutdown() throws Exception {
        this.executor.shutdownNow();
        this.bkEnsemble.stop();
    }

    @AfterMethod(alwaysRun = true)
    void cleanUp() {
        log.info("Cleaning up the broker registry...");
        this.brokerRegistries.forEach(brokerRegistryImpl -> {
            if (brokerRegistryImpl.isStarted()) {
                try {
                    brokerRegistryImpl.close();
                } catch (PulsarServerException e) {
                    throw new RuntimeException((Throwable) e);
                }
            }
        });
        this.brokerRegistries.clear();
        log.info("Cleaning up the pulsar services...");
        this.pulsarServices.forEach(pulsarService -> {
            try {
                pulsarService.close();
            } catch (PulsarServerException e) {
                throw new RuntimeException((Throwable) e);
            }
        });
        this.pulsarServices.clear();
    }

    @Test(timeOut = 30000)
    public void testRegisterAndLookup() throws Exception {
        PulsarService createPulsarService = createPulsarService();
        PulsarService createPulsarService2 = createPulsarService();
        PulsarService createPulsarService3 = createPulsarService();
        createPulsarService.start();
        createPulsarService2.start();
        createPulsarService3.start();
        BrokerRegistryImpl createBrokerRegistryImpl = createBrokerRegistryImpl(createPulsarService);
        BrokerRegistryImpl createBrokerRegistryImpl2 = createBrokerRegistryImpl(createPulsarService2);
        BrokerRegistryImpl createBrokerRegistryImpl3 = createBrokerRegistryImpl(createPulsarService3);
        HashSet hashSet = new HashSet();
        createBrokerRegistryImpl.addListener((str, notificationType) -> {
            hashSet.add(str);
        });
        createBrokerRegistryImpl.start();
        createBrokerRegistryImpl2.start();
        Awaitility.await().atMost(Duration.ofSeconds(5L)).untilAsserted(() -> {
            Assert.assertEquals(hashSet.size(), 2);
        });
        Assert.assertEquals(((List) createBrokerRegistryImpl.getAvailableBrokersAsync().get()).size(), 2);
        Assert.assertEquals(((List) createBrokerRegistryImpl2.getAvailableBrokersAsync().get()).size(), 2);
        createBrokerRegistryImpl3.start();
        Assert.assertEquals(((List) createBrokerRegistryImpl3.getAvailableBrokersAsync().get()).size(), 3);
        Awaitility.await().atMost(Duration.ofSeconds(5L)).untilAsserted(() -> {
            Assert.assertEquals(hashSet.size(), 3);
        });
        Assert.assertEquals(hashSet, new HashSet((Collection) createBrokerRegistryImpl.getAvailableBrokersAsync().get()));
        Assert.assertEquals(hashSet, new HashSet((Collection) createBrokerRegistryImpl2.getAvailableBrokersAsync().get()));
        Assert.assertEquals(hashSet, new HashSet((Collection) createBrokerRegistryImpl3.getAvailableBrokersAsync().get()));
        Assert.assertEquals(hashSet, ((Map) createBrokerRegistryImpl.getAvailableBrokerLookupDataAsync().get()).keySet());
        Assert.assertEquals(hashSet, ((Map) createBrokerRegistryImpl2.getAvailableBrokerLookupDataAsync().get()).keySet());
        Assert.assertEquals(hashSet, ((Map) createBrokerRegistryImpl3.getAvailableBrokerLookupDataAsync().get()).keySet());
        Optional optional = (Optional) createBrokerRegistryImpl.lookupAsync(createBrokerRegistryImpl2.getBrokerId()).get();
        Assert.assertTrue(optional.isPresent());
        Assert.assertEquals(((BrokerLookupData) optional.get()).getWebServiceUrl(), createPulsarService2.getSafeWebServiceAddress());
        Assert.assertEquals(((BrokerLookupData) optional.get()).getWebServiceUrlTls(), createPulsarService2.getWebServiceAddressTls());
        Assert.assertEquals(((BrokerLookupData) optional.get()).getPulsarServiceUrl(), createPulsarService2.getBrokerServiceUrl());
        Assert.assertEquals(((BrokerLookupData) optional.get()).getPulsarServiceUrlTls(), createPulsarService2.getBrokerServiceUrlTls());
        Assert.assertEquals(((BrokerLookupData) optional.get()).advertisedListeners(), createPulsarService2.getAdvertisedListeners());
        Assert.assertEquals(((BrokerLookupData) optional.get()).protocols(), createPulsarService2.getProtocolDataToAdvertise());
        Assert.assertEquals(((BrokerLookupData) optional.get()).persistentTopicsEnabled(), createPulsarService2.getConfiguration().isEnablePersistentTopics());
        Assert.assertEquals(((BrokerLookupData) optional.get()).nonPersistentTopicsEnabled(), createPulsarService2.getConfiguration().isEnableNonPersistentTopics());
        Assert.assertEquals(((BrokerLookupData) optional.get()).brokerVersion(), createPulsarService2.getBrokerVersion());
        createBrokerRegistryImpl.unregister();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(((List) createBrokerRegistryImpl2.getAvailableBrokersAsync().get()).size(), 2);
        });
    }

    @Test
    public void testRegisterFailWithSameBrokerId() throws Exception {
        PulsarService createPulsarService = createPulsarService();
        PulsarService createPulsarService2 = createPulsarService();
        createPulsarService.start();
        createPulsarService2.start();
        ((PulsarService) Mockito.doReturn(createPulsarService.getBrokerId()).when(createPulsarService2)).getBrokerId();
        BrokerRegistryImpl createBrokerRegistryImpl = createBrokerRegistryImpl(createPulsarService);
        BrokerRegistryImpl createBrokerRegistryImpl2 = createBrokerRegistryImpl(createPulsarService2);
        createBrokerRegistryImpl.start();
        try {
            createBrokerRegistryImpl2.start();
            Assert.fail();
        } catch (Exception e) {
            log.info("Broker registry start failed.", e);
            Assert.assertTrue(e instanceof PulsarServerException);
            Assert.assertTrue(e.getMessage().contains("LockBusyException"));
        }
    }

    @Test
    public void testCloseRegister() throws Exception {
        PulsarService createPulsarService = createPulsarService();
        createPulsarService.start();
        BrokerRegistryImpl createBrokerRegistryImpl = createBrokerRegistryImpl(createPulsarService);
        Assert.assertEquals(getState(createBrokerRegistryImpl), BrokerRegistryImpl.State.Init);
        createBrokerRegistryImpl.start();
        Assert.assertEquals(getState(createBrokerRegistryImpl), BrokerRegistryImpl.State.Registered);
        createBrokerRegistryImpl.addListener((str, notificationType) -> {
        });
        Assert.assertTrue(createBrokerRegistryImpl.isStarted());
        Assert.assertFalse(((List) WhiteboxImpl.getInternalState(createBrokerRegistryImpl, "listeners")).isEmpty());
        createBrokerRegistryImpl.unregister();
        Assert.assertEquals(getState(createBrokerRegistryImpl), BrokerRegistryImpl.State.Started);
        createBrokerRegistryImpl.register();
        Assert.assertEquals(getState(createBrokerRegistryImpl), BrokerRegistryImpl.State.Registered);
        createBrokerRegistryImpl.close();
        Assert.assertFalse(createBrokerRegistryImpl.isStarted());
        Assert.assertEquals(getState(createBrokerRegistryImpl), BrokerRegistryImpl.State.Closed);
        Assert.assertTrue(((List) WhiteboxImpl.getInternalState(createBrokerRegistryImpl, "listeners")).isEmpty());
        try {
            createBrokerRegistryImpl.getAvailableBrokersAsync().get();
            Assert.fail();
        } catch (Exception e) {
            log.info("Failed to getAvailableBrokersAsync.", e);
            Assert.assertTrue(FutureUtil.unwrapCompletionException(e) instanceof IllegalStateException);
        }
        try {
            createBrokerRegistryImpl.getAvailableBrokerLookupDataAsync().get();
            Assert.fail();
        } catch (Exception e2) {
            log.info("Failed to getAvailableBrokerLookupDataAsync.", e2);
            Assert.assertTrue(FutureUtil.unwrapCompletionException(e2) instanceof IllegalStateException);
        }
        try {
            createBrokerRegistryImpl.lookupAsync("test").get();
            Assert.fail();
        } catch (Exception e3) {
            log.info("Failed to lookupAsync.", e3);
            Assert.assertTrue(FutureUtil.unwrapCompletionException(e3) instanceof IllegalStateException);
        }
        try {
            createBrokerRegistryImpl.addListener((str2, notificationType2) -> {
            });
            Assert.fail();
        } catch (Exception e4) {
            log.info("Failed to lookupAsync.", e4);
            Assert.assertTrue(FutureUtil.unwrapCompletionException(e4) instanceof IllegalStateException);
        }
    }

    @Test
    public void testIsVerifiedNotification() {
        Assert.assertFalse(BrokerRegistryImpl.isVerifiedNotification(new Notification(NotificationType.Created, "/")));
        Assert.assertFalse(BrokerRegistryImpl.isVerifiedNotification(new Notification(NotificationType.Created, "/loadbalance/brokersxyz")));
        Assert.assertFalse(BrokerRegistryImpl.isVerifiedNotification(new Notification(NotificationType.Created, "/loadbalance/brokers")));
        Assert.assertTrue(BrokerRegistryImpl.isVerifiedNotification(new Notification(NotificationType.Created, "/loadbalance/brokers/brokerId")));
        Assert.assertTrue(BrokerRegistryImpl.isVerifiedNotification(new Notification(NotificationType.Created, "/loadbalance/brokers/brokerId/xyz")));
    }

    @Test
    public void testKeyPath() {
        Assert.assertEquals(BrokerRegistryImpl.keyPath("brokerId"), "/loadbalance/brokers/brokerId");
    }

    public BrokerRegistryImpl.State getState(BrokerRegistryImpl brokerRegistryImpl) {
        return (BrokerRegistryImpl.State) WhiteboxImpl.getInternalState(brokerRegistryImpl, BrokerRegistryImpl.State.class);
    }
}
