package org.apache.pulsar.broker.loadbalance;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Generated;
import org.apache.pulsar.broker.MultiBrokerTestZKBaseTest;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.admin.Lookup;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.LookupTopicResult;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.class */
public class MultiBrokerLeaderElectionTest extends MultiBrokerTestZKBaseTest {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(MultiBrokerLeaderElectionTest.class);

    public MultiBrokerLeaderElectionTest() {
        this.isTcpLookup = true;
    }

    @Override // org.apache.pulsar.broker.MultiBrokerBaseTest
    protected int numberOfAdditionalBrokers() {
        return 9;
    }

    @Test
    public void shouldElectOneLeader() {
        int i = 0;
        Iterator<PulsarService> it = getAllBrokers().iterator();
        while (it.hasNext()) {
            if (it.next().getLeaderElectionService().isLeader()) {
                i++;
            }
        }
        Assert.assertEquals(i, 1);
    }

    @Test
    public void shouldAllBrokersKnowTheLeader() {
        Awaitility.await().untilAsserted(() -> {
            for (PulsarService pulsarService : getAllBrokers()) {
                Assert.assertTrue(pulsarService.getLeaderElectionService().getCurrentLeader().isPresent(), "Leader wasn't known on broker " + pulsarService.getBrokerServiceUrl());
            }
        });
    }

    @Test
    public void shouldAllBrokersBeAbleToGetTheLeader() {
        Awaitility.await().untilAsserted(() -> {
            LeaderBroker leaderBroker = null;
            for (PulsarService pulsarService : getAllBrokers()) {
                Optional optional = (Optional) pulsarService.getLeaderElectionService().readCurrentLeader().get(1L, TimeUnit.SECONDS);
                Assert.assertTrue(optional.isPresent(), "Leader wasn't known on broker " + pulsarService.getBrokerServiceUrl());
                if (leaderBroker != null) {
                    Assert.assertEquals(optional.get(), leaderBroker, "Different leader on broker " + pulsarService.getBrokerServiceUrl());
                } else {
                    leaderBroker = (LeaderBroker) optional.get();
                }
            }
        });
    }

    /* JADX WARN: Finally extract failed */
    @Test(timeOut = 120000)
    public void shouldProvideConsistentAnswerToTopicLookupsUsingAdminApi() throws PulsarAdminException, ExecutionException, InterruptedException {
        String str = "public/ns" + UUID.randomUUID();
        this.admin.namespaces().createNamespace(str, 256);
        String str2 = "persistent://" + str + "/lookuptest-";
        List list = (List) IntStream.range(0, 500).mapToObj(i -> {
            return str2 + i;
        }).collect(Collectors.toList());
        List<PulsarAdmin> allAdmins = getAllAdmins();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(allAdmins.size());
        try {
            ArrayList arrayList = new ArrayList();
            Phaser phaser = new Phaser(1);
            try {
                for (PulsarAdmin pulsarAdmin : allAdmins) {
                    phaser.register();
                    Lookup lookups = pulsarAdmin.lookups();
                    log.info("Doing lookup to broker {}", pulsarAdmin.getServiceUrl());
                    arrayList.add(newFixedThreadPool.submit(() -> {
                        return (List) list.stream().map(str3 -> {
                            phaser.arriveAndAwaitAdvance();
                            try {
                                return lookups.lookupTopic(str3);
                            } catch (PulsarAdminException e) {
                                log.error("Error looking up topic {} in {}", str3, pulsarAdmin.getServiceUrl());
                                throw new RuntimeException((Throwable) e);
                            }
                        }).collect(Collectors.toList());
                    }));
                }
                phaser.arriveAndDeregister();
                List list2 = null;
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    List list3 = (List) ((Future) it.next()).get();
                    if (list2 == null) {
                        list2 = list3;
                    } else {
                        Assert.assertEquals(list3, list2, "The lookup results weren't consistent.");
                    }
                }
                if (Collections.singletonList(phaser).get(0) != null) {
                    phaser.forceTermination();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(phaser).get(0) != null) {
                    phaser.forceTermination();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(newFixedThreadPool).get(0) != null) {
                newFixedThreadPool.shutdownNow();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test(timeOut = 60000)
    public void shouldProvideConsistentAnswerToTopicLookupsUsingClient() throws PulsarAdminException, ExecutionException, InterruptedException {
        String str = "public/ns" + UUID.randomUUID();
        this.admin.namespaces().createNamespace(str, 256);
        String str2 = "persistent://" + str + "/lookuptest-";
        List list = (List) IntStream.range(0, 500).mapToObj(i -> {
            return str2 + i;
        }).collect(Collectors.toList());
        List<PulsarClient> allClients = getAllClients();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(allClients.size());
        try {
            ArrayList arrayList = new ArrayList();
            Phaser phaser = new Phaser(1);
            try {
                Iterator<PulsarClient> it = allClients.iterator();
                while (it.hasNext()) {
                    PulsarClientImpl pulsarClientImpl = (PulsarClient) it.next();
                    phaser.register();
                    String serviceUrl = pulsarClientImpl.getConfiguration().getServiceUrl();
                    LookupService lookup = pulsarClientImpl.getLookup();
                    log.info("Doing lookup to broker {}", serviceUrl);
                    arrayList.add(newFixedThreadPool.submit(() -> {
                        return (List) list.stream().map(str3 -> {
                            phaser.arriveAndAwaitAdvance();
                            try {
                                InetSocketAddress logicalAddress = ((LookupTopicResult) lookup.getBroker(TopicName.get(str3)).get()).getLogicalAddress();
                                return logicalAddress.getHostString() + ":" + logicalAddress.getPort();
                            } catch (InterruptedException | ExecutionException e) {
                                log.error("Error looking up topic {} in {}", str3, serviceUrl);
                                throw new RuntimeException(e);
                            }
                        }).collect(Collectors.toList());
                    }));
                }
                phaser.arriveAndDeregister();
                List list2 = null;
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    List list3 = (List) ((Future) it2.next()).get();
                    if (list2 == null) {
                        list2 = list3;
                    } else {
                        Assert.assertEquals(list3, list2, "The lookup results weren't consistent.");
                    }
                }
                if (Collections.singletonList(phaser).get(0) != null) {
                    phaser.forceTermination();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(phaser).get(0) != null) {
                    phaser.forceTermination();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(newFixedThreadPool).get(0) != null) {
                newFixedThreadPool.shutdownNow();
            }
        }
    }
}
