package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.socks5.auth.DefaultPasswordAuthImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

@Test(groups = {"cluster-migration"})
/* loaded from: input_file:org/apache/pulsar/broker/service/ClusterMigrationTest.class */
public class ClusterMigrationTest {
    private static final Logger log = LoggerFactory.getLogger(ClusterMigrationTest.class);
    protected String methodName;
    String namespace = "pulsar/migrationNs";
    String namespaceNotToMigrate = "pulsar/notToMigrateNs";
    TestBroker broker1;
    TestBroker broker2;
    TestBroker broker3;
    TestBroker broker4;
    URL url1;
    URL urlTls1;
    PulsarService pulsar1;
    PulsarAdmin admin1;
    URL url2;
    URL urlTls2;
    PulsarService pulsar2;
    PulsarAdmin admin2;
    URL url3;
    URL urlTls3;
    PulsarService pulsar3;
    PulsarAdmin admin3;
    URL url4;
    URL urlTls4;
    PulsarService pulsar4;
    PulsarAdmin admin4;
    String loadManagerClassName;

    /* loaded from: input_file:org/apache/pulsar/broker/service/ClusterMigrationTest$TestBroker.class */
    static class TestBroker extends MockedPulsarServiceBaseTest {
        private String clusterName;
        private String loadManagerClassName;

        public TestBroker(String str, String str2) throws Exception {
            this.clusterName = str;
            this.loadManagerClassName = str2;
            setup();
        }

