package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ReplicatorTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.proto.Test;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.ProducerInterceptors;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.schema.Schemas;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.testng.collections.Lists;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/ReplicatorTest.class */
public class ReplicatorTest extends ReplicatorTestBase {
    protected String methodName;
    private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);

    @BeforeMethod(alwaysRun = true)
    public void beforeMethod(Method method) throws Exception {
        this.methodName = method.getName();
        this.admin1.namespaces().removeBacklogQuota("pulsar/ns");
        this.admin1.namespaces().removeBacklogQuota("pulsar/ns1");
        this.admin1.namespaces().removeBacklogQuota("pulsar/global/ns");
    }

    @Override // org.apache.pulsar.broker.service.ReplicatorTestBase
    @BeforeClass(alwaysRun = true, timeOut = 300000)
    public void setup() throws Exception {
        super.setup();
    }

    @Override // org.apache.pulsar.broker.service.ReplicatorTestBase
    @AfterClass(alwaysRun = true, timeOut = 300000)
    public void cleanup() throws Exception {
        super.cleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "partitionedTopic")
    public Object[][] partitionedTopicProvider() {
        return new Object[]{new Object[]{Boolean.TRUE}, new Object[]{Boolean.FALSE}};
    }

    @Test
    public void testConfigChange() throws Exception {
        log.info("--- Starting ReplicatorTest::testConfigChange ---");
        List newArrayList = Lists.newArrayList();
        for (int i = 0; i < 10; i++) {
            final TopicName topicName = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i));
            newArrayList.add(this.executor.submit(new Callable<Void>() { // from class: org.apache.pulsar.broker.service.ReplicatorTest.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(ReplicatorTest.this.url1, topicName);
                    try {
                        ReplicatorTest.log.info("--- Starting producer --- " + ReplicatorTest.this.url1);
                        ReplicatorTestBase.MessageConsumer messageConsumer = new ReplicatorTestBase.MessageConsumer(ReplicatorTest.this.url1, topicName);
                        try {
                            ReplicatorTest.log.info("--- Starting Consumer --- " + ReplicatorTest.this.url1);
                            messageProducer.produce(2);
                            messageConsumer.receive(2);
                            if (Collections.singletonList(messageConsumer).get(0) != null) {
                                messageConsumer.close();
                            }
                            return null;
                        } catch (Throwable th) {
                            if (Collections.singletonList(messageConsumer).get(0) != null) {
                                messageConsumer.close();
                            }
                            throw th;
                        }
                    } finally {
                        if (Collections.singletonList(messageProducer).get(0) != null) {
                            messageProducer.close();
                        }
                    }
                }
            }));
        }
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            try {
                ((Future) it.next()).get();
            } catch (Exception e) {
                log.error("exception in getting future result ", e);
                Assert.fail(String.format("replication test failed with %s exception", e.getMessage()));
            }
        }
        Thread.sleep(1000L);
        ConcurrentOpenHashMap replicationClients = this.ns1.getReplicationClients();
        ConcurrentOpenHashMap replicationClients2 = this.ns2.getReplicationClients();
        ConcurrentOpenHashMap replicationClients3 = this.ns3.getReplicationClients();
        Assert.assertNotNull(replicationClients.get("r2"));
        Assert.assertNotNull(replicationClients.get("r3"));
        Assert.assertNotNull(replicationClients2.get("r1"));
        Assert.assertNotNull(replicationClients2.get("r3"));
        Assert.assertNotNull(replicationClients3.get("r1"));
        Assert.assertNotNull(replicationClients3.get("r2"));
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet(new String[]{"r1"}));
        Thread.sleep(1000L);
        Assert.assertNotNull(replicationClients.get("r2"));
        Assert.assertNotNull(replicationClients.get("r3"));
        Assert.assertNotNull(replicationClients2.get("r1"));
        Assert.assertNotNull(replicationClients2.get("r3"));
        Assert.assertNotNull(replicationClients3.get("r1"));
        Assert.assertNotNull(replicationClients3.get("r2"));
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet(new String[]{"r1", "r2", "r3"}));
        Thread.sleep(1000L);
        Assert.assertNotNull(replicationClients.get("r2"));
        Assert.assertNotNull(replicationClients.get("r3"));
        Assert.assertNotNull(replicationClients2.get("r1"));
        Assert.assertNotNull(replicationClients2.get("r3"));
        Assert.assertNotNull(replicationClients3.get("r1"));
        Assert.assertNotNull(replicationClients3.get("r2"));
    }

    @Test(timeOut = 10000)
    public void activeBrokerParse() throws Exception {
        this.pulsar1.getConfiguration().setAuthorizationEnabled(true);
        String str = "activeCLuster2";
        this.admin2.clusters().createCluster("activeCLuster2", ClusterData.builder().serviceUrl(String.format("%s,localhost:1234,localhost:5678", this.pulsar2.getWebServiceAddress())).build());
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.admin2.clusters().getCluster(str) != null);
        });
        Assert.assertEquals((String) this.admin1.brokers().getActiveBrokers("activeCLuster2").get(0), this.url2.toString().replace("http://", ""));
        this.pulsar1.getConfiguration().setAuthorizationEnabled(false);
    }

    @Test(timeOut = 30000)
    public void testConcurrentReplicator() throws Exception {
        log.info("--- Starting ReplicatorTest::testConcurrentReplicator ---");
        String newUniqueName = BrokerTestUtil.newUniqueName("pulsar/concurrent");
        this.admin1.namespaces().createNamespace(newUniqueName);
        this.admin1.namespaces().setNamespaceReplicationClusters(newUniqueName, Sets.newHashSet(new String[]{"r1", "r2"}));
        TopicName topicName = TopicName.get(BrokerTestUtil.newUniqueName("persistent://" + newUniqueName + "/topic"));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            build.newProducer().topic(topicName.toString()).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create().close();
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar1.getBrokerService().getOrCreateTopic(topicName.toString()).get();
            PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) Mockito.spy(this.pulsar1.getBrokerService().getReplicationClient("r3", this.pulsar1.getBrokerService().pulsar().getPulsarResources().getClusterResources().getCluster("r3")));
            Method declaredMethod = PersistentTopic.class.getDeclaredMethod("startReplicator", String.class);
            declaredMethod.setAccessible(true);
            Field declaredField = BrokerService.class.getDeclaredField("replicationClients");
            declaredField.setAccessible(true);
            ((ConcurrentOpenHashMap) declaredField.get(this.pulsar1.getBrokerService())).put("r3", pulsarClientImpl);
            this.admin1.namespaces().setNamespaceReplicationClusters(newUniqueName, Sets.newHashSet(new String[]{"r1", "r2", "r3"}));
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
            for (int i = 0; i < 5; i++) {
                try {
                    newFixedThreadPool.submit(() -> {
                        try {
                            declaredMethod.invoke(persistentTopic, "r3");
                        } catch (Exception e) {
                            Assert.fail("setting replicator failed", e);
                        }
                    });
                } catch (Throwable th) {
                    if (Collections.singletonList(newFixedThreadPool).get(0) != null) {
                        newFixedThreadPool.shutdownNow();
                    }
                    throw th;
                }
            }
            Thread.sleep(3000L);
            ((PulsarClientImpl) Mockito.verify(pulsarClientImpl, Mockito.times(1))).createProducerAsync((ProducerConfigurationData) Mockito.any(ProducerConfigurationData.class), (Schema) Mockito.any(Schema.class), (ProducerInterceptors) ArgumentMatchers.eq((Object) null));
            if (Collections.singletonList(newFixedThreadPool).get(0) != null) {
                newFixedThreadPool.shutdownNow();
            }
        } finally {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "namespace")
    public Object[][] namespaceNameProvider() {
        return new Object[]{new Object[]{"pulsar/ns"}, new Object[]{"pulsar/global/ns"}};
    }

    /* JADX WARN: Finally extract failed */
    @Test(dataProvider = "namespace")
    public void testReplication(String str) throws Exception {
        log.info("--- Starting ReplicatorTest::testReplication ---");
        TopicName topicName = TopicName.get(BrokerTestUtil.newUniqueName("persistent://" + str + "/repltopic"));
        ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(this.url1, topicName);
        try {
            log.info("--- Starting producer --- " + this.url1);
            ReplicatorTestBase.MessageProducer messageProducer2 = new ReplicatorTestBase.MessageProducer(this.url2, topicName);
            try {
                log.info("--- Starting producer --- " + this.url2);
                messageProducer = new ReplicatorTestBase.MessageProducer(this.url3, topicName);
                try {
                    log.info("--- Starting producer --- " + this.url3);
                    ReplicatorTestBase.MessageConsumer messageConsumer = new ReplicatorTestBase.MessageConsumer(this.url1, topicName);
                    try {
                        log.info("--- Starting Consumer --- " + this.url1);
                        ReplicatorTestBase.MessageConsumer messageConsumer2 = new ReplicatorTestBase.MessageConsumer(this.url2, topicName);
                        try {
                            log.info("--- Starting Consumer --- " + this.url2);
                            ReplicatorTestBase.MessageConsumer messageConsumer3 = new ReplicatorTestBase.MessageConsumer(this.url3, topicName);
                            try {
                                log.info("--- Starting Consumer --- " + this.url3);
                                messageProducer.produce(2);
                                messageConsumer.receive(2);
                                messageConsumer2.receive(2);
                                messageConsumer3.receive(2);
                                messageProducer2.produce(2);
                                messageConsumer.receive(2);
                                messageConsumer2.receive(2);
                                messageConsumer3.receive(2);
                                messageProducer.produce(2);
                                messageConsumer.receive(2);
                                messageConsumer2.receive(2);
                                messageConsumer3.receive(2);
                                messageProducer.produce(1);
                                messageProducer2.produce(1);
                                messageConsumer.receive(1);
                                messageConsumer2.receive(1);
                                messageConsumer3.receive(1);
                                messageConsumer.receive(1);
                                messageConsumer2.receive(1);
                                messageConsumer3.receive(1);
                                if (Collections.singletonList(messageConsumer3).get(0) != null) {
                                    messageConsumer3.close();
                                }
                                if (Collections.singletonList(messageConsumer2).get(0) != null) {
                                    messageConsumer2.close();
                                }
                                if (Collections.singletonList(messageConsumer).get(0) != null) {
                                    messageConsumer.close();
                                }
                                if (Collections.singletonList(messageProducer).get(0) != null) {
                                    messageProducer.close();
                                }
                                if (Collections.singletonList(messageProducer2).get(0) != null) {
                                    messageProducer2.close();
                                }
                            } catch (Throwable th) {
                                if (Collections.singletonList(messageConsumer3).get(0) != null) {
                                    messageConsumer3.close();
                                }
                                throw th;
                            }
                        } catch (Throwable th2) {
                            if (Collections.singletonList(messageConsumer2).get(0) != null) {
                                messageConsumer2.close();
                            }
                            throw th2;
                        }
                    } catch (Throwable th3) {
                        if (Collections.singletonList(messageConsumer).get(0) != null) {
                            messageConsumer.close();
                        }
                        throw th3;
                    }
                } finally {
                    if (Collections.singletonList(messageProducer).get(0) != null) {
                        messageProducer.close();
                    }
                }
            } finally {
                if (Collections.singletonList(messageProducer2).get(0) != null) {
                    messageProducer2.close();
                }
            }
        } catch (Throwable th4) {
            if (Collections.singletonList(messageProducer).get(0) != null) {
                messageProducer.close();
            }
            throw th4;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test(invocationCount = Test.TestMessage.NESTEDFIELD_FIELD_NUMBER)
    public void testReplicationWithSchema() throws Exception {
        this.config1.setBrokerDeduplicationEnabled(true);
        this.config2.setBrokerDeduplicationEnabled(true);
        this.config3.setBrokerDeduplicationEnabled(true);
        PulsarClient client = this.pulsar1.getClient();
        PulsarClient client2 = this.pulsar2.getClient();
        PulsarClient client3 = this.pulsar3.getClient();
        TopicName topicName = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/testReplicationWithSchema"));
        Producer create = client.newProducer(Schema.AVRO(Schemas.PersonOne.class)).topic(topicName.toString()).enableBatching(false).create();
        try {
            Producer create2 = client.newProducer(Schema.AVRO(Schemas.PersonThree.class)).topic(topicName.toString()).enableBatching(false).create();
            try {
                this.admin1.topics().createSubscription(topicName.toString(), "my-sub", MessageId.earliest);
                this.admin2.topics().createSubscription(topicName.toString(), "my-sub", MessageId.earliest);
                this.admin3.topics().createSubscription(topicName.toString(), "my-sub", MessageId.earliest);
                for (int i = 0; i < 500; i++) {
                    create.sendAsync(new Schemas.PersonOne(i));
                }
                for (int i2 = 500; i2 < 1000; i2++) {
                    create2.sendAsync(new Schemas.PersonThree(i2, "name-" + i2));
                }
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertTrue(this.admin1.topics().getInternalStats(topicName.toString()).schemaLedgers.size() > 0);
                    Assert.assertTrue(this.admin2.topics().getInternalStats(topicName.toString()).schemaLedgers.size() > 0);
                    Assert.assertTrue(this.admin3.topics().getInternalStats(topicName.toString()).schemaLedgers.size() > 0);
                });
                Consumer subscribe = client.newConsumer(Schema.AUTO_CONSUME()).topic(new String[]{topicName.toString()}).subscriptionName("my-sub").subscribe();
                try {
                    Consumer subscribe2 = client2.newConsumer(Schema.AUTO_CONSUME()).topic(new String[]{topicName.toString()}).subscriptionName("my-sub").subscribe();
                    try {
                        Consumer subscribe3 = client3.newConsumer(Schema.AUTO_CONSUME()).topic(new String[]{topicName.toString()}).subscriptionName("my-sub").subscribe();
                        int i3 = -1;
                        for (int i4 = 0; i4 < 1000; i4++) {
                            try {
                                Message receive = subscribe.receive();
                                Message receive2 = subscribe2.receive();
                                Message receive3 = subscribe3.receive();
                                Assert.assertTrue((receive == null || receive2 == null || receive3 == null) ? false : true);
                                GenericRecord genericRecord = (GenericRecord) receive.getValue();
                                GenericRecord genericRecord2 = (GenericRecord) receive2.getValue();
                                GenericRecord genericRecord3 = (GenericRecord) receive3.getValue();
                                int intValue = ((Integer) genericRecord.getField("id")).intValue();
                                int intValue2 = ((Integer) genericRecord2.getField("id")).intValue();
                                int intValue3 = ((Integer) genericRecord3.getField("id")).intValue();
                                log.info("Received ids, id1: {}, id2: {}, id3: {}, lastId: {}", new Object[]{Integer.valueOf(intValue), Integer.valueOf(intValue2), Integer.valueOf(intValue3), Integer.valueOf(i3)});
                                Assert.assertTrue(intValue == intValue2 && intValue2 == intValue3);
                                Assert.assertTrue(intValue > i3);
                                i3 = intValue;
                                subscribe.acknowledge(receive);
                                subscribe2.acknowledge(receive2);
                                subscribe3.acknowledge(receive3);
                            } catch (Throwable th) {
                                if (Collections.singletonList(subscribe3).get(0) != null) {
                                    subscribe3.close();
                                }
                                throw th;
                            }
                        }
                        create = client.newProducer().topic(topicName.toString()).enableBatching(false).create();
                        try {
                            byte[] bytes = "Bytes".getBytes();
                            create.send(bytes);
                            Assert.assertEquals(((GenericRecord) subscribe.receive().getValue()).getNativeObject(), bytes);
                            Assert.assertEquals(((GenericRecord) subscribe2.receive().getValue()).getNativeObject(), bytes);
                            Assert.assertEquals(((GenericRecord) subscribe3.receive().getValue()).getNativeObject(), bytes);
                            if (Collections.singletonList(create).get(0) != null) {
                                create.close();
                            }
                            if (Collections.singletonList(subscribe3).get(0) != null) {
                                subscribe3.close();
                            }
                            if (Collections.singletonList(subscribe2).get(0) != null) {
                                subscribe2.close();
                            }
                            if (Collections.singletonList(subscribe).get(0) != null) {
                                subscribe.close();
                            }
                            if (Collections.singletonList(create2).get(0) != null) {
                                create2.close();
                            }
                        } finally {
                            if (Collections.singletonList(create).get(0) != null) {
                                create.close();
                            }
                        }
                    } catch (Throwable th2) {
                        if (Collections.singletonList(subscribe2).get(0) != null) {
                            subscribe2.close();
                        }
                        throw th2;
                    }
                } catch (Throwable th3) {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    throw th3;
                }
            } finally {
                if (Collections.singletonList(create2).get(0) != null) {
                    create2.close();
                }
            }
        } catch (Throwable th4) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th4;
        }
    }

    /* JADX WARN: Finally extract failed */
    @org.testng.annotations.Test
    public void testReplicationOverrides() throws Exception {
        log.info("--- Starting ReplicatorTest::testReplicationOverrides ---");
        for (int i = 0; i < 10; i++) {
            TopicName topicName = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopic"));
            ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(this.url1, topicName);
            try {
                log.info("--- Starting producer --- " + this.url1);
                ReplicatorTestBase.MessageProducer messageProducer2 = new ReplicatorTestBase.MessageProducer(this.url2, topicName);
                try {
                    log.info("--- Starting producer --- " + this.url2);
                    messageProducer2 = new ReplicatorTestBase.MessageProducer(this.url3, topicName);
                    try {
                        log.info("--- Starting producer --- " + this.url3);
                        ReplicatorTestBase.MessageConsumer messageConsumer = new ReplicatorTestBase.MessageConsumer(this.url1, topicName);
                        try {
                            log.info("--- Starting Consumer --- " + this.url1);
                            ReplicatorTestBase.MessageConsumer messageConsumer2 = new ReplicatorTestBase.MessageConsumer(this.url2, topicName);
                            try {
                                log.info("--- Starting Consumer --- " + this.url2);
                                messageConsumer2 = new ReplicatorTestBase.MessageConsumer(this.url3, topicName);
                                try {
                                    log.info("--- Starting Consumer --- " + this.url3);
                                    messageProducer.produce(1, messageProducer.newMessage().disableReplication());
                                    messageConsumer.receive(1);
                                    Assert.assertTrue(messageConsumer2.drained());
                                    Assert.assertTrue(messageConsumer2.drained());
                                    messageProducer.produce(1, messageProducer.newMessage().replicationClusters(Lists.newArrayList(new String[]{"r1", "r3"})));
                                    messageConsumer.receive(1);
                                    Assert.assertTrue(messageConsumer2.drained());
                                    messageConsumer2.receive(1);
                                    messageProducer.produce(1);
                                    messageConsumer.receive(1);
                                    messageConsumer2.receive(1);
                                    messageConsumer2.receive(1);
                                    Assert.assertTrue(messageConsumer.drained());
                                    Assert.assertTrue(messageConsumer2.drained());
                                    Assert.assertTrue(messageConsumer2.drained());
                                    if (Collections.singletonList(messageConsumer2).get(0) != null) {
                                        messageConsumer2.close();
                                    }
                                    if (Collections.singletonList(messageConsumer2).get(0) != null) {
                                        messageConsumer2.close();
                                    }
                                    if (Collections.singletonList(messageConsumer).get(0) != null) {
                                        messageConsumer.close();
                                    }
                                    if (Collections.singletonList(messageProducer2).get(0) != null) {
                                        messageProducer2.close();
                                    }
                                    if (Collections.singletonList(messageProducer2).get(0) != null) {
                                        messageProducer2.close();
                                    }
                                    if (Collections.singletonList(messageProducer).get(0) != null) {
                                        messageProducer.close();
                                    }
                                } finally {
                                    if (Collections.singletonList(messageConsumer2).get(0) != null) {
                                        messageConsumer2.close();
                                    }
                                }
                            } finally {
                            }
                        } catch (Throwable th) {
                            throw th;
                        }
                    } finally {
                        if (Collections.singletonList(messageProducer2).get(0) != null) {
                            messageProducer2.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th2) {
                throw th2;
            }
        }
    }

    @org.testng.annotations.Test
    public void testFailures() {
        log.info("--- Starting ReplicatorTest::testFailures ---");
        try {
            new ReplicatorTestBase.MessageConsumer(this.url2, TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/res-cons-id-")), "pulsar.repl.").close();
        } catch (Exception e) {
        }
    }

    @org.testng.annotations.Test(timeOut = 30000)
    public void testReplicatePeekAndSkip() throws Exception {
        TopicName topicName = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/peekAndSeekTopic"));
        ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(this.url1, topicName);
        try {
            ReplicatorTestBase.MessageConsumer messageConsumer = new ReplicatorTestBase.MessageConsumer(this.url3, topicName);
            try {
                messageProducer.produce(2);
                PersistentTopic persistentTopic = (PersistentTopic) this.pulsar1.getBrokerService().getTopicReference(topicName.toString()).get();
                PersistentReplicator persistentReplicator = (PersistentReplicator) persistentTopic.getReplicators().get((String) persistentTopic.getReplicators().keys().get(0));
                persistentReplicator.skipMessages(2);
                Assert.assertNull((Entry) persistentReplicator.peekNthMessage(1).get(50L, TimeUnit.MILLISECONDS));
                if (Collections.singletonList(messageConsumer).get(0) != null) {
                    messageConsumer.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(messageConsumer).get(0) != null) {
                    messageConsumer.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(messageProducer).get(0) != null) {
                messageProducer.close();
            }
        }
    }

    @org.testng.annotations.Test(timeOut = 30000)
    public void testReplicatorClearBacklog() throws Exception {
        TreeSet treeSet = new TreeSet();
        TopicName topicName = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/clearBacklogTopic"));
        treeSet.add(topicName.toString());
        ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(this.url1, topicName);
        try {
            ReplicatorTestBase.MessageConsumer messageConsumer = new ReplicatorTestBase.MessageConsumer(this.url3, topicName);
            try {
                messageProducer.produce(2);
                PersistentTopic persistentTopic = (PersistentTopic) this.pulsar1.getBrokerService().getTopicReference(topicName.toString()).get();
                PersistentReplicator persistentReplicator = (PersistentReplicator) Mockito.spy((Replicator) persistentTopic.getReplicators().get((String) persistentTopic.getReplicators().keys().get(0)));
                persistentReplicator.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), (Object) null);
                persistentReplicator.clearBacklog().get();
                Thread.sleep(100L);
                persistentReplicator.updateRates();
                persistentReplicator.expireMessages(1);
                Assert.assertEquals(persistentReplicator.getStats().getReplicationBacklog(), 0L);
                if (Collections.singletonList(messageConsumer).get(0) != null) {
                    messageConsumer.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(messageConsumer).get(0) != null) {
                    messageConsumer.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(messageProducer).get(0) != null) {
                messageProducer.close();
            }
        }
    }

    @org.testng.annotations.Test(timeOut = 30000)
    public void testResetCursorNotFail() throws Exception {
        log.info("--- Starting ReplicatorTest::testResetCursorNotFail ---");
        TopicName topicName = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/resetrepltopic"));
        ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(this.url1, topicName);
        try {
            log.info("--- Starting producer --- " + this.url1);
            ReplicatorTestBase.MessageConsumer messageConsumer = new ReplicatorTestBase.MessageConsumer(this.url1, topicName);
            try {
                log.info("--- Starting Consumer --- " + this.url1);
                messageProducer.produce(2);
                messageConsumer.receive(2);
                this.admin1.topics().resetCursor(topicName.toString(), "sub-id", System.currentTimeMillis());
                if (Collections.singletonList(messageConsumer).get(0) != null) {
                    messageConsumer.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(messageConsumer).get(0) != null) {
                    messageConsumer.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(messageProducer).get(0) != null) {
                messageProducer.close();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @org.testng.annotations.Test
    public void testReplicationForBatchMessages() throws Exception {
        log.info("--- Starting ReplicatorTest::testReplicationForBatchMessages ---");
        TopicName topicName = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopicbatch"));
        ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(this.url1, topicName, true);
        try {
            log.info("--- Starting producer --- " + this.url1);
            messageProducer = new ReplicatorTestBase.MessageProducer(this.url2, topicName, true);
            try {
                log.info("--- Starting producer --- " + this.url2);
                ReplicatorTestBase.MessageProducer messageProducer2 = new ReplicatorTestBase.MessageProducer(this.url3, topicName, true);
                try {
                    log.info("--- Starting producer --- " + this.url3);
                    ReplicatorTestBase.MessageConsumer messageConsumer = new ReplicatorTestBase.MessageConsumer(this.url1, topicName);
                    try {
                        log.info("--- Starting Consumer --- " + this.url1);
                        ReplicatorTestBase.MessageConsumer messageConsumer2 = new ReplicatorTestBase.MessageConsumer(this.url2, topicName);
                        try {
                            log.info("--- Starting Consumer --- " + this.url2);
                            ReplicatorTestBase.MessageConsumer messageConsumer3 = new ReplicatorTestBase.MessageConsumer(this.url3, topicName);
                            try {
                                log.info("--- Starting Consumer --- " + this.url3);
                                messageProducer.produceBatch(10);
                                messageConsumer.receive(10);
                                messageConsumer2.receive(10);
                                messageConsumer3.receive(10);
                                messageProducer.produceBatch(10);
                                messageConsumer.receive(10);
                                messageConsumer2.receive(10);
                                messageConsumer3.receive(10);
                                if (Collections.singletonList(messageConsumer3).get(0) != null) {
                                    messageConsumer3.close();
                                }
                                if (Collections.singletonList(messageConsumer2).get(0) != null) {
                                    messageConsumer2.close();
                                }
                                if (Collections.singletonList(messageConsumer).get(0) != null) {
                                    messageConsumer.close();
                                }
                                if (Collections.singletonList(messageProducer2).get(0) != null) {
                                    messageProducer2.close();
                                }
                                if (Collections.singletonList(messageProducer).get(0) != null) {
                                    messageProducer.close();
                                }
                            } catch (Throwable th) {
                                if (Collections.singletonList(messageConsumer3).get(0) != null) {
                                    messageConsumer3.close();
                                }
                                throw th;
                            }
                        } catch (Throwable th2) {
                            if (Collections.singletonList(messageConsumer2).get(0) != null) {
                                messageConsumer2.close();
                            }
                            throw th2;
                        }
                    } catch (Throwable th3) {
                        if (Collections.singletonList(messageConsumer).get(0) != null) {
                            messageConsumer.close();
                        }
                        throw th3;
                    }
                } finally {
                    if (Collections.singletonList(messageProducer2).get(0) != null) {
                        messageProducer2.close();
                    }
                }
            } finally {
                if (Collections.singletonList(messageProducer).get(0) != null) {
                    messageProducer.close();
                }
            }
        } catch (Throwable th4) {
            if (Collections.singletonList(messageProducer).get(0) != null) {
                messageProducer.close();
            }
            throw th4;
        }
    }

    @org.testng.annotations.Test(timeOut = 30000)
    public void testDeleteReplicatorFailure() throws Exception {
        log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopicbatch");
        ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(this.url1, TopicName.get(newUniqueName));
        try {
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar1.getBrokerService().getTopicReference(newUniqueName).get();
            String str = (String) persistentTopic.getReplicators().keys().get(0);
            ManagedLedgerImpl managedLedger = persistentTopic.getManagedLedger();
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            managedLedger.asyncDeleteCursor("pulsar.repl." + str, new AsyncCallbacks.DeleteCursorCallback() { // from class: org.apache.pulsar.broker.service.ReplicatorTest.2
                public void deleteCursorComplete(Object obj) {
                    countDownLatch.countDown();
                }

                public void deleteCursorFailed(ManagedLedgerException managedLedgerException, Object obj) {
                    countDownLatch.countDown();
                }
            }, (Object) null);
            countDownLatch.await();
            Method declaredMethod = PersistentTopic.class.getDeclaredMethod("removeReplicator", String.class);
            declaredMethod.setAccessible(true);
            ((CompletableFuture) declaredMethod.invoke(persistentTopic, str)).thenApply(r5 -> {
                Assert.assertNull(persistentTopic.getPersistentReplicator(str));
                return null;
            });
            if (Collections.singletonList(messageProducer).get(0) != null) {
                messageProducer.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(messageProducer).get(0) != null) {
                messageProducer.close();
            }
            throw th;
        }
    }

    @org.testng.annotations.Test(priority = Test.TestMessage.NESTEDFIELD_FIELD_NUMBER, timeOut = 30000)
    public void testReplicatorProducerClosing() throws Exception {
        log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopicbatch");
        ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(this.url1, TopicName.get(newUniqueName));
        try {
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar1.getBrokerService().getTopicReference(newUniqueName).get();
            Replicator persistentReplicator = persistentTopic.getPersistentReplicator((String) persistentTopic.getReplicators().keys().get(0));
            this.pulsar2.close();
            this.pulsar2 = null;
            this.pulsar3.close();
            this.pulsar3 = null;
            persistentReplicator.disconnect(false);
            Thread.sleep(100L);
            Field declaredField = AbstractReplicator.class.getDeclaredField("producer");
            declaredField.setAccessible(true);
            Assert.assertNull((ProducerImpl) declaredField.get(persistentReplicator));
            if (Collections.singletonList(messageProducer).get(0) != null) {
                messageProducer.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(messageProducer).get(0) != null) {
                messageProducer.close();
            }
            throw th;
        }
    }

    @org.testng.annotations.Test(priority = Test.TestMessage.NESTEDFIELD_FIELD_NUMBER, timeOut = 30000)
    public void testReplicatorProducerName() throws Exception {
        log.info("--- Starting ReplicatorTest::testReplicatorProducerName ---");
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://pulsar/ns/testReplicatorProducerName");
        ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(this.url1, TopicName.get(newUniqueName));
        try {
            Awaitility.await().untilAsserted(() -> {
                Assert.assertTrue(this.pulsar2.getBrokerService().getTopicReference(newUniqueName).isPresent());
            });
            Optional topicReference = this.pulsar2.getBrokerService().getTopicReference(newUniqueName);
            Assert.assertTrue(topicReference.isPresent());
            Awaitility.await().untilAsserted(() -> {
                Assert.assertTrue(((Set) ((Topic) topicReference.get()).getProducers().values().stream().map((v0) -> {
                    return v0.getRemoteCluster();
                }).collect(Collectors.toSet())).contains("r1"));
            });
            if (Collections.singletonList(messageProducer).get(0) != null) {
                messageProducer.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(messageProducer).get(0) != null) {
                messageProducer.close();
            }
            throw th;
        }
    }

    @org.testng.annotations.Test(priority = Test.TestMessage.NESTEDFIELD_FIELD_NUMBER, timeOut = 30000)
    public void testReplicatorProducerNameWithUserDefinedReplicatorPrefix() throws Exception {
        log.info("--- Starting ReplicatorTest::testReplicatorProducerNameWithUserDefinedReplicatorPrefix ---");
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://pulsar/ns/testReplicatorProducerNameWithUserDefinedReplicatorPrefix");
        TopicName topicName = TopicName.get(newUniqueName);
        this.pulsar1.getConfiguration().setReplicatorPrefix("user-defined-prefix");
        this.pulsar2.getConfiguration().setReplicatorPrefix("user-defined-prefix");
        this.pulsar3.getConfiguration().setReplicatorPrefix("user-defined-prefix");
        ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(this.url1, topicName);
        try {
            Awaitility.await().untilAsserted(() -> {
                Assert.assertTrue(this.pulsar2.getBrokerService().getTopicReference(newUniqueName).isPresent());
            });
            Optional topicReference = this.pulsar2.getBrokerService().getTopicReference(newUniqueName);
            Assert.assertTrue(topicReference.isPresent());
            Assert.assertTrue(((Set) ((Topic) topicReference.get()).getProducers().values().stream().map((v0) -> {
                return v0.getRemoteCluster();
            }).collect(Collectors.toSet())).contains("r1"));
            if (Collections.singletonList(messageProducer).get(0) != null) {
                messageProducer.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(messageProducer).get(0) != null) {
                messageProducer.close();
            }
            throw th;
        }
    }

    @org.testng.annotations.Test(timeOut = 60000, priority = -1)
    public void testResumptionAfterBacklogRelaxed() throws Exception {
        List<BacklogQuota.RetentionPolicy> newArrayList = Lists.newArrayList();
        newArrayList.add(BacklogQuota.RetentionPolicy.producer_exception);
        newArrayList.add(BacklogQuota.RetentionPolicy.producer_request_hold);
        for (BacklogQuota.RetentionPolicy retentionPolicy : newArrayList) {
            this.admin1.namespaces().setBacklogQuota("pulsar/ns1", BacklogQuota.builder().limitSize(1048576L).retentionPolicy(retentionPolicy).build());
            Thread.sleep(200L);
            TopicName topicName = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/%s-" + retentionPolicy));
            ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(this.url1, topicName);
            try {
                ReplicatorTestBase.MessageConsumer messageConsumer = new ReplicatorTestBase.MessageConsumer(this.url2, topicName);
                try {
                    Replicator persistentReplicator = ((PersistentTopic) this.pulsar1.getBrokerService().getTopicReference(topicName.toString()).get()).getPersistentReplicator("r2");
                    messageProducer.produce(1);
                    Thread.sleep(500L);
                    this.admin1.namespaces().setBacklogQuota("pulsar/ns1", BacklogQuota.builder().limitSize(1L).retentionPolicy(retentionPolicy).build());
                    Thread.sleep(6000L);
                    Assert.assertEquals(persistentReplicator.getStats().replicationBacklog, 0L);
                    messageProducer.produce(1);
                    Thread.sleep(500L);
                    Assert.assertEquals(persistentReplicator.getStats().replicationBacklog, 1L);
                    messageConsumer.receive(1);
                    messageConsumer.receive(1);
                    for (int i = 0; i < 10 && persistentReplicator.getStats().replicationBacklog > 0; i++) {
                        if (i != 10 - 1) {
                            Thread.sleep(100L);
                        }
                    }
                    Assert.assertEquals(persistentReplicator.getStats().replicationBacklog, 0L);
                    if (Collections.singletonList(messageConsumer).get(0) != null) {
                        messageConsumer.close();
                    }
                    if (Collections.singletonList(messageProducer).get(0) != null) {
                        messageProducer.close();
                    }
                } finally {
                }
            } catch (Throwable th) {
                if (Collections.singletonList(messageProducer).get(0) != null) {
                    messageProducer.close();
                }
                throw th;
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @org.testng.annotations.Test(timeOut = 15000)
    public void testCloseReplicatorStartProducer() throws Exception {
        TopicName topicName = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/closeCursor"));
        ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(this.url1, topicName);
        try {
            ReplicatorTestBase.MessageConsumer messageConsumer = new ReplicatorTestBase.MessageConsumer(this.url1, topicName);
            try {
                ReplicatorTestBase.MessageConsumer messageConsumer2 = new ReplicatorTestBase.MessageConsumer(this.url2, topicName);
                try {
                    PersistentReplicator persistentReplicator = ((PersistentTopic) this.pulsar1.getBrokerService().getTopicReference(topicName.toString()).get()).getPersistentReplicator("r2");
                    Field declaredField = PersistentReplicator.class.getDeclaredField("cursor");
                    declaredField.setAccessible(true);
                    ManagedCursor managedCursor = (ManagedCursor) declaredField.get(persistentReplicator);
                    managedCursor.close();
                    messageProducer.produce(10);
                    try {
                        managedCursor.readEntriesOrWait(10);
                        Assert.fail("It should have failed");
                    } catch (Exception e) {
                        Assert.assertEquals(e.getClass(), ManagedLedgerException.CursorAlreadyClosedException.class);
                    }
                    persistentReplicator.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("Cursor already closed exception"), (Object) null);
                    Thread.sleep(100L);
                    Field declaredField2 = AbstractReplicator.class.getDeclaredField("producer");
                    declaredField2.setAccessible(true);
                    Assert.assertNull((ProducerImpl) declaredField2.get(persistentReplicator));
                    if (Collections.singletonList(messageConsumer2).get(0) != null) {
                        messageConsumer2.close();
                    }
                    if (Collections.singletonList(messageConsumer).get(0) != null) {
                        messageConsumer.close();
                    }
                } catch (Throwable th) {
                    if (Collections.singletonList(messageConsumer2).get(0) != null) {
                        messageConsumer2.close();
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(messageConsumer).get(0) != null) {
                    messageConsumer.close();
                }
                throw th2;
            }
        } finally {
            if (Collections.singletonList(messageProducer).get(0) != null) {
                messageProducer.close();
            }
        }
    }

    @org.testng.annotations.Test(timeOut = 30000)
    public void verifyChecksumAfterReplication() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://pulsar/ns/checksumAfterReplication");
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).build();
        try {
            Producer create = build.newProducer().topic(newUniqueName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            build = PulsarClient.builder().serviceUrl(this.url2.toString()).build();
            try {
                RawReader rawReader = (RawReader) RawReader.create(build, newUniqueName, "sub").get();
                create.send("Hello".getBytes());
                ByteBuf headersAndPayload = ((RawMessage) rawReader.readNextAsync().get()).getHeadersAndPayload();
                Assert.assertTrue(Commands.hasChecksum(headersAndPayload));
                Assert.assertEquals(Commands.readChecksum(headersAndPayload), Crc32cIntChecksum.computeChecksum(headersAndPayload));
                create.close();
                rawReader.closeAsync().get();
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @org.testng.annotations.Test
    public void testReplicatorWithPartitionedTopic() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/partTopic" + UUID.randomUUID();
        this.admin1.namespaces().createNamespace(str);
        this.admin1.namespaces().setNamespaceReplicationClusters(str, Sets.newHashSet(new String[]{"r1", "r2", "r3"}));
        this.admin1.topics().createPartitionedTopic(str2, 3);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin2.topics().getPartitionedTopicList(str));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals((String) this.admin2.topics().getPartitionedTopicList(str).get(0), str2);
        });
        Assert.assertEquals(this.admin1.topics().getList(str).size(), 3);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin3.topics().getPartitionedTopicList(str));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals((String) this.admin3.topics().getPartitionedTopicList(str).get(0), str2);
        });
        this.admin2.topics().updatePartitionedTopic(str2, 5);
        Assert.assertEquals(this.admin2.topics().getPartitionedTopicMetadata(str2).partitions, 5);
        Assert.assertEquals((int) this.admin2.topics().getList(str).stream().filter(str3 -> {
            return !str3.contains("__change_events");
        }).count(), 5);
        this.admin3.topics().updatePartitionedTopic(str2, 6);
        Assert.assertEquals(this.admin3.topics().getPartitionedTopicMetadata(str2).partitions, 6);
        Assert.assertEquals(this.admin3.topics().getList(str).stream().filter(str4 -> {
            return !str4.contains("__change_events");
        }).count(), 6L);
        this.admin1.topics().updatePartitionedTopic(str2, 7);
        Assert.assertEquals(this.admin1.topics().getPartitionedTopicMetadata(str2).partitions, 7);
        Assert.assertEquals(this.admin2.topics().getPartitionedTopicMetadata(str2).partitions, 7);
        Assert.assertEquals(this.admin3.topics().getPartitionedTopicMetadata(str2).partitions, 7);
        Assert.assertEquals(this.admin1.topics().getList(str).stream().filter(str5 -> {
            return !str5.contains("__change_events");
        }).count(), 7L);
        Assert.assertEquals(this.admin2.topics().getList(str).stream().filter(str6 -> {
            return !str6.contains("__change_events");
        }).count(), 7L);
        Assert.assertEquals(this.admin3.topics().getList(str).stream().filter(str7 -> {
            return !str7.contains("__change_events");
        }).count(), 7L);
    }

    @org.testng.annotations.Test(dataProvider = "partitionedTopic")
    public void testReplicatorOnPartitionedTopic(boolean z) throws Exception {
        log.info("--- Starting ReplicatorTest::{} --- ", this.methodName);
        String newUniqueName = BrokerTestUtil.newUniqueName("pulsar/partitionedNs-" + z);
        String newUniqueName2 = BrokerTestUtil.newUniqueName("persistent://" + newUniqueName + "/partTopic-" + z);
        String newUniqueName3 = BrokerTestUtil.newUniqueName("non-persistent://" + newUniqueName + "/partTopic-" + z);
        BrokerService brokerService = this.pulsar1.getBrokerService();
        this.admin1.namespaces().createNamespace(newUniqueName);
        this.admin1.namespaces().setNamespaceReplicationClusters(newUniqueName, Sets.newHashSet(new String[]{"r1", "r2", "r3"}));
        if (z) {
            this.admin1.topics().createPartitionedTopic(newUniqueName2, 5);
            this.admin1.topics().createPartitionedTopic(newUniqueName3, 5);
        }
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).build();
        try {
            build.newProducer().topic("persistent://" + newUniqueName + "/dummyTopic").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            try {
                brokerService.getOrCreateTopic(newUniqueName2).get();
                if (z) {
                    Assert.fail("Topic creation fails with partitioned topic as replicator init fails");
                }
            } catch (Exception e) {
                if (!z) {
                    Assert.fail("Topic creation should not fail without any partitioned topic");
                }
                Assert.assertTrue(e.getCause() instanceof BrokerServiceException.NamingException);
            }
            try {
                brokerService.getOrCreateTopic(newUniqueName3).get();
                if (z) {
                    Assert.fail("Topic creation fails with partitioned topic as replicator init fails");
                }
            } catch (Exception e2) {
                if (!z) {
                    Assert.fail("Topic creation should not fail without any partitioned topic");
                }
                Assert.assertTrue(e2.getCause() instanceof BrokerServiceException.NamingException);
            }
        } finally {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        }
    }

    @org.testng.annotations.Test
    public void testReplicatedCluster() throws Exception {
        log.info("--- Starting ReplicatorTest::testReplicatedCluster ---");
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://pulsar/global/repl/topic1");
        this.admin1.namespaces().createNamespace("pulsar/global/repl");
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/repl", Sets.newHashSet(new String[]{"r1", "r2", "r3"}));
        this.admin1.topics().createPartitionedTopic(newUniqueName, 4);
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            build = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                Producer create = build.newProducer().topic(newUniqueName).create();
                Consumer subscribe = build.newConsumer().topic(new String[]{newUniqueName}).subscriptionName("s1").subscribe();
                Consumer subscribe2 = build.newConsumer().topic(new String[]{newUniqueName}).subscriptionName("s1").subscribe();
                byte[] bytes = "test".getBytes();
                create.newMessage().replicationClusters(Lists.newArrayList(new String[]{"r1"})).value(bytes).send();
                Assert.assertEquals((byte[]) subscribe.receive().getValue(), bytes);
                if (subscribe2.receive(1, TimeUnit.SECONDS) != null) {
                    Assert.fail("msg should have not been replicated to remote cluster");
                }
                subscribe.close();
                subscribe2.close();
                create.close();
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @org.testng.annotations.Test
    public void testUpdateGlobalTopicPartition() throws Exception {
        log.info("--- Starting ReplicatorTest::testUpdateGlobalTopicPartition ---");
        String clusterName = this.pulsar1.getConfig().getClusterName();
        String clusterName2 = this.pulsar2.getConfig().getClusterName();
        String newUniqueName = BrokerTestUtil.newUniqueName("pulsar/ns");
        String newUniqueName2 = BrokerTestUtil.newUniqueName("persistent://" + newUniqueName + "/topic1");
        this.admin1.namespaces().createNamespace(newUniqueName, Sets.newHashSet(new String[]{clusterName, clusterName2}));
        this.admin1.topics().createPartitionedTopic(newUniqueName2, 4);
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            build = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                Consumer subscribe = build.newConsumer().topic(new String[]{newUniqueName2}).subscriptionName("sub1").subscribe();
                Consumer subscribe2 = build.newConsumer().topic(new String[]{newUniqueName2}).subscriptionName("sub1").subscribe();
                this.admin1.topics().updatePartitionedTopic(newUniqueName2, 8);
                Assert.assertEquals(this.admin1.topics().getPartitionedTopicMetadata(newUniqueName2).partitions, 8);
                Producer create = build.newProducer().topic(newUniqueName2).create();
                Producer create2 = build.newProducer().topic(newUniqueName2).create();
                for (int i = 4; i < 8; i++) {
                    String str = newUniqueName2 + "-partition-" + i;
                    Assert.assertEquals(this.admin1.topics().getSubscriptions(str).size(), 1);
                    Assert.assertEquals(this.admin2.topics().getSubscriptions(str).size(), 1);
                }
                create.close();
                create2.close();
                subscribe.close();
                subscribe2.close();
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "topicPrefix")
    public static Object[][] topicPrefix() {
        return new Object[]{new Object[]{"persistent://", "/persistent"}, new Object[]{"non-persistent://", "/non-persistent"}};
    }

    @org.testng.annotations.Test(dataProvider = "topicPrefix")
    public void testTopicReplicatedAndProducerCreate(String str, String str2) throws Exception {
        log.info("--- Starting ReplicatorTest::testTopicReplicatedAndProducerCreate ---");
        String clusterName = this.pulsar1.getConfig().getClusterName();
        String clusterName2 = this.pulsar2.getConfig().getClusterName();
        String newUniqueName = BrokerTestUtil.newUniqueName("pulsar/ns");
        String newUniqueName2 = BrokerTestUtil.newUniqueName(str + newUniqueName + str2 + "-partitioned");
        String newUniqueName3 = BrokerTestUtil.newUniqueName(str + newUniqueName + str2 + "-non-partitioned");
        this.admin1.namespaces().createNamespace(newUniqueName, Sets.newHashSet(new String[]{clusterName, clusterName2}));
        this.admin1.namespaces().setNamespaceReplicationClusters(newUniqueName, Sets.newHashSet(new String[]{"r1", "r2", "r3"}));
        this.admin1.topics().createPartitionedTopic(newUniqueName2, 4);
        this.admin1.topics().createNonPartitionedTopic(newUniqueName3);
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            build = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                Producer create = build.newProducer().topic(newUniqueName2).create();
                Producer create2 = build.newProducer().topic(newUniqueName2).create();
                Assert.assertNotNull(create.send("test".getBytes()));
                Assert.assertNotNull(create2.send("test".getBytes()));
                Producer create3 = build.newProducer().topic(newUniqueName3).create();
                Producer create4 = build.newProducer().topic(newUniqueName3).create();
                Assert.assertNotNull(create3.send("test".getBytes()));
                Assert.assertNotNull(create4.send("test".getBytes()));
                create.close();
                create2.close();
                create3.close();
                create4.close();
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @org.testng.annotations.Test
    public void testCleanupTopic() throws Exception {
        String clusterName = this.pulsar1.getConfig().getClusterName();
        String clusterName2 = this.pulsar2.getConfig().getClusterName();
        String str = "pulsar/ns-" + System.nanoTime();
        String str2 = "persistent://" + str + "/cleanTopic";
        String str3 = str + "/persistent/cleanTopic";
        this.admin1.namespaces().createNamespace(str, Sets.newHashSet(new String[]{clusterName, clusterName2}));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        this.config1.setTopicLoadTimeoutSeconds(3L);
        this.config2.setTopicLoadTimeoutSeconds(3L);
        ManagedLedgerFactoryImpl managedLedgerFactory = this.pulsar1.getManagedLedgerClientFactory().getManagedLedgerFactory();
        Field declaredField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
        declaredField.setAccessible(true);
        ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) declaredField.get(managedLedgerFactory);
        CompletableFuture completableFuture = new CompletableFuture();
        concurrentHashMap.put(str3, completableFuture);
        try {
            Assert.fail("consumer should fail due to topic loading failure");
        } catch (Exception e) {
        }
        CompletableFuture completableFuture2 = null;
        for (int i = 0; i < 5; i++) {
            completableFuture2 = (CompletableFuture) this.pulsar1.getBrokerService().getTopics().get(str2);
            if (completableFuture2 == null) {
                Thread.sleep(i * 1000);
            }
        }
        try {
            completableFuture2.get();
            Assert.fail("topic creation should fail");
        } catch (Exception e2) {
        }
        CompletableFuture completableFuture3 = completableFuture2;
        MockedPulsarServiceBaseTest.retryStrategically(r7 -> {
            return this.pulsar1.getBrokerService().getTopic(str2, false) != completableFuture3;
        }, 5, 1000L);
        Assert.assertNotEquals(completableFuture3, this.pulsar1.getBrokerService().getTopics().get(str2));
        try {
            Assert.fail("consumer should fail due to topic loading failure");
        } catch (Exception e3) {
        }
        completableFuture.complete(managedLedgerFactory.open(str3 + "-2"));
        ((Consumer) build.newConsumer().topic(new String[]{str2}).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).subscribeAsync().get(2 * 3, TimeUnit.SECONDS)).close();
    }

    @org.testng.annotations.Test
    public void createPartitionedTopicTest() throws Exception {
        String clusterName = this.pulsar1.getConfig().getClusterName();
        String clusterName2 = this.pulsar2.getConfig().getClusterName();
        String clusterName3 = this.pulsar3.getConfig().getClusterName();
        String newUniqueName = BrokerTestUtil.newUniqueName("pulsar/ns");
        String newUniqueName2 = BrokerTestUtil.newUniqueName("persistent://" + newUniqueName + "/partitioned");
        String newUniqueName3 = BrokerTestUtil.newUniqueName("persistent://" + newUniqueName + "/non-partitioned");
        String newUniqueName4 = BrokerTestUtil.newUniqueName("non-persistent://" + newUniqueName + "/partitioned");
        String newUniqueName5 = BrokerTestUtil.newUniqueName("non-persistent://" + newUniqueName + "/non-partitioned");
        this.admin1.namespaces().createNamespace(newUniqueName, Sets.newHashSet(new String[]{clusterName, clusterName2, clusterName3}));
        this.admin1.namespaces().setNamespaceReplicationClusters(newUniqueName, Sets.newHashSet(new String[]{"r1", "r2", "r3"}));
        this.admin1.topics().createPartitionedTopic(newUniqueName2, 3);
        this.admin1.topics().createPartitionedTopic(newUniqueName4, 3);
        this.admin1.topics().createNonPartitionedTopic(newUniqueName3);
        this.admin1.topics().createNonPartitionedTopic(newUniqueName5);
        List partitionedTopicList = this.admin1.topics().getPartitionedTopicList(newUniqueName);
        Assert.assertTrue(partitionedTopicList.contains(newUniqueName2));
        Assert.assertTrue(partitionedTopicList.contains(newUniqueName4));
        List<String> newArrayList = Lists.newArrayList(new String[]{newUniqueName3, newUniqueName5});
        TopicName topicName = TopicName.get(newUniqueName2);
        for (int i = 0; i < 3; i++) {
            newArrayList.add(topicName.getPartition(i).toString());
        }
        checkListContainExpectedTopic(this.admin1, newUniqueName, newArrayList);
        checkListContainExpectedTopic(this.admin2, newUniqueName, newArrayList);
        checkListContainExpectedTopic(this.admin3, newUniqueName, newArrayList);
    }

    @org.testng.annotations.Test
    public void testDoNotReplicateSystemTopic() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("pulsar/ns");
        this.admin1.namespaces().createNamespace(newUniqueName, Sets.newHashSet(new String[]{"r1", "r2", "r3"}));
        String topicName = TopicName.get("persistent", NamespaceName.get(newUniqueName), "testDoesNotReplicateSystemTopic").toString();
        String topicName2 = TopicName.get("persistent", NamespaceName.get(newUniqueName), "__change_events").toString();
        this.admin1.topics().createNonPartitionedTopic(topicName);
        this.admin1.topics().setRetention(topicName, new RetentionPolicies(10, 10));
        this.admin2.topics().setRetention(topicName, new RetentionPolicies(20, 20));
        this.admin3.topics().setRetention(topicName, new RetentionPolicies(30, 30));
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin1.topics().getStats(topicName2).getReplication().size(), 0);
            Assert.assertEquals(this.admin2.topics().getStats(topicName2).getReplication().size(), 0);
            Assert.assertEquals(this.admin3.topics().getStats(topicName2).getReplication().size(), 0);
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin1.topics().getRetention(topicName).getRetentionSizeInMB(), 10L);
            Assert.assertEquals(this.admin2.topics().getRetention(topicName).getRetentionSizeInMB(), 20L);
            Assert.assertEquals(this.admin3.topics().getRetention(topicName).getRetentionSizeInMB(), 30L);
        });
    }

    @org.testng.annotations.Test
    public void testLookupAnotherCluster() throws Exception {
        log.info("--- Starting ReplicatorTest::testLookupAnotherCluster ---");
        this.admin1.namespaces().createNamespace("pulsar/r2/cross-cluster-ns");
        TopicName topicName = TopicName.get("persistent://pulsar/r2/cross-cluster-ns/topic");
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            build.newProducer().topic(topicName.toString()).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create().close();
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    private void checkListContainExpectedTopic(PulsarAdmin pulsarAdmin, String str, List<String> list) {
        ArrayList arrayList = new ArrayList();
        Awaitility.await().atMost(2L, TimeUnit.SECONDS).until(() -> {
            arrayList.clear();
            arrayList.addAll((Collection) pulsarAdmin.topics().getList(str).stream().filter(str2 -> {
                return !str2.contains("__change_events");
            }).collect(Collectors.toList()));
            return Boolean.valueOf(arrayList.size() == list.size());
        });
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            Assert.assertTrue(arrayList.contains(it.next()));
        }
    }
}
