package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.service.ReplicatorTestBase;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.class */
public class ReplicatorGlobalNSTest extends ReplicatorTestBase {
    private static final Logger log = LoggerFactory.getLogger(ReplicatorGlobalNSTest.class);
    protected String methodName;

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "loadManagerClassName")
    public static Object[][] loadManagerClassName() {
        return new Object[]{new Object[]{ModularLoadManagerImpl.class.getName()}, new Object[]{ExtensibleLoadManagerImpl.class.getName()}};
    }

    @Factory(dataProvider = "loadManagerClassName")
    public ReplicatorGlobalNSTest(String str) {
        this.loadManagerClassName = str;
    }

    @BeforeMethod
    public void beforeMethod(Method method) {
        this.methodName = method.getName();
    }

    @Override // org.apache.pulsar.broker.service.ReplicatorTestBase
    @BeforeClass(timeOut = 300000)
    public void setup() throws Exception {
        super.setup();
    }

    @Override // org.apache.pulsar.broker.service.ReplicatorTestBase
    @AfterClass(alwaysRun = true, timeOut = 300000)
    public void cleanup() throws Exception {
        super.cleanup();
    }

    @Test(priority = Integer.MAX_VALUE)
    public void testRemoveLocalClusterOnGlobalNamespace() throws Exception {
        log.info("--- Starting ReplicatorTest::testRemoveLocalClusterOnGlobalNamespace ---");
        this.admin1.namespaces().createNamespace("pulsar/global/removeClusterTest");
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/removeClusterTest", Sets.newHashSet(new String[]{"r1", "r2", "r3"}));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            build = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                ProducerImpl create = build.newProducer().topic("persistent://pulsar/global/removeClusterTest/topic").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                ConsumerImpl subscribe = build.newConsumer().topic(new String[]{"persistent://pulsar/global/removeClusterTest/topic"}).subscriptionName("sub1").subscribe();
                ConsumerImpl subscribe2 = build.newConsumer().topic(new String[]{"persistent://pulsar/global/removeClusterTest/topic"}).subscriptionName("sub1").subscribe();
                this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/removeClusterTest", Sets.newHashSet(new String[]{"r2", "r3"}));
                Awaitility.await().atMost(1L, TimeUnit.MINUTES).untilAsserted(() -> {
                    Assert.assertFalse(this.pulsar1.getBrokerService().getTopics().containsKey("persistent://pulsar/global/removeClusterTest/topic"));
                    Assert.assertFalse(create.isConnected());
                    Assert.assertFalse(subscribe.isConnected());
                    Assert.assertTrue(subscribe2.isConnected());
                });
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test(priority = 2147483646)
    public void testConfigChange() throws Exception {
        log.info("--- Starting ReplicatorTest::testConfigChange ---");
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            final TopicName topicName = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i));
            arrayList.add(this.executor.submit(new Callable<Void>() { // from class: org.apache.pulsar.broker.service.ReplicatorGlobalNSTest.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(ReplicatorGlobalNSTest.this.url1, topicName);
                    try {
                        ReplicatorGlobalNSTest.log.info("--- Starting producer --- " + ReplicatorGlobalNSTest.this.url1);
                        ReplicatorTestBase.MessageConsumer messageConsumer = new ReplicatorTestBase.MessageConsumer(ReplicatorGlobalNSTest.this.url1, topicName);
                        try {
                            ReplicatorGlobalNSTest.log.info("--- Starting Consumer --- " + ReplicatorGlobalNSTest.this.url1);
                            messageProducer.produce(2);
                            messageConsumer.receive(2);
                            if (Collections.singletonList(messageConsumer).get(0) != null) {
                                messageConsumer.close();
                            }
                            return null;
                        } catch (Throwable th) {
                            if (Collections.singletonList(messageConsumer).get(0) != null) {
                                messageConsumer.close();
                            }
                            throw th;
                        }
                    } finally {
                        if (Collections.singletonList(messageProducer).get(0) != null) {
                            messageProducer.close();
                        }
                    }
                }
            }));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                ((Future) it.next()).get();
            } catch (Exception e) {
                log.error("exception in getting future result ", e);
                Assert.fail(String.format("replication test failed with %s exception", e.getMessage()));
            }
        }
        Thread.sleep(1000L);
        Map replicationClients = this.ns1.getReplicationClients();
        Map replicationClients2 = this.ns2.getReplicationClients();
        Map replicationClients3 = this.ns3.getReplicationClients();
        Assert.assertNotNull(replicationClients.get("r2"));
        Assert.assertNotNull(replicationClients.get("r3"));
        Assert.assertNotNull(replicationClients2.get("r1"));
        Assert.assertNotNull(replicationClients2.get("r3"));
        Assert.assertNotNull(replicationClients3.get("r1"));
        Assert.assertNotNull(replicationClients3.get("r2"));
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet(new String[]{"r1"}));
        Thread.sleep(1000L);
        Assert.assertNotNull(replicationClients.get("r2"));
        Assert.assertNotNull(replicationClients.get("r3"));
        Assert.assertNotNull(replicationClients2.get("r1"));
        Assert.assertNotNull(replicationClients2.get("r3"));
        Assert.assertNotNull(replicationClients3.get("r1"));
        Assert.assertNotNull(replicationClients3.get("r2"));
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet(new String[]{"r1", "r2", "r3"}));
        Thread.sleep(1000L);
        Assert.assertNotNull(replicationClients.get("r2"));
        Assert.assertNotNull(replicationClients.get("r3"));
        Assert.assertNotNull(replicationClients2.get("r1"));
        Assert.assertNotNull(replicationClients2.get("r3"));
        Assert.assertNotNull(replicationClients3.get("r1"));
        Assert.assertNotNull(replicationClients3.get("r2"));
    }
}