        @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
        protected void setup() throws Exception {
            super.setupWithClusterName(this.clusterName);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
        public void doInitConf() throws Exception {
            super.doInitConf();
            this.conf.setLoadManagerClassName(this.loadManagerClassName);
            this.conf.setWebServicePortTls(Optional.of(0));
            this.conf.setBrokerServicePortTls(Optional.of(0));
            this.conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
            this.conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
            this.conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
        }

        public PulsarService getPulsarService() {
            return this.pulsar;
        }

        public String getClusterName() {
            return this.configClusterName;
        }

        public void stop() throws Exception {
            stopBroker();
        }

        @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
        protected void cleanup() throws Exception {
            internalCleanup();
        }

        public void restart() throws Exception {
            restartBroker();
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "NamespaceMigrationTopicSubscriptionTypes")
    public Object[][] namespaceMigrationSubscriptionTypes() {
        return new Object[]{new Object[]{SubscriptionType.Shared, true, false}, new Object[]{SubscriptionType.Shared, false, true}, new Object[]{SubscriptionType.Shared, true, true}};
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "loadManagerClassName")
    public static Object[][] loadManagerClassName() {
        return new Object[]{new Object[]{ModularLoadManagerImpl.class.getName()}, new Object[]{ExtensibleLoadManagerImpl.class.getName()}};
    }

    @Factory(dataProvider = "loadManagerClassName")
    public ClusterMigrationTest(String str) {
        this.loadManagerClassName = str;
    }

    @BeforeMethod(alwaysRun = true, timeOut = 300000)
    public void setup() throws Exception {
        log.info("--- Starting ReplicatorTestBase::setup ---");
        this.broker1 = new TestBroker("r1", this.loadManagerClassName);
        this.broker2 = new TestBroker("r2", this.loadManagerClassName);
        this.broker3 = new TestBroker("r3", this.loadManagerClassName);
        this.broker4 = new TestBroker("r4", this.loadManagerClassName);
        this.pulsar1 = this.broker1.getPulsarService();
        this.url1 = new URL(this.pulsar1.getWebServiceAddress());
        this.urlTls1 = new URL(this.pulsar1.getWebServiceAddressTls());
        this.admin1 = PulsarAdmin.builder().serviceHttpUrl(this.url1.toString()).build();
        this.pulsar2 = this.broker2.getPulsarService();
        this.url2 = new URL(this.pulsar2.getWebServiceAddress());
        this.urlTls2 = new URL(this.pulsar2.getWebServiceAddressTls());
        this.admin2 = PulsarAdmin.builder().serviceHttpUrl(this.url2.toString()).build();
        this.pulsar3 = this.broker3.getPulsarService();
        this.url3 = new URL(this.pulsar3.getWebServiceAddress());
        this.urlTls3 = new URL(this.pulsar3.getWebServiceAddressTls());
        this.admin3 = PulsarAdmin.builder().serviceHttpUrl(this.url3.toString()).build();
        this.pulsar4 = this.broker4.getPulsarService();
        this.url4 = new URL(this.pulsar4.getWebServiceAddress());
        this.urlTls4 = new URL(this.pulsar4.getWebServiceAddressTls());
        this.admin4 = PulsarAdmin.builder().serviceHttpUrl(this.url4.toString()).build();
        this.admin1.clusters().createCluster("r1", ClusterData.builder().serviceUrl(this.url1.toString()).serviceUrlTls(this.urlTls1.toString()).brokerServiceUrl(this.pulsar1.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar1.getBrokerServiceUrlTls()).build());
        this.admin3.clusters().createCluster("r1", ClusterData.builder().serviceUrl(this.url1.toString()).serviceUrlTls(this.urlTls1.toString()).brokerServiceUrl(this.pulsar1.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar1.getBrokerServiceUrlTls()).build());
        this.admin2.clusters().createCluster("r2", ClusterData.builder().serviceUrl(this.url2.toString()).serviceUrlTls(this.urlTls2.toString()).brokerServiceUrl(this.pulsar2.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar2.getBrokerServiceUrlTls()).build());
        this.admin4.clusters().createCluster("r2", ClusterData.builder().serviceUrl(this.url2.toString()).serviceUrlTls(this.urlTls2.toString()).brokerServiceUrl(this.pulsar2.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar2.getBrokerServiceUrlTls()).build());
        this.admin1.clusters().createCluster("r3", ClusterData.builder().serviceUrl(this.url3.toString()).serviceUrlTls(this.urlTls3.toString()).brokerServiceUrl(this.pulsar3.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar3.getBrokerServiceUrlTls()).build());
        this.admin3.clusters().createCluster("r3", ClusterData.builder().serviceUrl(this.url3.toString()).serviceUrlTls(this.urlTls3.toString()).brokerServiceUrl(this.pulsar3.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar3.getBrokerServiceUrlTls()).build());
        this.admin2.clusters().createCluster("r4", ClusterData.builder().serviceUrl(this.url4.toString()).serviceUrlTls(this.urlTls4.toString()).brokerServiceUrl(this.pulsar4.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar4.getBrokerServiceUrlTls()).build());
        this.admin4.clusters().createCluster("r4", ClusterData.builder().serviceUrl(this.url4.toString()).serviceUrlTls(this.urlTls4.toString()).brokerServiceUrl(this.pulsar4.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar4.getBrokerServiceUrlTls()).build());
        updateTenantInfo(this.admin1, DefaultPasswordAuthImpl.DEFAULT_PASSWORD, new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2", "appid3"}), Sets.newHashSet(new String[]{"r1", "r3"})));
        updateTenantInfo(this.admin3, DefaultPasswordAuthImpl.DEFAULT_PASSWORD, new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2", "appid3"}), Sets.newHashSet(new String[]{"r1", "r3"})));
        this.admin1.namespaces().createNamespace(this.namespace, Sets.newHashSet(new String[]{"r1", "r3"}));
        this.admin3.namespaces().createNamespace(this.namespace);
        this.admin1.namespaces().setNamespaceReplicationClusters(this.namespace, Sets.newHashSet(new String[]{"r1", "r3"}));
        this.admin1.namespaces().createNamespace(this.namespaceNotToMigrate, Sets.newHashSet(new String[]{"r1", "r3"}));
        this.admin3.namespaces().createNamespace(this.namespaceNotToMigrate);
        this.admin1.namespaces().setNamespaceReplicationClusters(this.namespaceNotToMigrate, Sets.newHashSet(new String[]{"r1", "r3"}));
        updateTenantInfo(this.admin2, DefaultPasswordAuthImpl.DEFAULT_PASSWORD, new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2", "appid3"}), Sets.newHashSet(new String[]{"r2", "r4"})));
        updateTenantInfo(this.admin4, DefaultPasswordAuthImpl.DEFAULT_PASSWORD, new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2", "appid3"}), Sets.newHashSet(new String[]{"r2", "r4"})));
        this.admin2.namespaces().createNamespace(this.namespace, Sets.newHashSet(new String[]{"r2", "r4"}));
        this.admin4.namespaces().createNamespace(this.namespace);
        this.admin2.namespaces().setNamespaceReplicationClusters(this.namespace, Sets.newHashSet(new String[]{"r2", "r4"}));
        this.admin2.namespaces().createNamespace(this.namespaceNotToMigrate, Sets.newHashSet(new String[]{"r2", "r4"}));
        this.admin4.namespaces().createNamespace(this.namespaceNotToMigrate);
        this.admin2.namespaces().setNamespaceReplicationClusters(this.namespaceNotToMigrate, Sets.newHashSet(new String[]{"r2", "r4"}));
        Assert.assertEquals(this.admin1.clusters().getCluster("r1").getServiceUrl(), this.url1.toString());
        Assert.assertEquals(this.admin2.clusters().getCluster("r2").getServiceUrl(), this.url2.toString());
        Assert.assertEquals(this.admin3.clusters().getCluster("r3").getServiceUrl(), this.url3.toString());
        Assert.assertEquals(this.admin4.clusters().getCluster("r4").getServiceUrl(), this.url4.toString());
        Assert.assertEquals(this.admin1.clusters().getCluster("r1").getBrokerServiceUrl(), this.pulsar1.getBrokerServiceUrl());
        Assert.assertEquals(this.admin2.clusters().getCluster("r2").getBrokerServiceUrl(), this.pulsar2.getBrokerServiceUrl());
        Assert.assertEquals(this.admin3.clusters().getCluster("r3").getBrokerServiceUrl(), this.pulsar3.getBrokerServiceUrl());
        Assert.assertEquals(this.admin4.clusters().getCluster("r4").getBrokerServiceUrl(), this.pulsar4.getBrokerServiceUrl());
        Thread.sleep(100L);
        log.info("--- ReplicatorTestBase::setup completed ---");
    }

    protected void updateTenantInfo(PulsarAdmin pulsarAdmin, String str, TenantInfoImpl tenantInfoImpl) throws Exception {
        if (pulsarAdmin.tenants().getTenants().contains(str)) {
            pulsarAdmin.tenants().updateTenant(str, tenantInfoImpl);
        } else {
            pulsarAdmin.tenants().createTenant(str, tenantInfoImpl);
        }
    }

    @AfterMethod(alwaysRun = true, timeOut = 300000)
    protected void cleanup() throws Exception {
        log.info("--- Shutting down ---");
        this.admin1.close();
        this.admin2.close();
        this.admin3.close();
        this.admin4.close();
        this.broker1.cleanup();
        this.broker2.cleanup();
        this.broker3.cleanup();
        this.broker4.cleanup();
    }

    @BeforeMethod(alwaysRun = true)
    public void beforeMethod(Method method) throws Exception {
        this.methodName = method.getName();
    }

    @Test
    public void testClusterMigration() throws Exception {
        log.info("--- Starting ReplicatorTest::testClusterMigration ---");
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://" + this.namespace + "/migrationTopic");
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Producer create = build.newProducer().topic(newUniqueName).enableBatching(false).producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            Consumer subscribe = build.newConsumer().topic(new String[]{newUniqueName}).subscriptionType(SubscriptionType.Shared).subscriptionName("s1").subscribe();
            AbstractTopic abstractTopic = (AbstractTopic) ((Optional) this.pulsar1.getBrokerService().getTopic(newUniqueName, false).getNow(null)).get();
            MockedPulsarServiceBaseTest.retryStrategically(r3 -> {
                return !abstractTopic.getProducers().isEmpty();
            }, 5, 500L);
            MockedPulsarServiceBaseTest.retryStrategically(r32 -> {
                return !abstractTopic.getSubscriptions().isEmpty();
            }, 5, 500L);
            Assert.assertFalse(abstractTopic.getProducers().isEmpty());
            Assert.assertFalse(abstractTopic.getSubscriptions().isEmpty());
            subscribe.close();
            for (int i = 0; i < 5; i++) {
                create.send("test1".getBytes());
            }
            build = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                Producer create2 = build.newProducer().topic(newUniqueName).enableBatching(false).producerName("cluster2-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                AbstractTopic abstractTopic2 = (AbstractTopic) ((Optional) this.pulsar2.getBrokerService().getTopic(newUniqueName, false).getNow(null)).get();
                Assert.assertFalse(abstractTopic2.getProducers().isEmpty());
                ClusterPolicies.ClusterUrl clusterUrl = new ClusterPolicies.ClusterUrl(this.pulsar2.getWebServiceAddress(), this.pulsar2.getWebServiceAddressTls(), this.pulsar2.getBrokerServiceUrl(), (String) null);
                this.admin1.clusters().updateClusterMigration("r1", true, clusterUrl);
                Assert.assertEquals(this.admin1.clusters().getClusterMigration("r1").getMigratedClusterUrl(), clusterUrl);
                MockedPulsarServiceBaseTest.retryStrategically(r33 -> {
                    try {
                        abstractTopic.checkClusterMigration().get();
                        return true;
                    } catch (Exception e) {
                        return false;
                    }
                }, 10, 500L);
                abstractTopic.checkClusterMigration().get();
                log.info("before sending message");
                Thread.sleep(1000L);
                create.sendAsync("test1".getBytes());
                MockedPulsarServiceBaseTest.retryStrategically(r34 -> {
                    return abstractTopic.getProducers().isEmpty();
                }, 10, 500L);
                log.info("before asserting");
                Assert.assertTrue(abstractTopic.getProducers().isEmpty());
                Producer create3 = build.newProducer().topic(newUniqueName).enableBatching(false).producerName("cluster1-2").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
                    return abstractTopic2.getProducers().size() == 3;
                }, 10, 500L);
                Assert.assertTrue(abstractTopic2.getProducers().size() == 3);
                Consumer subscribe2 = build.newConsumer().topic(new String[]{newUniqueName}).subscriptionName("s1").subscribe();
                for (int i2 = 0; i2 < 5; i2++) {
                    Message receive = subscribe2.receive();
                    Assert.assertEquals(receive.getData(), "test1".getBytes());
                    subscribe2.acknowledge(receive);
                }
                MockedPulsarServiceBaseTest.retryStrategically(r35 -> {
                    return !abstractTopic2.getSubscriptions().isEmpty();
                }, 10, 500L);
                Assert.assertFalse(abstractTopic2.getSubscriptions().isEmpty());
                abstractTopic.checkClusterMigration().get();
                abstractTopic.getReplicators().forEach((str, replicator) -> {
                    Assert.assertFalse(replicator.isConnected());
                });
                Assert.assertTrue(abstractTopic.getSubscriptions().isEmpty());
                Consumer subscribe3 = build.newConsumer().topic(new String[]{newUniqueName}).subscriptionType(SubscriptionType.Shared).subscriptionName("s2").subscribe();
                MockedPulsarServiceBaseTest.retryStrategically(r42 -> {
                    return abstractTopic2.getSubscription("s2") != null;
                }, 10, 500L);
                Assert.assertFalse(abstractTopic2.getSubscription("s2").getConsumers().isEmpty());
                Consumer subscribe4 = build.newConsumer().topic(new String[]{newUniqueName}).subscriptionType(SubscriptionType.Shared).subscriptionName("sM").subscribe();
                Assert.assertFalse(((Topic) this.pulsar2.getBrokerService().getTopicReference(newUniqueName).get()).getSubscription("sM").getConsumers().isEmpty());
                subscribe4.close();
                String str2 = newUniqueName + "-new";
                Consumer subscribe5 = build.newConsumer().topic(new String[]{str2}).subscriptionType(SubscriptionType.Shared).subscriptionName("sM").subscribe();
                MockedPulsarServiceBaseTest.retryStrategically(r5 -> {
                    return this.pulsar2.getBrokerService().getTopicReference(str2).isPresent();
                }, 5, 100L);
                ((Topic) this.pulsar2.getBrokerService().getTopicReference(str2).get()).checkClusterMigration().get();
                MockedPulsarServiceBaseTest.retryStrategically(r52 -> {
                    return this.pulsar2.getBrokerService().getTopicReference(str2).isPresent() && ((Topic) this.pulsar2.getBrokerService().getTopicReference(str2).get()).getSubscription("sM").getConsumers().isEmpty();
                }, 5, 100L);
                Assert.assertFalse(((Topic) this.pulsar2.getBrokerService().getTopicReference(str2).get()).getSubscription("sM").getConsumers().isEmpty());
                subscribe5.close();
                for (int i3 = 0; i3 < 5; i3++) {
                    create.send("test2".getBytes());
                    create2.send("test2".getBytes());
                    create3.send("test2".getBytes());
                }
                log.info("Successfully published messages by migrated producers");
                for (int i4 = 0; i4 < 5 * 3; i4++) {
                    Assert.assertEquals(subscribe2.receive(2, TimeUnit.SECONDS).getData(), "test2".getBytes());
                    Assert.assertEquals(subscribe3.receive(2, TimeUnit.SECONDS).getData(), "test2".getBytes());
                }
                String newUniqueName2 = BrokerTestUtil.newUniqueName("persistent://" + this.namespace + "/migrationTopic");
                Consumer subscribe6 = build.newConsumer().topic(new String[]{newUniqueName2}).subscriptionType(SubscriptionType.Shared).subscriptionName("s1-d").subscribe();
                Producer create4 = build.newProducer().topic(newUniqueName2).enableBatching(false).producerName("cluster1-d").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                Assert.assertNotNull((AbstractTopic) ((Optional) this.pulsar2.getBrokerService().getTopic(newUniqueName2, false).getNow(null)).get());
                for (int i5 = 0; i5 < 5; i5++) {
                    create4.send("diff".getBytes());
                    Assert.assertEquals(subscribe6.receive(2, TimeUnit.SECONDS).getData(), "diff".getBytes());
                }
                this.broker1.restart();
                Producer create5 = build.newProducer().topic(newUniqueName).enableBatching(false).producerName("cluster1-4").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                Consumer subscribe7 = build.newConsumer().topic(new String[]{newUniqueName}).subscriptionType(SubscriptionType.Shared).subscriptionName("s3").subscribe();
                MockedPulsarServiceBaseTest.retryStrategically(r43 -> {
                    return abstractTopic2.getProducers().size() == 4;
                }, 10, 500L);
                Assert.assertTrue(abstractTopic2.getProducers().size() == 4);
                MockedPulsarServiceBaseTest.retryStrategically(r44 -> {
                    return abstractTopic2.getSubscription("s3") != null;
                }, 10, 500L);
                Assert.assertFalse(abstractTopic2.getSubscription("s3").getConsumers().isEmpty());
                for (int i6 = 0; i6 < 5; i6++) {
                    create5.send("test3".getBytes());
                    Assert.assertEquals(subscribe2.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes());
                    Assert.assertEquals(subscribe3.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes());
                    Assert.assertEquals(subscribe7.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes());
                }
                build.close();
                build.close();
                log.info("Successfully consumed messages by migrated consumers");
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testClusterMigrationWithReplicationBacklog() throws Exception {
        log.info("--- Starting ReplicatorTest::testClusterMigrationWithReplicationBacklog ---");
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://" + this.namespace + "/migrationTopic");
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            build = PulsarClient.builder().serviceUrl(this.url3.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                Producer create = build.newProducer().topic(newUniqueName).enableBatching(false).producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                Consumer subscribe = build.newConsumer().topic(new String[]{newUniqueName}).subscriptionType(SubscriptionType.Shared).subscriptionName("s1").subscribe();
                build.newConsumer().topic(new String[]{newUniqueName}).subscriptionType(SubscriptionType.Shared).subscriptionName("s1").subscribe();
                AbstractTopic abstractTopic = (AbstractTopic) ((Optional) this.pulsar1.getBrokerService().getTopic(newUniqueName, false).getNow(null)).get();
                MockedPulsarServiceBaseTest.retryStrategically(r3 -> {
                    return !abstractTopic.getProducers().isEmpty();
                }, 5, 500L);
                MockedPulsarServiceBaseTest.retryStrategically(r32 -> {
                    return !abstractTopic.getSubscriptions().isEmpty();
                }, 5, 500L);
                Assert.assertFalse(abstractTopic.getProducers().isEmpty());
                Assert.assertFalse(abstractTopic.getSubscriptions().isEmpty());
                subscribe.close();
                MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
                    return abstractTopic.getReplicators().size() == 1;
                }, 10, 3000L);
                Assert.assertEquals(abstractTopic.getReplicators().size(), 1);
                this.broker3.stop();
                MockedPulsarServiceBaseTest.retryStrategically(r33 -> {
                    return this.broker3.getPulsarService() == null;
                }, 10, 1000L);
                Assert.assertNull(this.pulsar3.getBrokerService());
                for (int i = 0; i < 5; i++) {
                    create.send("test1".getBytes());
                }
                MockedPulsarServiceBaseTest.retryStrategically(r34 -> {
                    return abstractTopic.isReplicationBacklogExist();
                }, 10, 1000L);
                Assert.assertTrue(abstractTopic.isReplicationBacklogExist());
                PulsarClient build2 = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
                try {
                    build2.newProducer().topic(newUniqueName).enableBatching(false).producerName("cluster2-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                    AbstractTopic abstractTopic2 = (AbstractTopic) ((Optional) this.pulsar2.getBrokerService().getTopic(newUniqueName, false).getNow(null)).get();
                    log.info("name of topic 2 - {}", abstractTopic2.getName());
                    Assert.assertFalse(abstractTopic2.getProducers().isEmpty());
                    MockedPulsarServiceBaseTest.retryStrategically(r42 -> {
                        return abstractTopic2.getReplicators().size() == 1;
                    }, 10, 2000L);
                    log.info("replicators should be ready");
                    this.admin1.clusters().updateClusterMigration("r1", true, new ClusterPolicies.ClusterUrl(this.pulsar2.getWebServiceAddress(), this.pulsar2.getWebServiceAddressTls(), this.pulsar2.getBrokerServiceUrl(), this.pulsar2.getBrokerServiceUrlTls()));
                    log.info("update cluster migration called");
                    MockedPulsarServiceBaseTest.retryStrategically(r35 -> {
                        try {
                            abstractTopic.checkClusterMigration().get();
                            return true;
                        } catch (Exception e) {
                            return false;
                        }
                    }, 10, 500L);
                    abstractTopic.checkClusterMigration().get();
                    create.sendAsync("test1".getBytes());
                    MockedPulsarServiceBaseTest.retryStrategically(r36 -> {
                        return abstractTopic.getProducers().isEmpty();
                    }, 10, 500L);
                    Assert.assertTrue(abstractTopic.getProducers().isEmpty());
                    Assert.assertEquals(abstractTopic2.getProducers().size(), 1);
                    this.broker3.restart();
                    MockedPulsarServiceBaseTest.retryStrategically(r37 -> {
                        return this.broker3.getPulsarService() != null;
                    }, 10, 1000L);
                    Assert.assertNotNull(this.broker3.getPulsarService());
                    this.pulsar3 = this.broker3.getPulsarService();
                    MockedPulsarServiceBaseTest.retryStrategically(r38 -> {
                        return !abstractTopic.isReplicationBacklogExist();
                    }, 10, 1000L);
                    Assert.assertFalse(abstractTopic.isReplicationBacklogExist());
                    abstractTopic.checkClusterMigration().get();
                    MockedPulsarServiceBaseTest.retryStrategically(r43 -> {
                        return abstractTopic2.getProducers().size() == 2;
                    }, 10, 500L);
                    Assert.assertEquals(abstractTopic2.getProducers().size(), 2);
                    build.close();
                    build2.close();
                    build.close();
                    if (Collections.singletonList(build2).get(0) != null) {
                        build2.close();
                    }
                    if (Collections.singletonList(build).get(0) != null) {
                        build.close();
                    }
                } finally {
                    if (Collections.singletonList(build2).get(0) != null) {
                        build2.close();
                    }
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testClusterMigrationWithResourceCreated() throws Exception {
        log.info("--- Starting testClusterMigrationWithResourceCreated ---");
        String str = "pulsar2" + "/migration";
        String clusterName = this.pulsar2.getConfig().getClusterName();
        String clusterName2 = this.pulsar1.getConfig().getClusterName();
        this.admin1.clusters().createCluster(clusterName, ClusterData.builder().serviceUrl(this.url2.toString()).serviceUrlTls(this.urlTls2.toString()).brokerServiceUrl(this.pulsar2.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar2.getBrokerServiceUrlTls()).build());
        this.admin2.clusters().createCluster(clusterName2, ClusterData.builder().serviceUrl(this.url1.toString()).serviceUrlTls(this.urlTls1.toString()).brokerServiceUrl(this.pulsar1.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar1.getBrokerServiceUrlTls()).build());
        this.admin1.tenants().createTenant("pulsar2", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2", "appid3"}), Sets.newHashSet(new String[]{"r1", clusterName})));
        this.admin2.tenants().createTenant("pulsar2", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2", "appid3"}), Sets.newHashSet(new String[]{"r1", clusterName})));
        this.admin1.namespaces().createNamespace(str, Sets.newHashSet(new String[]{"r1", clusterName}));
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://" + str + "/migrationTopic");
        this.broker1.getPulsarService().getConfig().setClusterMigrationAutoResourceCreation(true);
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Producer create = build.newProducer().topic(newUniqueName).enableBatching(false).producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            this.admin1.topics().createSubscription(newUniqueName, "s1", MessageId.earliest);
            this.admin1.topics().createSubscription(newUniqueName, "s2", MessageId.earliest);
            this.admin1.clusters().updateClusterMigration("r1", true, new ClusterPolicies.ClusterUrl(this.pulsar2.getWebServiceAddress(), this.pulsar2.getWebServiceAddressTls(), this.pulsar2.getBrokerServiceUrl(), this.pulsar2.getBrokerServiceUrlTls()));
            PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar1.getBrokerService().getTopic(newUniqueName, false).getNow(null)).get();
            MockedPulsarServiceBaseTest.retryStrategically(r3 -> {
                try {
                    persistentTopic.checkClusterMigration().get();
                    return true;
                } catch (Exception e) {
                    return false;
                }
            }, 10, 500L);
            Assert.assertNotNull(this.admin2.tenants().getTenantInfo("pulsar2"));
            Assert.assertNotNull(this.admin2.namespaces().getPolicies(str));
            List subscriptions = this.admin2.topics().getSubscriptions(newUniqueName);
            Assert.assertTrue(subscriptions.contains("s1"));
            Assert.assertTrue(subscriptions.contains("s2"));
            for (int i = 0; i < 5; i++) {
                create.send("test1".getBytes());
            }
            Consumer subscribe = build.newConsumer().topic(new String[]{newUniqueName}).subscriptionName("s1").subscribe();
            for (int i2 = 0; i2 < 5; i2++) {
                Assert.assertNotNull(subscribe.receive());
            }
            subscribe.close();
            create.close();
            String str2 = newUniqueName + "-new";
            build.newProducer().topic(str2).enableBatching(false).producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            MockedPulsarServiceBaseTest.retryStrategically(r6 -> {
                try {
                    ((Optional) this.pulsar2.getBrokerService().getTopic(str2, false).getNow(null)).get();
                    return true;
                } catch (Exception e) {
                    return false;
                }
            }, 10, 500L);
            PersistentTopic persistentTopic2 = (PersistentTopic) ((Optional) this.pulsar2.getBrokerService().getTopic(str2, false).getNow(null)).get();
            MockedPulsarServiceBaseTest.retryStrategically(r32 -> {
                try {
                    return !persistentTopic2.getProducers().isEmpty();
                } catch (Exception e) {
                    return false;
                }
            }, 10, 500L);
            Assert.assertFalse(persistentTopic2.getProducers().isEmpty());
            build.newConsumer().topic(new String[]{str2}).subscriptionName("s1").subscribe();
            MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
                try {
                    return !persistentTopic2.getSubscription("s1").getConsumers().isEmpty();
                } catch (Exception e) {
                    return false;
                }
            }, 10, 500L);
            Assert.assertFalse(persistentTopic2.getSubscription("s1").getConsumers().isEmpty());
            build.close();
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test(dataProvider = "NamespaceMigrationTopicSubscriptionTypes")
    public void testNamespaceMigration(SubscriptionType subscriptionType, boolean z, boolean z2) throws Exception {
        log.info("--- Starting Test::testNamespaceMigration ---");
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://" + this.namespace + "/migrationTopic");
        String newUniqueName2 = BrokerTestUtil.newUniqueName("persistent://" + this.namespaceNotToMigrate + "/migrationTopic");
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Producer create = build.newProducer().topic(newUniqueName).enableBatching(false).producerName("blue-producer-ns1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            Consumer subscribe = build.newConsumer().topic(new String[]{newUniqueName}).subscriptionType(subscriptionType).subscriptionName("s1").subscribe();
            AbstractTopic abstractTopic = (AbstractTopic) ((Optional) this.pulsar1.getBrokerService().getTopic(newUniqueName, false).getNow(null)).get();
            MockedPulsarServiceBaseTest.retryStrategically(r3 -> {
                return !abstractTopic.getProducers().isEmpty();
            }, 5, 500L);
            MockedPulsarServiceBaseTest.retryStrategically(r32 -> {
                return !abstractTopic.getSubscriptions().isEmpty();
            }, 5, 500L);
            Assert.assertFalse(abstractTopic.getProducers().isEmpty());
            Assert.assertFalse(abstractTopic.getSubscriptions().isEmpty());
            Producer create2 = build.newProducer().topic(newUniqueName2).enableBatching(false).producerName("blue-producer-ns2-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            Consumer subscribe2 = build.newConsumer().topic(new String[]{newUniqueName2}).subscriptionType(subscriptionType).subscriptionName("s1").subscribe();
            AbstractTopic abstractTopic2 = (AbstractTopic) ((Optional) this.pulsar1.getBrokerService().getTopic(newUniqueName2, false).getNow(null)).get();
            MockedPulsarServiceBaseTest.retryStrategically(r33 -> {
                return !abstractTopic2.getProducers().isEmpty();
            }, 5, 500L);
            MockedPulsarServiceBaseTest.retryStrategically(r34 -> {
                return !abstractTopic2.getSubscriptions().isEmpty();
            }, 5, 500L);
            Assert.assertFalse(abstractTopic2.getProducers().isEmpty());
            Assert.assertFalse(abstractTopic2.getSubscriptions().isEmpty());
            subscribe.close();
            subscribe2.close();
            for (int i = 0; i < 5; i++) {
                create.send("test1".getBytes());
                create2.send("test1".getBytes());
            }
            build = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                Producer create3 = build.newProducer().topic(newUniqueName).enableBatching(false).producerName("green-producer-ns1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                AbstractTopic abstractTopic3 = (AbstractTopic) ((Optional) this.pulsar2.getBrokerService().getTopic(newUniqueName, false).getNow(null)).get();
                Assert.assertFalse(abstractTopic3.getProducers().isEmpty());
                Producer create4 = build.newProducer().topic(newUniqueName2).enableBatching(false).producerName("cluster2-nm1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                AbstractTopic abstractTopic4 = (AbstractTopic) ((Optional) this.pulsar2.getBrokerService().getTopic(newUniqueName2, false).getNow(null)).get();
                Assert.assertFalse(abstractTopic4.getProducers().isEmpty());
                this.admin1.clusters().updateClusterMigration("r1", z, new ClusterPolicies.ClusterUrl(this.pulsar2.getWebServiceAddress(), this.pulsar2.getWebServiceAddressTls(), this.pulsar2.getBrokerServiceUrl(), this.pulsar2.getBrokerServiceUrlTls()));
                this.admin1.namespaces().updateMigrationState(this.namespace, z2);
                MockedPulsarServiceBaseTest.retryStrategically(r5 -> {
                    try {
                        abstractTopic.checkClusterMigration().get();
                        if (!z) {
                            return true;
                        }
                        abstractTopic2.checkClusterMigration().get();
                        return true;
                    } catch (Exception e) {
                        return false;
                    }
                }, 10, 500L);
                abstractTopic.checkClusterMigration().get();
                if (z) {
                    abstractTopic2.checkClusterMigration().get();
                }
                log.info("before sending message");
                Thread.sleep(1000L);
                create.sendAsync("test1".getBytes());
                create2.sendAsync("test1".getBytes());
                MockedPulsarServiceBaseTest.retryStrategically(r35 -> {
                    return abstractTopic.getProducers().isEmpty();
                }, 10, 500L);
                Assert.assertTrue(abstractTopic.getProducers().isEmpty());
                if (z) {
                    MockedPulsarServiceBaseTest.retryStrategically(r36 -> {
                        return abstractTopic2.getProducers().isEmpty();
                    }, 10, 500L);
                    Assert.assertTrue(abstractTopic2.getProducers().isEmpty());
                } else {
                    MockedPulsarServiceBaseTest.retryStrategically(r37 -> {
                        return !abstractTopic2.getProducers().isEmpty();
                    }, 10, 500L);
                    Assert.assertTrue(!abstractTopic2.getProducers().isEmpty());
                }
                Producer create5 = build.newProducer().topic(newUniqueName).enableBatching(false).producerName("blue-producer-ns1-2").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
                    return abstractTopic3.getProducers().size() == 3;
                }, 10, 500L);
                Assert.assertTrue(abstractTopic3.getProducers().size() == 3);
                if (z) {
                    MockedPulsarServiceBaseTest.retryStrategically(r42 -> {
                        return abstractTopic4.getProducers().size() == 2;
                    }, 10, 500L);
                    Assert.assertTrue(abstractTopic4.getProducers().size() == 2);
                } else {
                    MockedPulsarServiceBaseTest.retryStrategically(r43 -> {
                        return abstractTopic4.getProducers().size() == 1;
                    }, 10, 500L);
                    Assert.assertTrue(abstractTopic4.getProducers().size() == 1);
                }
                Consumer subscribe3 = build.newConsumer().topic(new String[]{newUniqueName}).subscriptionName("s1").subscribe();
                Consumer subscribe4 = build.newConsumer().topic(new String[]{newUniqueName2}).subscriptionName("s1").subscribe();
                for (int i2 = 0; i2 < 5; i2++) {
                    Message receive = subscribe3.receive();
                    Assert.assertEquals(receive.getData(), "test1".getBytes());
                    subscribe3.acknowledge(receive);
                    Message receive2 = subscribe4.receive();
                    Assert.assertEquals(receive2.getData(), "test1".getBytes());
                    subscribe4.acknowledge(receive2);
                }
                MockedPulsarServiceBaseTest.retryStrategically(r38 -> {
                    return !abstractTopic3.getSubscriptions().isEmpty();
                }, 10, 500L);
                Assert.assertFalse(abstractTopic3.getSubscriptions().isEmpty());
                if (z) {
                    MockedPulsarServiceBaseTest.retryStrategically(r39 -> {
                        return !abstractTopic4.getSubscriptions().isEmpty();
                    }, 10, 500L);
                    Assert.assertFalse(abstractTopic4.getSubscriptions().isEmpty());
                } else {
                    MockedPulsarServiceBaseTest.retryStrategically(r310 -> {
                        return abstractTopic4.getSubscriptions().isEmpty();
                    }, 10, 500L);
                    Assert.assertTrue(abstractTopic4.getSubscriptions().isEmpty());
                }
                abstractTopic.checkClusterMigration().get();
                if (z) {
                    abstractTopic2.checkClusterMigration().get();
                }
                abstractTopic.getReplicators().forEach((str, replicator) -> {
                    Assert.assertFalse(replicator.isConnected());
                });
                Assert.assertTrue(abstractTopic.getSubscriptions().isEmpty());
                if (z) {
                    abstractTopic2.getReplicators().forEach((str2, replicator2) -> {
                        Assert.assertFalse(replicator2.isConnected());
                    });
                    Assert.assertTrue(abstractTopic2.getSubscriptions().isEmpty());
                } else {
                    abstractTopic2.getReplicators().forEach((str3, replicator3) -> {
                        Assert.assertTrue(replicator3.isConnected());
                    });
                    Assert.assertFalse(abstractTopic2.getSubscriptions().isEmpty());
                }
                Consumer subscribe5 = build.newConsumer().topic(new String[]{newUniqueName}).subscriptionType(subscriptionType).subscriptionName("s2").subscribe();
                build.newConsumer().topic(new String[]{newUniqueName2}).subscriptionType(subscriptionType).subscriptionName("s2").subscribe();
                MockedPulsarServiceBaseTest.retryStrategically(r44 -> {
                    return abstractTopic3.getSubscription("s2") != null;
                }, 10, 500L);
                Assert.assertFalse(abstractTopic3.getSubscription("s2").getConsumers().isEmpty());
                if (z) {
                    MockedPulsarServiceBaseTest.retryStrategically(r45 -> {
                        return abstractTopic4.getSubscription("s2") != null;
                    }, 10, 500L);
                    Assert.assertFalse(abstractTopic4.getSubscription("s2").getConsumers().isEmpty());
                } else {
                    MockedPulsarServiceBaseTest.retryStrategically(r46 -> {
                        return abstractTopic4.getSubscription("s2") == null;
                    }, 10, 500L);
                }
                Consumer subscribe6 = build.newConsumer().topic(new String[]{newUniqueName}).subscriptionType(subscriptionType).subscriptionName("sM").subscribe();
                Assert.assertFalse(((Topic) this.pulsar2.getBrokerService().getTopicReference(newUniqueName).get()).getSubscription("sM").getConsumers().isEmpty());
                subscribe6.close();
                String str4 = newUniqueName + "-new";
                Consumer subscribe7 = build.newConsumer().topic(new String[]{str4}).subscriptionType(subscriptionType).subscriptionName("sM").subscribe();
                MockedPulsarServiceBaseTest.retryStrategically(r52 -> {
                    return this.pulsar1.getBrokerService().getTopicReference(str4).isPresent();
                }, 5, 100L);
                ((Topic) this.pulsar2.getBrokerService().getTopicReference(str4).get()).checkClusterMigration().get();
                MockedPulsarServiceBaseTest.retryStrategically(r53 -> {
                    return this.pulsar2.getBrokerService().getTopicReference(str4).isPresent() && ((Topic) this.pulsar2.getBrokerService().getTopicReference(str4).get()).getSubscription("sM").getConsumers().isEmpty();
                }, 5, 100L);
                Assert.assertFalse(((Topic) this.pulsar2.getBrokerService().getTopicReference(str4).get()).getSubscription("sM").getConsumers().isEmpty());
                subscribe7.close();
                for (int i3 = 0; i3 < 5; i3++) {
                    create.send("test2".getBytes());
                    create5.send("test2".getBytes());
                    create3.send("test2".getBytes());
                }
                log.info("Successfully published messages by migrated producers");
                for (int i4 = 0; i4 < 5 * 3; i4++) {
                    Assert.assertEquals(subscribe3.receive(2, TimeUnit.SECONDS).getData(), "test2".getBytes());
                    Assert.assertEquals(subscribe5.receive(2, TimeUnit.SECONDS).getData(), "test2".getBytes());
                }
                String newUniqueName3 = BrokerTestUtil.newUniqueName("persistent://" + this.namespace + "/migrationTopic");
                Consumer subscribe8 = build.newConsumer().topic(new String[]{newUniqueName3}).subscriptionType(subscriptionType).subscriptionName("s1-d").subscribe();
                Producer create6 = build.newProducer().topic(newUniqueName3).enableBatching(false).producerName("cluster1-d").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                Assert.assertNotNull((AbstractTopic) ((Optional) this.pulsar2.getBrokerService().getTopic(newUniqueName3, false).getNow(null)).get());
                for (int i5 = 0; i5 < 5; i5++) {
                    create6.send("diff".getBytes());
                    Assert.assertEquals(subscribe8.receive(2, TimeUnit.SECONDS).getData(), "diff".getBytes());
                }
                this.broker1.restart();
                Producer create7 = build.newProducer().topic(newUniqueName).enableBatching(false).producerName("cluster1-4").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                Consumer subscribe9 = build.newConsumer().topic(new String[]{newUniqueName}).subscriptionType(subscriptionType).subscriptionName("s3").subscribe();
                MockedPulsarServiceBaseTest.retryStrategically(r47 -> {
                    return abstractTopic3.getProducers().size() == 4;
                }, 10, 500L);
                Assert.assertTrue(abstractTopic3.getProducers().size() == 4);
                MockedPulsarServiceBaseTest.retryStrategically(r48 -> {
                    return abstractTopic3.getSubscription("s3") != null;
                }, 10, 500L);
                Assert.assertFalse(abstractTopic3.getSubscription("s3").getConsumers().isEmpty());
                for (int i6 = 0; i6 < 5; i6++) {
                    create7.send("test3".getBytes());
                    Assert.assertEquals(subscribe3.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes());
                    Assert.assertEquals(subscribe5.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes());
                    Assert.assertEquals(subscribe9.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes());
                }
                log.info("Successfully consumed messages by migrated consumers");
                subscribe3.close();
                subscribe5.close();
                subscribe4.close();
                create.close();
                create5.close();
                create2.close();
                create3.close();
                create4.close();
                build.close();
                build.close();
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test(dataProvider = "NamespaceMigrationTopicSubscriptionTypes")
    public void testNamespaceMigrationWithReplicationBacklog(SubscriptionType subscriptionType, boolean z, boolean z2) throws Exception {
        log.info("--- Starting ReplicatorTest::testNamespaceMigrationWithReplicationBacklog ---");
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://" + this.namespace + "/migrationTopic");
        String newUniqueName2 = BrokerTestUtil.newUniqueName("persistent://" + this.namespaceNotToMigrate + "/migrationTopic");
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            PulsarClient build2 = PulsarClient.builder().serviceUrl(this.url3.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                Producer create = build.newProducer().topic(newUniqueName).enableBatching(false).producerName("blue-producer-ns1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                Consumer subscribe = build.newConsumer().topic(new String[]{newUniqueName}).subscriptionType(subscriptionType).subscriptionName("s1").subscribe();
                Producer create2 = build.newProducer().topic(newUniqueName2).enableBatching(false).producerName("blue-producer-ns1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                Consumer subscribe2 = build.newConsumer().topic(new String[]{newUniqueName2}).subscriptionType(subscriptionType).subscriptionName("s1").subscribe();
                build2.newConsumer().topic(new String[]{newUniqueName}).subscriptionType(subscriptionType).subscriptionName("s1").subscribe();
                build2.newConsumer().topic(new String[]{newUniqueName2}).subscriptionType(subscriptionType).subscriptionName("s1").subscribe();
                AbstractTopic abstractTopic = (AbstractTopic) ((Optional) this.pulsar1.getBrokerService().getTopic(newUniqueName, false).getNow(null)).get();
                MockedPulsarServiceBaseTest.retryStrategically(r3 -> {
                    return !abstractTopic.getProducers().isEmpty();
                }, 5, 500L);
                MockedPulsarServiceBaseTest.retryStrategically(r32 -> {
                    return !abstractTopic.getSubscriptions().isEmpty();
                }, 5, 500L);
                Assert.assertFalse(abstractTopic.getProducers().isEmpty());
                Assert.assertFalse(abstractTopic.getSubscriptions().isEmpty());
                AbstractTopic abstractTopic2 = (AbstractTopic) ((Optional) this.pulsar1.getBrokerService().getTopic(newUniqueName2, false).getNow(null)).get();
                MockedPulsarServiceBaseTest.retryStrategically(r33 -> {
                    return !abstractTopic2.getProducers().isEmpty();
                }, 5, 500L);
                MockedPulsarServiceBaseTest.retryStrategically(r34 -> {
                    return !abstractTopic2.getSubscriptions().isEmpty();
                }, 5, 500L);
                Assert.assertFalse(abstractTopic2.getProducers().isEmpty());
                Assert.assertFalse(abstractTopic2.getSubscriptions().isEmpty());
                subscribe.close();
                subscribe2.close();
                MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
                    return abstractTopic.getReplicators().size() == 1;
                }, 10, 3000L);
                Assert.assertEquals(abstractTopic.getReplicators().size(), 1);
                MockedPulsarServiceBaseTest.retryStrategically(r42 -> {
                    return abstractTopic2.getReplicators().size() == 1;
                }, 10, 3000L);
                Assert.assertEquals(abstractTopic2.getReplicators().size(), 1);
                this.broker3.stop();
                MockedPulsarServiceBaseTest.retryStrategically(r35 -> {
                    return this.broker3.getPulsarService() == null;
                }, 10, 1000L);
                Assert.assertNull(this.pulsar3.getBrokerService());
                for (int i = 0; i < 5; i++) {
                    create.send("test1".getBytes());
                    create2.send("test1".getBytes());
                }
                MockedPulsarServiceBaseTest.retryStrategically(r36 -> {
                    return abstractTopic.isReplicationBacklogExist();
                }, 10, 1000L);
                Assert.assertTrue(abstractTopic.isReplicationBacklogExist());
                MockedPulsarServiceBaseTest.retryStrategically(r37 -> {
                    return abstractTopic2.isReplicationBacklogExist();
                }, 10, 1000L);
                Assert.assertTrue(abstractTopic2.isReplicationBacklogExist());
                build = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
                try {
                    Producer create3 = build.newProducer().topic(newUniqueName).enableBatching(false).producerName("green-producer-ns1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                    AbstractTopic abstractTopic3 = (AbstractTopic) ((Optional) this.pulsar2.getBrokerService().getTopic(newUniqueName, false).getNow(null)).get();
                    Producer create4 = build.newProducer().topic(newUniqueName2).enableBatching(false).producerName("green-producer-ns2-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                    AbstractTopic abstractTopic4 = (AbstractTopic) ((Optional) this.pulsar2.getBrokerService().getTopic(newUniqueName2, false).getNow(null)).get();
                    log.info("name of topic 2 - {}", abstractTopic3.getName());
                    Assert.assertFalse(abstractTopic3.getProducers().isEmpty());
                    MockedPulsarServiceBaseTest.retryStrategically(r43 -> {
                        return abstractTopic3.getReplicators().size() == 1;
                    }, 10, 2000L);
                    log.info("replicators should be ready");
                    this.admin1.clusters().updateClusterMigration("r1", z, new ClusterPolicies.ClusterUrl(this.pulsar2.getWebServiceAddress(), this.pulsar2.getWebServiceAddressTls(), this.pulsar2.getBrokerServiceUrl(), this.pulsar2.getBrokerServiceUrlTls()));
                    this.admin1.namespaces().updateMigrationState(this.namespace, z2);
                    Assert.assertEquals(this.admin1.namespaces().getPolicies(this.namespace).migrated, z2);
                    log.info("update cluster migration called");
                    MockedPulsarServiceBaseTest.retryStrategically(r5 -> {
                        try {
                            abstractTopic.checkClusterMigration().get();
                            if (!z) {
                                return true;
                            }
                            abstractTopic2.checkClusterMigration().get();
                            return true;
                        } catch (Exception e) {
                            return false;
                        }
                    }, 10, 500L);
                    abstractTopic.checkClusterMigration().get();
                    if (z) {
                        abstractTopic2.checkClusterMigration().get();
                    }
                    create.sendAsync("test1".getBytes());
                    create2.sendAsync("test1".getBytes());
                    MockedPulsarServiceBaseTest.retryStrategically(r38 -> {
                        return abstractTopic.getProducers().isEmpty();
                    }, 10, 500L);
                    Assert.assertTrue(abstractTopic.getProducers().isEmpty());
                    if (z) {
                        MockedPulsarServiceBaseTest.retryStrategically(r39 -> {
                            return abstractTopic2.getProducers().isEmpty();
                        }, 10, 500L);
                        Assert.assertTrue(abstractTopic2.getProducers().isEmpty());
                    } else {
                        MockedPulsarServiceBaseTest.retryStrategically(r310 -> {
                            return !abstractTopic2.getProducers().isEmpty();
                        }, 10, 500L);
                        Assert.assertFalse(abstractTopic2.getProducers().isEmpty());
                    }
                    Assert.assertEquals(abstractTopic3.getProducers().size(), 1);
                    this.broker3.restart();
                    MockedPulsarServiceBaseTest.retryStrategically(r311 -> {
                        return this.broker3.getPulsarService() != null;
                    }, 10, 1000L);
                    Assert.assertNotNull(this.broker3.getPulsarService());
                    this.pulsar3 = this.broker3.getPulsarService();
                    MockedPulsarServiceBaseTest.retryStrategically(r312 -> {
                        return !abstractTopic.isReplicationBacklogExist();
                    }, 10, 1000L);
                    Assert.assertFalse(abstractTopic.isReplicationBacklogExist());
                    MockedPulsarServiceBaseTest.retryStrategically(r313 -> {
                        return !abstractTopic2.isReplicationBacklogExist();
                    }, 10, 1000L);
                    Assert.assertFalse(abstractTopic2.isReplicationBacklogExist());
                    abstractTopic.checkClusterMigration().get();
                    abstractTopic2.checkClusterMigration().get();
                    MockedPulsarServiceBaseTest.retryStrategically(r44 -> {
                        return abstractTopic3.getProducers().size() == 2;
                    }, 10, 500L);
                    Assert.assertEquals(abstractTopic3.getProducers().size(), 2);
                    if (z) {
                        MockedPulsarServiceBaseTest.retryStrategically(r45 -> {
                            return abstractTopic4.getProducers().size() == 2;
                        }, 10, 500L);
                        Assert.assertEquals(abstractTopic4.getProducers().size(), 2);
                    } else {
                        MockedPulsarServiceBaseTest.retryStrategically(r46 -> {
                            return abstractTopic4.getProducers().size() == 1;
                        }, 10, 500L);
                        Assert.assertEquals(abstractTopic4.getProducers().size(), 1);
                    }
                    create.close();
                    create2.close();
                    subscribe.close();
                    subscribe2.close();
                    create3.close();
                    create4.close();
                    build.close();
                    build.close();
                    build2.close();
                    if (Collections.singletonList(build).get(0) != null) {
                        build.close();
                    }
                    if (Collections.singletonList(build2).get(0) != null) {
                        build2.close();
                    }
                } finally {
                    if (Collections.singletonList(build).get(0) != null) {
                        build.close();
                    }
                }
            } finally {
                if (Collections.singletonList(build2).get(0) != null) {
                    build2.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }
}
