/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.ReplicatorTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.ProducerInterceptors;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.testng.collections.Lists;

@Test(groups={"quarantine"})
public class ReplicatorTest
extends ReplicatorTestBase {
    protected String methodName;
    private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);

    @BeforeMethod(alwaysRun=true)
    public void beforeMethod(Method m) throws Exception {
        this.methodName = m.getName();
        this.admin1.namespaces().removeBacklogQuota("pulsar/ns");
        this.admin1.namespaces().removeBacklogQuota("pulsar/ns1");
        this.admin1.namespaces().removeBacklogQuota("pulsar/global/ns");
    }

    @Override
    @BeforeClass(alwaysRun=true, timeOut=300000L)
    public void setup() throws Exception {
        super.setup();
    }

    @Override
    @AfterClass(alwaysRun=true, timeOut=300000L)
    public void cleanup() throws Exception {
        super.cleanup();
    }

    @DataProvider(name="partitionedTopic")
    public Object[][] partitionedTopicProvider() {
        return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}};
    }

    @Test
    public void testConfigChange() throws Exception {
        log.info("--- Starting ReplicatorTest::testConfigChange ---");
        List results = Lists.newArrayList();
        for (int i = 0; i < 10; ++i) {
            final TopicName dest = TopicName.get((String)BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i));
            results.add(this.executor.submit(new Callable<Void>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public Void call() throws Exception {
                    ReplicatorTestBase.MessageProducer producer = new ReplicatorTestBase.MessageProducer(ReplicatorTest.this.url1, dest);
                    try {
                        log.info("--- Starting producer --- " + ReplicatorTest.this.url1);
                        ReplicatorTestBase.MessageConsumer consumer = new ReplicatorTestBase.MessageConsumer(ReplicatorTest.this.url1, dest);
                        try {
                            log.info("--- Starting Consumer --- " + ReplicatorTest.this.url1);
                            producer.produce(2);
                            consumer.receive(2);
                            Void void_ = null;
                            if (Collections.singletonList(consumer).get(0) != null) {
                                consumer.close();
                            }
                            return void_;
                        }
                        catch (Throwable throwable) {
                            if (Collections.singletonList(consumer).get(0) != null) {
                                consumer.close();
                            }
                            throw throwable;
                        }
                    }
                    finally {
                        if (Collections.singletonList(producer).get(0) != null) {
                            producer.close();
                        }
                    }
                }
            }));
        }
        for (Future result : results) {
            try {
                result.get();
            }
            catch (Exception e) {
                log.error("exception in getting future result ", (Throwable)e);
                Assert.fail((String)String.format("replication test failed with %s exception", e.getMessage()));
            }
        }
        Thread.sleep(1000L);
        ConcurrentOpenHashMap replicationClients1 = this.ns1.getReplicationClients();
        ConcurrentOpenHashMap replicationClients2 = this.ns2.getReplicationClients();
        ConcurrentOpenHashMap replicationClients3 = this.ns3.getReplicationClients();
        Assert.assertNotNull((Object)replicationClients1.get((Object)"r2"));
        Assert.assertNotNull((Object)replicationClients1.get((Object)"r3"));
        Assert.assertNotNull((Object)replicationClients2.get((Object)"r1"));
        Assert.assertNotNull((Object)replicationClients2.get((Object)"r3"));
        Assert.assertNotNull((Object)replicationClients3.get((Object)"r1"));
        Assert.assertNotNull((Object)replicationClients3.get((Object)"r2"));
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", (Set)Sets.newHashSet((Object[])new String[]{"r1"}));
        Thread.sleep(1000L);
        Assert.assertNotNull((Object)replicationClients1.get((Object)"r2"));
        Assert.assertNotNull((Object)replicationClients1.get((Object)"r3"));
        Assert.assertNotNull((Object)replicationClients2.get((Object)"r1"));
        Assert.assertNotNull((Object)replicationClients2.get((Object)"r3"));
        Assert.assertNotNull((Object)replicationClients3.get((Object)"r1"));
        Assert.assertNotNull((Object)replicationClients3.get((Object)"r2"));
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2", "r3"}));
        Thread.sleep(1000L);
        Assert.assertNotNull((Object)replicationClients1.get((Object)"r2"));
        Assert.assertNotNull((Object)replicationClients1.get((Object)"r3"));
        Assert.assertNotNull((Object)replicationClients2.get((Object)"r1"));
        Assert.assertNotNull((Object)replicationClients2.get((Object)"r3"));
        Assert.assertNotNull((Object)replicationClients3.get((Object)"r1"));
        Assert.assertNotNull((Object)replicationClients3.get((Object)"r2"));
    }

    @Test(timeOut=10000L)
    public void activeBrokerParse() throws Exception {
        this.pulsar1.getConfiguration().setAuthorizationEnabled(true);
        ClusterData cluster2Data = new ClusterData();
        String cluster2ServiceUrls = String.format("%s,localhost:1234,localhost:5678", this.pulsar2.getWebServiceAddress());
        cluster2Data.setServiceUrl(cluster2ServiceUrls);
        String cluster2 = "activeCLuster2";
        this.admin2.clusters().createCluster(cluster2, cluster2Data);
        Awaitility.await().until(() -> this.admin2.clusters().getCluster(cluster2) != null);
        List list = this.admin1.brokers().getActiveBrokers(cluster2);
        Assert.assertEquals((String)((String)list.get(0)), (String)this.url2.toString().replace("http://", ""));
        this.pulsar1.getConfiguration().setAuthorizationEnabled(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testConcurrentReplicator() throws Exception {
        log.info("--- Starting ReplicatorTest::testConcurrentReplicator ---");
        String namespace = BrokerTestUtil.newUniqueName("pulsar/concurrent");
        this.admin1.namespaces().createNamespace(namespace);
        this.admin1.namespaces().setNamespaceReplicationClusters(namespace, (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2"}));
        TopicName topicName = TopicName.get((String)BrokerTestUtil.newUniqueName("persistent://" + namespace + "/topic"));
        PulsarClient client1 = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Producer producer = client1.newProducer().topic(topicName.toString()).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            producer.close();
            PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getOrCreateTopic(topicName.toString()).get();
            PulsarClientImpl pulsarClient = (PulsarClientImpl)Mockito.spy((Object)((PulsarClientImpl)this.pulsar1.getBrokerService().getReplicationClient("r3")));
            Method startRepl = PersistentTopic.class.getDeclaredMethod("startReplicator", String.class);
            startRepl.setAccessible(true);
            Field replClientField = BrokerService.class.getDeclaredField("replicationClients");
            replClientField.setAccessible(true);
            ConcurrentOpenHashMap replicationClients = (ConcurrentOpenHashMap)replClientField.get(this.pulsar1.getBrokerService());
            replicationClients.put((Object)"r3", (Object)pulsarClient);
            this.admin1.namespaces().setNamespaceReplicationClusters(namespace, (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2", "r3"}));
            ExecutorService executor = Executors.newFixedThreadPool(5);
            try {
                for (int i = 0; i < 5; ++i) {
                    executor.submit(() -> {
                        try {
                            startRepl.invoke((Object)topic, "r3");
                        }
                        catch (Exception e) {
                            Assert.fail((String)"setting replicator failed", (Throwable)e);
                        }
                    });
                }
                Thread.sleep(3000L);
                ((PulsarClientImpl)Mockito.verify((Object)pulsarClient, (VerificationMode)Mockito.times((int)1))).createProducerAsync((ProducerConfigurationData)Mockito.any(ProducerConfigurationData.class), (Schema)Mockito.any(Schema.class), (ProducerInterceptors)ArgumentMatchers.eq(null));
            }
            finally {
                if (Collections.singletonList(executor).get(0) != null) {
                    executor.shutdownNow();
                }
            }
        }
        finally {
            if (Collections.singletonList(client1).get(0) != null) {
                client1.close();
            }
        }
    }

    @DataProvider(name="namespace")
    public Object[][] namespaceNameProvider() {
        return new Object[][]{{"pulsar/ns"}, {"pulsar/global/ns"}};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="namespace")
    public void testReplication(String namespace) throws Exception {
        log.info("--- Starting ReplicatorTest::testReplication ---");
        TopicName dest = TopicName.get((String)BrokerTestUtil.newUniqueName("persistent://" + namespace + "/repltopic"));
        ReplicatorTestBase.MessageProducer producer1 = new ReplicatorTestBase.MessageProducer(this.url1, dest);
        try {
            log.info("--- Starting producer --- " + this.url1);
            ReplicatorTestBase.MessageProducer producer2 = new ReplicatorTestBase.MessageProducer(this.url2, dest);
            try {
                log.info("--- Starting producer --- " + this.url2);
                ReplicatorTestBase.MessageProducer producer3 = new ReplicatorTestBase.MessageProducer(this.url3, dest);
                try {
                    log.info("--- Starting producer --- " + this.url3);
                    ReplicatorTestBase.MessageConsumer consumer1 = new ReplicatorTestBase.MessageConsumer(this.url1, dest);
                    try {
                        log.info("--- Starting Consumer --- " + this.url1);
                        ReplicatorTestBase.MessageConsumer consumer2 = new ReplicatorTestBase.MessageConsumer(this.url2, dest);
                        try {
                            log.info("--- Starting Consumer --- " + this.url2);
                            ReplicatorTestBase.MessageConsumer consumer3 = new ReplicatorTestBase.MessageConsumer(this.url3, dest);
                            try {
                                log.info("--- Starting Consumer --- " + this.url3);
                                producer1.produce(2);
                                consumer1.receive(2);
                                consumer2.receive(2);
                                consumer3.receive(2);
                                producer2.produce(2);
                                consumer1.receive(2);
                                consumer2.receive(2);
                                consumer3.receive(2);
                                producer3.produce(2);
                                consumer1.receive(2);
                                consumer2.receive(2);
                                consumer3.receive(2);
                                producer1.produce(1);
                                producer2.produce(1);
                                consumer1.receive(1);
                                consumer2.receive(1);
                                consumer3.receive(1);
                                consumer1.receive(1);
                                consumer2.receive(1);
                                consumer3.receive(1);
                            }
                            finally {
                                if (Collections.singletonList(consumer3).get(0) != null) {
                                    consumer3.close();
                                }
                            }
                        }
                        finally {
                            if (Collections.singletonList(consumer2).get(0) != null) {
                                consumer2.close();
                            }
                        }
                    }
                    finally {
                        if (Collections.singletonList(consumer1).get(0) != null) {
                            consumer1.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(producer3).get(0) != null) {
                        producer3.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(producer2).get(0) != null) {
                    producer2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer1).get(0) != null) {
                producer1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReplicationOverrides() throws Exception {
        log.info("--- Starting ReplicatorTest::testReplicationOverrides ---");
        for (int i = 0; i < 10; ++i) {
            TopicName dest = TopicName.get((String)BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopic"));
            ReplicatorTestBase.MessageProducer producer1 = new ReplicatorTestBase.MessageProducer(this.url1, dest);
            try {
                log.info("--- Starting producer --- " + this.url1);
                ReplicatorTestBase.MessageProducer producer2 = new ReplicatorTestBase.MessageProducer(this.url2, dest);
                try {
                    log.info("--- Starting producer --- " + this.url2);
                    ReplicatorTestBase.MessageProducer producer3 = new ReplicatorTestBase.MessageProducer(this.url3, dest);
                    try {
                        log.info("--- Starting producer --- " + this.url3);
                        ReplicatorTestBase.MessageConsumer consumer1 = new ReplicatorTestBase.MessageConsumer(this.url1, dest);
                        try {
                            log.info("--- Starting Consumer --- " + this.url1);
                            ReplicatorTestBase.MessageConsumer consumer2 = new ReplicatorTestBase.MessageConsumer(this.url2, dest);
                            try {
                                log.info("--- Starting Consumer --- " + this.url2);
                                ReplicatorTestBase.MessageConsumer consumer3 = new ReplicatorTestBase.MessageConsumer(this.url3, dest);
                                try {
                                    log.info("--- Starting Consumer --- " + this.url3);
                                    producer1.produce(1, (TypedMessageBuilder<byte[]>)producer1.newMessage().disableReplication());
                                    consumer1.receive(1);
                                    Assert.assertTrue((boolean)consumer2.drained());
                                    Assert.assertTrue((boolean)consumer3.drained());
                                    producer1.produce(1, (TypedMessageBuilder<byte[]>)producer1.newMessage().replicationClusters(Lists.newArrayList((Object[])new String[]{"r1", "r3"})));
                                    consumer1.receive(1);
                                    Assert.assertTrue((boolean)consumer2.drained());
                                    consumer3.receive(1);
                                    producer1.produce(1);
                                    consumer1.receive(1);
                                    consumer2.receive(1);
                                    consumer3.receive(1);
                                    Assert.assertTrue((boolean)consumer1.drained());
                                    Assert.assertTrue((boolean)consumer2.drained());
                                    Assert.assertTrue((boolean)consumer3.drained());
                                    continue;
                                }
                                finally {
                                    if (Collections.singletonList(consumer3).get(0) != null) {
                                        consumer3.close();
                                    }
                                }
                            }
                            finally {
                                if (Collections.singletonList(consumer2).get(0) != null) {
                                    consumer2.close();
                                }
                            }
                        }
                        finally {
                            if (Collections.singletonList(consumer1).get(0) != null) {
                                consumer1.close();
                            }
                        }
                    }
                    finally {
                        if (Collections.singletonList(producer3).get(0) != null) {
                            producer3.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(producer2).get(0) != null) {
                        producer2.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(producer1).get(0) != null) {
                    producer1.close();
                }
            }
        }
    }

    @Test
    public void testFailures() {
        log.info("--- Starting ReplicatorTest::testFailures ---");
        try {
            TopicName dest = TopicName.get((String)BrokerTestUtil.newUniqueName("persistent://pulsar/ns/res-cons-id-"));
            ReplicatorTestBase.MessageConsumer consumer = new ReplicatorTestBase.MessageConsumer(this.url2, dest, "pulsar.repl.");
            consumer.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testReplicatePeekAndSkip() throws Exception {
        TopicName dest = TopicName.get((String)BrokerTestUtil.newUniqueName("persistent://pulsar/ns/peekAndSeekTopic"));
        ReplicatorTestBase.MessageProducer producer1 = new ReplicatorTestBase.MessageProducer(this.url1, dest);
        try {
            ReplicatorTestBase.MessageConsumer consumer1 = new ReplicatorTestBase.MessageConsumer(this.url3, dest);
            try {
                producer1.produce(2);
                PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
                PersistentReplicator replicator = (PersistentReplicator)topic.getReplicators().get(topic.getReplicators().keys().get(0));
                replicator.skipMessages(2);
                CompletableFuture result = replicator.peekNthMessage(1);
                Entry entry = (Entry)result.get(50L, TimeUnit.MILLISECONDS);
                Assert.assertNull((Object)entry);
            }
            finally {
                if (Collections.singletonList(consumer1).get(0) != null) {
                    consumer1.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer1).get(0) != null) {
                producer1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testReplicatorClearBacklog() throws Exception {
        TreeSet<String> testDests = new TreeSet<String>();
        TopicName dest = TopicName.get((String)BrokerTestUtil.newUniqueName("persistent://pulsar/ns/clearBacklogTopic"));
        testDests.add(dest.toString());
        ReplicatorTestBase.MessageProducer producer1 = new ReplicatorTestBase.MessageProducer(this.url1, dest);
        try {
            ReplicatorTestBase.MessageConsumer consumer1 = new ReplicatorTestBase.MessageConsumer(this.url3, dest);
            try {
                producer1.produce(2);
                PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
                PersistentReplicator replicator = (PersistentReplicator)Mockito.spy((Object)topic.getReplicators().get(topic.getReplicators().keys().get(0)));
                replicator.readEntriesFailed((ManagedLedgerException)new ManagedLedgerException.InvalidCursorPositionException("failed"), null);
                replicator.clearBacklog().get();
                Thread.sleep(100L);
                replicator.updateRates();
                replicator.expireMessages(1);
                ReplicatorStats status = replicator.getStats();
                Assert.assertEquals((long)status.replicationBacklog, (long)0L);
            }
            finally {
                if (Collections.singletonList(consumer1).get(0) != null) {
                    consumer1.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer1).get(0) != null) {
                producer1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testResetCursorNotFail() throws Exception {
        log.info("--- Starting ReplicatorTest::testResetCursorNotFail ---");
        TopicName dest = TopicName.get((String)BrokerTestUtil.newUniqueName("persistent://pulsar/ns/resetrepltopic"));
        ReplicatorTestBase.MessageProducer producer1 = new ReplicatorTestBase.MessageProducer(this.url1, dest);
        try {
            log.info("--- Starting producer --- " + this.url1);
            ReplicatorTestBase.MessageConsumer consumer1 = new ReplicatorTestBase.MessageConsumer(this.url1, dest);
            try {
                log.info("--- Starting Consumer --- " + this.url1);
                producer1.produce(2);
                consumer1.receive(2);
                this.admin1.topics().resetCursor(dest.toString(), "sub-id", System.currentTimeMillis());
            }
            finally {
                if (Collections.singletonList(consumer1).get(0) != null) {
                    consumer1.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer1).get(0) != null) {
                producer1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReplicationForBatchMessages() throws Exception {
        log.info("--- Starting ReplicatorTest::testReplicationForBatchMessages ---");
        TopicName dest = TopicName.get((String)BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopicbatch"));
        ReplicatorTestBase.MessageProducer producer1 = new ReplicatorTestBase.MessageProducer(this.url1, dest, true);
        try {
            log.info("--- Starting producer --- " + this.url1);
            ReplicatorTestBase.MessageProducer producer2 = new ReplicatorTestBase.MessageProducer(this.url2, dest, true);
            try {
                log.info("--- Starting producer --- " + this.url2);
                ReplicatorTestBase.MessageProducer producer3 = new ReplicatorTestBase.MessageProducer(this.url3, dest, true);
                try {
                    log.info("--- Starting producer --- " + this.url3);
                    ReplicatorTestBase.MessageConsumer consumer1 = new ReplicatorTestBase.MessageConsumer(this.url1, dest);
                    try {
                        log.info("--- Starting Consumer --- " + this.url1);
                        ReplicatorTestBase.MessageConsumer consumer2 = new ReplicatorTestBase.MessageConsumer(this.url2, dest);
                        try {
                            log.info("--- Starting Consumer --- " + this.url2);
                            ReplicatorTestBase.MessageConsumer consumer3 = new ReplicatorTestBase.MessageConsumer(this.url3, dest);
                            try {
                                log.info("--- Starting Consumer --- " + this.url3);
                                producer1.produceBatch(10);
                                consumer1.receive(10);
                                consumer2.receive(10);
                                consumer3.receive(10);
                                producer2.produceBatch(10);
                                consumer1.receive(10);
                                consumer2.receive(10);
                                consumer3.receive(10);
                            }
                            finally {
                                if (Collections.singletonList(consumer3).get(0) != null) {
                                    consumer3.close();
                                }
                            }
                        }
                        finally {
                            if (Collections.singletonList(consumer2).get(0) != null) {
                                consumer2.close();
                            }
                        }
                    }
                    finally {
                        if (Collections.singletonList(consumer1).get(0) != null) {
                            consumer1.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(producer3).get(0) != null) {
                        producer3.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(producer2).get(0) != null) {
                    producer2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer1).get(0) != null) {
                producer1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testDeleteReplicatorFailure() throws Exception {
        log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
        String topicName = BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopicbatch");
        TopicName dest = TopicName.get((String)topicName);
        ReplicatorTestBase.MessageProducer producer1 = new ReplicatorTestBase.MessageProducer(this.url1, dest);
        try {
            PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getTopicReference(topicName).get();
            String replicatorClusterName = (String)topic.getReplicators().keys().get(0);
            ManagedLedgerImpl ledger = (ManagedLedgerImpl)topic.getManagedLedger();
            final CountDownLatch latch = new CountDownLatch(1);
            ledger.asyncDeleteCursor("pulsar.repl." + replicatorClusterName, new AsyncCallbacks.DeleteCursorCallback(){

                public void deleteCursorComplete(Object ctx) {
                    latch.countDown();
                }

                public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
                    latch.countDown();
                }
            }, null);
            latch.await();
            Method removeReplicator = PersistentTopic.class.getDeclaredMethod("removeReplicator", String.class);
            removeReplicator.setAccessible(true);
            CompletableFuture result = (CompletableFuture)removeReplicator.invoke((Object)topic, replicatorClusterName);
            result.thenApply(v -> {
                Assert.assertNull((Object)topic.getPersistentReplicator(replicatorClusterName));
                return null;
            });
        }
        finally {
            if (Collections.singletonList(producer1).get(0) != null) {
                producer1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(priority=5, timeOut=30000L)
    public void testReplicatorProducerClosing() throws Exception {
        log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
        String topicName = BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopicbatch");
        TopicName dest = TopicName.get((String)topicName);
        ReplicatorTestBase.MessageProducer producer1 = new ReplicatorTestBase.MessageProducer(this.url1, dest);
        try {
            PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getTopicReference(topicName).get();
            String replicatorClusterName = (String)topic.getReplicators().keys().get(0);
            Replicator replicator = topic.getPersistentReplicator(replicatorClusterName);
            this.pulsar2.close();
            this.pulsar2 = null;
            this.pulsar3.close();
            this.pulsar3 = null;
            replicator.disconnect(false);
            Thread.sleep(100L);
            Field field = AbstractReplicator.class.getDeclaredField("producer");
            field.setAccessible(true);
            ProducerImpl producer = (ProducerImpl)field.get(replicator);
            Assert.assertNull((Object)producer);
        }
        finally {
            if (Collections.singletonList(producer1).get(0) != null) {
                producer1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=60000L, priority=-1)
    public void testResumptionAfterBacklogRelaxed() throws Exception {
        List policies = Lists.newArrayList();
        policies.add(BacklogQuota.RetentionPolicy.producer_exception);
        policies.add(BacklogQuota.RetentionPolicy.producer_request_hold);
        for (BacklogQuota.RetentionPolicy policy : policies) {
            this.admin1.namespaces().setBacklogQuota("pulsar/ns1", new BacklogQuota(0x100000L, policy));
            Thread.sleep(200L);
            TopicName dest = TopicName.get((String)BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/%s-" + policy));
            ReplicatorTestBase.MessageProducer producer1 = new ReplicatorTestBase.MessageProducer(this.url1, dest);
            try {
                ReplicatorTestBase.MessageConsumer consumer2 = new ReplicatorTestBase.MessageConsumer(this.url2, dest);
                try {
                    PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
                    Replicator replicator = topic.getPersistentReplicator("r2");
                    producer1.produce(1);
                    Thread.sleep(500L);
                    this.admin1.namespaces().setBacklogQuota("pulsar/ns1", new BacklogQuota(1L, policy));
                    Thread.sleep(6000L);
                    Assert.assertEquals((long)replicator.getStats().replicationBacklog, (long)0L);
                    producer1.produce(1);
                    Thread.sleep(500L);
                    Assert.assertEquals((long)replicator.getStats().replicationBacklog, (long)1L);
                    consumer2.receive(1);
                    consumer2.receive(1);
                    int retry = 10;
                    for (int i = 0; i < retry && replicator.getStats().replicationBacklog > 0L; ++i) {
                        if (i == retry - 1) continue;
                        Thread.sleep(100L);
                    }
                    Assert.assertEquals((long)replicator.getStats().replicationBacklog, (long)0L);
                }
                finally {
                    if (Collections.singletonList(consumer2).get(0) == null) continue;
                    consumer2.close();
                }
            }
            finally {
                if (Collections.singletonList(producer1).get(0) == null) continue;
                producer1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=15000L)
    public void testCloseReplicatorStartProducer() throws Exception {
        TopicName dest = TopicName.get((String)BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/closeCursor"));
        ReplicatorTestBase.MessageProducer producer1 = new ReplicatorTestBase.MessageProducer(this.url1, dest);
        try {
            ReplicatorTestBase.MessageConsumer consumer1 = new ReplicatorTestBase.MessageConsumer(this.url1, dest);
            try {
                ReplicatorTestBase.MessageConsumer consumer2 = new ReplicatorTestBase.MessageConsumer(this.url2, dest);
                try {
                    PersistentTopic topic = (PersistentTopic)this.pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
                    PersistentReplicator replicator = (PersistentReplicator)topic.getPersistentReplicator("r2");
                    Field cursorField = PersistentReplicator.class.getDeclaredField("cursor");
                    cursorField.setAccessible(true);
                    ManagedCursor cursor = (ManagedCursor)cursorField.get(replicator);
                    cursor.close();
                    producer1.produce(10);
                    try {
                        cursor.readEntriesOrWait(10);
                        Assert.fail((String)"It should have failed");
                    }
                    catch (Exception e) {
                        Assert.assertEquals(e.getClass(), ManagedLedgerException.CursorAlreadyClosedException.class);
                    }
                    replicator.readEntriesFailed((ManagedLedgerException)new ManagedLedgerException.CursorAlreadyClosedException("Cursor already closed exception"), null);
                    Thread.sleep(100L);
                    Field producerField = AbstractReplicator.class.getDeclaredField("producer");
                    producerField.setAccessible(true);
                    ProducerImpl replicatorProducer = (ProducerImpl)producerField.get(replicator);
                    Assert.assertNull((Object)replicatorProducer);
                }
                finally {
                    if (Collections.singletonList(consumer2).get(0) != null) {
                        consumer2.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer1).get(0) != null) {
                    consumer1.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer1).get(0) != null) {
                producer1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void verifyChecksumAfterReplication() throws Exception {
        String topicName = BrokerTestUtil.newUniqueName("persistent://pulsar/ns/checksumAfterReplication");
        PulsarClient c1 = PulsarClient.builder().serviceUrl(this.url1.toString()).build();
        try {
            Producer p1 = c1.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            PulsarClient c2 = PulsarClient.builder().serviceUrl(this.url2.toString()).build();
            try {
                RawReader reader2 = (RawReader)RawReader.create((PulsarClient)c2, (String)topicName, (String)"sub").get();
                p1.send((Object)"Hello".getBytes());
                RawMessage msg = (RawMessage)reader2.readNextAsync().get();
                ByteBuf b = msg.getHeadersAndPayload();
                Assert.assertTrue((boolean)Commands.hasChecksum((ByteBuf)b));
                int parsedChecksum = Commands.readChecksum((ByteBuf)b);
                int computedChecksum = Crc32cIntChecksum.computeChecksum((ByteBuf)b);
                Assert.assertEquals((int)parsedChecksum, (int)computedChecksum);
                p1.close();
                reader2.closeAsync().get();
            }
            finally {
                if (Collections.singletonList(c2).get(0) != null) {
                    c2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(c1).get(0) != null) {
                c1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="partitionedTopic")
    public void testReplicatorOnPartitionedTopic(boolean isPartitionedTopic) throws Exception {
        log.info("--- Starting ReplicatorTest::{} --- ", (Object)this.methodName);
        String namespace = BrokerTestUtil.newUniqueName("pulsar/partitionedNs-" + isPartitionedTopic);
        String persistentTopicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/partTopic-" + isPartitionedTopic);
        String nonPersistentTopicName = BrokerTestUtil.newUniqueName("non-persistent://" + namespace + "/partTopic-" + isPartitionedTopic);
        BrokerService brokerService = this.pulsar1.getBrokerService();
        this.admin1.namespaces().createNamespace(namespace);
        this.admin1.namespaces().setNamespaceReplicationClusters(namespace, (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2", "r3"}));
        if (isPartitionedTopic) {
            this.admin1.topics().createPartitionedTopic(persistentTopicName, 5);
            this.admin1.topics().createPartitionedTopic(nonPersistentTopicName, 5);
        }
        PulsarClient client = PulsarClient.builder().serviceUrl(this.url1.toString()).build();
        try {
            client.newProducer().topic("persistent://" + namespace + "/dummyTopic").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            try {
                brokerService.getOrCreateTopic(persistentTopicName).get();
                if (isPartitionedTopic) {
                    Assert.fail((String)"Topic creation fails with partitioned topic as replicator init fails");
                }
            }
            catch (Exception e) {
                if (!isPartitionedTopic) {
                    Assert.fail((String)"Topic creation should not fail without any partitioned topic");
                }
                Assert.assertTrue((boolean)(e.getCause() instanceof BrokerServiceException.NamingException));
            }
            try {
                brokerService.getOrCreateTopic(nonPersistentTopicName).get();
                if (isPartitionedTopic) {
                    Assert.fail((String)"Topic creation fails with partitioned topic as replicator init fails");
                }
            }
            catch (Exception e) {
                if (!isPartitionedTopic) {
                    Assert.fail((String)"Topic creation should not fail without any partitioned topic");
                }
                Assert.assertTrue((boolean)(e.getCause() instanceof BrokerServiceException.NamingException));
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReplicatedCluster() throws Exception {
        log.info("--- Starting ReplicatorTest::testReplicatedCluster ---");
        String namespace = "pulsar/global/repl";
        String topicName = BrokerTestUtil.newUniqueName("persistent://pulsar/global/repl/topic1");
        this.admin1.namespaces().createNamespace("pulsar/global/repl");
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/repl", (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2", "r3"}));
        this.admin1.topics().createPartitionedTopic(topicName, 4);
        PulsarClient client1 = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            PulsarClient client2 = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                Producer producer1 = client1.newProducer().topic(topicName).create();
                Consumer consumer1 = client1.newConsumer().topic(new String[]{topicName}).subscriptionName("s1").subscribe();
                Consumer consumer2 = client2.newConsumer().topic(new String[]{topicName}).subscriptionName("s1").subscribe();
                byte[] value = "test".getBytes();
                TypedMessageBuilder msg = producer1.newMessage().replicationClusters(Lists.newArrayList((Object[])new String[]{"r1"})).value((Object)value);
                msg.send();
                Assert.assertEquals((byte[])((byte[])consumer1.receive().getValue()), (byte[])value);
                Message msg2 = consumer2.receive(1, TimeUnit.SECONDS);
                if (msg2 != null) {
                    Assert.fail((String)"msg should have not been replicated to remote cluster");
                }
                consumer1.close();
                consumer2.close();
                producer1.close();
            }
            finally {
                if (Collections.singletonList(client2).get(0) != null) {
                    client2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client1).get(0) != null) {
                client1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUpdateGlobalTopicPartition() throws Exception {
        log.info("--- Starting ReplicatorTest::testUpdateGlobalTopicPartition ---");
        String cluster1 = this.pulsar1.getConfig().getClusterName();
        String cluster2 = this.pulsar2.getConfig().getClusterName();
        String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");
        String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/topic1");
        int startPartitions = 4;
        int newPartitions = 8;
        String subscriberName = "sub1";
        this.admin1.namespaces().createNamespace(namespace, (Set)Sets.newHashSet((Object[])new String[]{cluster1, cluster2}));
        this.admin1.topics().createPartitionedTopic(topicName, startPartitions);
        PulsarClient client1 = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            PulsarClient client2 = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                Consumer consumer1 = client1.newConsumer().topic(new String[]{topicName}).subscriptionName("sub1").subscribe();
                Consumer consumer2 = client2.newConsumer().topic(new String[]{topicName}).subscriptionName("sub1").subscribe();
                this.admin1.topics().updatePartitionedTopic(topicName, newPartitions);
                Assert.assertEquals((int)this.admin1.topics().getPartitionedTopicMetadata((String)topicName).partitions, (int)newPartitions);
                Producer producer1 = client1.newProducer().topic(topicName).create();
                Producer producer2 = client2.newProducer().topic(topicName).create();
                for (int i = startPartitions; i < newPartitions; ++i) {
                    String partitionedTopic = topicName + "-partition-" + i;
                    Assert.assertEquals((int)this.admin1.topics().getSubscriptions(partitionedTopic).size(), (int)1);
                    Assert.assertEquals((int)this.admin2.topics().getSubscriptions(partitionedTopic).size(), (int)1);
                }
                producer1.close();
                producer2.close();
                consumer1.close();
                consumer2.close();
            }
            finally {
                if (Collections.singletonList(client2).get(0) != null) {
                    client2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client1).get(0) != null) {
                client1.close();
            }
        }
    }

    @DataProvider(name="topicPrefix")
    public static Object[][] topicPrefix() {
        return new Object[][]{{"persistent://", "/persistent"}, {"non-persistent://", "/non-persistent"}};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="topicPrefix")
    public void testTopicReplicatedAndProducerCreate(String topicPrefix, String topicName) throws Exception {
        log.info("--- Starting ReplicatorTest::testTopicReplicatedAndProducerCreate ---");
        String cluster1 = this.pulsar1.getConfig().getClusterName();
        String cluster2 = this.pulsar2.getConfig().getClusterName();
        String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");
        String partitionedTopicName = BrokerTestUtil.newUniqueName(topicPrefix + namespace + topicName + "-partitioned");
        String nonPartitionedTopicName = BrokerTestUtil.newUniqueName(topicPrefix + namespace + topicName + "-non-partitioned");
        int startPartitions = 4;
        this.admin1.namespaces().createNamespace(namespace, (Set)Sets.newHashSet((Object[])new String[]{cluster1, cluster2}));
        this.admin1.namespaces().setNamespaceReplicationClusters(namespace, (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2", "r3"}));
        this.admin1.topics().createPartitionedTopic(partitionedTopicName, 4);
        this.admin1.topics().createNonPartitionedTopic(nonPartitionedTopicName);
        PulsarClient client1 = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            PulsarClient client2 = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                Producer persistentProducer1 = client1.newProducer().topic(partitionedTopicName).create();
                Producer persistentProducer2 = client2.newProducer().topic(partitionedTopicName).create();
                Assert.assertNotNull((Object)persistentProducer1.send((Object)"test".getBytes()));
                Assert.assertNotNull((Object)persistentProducer2.send((Object)"test".getBytes()));
                Producer nonPersistentProducer1 = client1.newProducer().topic(nonPartitionedTopicName).create();
                Producer nonPersistentProducer2 = client2.newProducer().topic(nonPartitionedTopicName).create();
                Assert.assertNotNull((Object)nonPersistentProducer1.send((Object)"test".getBytes()));
                Assert.assertNotNull((Object)nonPersistentProducer2.send((Object)"test".getBytes()));
                persistentProducer1.close();
                persistentProducer2.close();
                nonPersistentProducer1.close();
                nonPersistentProducer2.close();
            }
            finally {
                if (Collections.singletonList(client2).get(0) != null) {
                    client2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client1).get(0) != null) {
                client1.close();
            }
        }
    }
}

