package org.apache.pulsar.broker.loadbalance.extensions;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import lombok.Generated;
import org.apache.bookkeeper.util.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.class */
public class BrokerRegistryIntegrationTest {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(BrokerRegistryIntegrationTest.class);
    private static final String clusterName = "test";
    private final int zkPort = PortManager.nextFreePort();
    private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, this.zkPort, PortManager::nextFreePort);
    private PulsarService pulsar;
    private BrokerRegistry brokerRegistry;
    private String brokerMetadataPath;

    @BeforeClass
    protected void setup() throws Exception {
        this.bk.start();
        this.pulsar = new PulsarService(brokerConfig());
        this.pulsar.start();
        PulsarAdmin adminClient = this.pulsar.getAdminClient();
        adminClient.clusters().createCluster("test", ClusterData.builder().build());
        adminClient.tenants().createTenant("public", TenantInfo.builder().allowedClusters(Collections.singleton("test")).build());
        adminClient.namespaces().createNamespace("public/default");
        this.brokerRegistry = ((ExtensibleLoadManagerWrapper) this.pulsar.getLoadManager().get()).get().getBrokerRegistry();
        this.brokerMetadataPath = "/loadbalance/brokers/" + this.pulsar.getBrokerId();
    }

    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.pulsar != null) {
            this.pulsar.close();
        }
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        this.bk.stop();
        if (currentTimeMillis2 > 5000) {
            throw new RuntimeException("Broker took " + currentTimeMillis2 + "ms to close");
        }
    }

    @Test
    public void testRecoverFromNodeDeletion() throws Exception {
        Awaitility.await().atMost(Duration.ofSeconds(3L)).untilAsserted(() -> {
            Assert.assertEquals((Collection) this.brokerRegistry.getAvailableBrokersAsync().join(), List.of(this.pulsar.getBrokerId()));
        });
        this.pulsar.getLocalMetadataStore().delete(this.brokerMetadataPath, Optional.empty());
        Awaitility.await().atMost(Duration.ofSeconds(3L)).untilAsserted(() -> {
            Assert.assertEquals((Collection) this.brokerRegistry.getAvailableBrokersAsync().join(), List.of(this.pulsar.getBrokerId()));
        });
        this.brokerRegistry.unregister();
        Thread.sleep(3000L);
        Assert.assertTrue(((List) this.brokerRegistry.getAvailableBrokersAsync().get()).isEmpty());
        this.brokerRegistry.registerAsync().get();
        Assert.assertEquals((Collection) this.brokerRegistry.getAvailableBrokersAsync().get(), List.of(this.pulsar.getBrokerId()));
    }

    @Test
    public void testRegisterAgain() throws Exception {
        Awaitility.await().atMost(Duration.ofSeconds(3L)).untilAsserted(() -> {
            Assert.assertEquals((Collection) this.brokerRegistry.getAvailableBrokersAsync().join(), List.of(this.pulsar.getBrokerId()));
        });
        MetadataStoreExtended localMetadataStore = this.pulsar.getLocalMetadataStore();
        GetResult getResult = (GetResult) ((Optional) localMetadataStore.get(this.brokerMetadataPath).get()).orElseThrow();
        log.info("Old result: {} {}", new String(getResult.getValue()), Long.valueOf(getResult.getStat().getVersion()));
        this.brokerRegistry.registerAsync().get();
        Awaitility.await().atMost(Duration.ofSeconds(3L)).untilAsserted(() -> {
            GetResult getResult2 = (GetResult) ((Optional) localMetadataStore.get(this.brokerMetadataPath).get()).orElseThrow();
            log.info("New result: {} {}", new String(getResult2.getValue()), Long.valueOf(getResult2.getStat().getVersion()));
            Assert.assertTrue(getResult2.getStat().getVersion() > getResult.getStat().getVersion());
            Assert.assertEquals(getResult2.getValue(), getResult.getValue());
        });
    }

    protected ServiceConfiguration brokerConfig() {
        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
        serviceConfiguration.setClusterName("test");
        serviceConfiguration.setAdvertisedAddress("localhost");
        serviceConfiguration.setBrokerServicePort(Optional.of(0));
        serviceConfiguration.setWebServicePort(Optional.of(0));
        serviceConfiguration.setMetadataStoreUrl("zk:127.0.0.1:" + this.bk.getZookeeperPort());
        serviceConfiguration.setManagedLedgerDefaultWriteQuorum(1);
        serviceConfiguration.setManagedLedgerDefaultAckQuorum(1);
        serviceConfiguration.setManagedLedgerDefaultEnsembleSize(1);
        serviceConfiguration.setDefaultNumberOfNamespaceBundles(16);
        serviceConfiguration.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
        serviceConfiguration.setLoadBalancerDebugModeEnabled(true);
        serviceConfiguration.setBrokerShutdownTimeoutMs(100L);
        return serviceConfiguration;
    }
}
