package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import io.netty.util.concurrent.FastThreadLocalThread;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/OneWayReplicatorTest.class */
public class OneWayReplicatorTest extends OneWayReplicatorTestBase {
    private static final Logger log = LoggerFactory.getLogger(OneWayReplicatorTest.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/broker/service/OneWayReplicatorTest$CursorCloseSignal.class */
    public static class CursorCloseSignal {
        CountDownLatch startCloseSignal;
        CountDownLatch startCallbackSignal;

        void startClose() {
            this.startCloseSignal.countDown();
        }

        void startCallback() {
            this.startCallbackSignal.countDown();
        }

        public CursorCloseSignal(CountDownLatch countDownLatch, CountDownLatch countDownLatch2) {
            this.startCloseSignal = countDownLatch;
            this.startCallbackSignal = countDownLatch2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/broker/service/OneWayReplicatorTest$SpyCursor.class */
    public static class SpyCursor {
        ManagedCursorImpl original;
        ManagedCursorImpl spy;

        public ManagedCursorImpl getOriginal() {
            return this.original;
        }

        public ManagedCursorImpl getSpy() {
            return this.spy;
        }

        public void setOriginal(ManagedCursorImpl managedCursorImpl) {
            this.original = managedCursorImpl;
        }

        public void setSpy(ManagedCursorImpl managedCursorImpl) {
            this.spy = managedCursorImpl;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof SpyCursor)) {
                return false;
            }
            SpyCursor spyCursor = (SpyCursor) obj;
            if (!spyCursor.canEqual(this)) {
                return false;
            }
            ManagedCursorImpl original = getOriginal();
            ManagedCursorImpl original2 = spyCursor.getOriginal();
            if (original == null) {
                if (original2 != null) {
                    return false;
                }
            } else if (!original.equals(original2)) {
                return false;
            }
            ManagedCursorImpl spy = getSpy();
            ManagedCursorImpl spy2 = spyCursor.getSpy();
            return spy == null ? spy2 == null : spy.equals(spy2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof SpyCursor;
        }

        public int hashCode() {
            ManagedCursorImpl original = getOriginal();
            int hashCode = (1 * 59) + (original == null ? 43 : original.hashCode());
            ManagedCursorImpl spy = getSpy();
            return (hashCode * 59) + (spy == null ? 43 : spy.hashCode());
        }

        public String toString() {
            return "OneWayReplicatorTest.SpyCursor(original=" + getOriginal() + ", spy=" + getSpy() + ")";
        }

        public SpyCursor(ManagedCursorImpl managedCursorImpl, ManagedCursorImpl managedCursorImpl2) {
            this.original = managedCursorImpl;
            this.spy = managedCursorImpl2;
        }
    }

    @Override // org.apache.pulsar.broker.service.OneWayReplicatorTestBase
    @BeforeClass(alwaysRun = true, timeOut = 300000)
    public void setup() throws Exception {
        super.setup();
    }

    @Override // org.apache.pulsar.broker.service.OneWayReplicatorTestBase
    @AfterClass(alwaysRun = true, timeOut = 300000)
    public void cleanup() throws Exception {
        super.cleanup();
    }

    private void waitReplicatorStopped(String str) {
        Awaitility.await().untilAsserted(() -> {
            Optional optional = (Optional) this.pulsar2.getBrokerService().getTopic(str, false).get();
            Assert.assertTrue(optional.isPresent());
            Assert.assertTrue(((PersistentTopic) optional.get()).getProducers().isEmpty());
            Assert.assertTrue(((Optional) this.pulsar2.getBrokerService().getTopic(str, false).get()).isPresent());
            PersistentTopic persistentTopic = (PersistentTopic) optional.get();
            Assert.assertTrue(persistentTopic.getReplicators().isEmpty() || !((Replicator) persistentTopic.getReplicators().get("r2")).isConnected());
        });
    }

    private ProducerImpl overrideProducerForReplicator(AbstractReplicator abstractReplicator, ProducerImpl producerImpl) throws Exception {
        Field declaredField = AbstractReplicator.class.getDeclaredField("producer");
        declaredField.setAccessible(true);
        ProducerImpl producerImpl2 = (ProducerImpl) declaredField.get(abstractReplicator);
        synchronized (abstractReplicator) {
            declaredField.set(abstractReplicator, producerImpl);
        }
        return producerImpl2;
    }

    @Test(timeOut = 45000)
    public void testReplicatorProducerStatInTopic() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
        byte[] bytes = "test".getBytes();
        this.admin1.topics().createNonPartitionedTopic(newUniqueName);
        this.admin2.topics().createNonPartitionedTopic(newUniqueName);
        this.admin1.topics().createSubscription(newUniqueName, "subscribe_1", MessageId.earliest);
        this.admin2.topics().createSubscription(newUniqueName, "subscribe_1", MessageId.earliest);
        Producer create = this.client1.newProducer().topic(newUniqueName).create();
        Consumer subscribe = this.client2.newConsumer().topic(new String[]{newUniqueName}).subscriptionName("subscribe_1").subscribe();
        create.newMessage().value(bytes).send();
        this.pulsar1.getBrokerService().checkReplicationPolicies();
        Assert.assertEquals((byte[]) subscribe.receive(10, TimeUnit.SECONDS).getValue(), bytes);
        TopicStats stats = this.admin2.topics().getStats(newUniqueName);
        org.junit.Assert.assertTrue(stats.getPublishers().size() + stats.getReplication().size() > 0);
        subscribe.close();
        create.close();
        cleanupTopics(() -> {
            this.admin1.topics().delete(newUniqueName);
            this.admin2.topics().delete(newUniqueName);
        });
    }

    @Test(timeOut = 45000)
    public void testCreateRemoteConsumerFirst() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
        Producer create = this.client1.newProducer(Schema.STRING).topic(newUniqueName).create();
        Consumer subscribe = this.client2.newConsumer(Schema.STRING).topic(new String[]{newUniqueName}).subscriptionName("s1").subscribe();
        waitReplicatorStarted(newUniqueName);
        create.close();
        subscribe.close();
        cleanupTopics(() -> {
            this.admin1.topics().delete(newUniqueName);
            this.admin2.topics().delete(newUniqueName);
        });
    }

    @Test(timeOut = 45000)
    public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
        this.admin1.topics().createNonPartitionedTopic(newUniqueName);
        waitReplicatorStarted(newUniqueName);
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar1.getBrokerService().getTopic(newUniqueName, false).join()).get();
        PersistentReplicator persistentReplicator = (PersistentReplicator) persistentTopic.getReplicators().values().iterator().next();
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        ProducerImpl producerImpl = (ProducerImpl) Mockito.mock(ProducerImpl.class);
        AtomicReference atomicReference = new AtomicReference();
        ((ProducerImpl) Mockito.doAnswer(invocationOnMock -> {
            return atomicBoolean.get() ? CompletableFuture.failedFuture(new Exception("mocked ex")) : ((ProducerImpl) atomicReference.get()).closeAsync();
        }).when(producerImpl)).closeAsync();
        atomicReference.set(overrideProducerForReplicator(persistentReplicator, producerImpl));
        this.admin1.topics().unload(newUniqueName);
        atomicBoolean.set(false);
        AtomicReference atomicReference2 = new AtomicReference();
        AtomicReference atomicReference3 = new AtomicReference();
        Awaitility.await().untilAsserted(() -> {
            atomicReference2.set((PersistentTopic) ((Optional) this.pulsar1.getBrokerService().getTopic(newUniqueName, false).join()).get());
            atomicReference3.set((PersistentReplicator) ((PersistentTopic) atomicReference2.get()).getReplicators().values().iterator().next());
            Assert.assertNotEquals(atomicReference2.get(), persistentTopic);
            Assert.assertNotEquals(atomicReference3.get(), persistentReplicator);
        });
        Awaitility.await().untilAsserted(() -> {
            org.junit.Assert.assertFalse(persistentReplicator.isConnected());
            org.junit.Assert.assertFalse(((ProducerImpl) atomicReference.get()).isConnected());
            org.junit.Assert.assertTrue(((PersistentReplicator) atomicReference3.get()).isConnected());
        });
        cleanupTopics(() -> {
            this.admin1.topics().delete(newUniqueName);
            this.admin2.topics().delete(newUniqueName);
        });
    }

    private Runnable injectMockReplicatorProducerBuilder(BiFunction<ProducerConfigurationData, ProducerImpl, ProducerImpl> biFunction) throws Exception {
        String clusterName = this.pulsar2.getConfig().getClusterName();
        BrokerService brokerService = this.pulsar1.getBrokerService();
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
        this.admin1.topics().createNonPartitionedTopic(newUniqueName);
        waitReplicatorStarted(newUniqueName);
        cleanupTopics(() -> {
            this.admin1.topics().delete(newUniqueName);
            this.admin2.topics().delete(newUniqueName);
        });
        ConcurrentOpenHashMap concurrentOpenHashMap = (ConcurrentOpenHashMap) WhiteboxImpl.getInternalState(brokerService, "replicationClients");
        PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) concurrentOpenHashMap.get(clusterName);
        PulsarClient pulsarClient = (PulsarClient) Mockito.spy(pulsarClientImpl);
        Assert.assertTrue(concurrentOpenHashMap.remove(clusterName, pulsarClientImpl));
        Assert.assertNull(concurrentOpenHashMap.putIfAbsent(clusterName, pulsarClient));
        ((PulsarClient) Mockito.doAnswer(invocationOnMock -> {
            ProducerBuilderImpl newProducer = pulsarClientImpl.newProducer((Schema) invocationOnMock.getArguments()[0]);
            ProducerBuilder producerBuilder = (ProducerBuilder) Mockito.spy(newProducer);
            ((ProducerBuilder) Mockito.doAnswer(invocationOnMock -> {
                CompletableFuture completableFuture = new CompletableFuture();
                newProducer.createAsync().whenComplete((producer, th) -> {
                    if (th != null) {
                        completableFuture.completeExceptionally(th);
                    } else {
                        ProducerImpl producerImpl = (ProducerImpl) producer;
                        new FastThreadLocalThread(() -> {
                            try {
                                completableFuture.complete((ProducerImpl) biFunction.apply(newProducer.getConf(), producerImpl));
                            } catch (Exception e) {
                                completableFuture.completeExceptionally(e);
                            }
                        }).start();
                    }
                });
                return completableFuture;
            }).when(producerBuilder)).createAsync();
            return producerBuilder;
        }).when(pulsarClient)).newProducer((Schema) Mockito.any(Schema.class));
        return () -> {
            Assert.assertTrue(concurrentOpenHashMap.remove(clusterName, pulsarClient));
            Assert.assertNull(concurrentOpenHashMap.putIfAbsent(clusterName, pulsarClientImpl));
        };
    }

    private SpyCursor spyCursor(PersistentTopic persistentTopic, String str) throws Exception {
        ManagedLedgerImpl managedLedger = persistentTopic.getManagedLedger();
        ManagedCursorImpl managedCursorImpl = managedLedger.getCursors().get(str);
        ManagedCursorImpl managedCursorImpl2 = (ManagedCursorImpl) Mockito.spy(managedCursorImpl);
        managedLedger.getCursors().removeCursor(str);
        managedLedger.deactivateCursor(managedCursorImpl);
        Method declaredMethod = ManagedLedgerImpl.class.getDeclaredMethod("addCursor", ManagedCursorImpl.class);
        declaredMethod.setAccessible(true);
        declaredMethod.invoke(managedLedger, managedCursorImpl2);
        return new SpyCursor(managedCursorImpl, managedCursorImpl2);
    }

    private CursorCloseSignal makeCursorClosingDelay(SpyCursor spyCursor) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        ((ManagedCursorImpl) Mockito.doAnswer(invocationOnMock -> {
            final AsyncCallbacks.CloseCallback closeCallback = (AsyncCallbacks.CloseCallback) invocationOnMock.getArguments()[0];
            Object obj = invocationOnMock.getArguments()[1];
            AsyncCallbacks.CloseCallback closeCallback2 = new AsyncCallbacks.CloseCallback() { // from class: org.apache.pulsar.broker.service.OneWayReplicatorTest.1
                public void closeComplete(final Object obj2) {
                    new FastThreadLocalThread(new Runnable() { // from class: org.apache.pulsar.broker.service.OneWayReplicatorTest.1.1
                        @Override // java.lang.Runnable
                        public void run() {
                            countDownLatch2.await();
                            closeCallback.closeComplete(obj2);
                        }
                    }).start();
                }

                public void closeFailed(final ManagedLedgerException managedLedgerException, final Object obj2) {
                    new FastThreadLocalThread(new Runnable() { // from class: org.apache.pulsar.broker.service.OneWayReplicatorTest.1.2
                        @Override // java.lang.Runnable
                        public void run() {
                            countDownLatch2.await();
                            closeCallback.closeFailed(managedLedgerException, obj2);
                        }
                    }).start();
                }
            };
            countDownLatch.await();
            spyCursor.original.asyncClose(closeCallback2, obj);
            return null;
        }).when(spyCursor.spy)).asyncClose((AsyncCallbacks.CloseCallback) Mockito.any(AsyncCallbacks.CloseCallback.class), Mockito.any());
        return new CursorCloseSignal(countDownLatch, countDownLatch2);
    }

    @Test(timeOut = 120000)
    public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
        AtomicInteger atomicInteger = new AtomicInteger();
        Runnable injectMockReplicatorProducerBuilder = injectMockReplicatorProducerBuilder((producerConfigurationData, producerImpl) -> {
            if (newUniqueName.equals(producerConfigurationData.getTopicName()) && atomicInteger.incrementAndGet() <= 6) {
                log.info("Retry create replicator.producer count: {}", atomicInteger);
                producerImpl.closeAsync();
                throw new RuntimeException("mock error");
            }
            return producerImpl;
        });
        this.admin1.topics().createNonPartitionedTopic(newUniqueName);
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar1.getBrokerService().getTopic(newUniqueName, false).join()).get();
        PersistentReplicator persistentReplicator = (PersistentReplicator) persistentTopic.getReplicators().values().iterator().next();
        Assert.assertFalse(persistentReplicator.isConnected());
        CursorCloseSignal makeCursorClosingDelay = makeCursorClosingDelay(spyCursor(persistentTopic, "pulsar.repl." + this.pulsar2.getConfig().getClusterName()));
        Awaitility.await().pollInterval(Duration.ofMillis(100L)).atMost(Duration.ofSeconds(60L)).untilAsserted(() -> {
            Assert.assertTrue(atomicInteger.get() >= 6, "count of retry to create producer is " + atomicInteger.get());
        });
        CompletableFuture close = persistentTopic.close(true);
        Awaitility.await().atMost(Duration.ofSeconds(30L)).untilAsserted(() -> {
            String valueOf = String.valueOf(persistentReplicator.getState());
            Assert.assertTrue(valueOf.equals("Stopped") || valueOf.equals("Terminated"));
        });
        Thread.sleep(4000L);
        log.info("Replicator.state: {}", persistentReplicator.getState());
        makeCursorClosingDelay.startClose();
        makeCursorClosingDelay.startCallback();
        close.join();
        Awaitility.await().pollInterval(Duration.ofSeconds(1L)).untilAsserted(() -> {
            Assert.assertEquals(((PersistentTopic) ((Optional) this.pulsar2.getBrokerService().getTopic(newUniqueName, false).join()).get()).getProducers().size(), 0);
            org.junit.Assert.assertFalse(persistentReplicator.isConnected());
        });
        injectMockReplicatorProducerBuilder.run();
        cleanupTopics(() -> {
            this.admin1.topics().delete(newUniqueName);
            this.admin2.topics().delete(newUniqueName);
        });
    }

    @Test
    public void testPartitionedTopicLevelReplication() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/ns1/tp_");
        String topicName = TopicName.get(newUniqueName).getPartition(0).toString();
        String topicName2 = TopicName.get(newUniqueName).getPartition(1).toString();
        this.admin1.topics().createPartitionedTopic(newUniqueName, 2);
        this.admin1.topics().setReplicationClusters(newUniqueName, Arrays.asList("r1", "r2"));
        Assert.assertEquals(this.admin2.topics().getPartitionedTopicMetadata(newUniqueName).partitions, 2);
        this.admin1.topics().setReplicationClusters(newUniqueName, Arrays.asList("r1"));
        waitReplicatorStopped(topicName);
        waitReplicatorStopped(topicName2);
        this.admin1.topics().deletePartitionedTopic(newUniqueName);
        this.admin2.topics().deletePartitionedTopic(newUniqueName);
    }

    @Test
    public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/ns1/tp_");
        String topicName = TopicName.get(newUniqueName).getPartition(0).toString();
        String topicName2 = TopicName.get(newUniqueName).getPartition(1).toString();
        this.admin1.topics().createPartitionedTopic(newUniqueName, 2);
        this.admin2.topics().createPartitionedTopic(newUniqueName, 2);
        this.admin1.topics().setReplicationClusters(newUniqueName, Arrays.asList("r1", "r2"));
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topics().getPartitionedTopicMetadata(newUniqueName).partitions, 2);
        });
        this.admin2.topics().updatePartitionedTopic(newUniqueName, 3);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topics().getPartitionedTopicMetadata(newUniqueName).partitions, 3);
        });
        this.admin1.topics().setReplicationClusters(newUniqueName, Arrays.asList("r1"));
        waitReplicatorStopped(topicName);
        waitReplicatorStopped(topicName2);
        this.admin1.topics().deletePartitionedTopic(newUniqueName);
        this.admin2.topics().deletePartitionedTopic(newUniqueName);
    }

    @Test
    public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/ns1/tp_");
        this.admin2.topics().createPartitionedTopic(newUniqueName, 3);
        this.admin1.topics().createPartitionedTopic(newUniqueName, 2);
        try {
            this.admin1.topics().setReplicationClusters(newUniqueName, Arrays.asList("r1", "r2"));
            Assert.fail("Expected error due to a conflict partitioned topic already exists.");
        } catch (Exception e) {
            Assert.assertTrue(FutureUtil.unwrapCompletionException(e).getMessage().contains("with different partitions"));
        }
        Assert.assertEquals(this.admin2.topics().getPartitionedTopicMetadata(newUniqueName).partitions, 3);
        Assert.assertEquals(this.admin1.topics().getReplicationClusters(newUniqueName, true).size(), 1);
        this.admin1.topics().deletePartitionedTopic(newUniqueName);
        this.admin2.topics().deletePartitionedTopic(newUniqueName);
    }

    @Test
    public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception {
        String str = "public/" + UUID.randomUUID().toString().replaceAll("-", "");
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://" + str + "/tp_");
        this.admin1.namespaces().createNamespace(str);
        this.admin2.namespaces().createNamespace(str);
        this.admin1.topics().createNonPartitionedTopic(newUniqueName);
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar1.getBrokerService().getTopic(newUniqueName, false).join()).get();
        AtomicInteger atomicInteger = new AtomicInteger();
        Runnable injectMockReplicatorProducerBuilder = injectMockReplicatorProducerBuilder((producerConfigurationData, producerImpl) -> {
            if (newUniqueName.equals(producerConfigurationData.getTopicName()) && atomicInteger.incrementAndGet() <= 6) {
                log.info("Retry create replicator.producer count: {}", atomicInteger);
                producerImpl.closeAsync();
                throw new RuntimeException("mock error");
            }
            return producerImpl;
        });
        this.admin1.namespaces().setNamespaceReplicationClusters(str, Sets.newHashSet(new String[]{"r1", "r2"}));
        AtomicReference atomicReference = new AtomicReference();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertFalse(persistentTopic.getReplicators().isEmpty());
            atomicReference.set((PersistentReplicator) persistentTopic.getReplicators().values().iterator().next());
            Assert.assertFalse(((PersistentReplicator) atomicReference.get()).isConnected());
        });
        CursorCloseSignal makeCursorClosingDelay = makeCursorClosingDelay(spyCursor(persistentTopic, "pulsar.repl." + this.pulsar2.getConfig().getClusterName()));
        Awaitility.await().pollInterval(Duration.ofMillis(100L)).atMost(Duration.ofSeconds(60L)).untilAsserted(() -> {
            Assert.assertTrue(atomicInteger.get() >= 6);
        });
        CompletableFuture close = persistentTopic.close(true);
        Awaitility.await().atMost(Duration.ofSeconds(30L)).untilAsserted(() -> {
            String valueOf = String.valueOf(((PersistentReplicator) atomicReference.get()).getState());
            log.error("replicator state: {}", valueOf);
            Assert.assertTrue(valueOf.equals("Disconnected") || valueOf.equals("Terminated"));
        });
        Thread.sleep(4000L);
        log.info("Replicator.state: {}", ((PersistentReplicator) atomicReference.get()).getState());
        makeCursorClosingDelay.startClose();
        makeCursorClosingDelay.startCallback();
        close.join();
        Awaitility.await().pollInterval(Duration.ofSeconds(1L)).untilAsserted(() -> {
            Assert.assertEquals(((PersistentTopic) ((Optional) this.pulsar2.getBrokerService().getTopic(newUniqueName, false).join()).get()).getProducers().size(), 0);
            org.junit.Assert.assertFalse(((PersistentReplicator) atomicReference.get()).isConnected());
        });
        injectMockReplicatorProducerBuilder.run();
        cleanupTopics(str, () -> {
            this.admin1.topics().delete(newUniqueName);
            this.admin2.topics().delete(newUniqueName);
        });
        this.admin1.namespaces().setNamespaceReplicationClusters(str, Sets.newHashSet(new String[]{"r1"}));
        this.admin1.namespaces().deleteNamespace(str);
        this.admin2.namespaces().deleteNamespace(str);
    }

    @Test
    public void testUnFenceTopicToReuse() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
        Producer create = this.client1.newProducer(Schema.STRING).topic(newUniqueName).create();
        waitReplicatorStarted(newUniqueName);
        String uuid = UUID.randomUUID().toString();
        Producer producer = (Producer) Mockito.mock(Producer.class);
        ((Producer) Mockito.doAnswer(invocationOnMock -> {
            return CompletableFuture.failedFuture(new RuntimeException("mocked error"));
        }).when(producer)).disconnect();
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar1.getBrokerService().getTopic(newUniqueName, false).join()).get();
        persistentTopic.getProducers().put(uuid, producer);
        GeoPersistentReplicator geoPersistentReplicator = (GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next();
        try {
            persistentTopic.close(false).join();
            Assert.fail("Expected close fails due to a producer close fails");
        } catch (Exception e) {
            log.info("Expected error: {}", e.getMessage());
        }
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(create.isConnected());
            GeoPersistentReplicator geoPersistentReplicator2 = (GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next();
            Assert.assertNotEquals(geoPersistentReplicator, geoPersistentReplicator2);
            Assert.assertFalse(geoPersistentReplicator.isConnected());
            Assert.assertFalse(geoPersistentReplicator.producer != null && geoPersistentReplicator.producer.isConnected());
            Assert.assertTrue(geoPersistentReplicator2.isConnected());
            Assert.assertTrue(geoPersistentReplicator2.producer != null && geoPersistentReplicator2.producer.isConnected());
        });
        persistentTopic.getProducers().remove(uuid, producer);
        create.close();
        cleanupTopics(() -> {
            this.admin1.topics().delete(newUniqueName);
            this.admin2.topics().delete(newUniqueName);
        });
    }

    @Test
    public void testDeleteNonPartitionedTopic() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
        this.admin1.topics().createNonPartitionedTopic(newUniqueName);
        verifyReplicationWorks(newUniqueName);
        setTopicLevelClusters(newUniqueName, Arrays.asList("r1"), this.admin1, this.pulsar1);
        setTopicLevelClusters(newUniqueName, Arrays.asList("r2"), this.admin2, this.pulsar2);
        this.admin1.topics().delete(newUniqueName);
        this.admin2.topics().delete(newUniqueName);
        Assert.assertFalse(((Boolean) this.pulsar1.getPulsarResources().getTopicResources().persistentTopicExists(TopicName.get(newUniqueName)).join()).booleanValue());
        Assert.assertFalse(((Boolean) this.pulsar2.getPulsarResources().getTopicResources().persistentTopicExists(TopicName.get(newUniqueName)).join()).booleanValue());
    }

    @Test
    public void testDeletePartitionedTopic() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
        this.admin1.topics().createPartitionedTopic(newUniqueName, 2);
        verifyReplicationWorks(newUniqueName);
        setTopicLevelClusters(newUniqueName, Arrays.asList("r1"), this.admin1, this.pulsar1);
        setTopicLevelClusters(newUniqueName, Arrays.asList("r2"), this.admin2, this.pulsar2);
        this.admin1.topics().deletePartitionedTopic(newUniqueName);
        if (!this.usingGlobalZK) {
            this.admin2.topics().deletePartitionedTopic(newUniqueName);
        }
        Assert.assertFalse(this.pulsar1.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().partitionedTopicExists(TopicName.get(newUniqueName)));
        Assert.assertFalse(this.pulsar2.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().partitionedTopicExists(TopicName.get(newUniqueName)));
        if (this.usingGlobalZK) {
            return;
        }
        Assert.assertFalse(((Boolean) this.pulsar1.getPulsarResources().getTopicResources().persistentTopicExists(TopicName.get(newUniqueName).getPartition(0)).join()).booleanValue());
        Assert.assertFalse(((Boolean) this.pulsar2.getPulsarResources().getTopicResources().persistentTopicExists(TopicName.get(newUniqueName).getPartition(0)).join()).booleanValue());
        Assert.assertFalse(((Boolean) this.pulsar1.getPulsarResources().getTopicResources().persistentTopicExists(TopicName.get(newUniqueName).getPartition(1)).join()).booleanValue());
        Assert.assertFalse(((Boolean) this.pulsar2.getPulsarResources().getTopicResources().persistentTopicExists(TopicName.get(newUniqueName).getPartition(1)).join()).booleanValue());
    }

    @Test
    public void testNoExpandTopicPartitionsWhenDisableTopicLevelReplication() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
        this.admin1.topics().createPartitionedTopic(newUniqueName, 2);
        verifyReplicationWorks(newUniqueName);
        setTopicLevelClusters(newUniqueName, Arrays.asList("r1"), this.admin1, this.pulsar1);
        setTopicLevelClusters(newUniqueName, Arrays.asList("r2"), this.admin2, this.pulsar2);
        this.admin1.topics().updatePartitionedTopic(newUniqueName, 3);
        Assert.assertEquals(this.admin1.topics().getPartitionedTopicMetadata(newUniqueName).partitions, 3);
        Thread.sleep(3000L);
        Assert.assertEquals(this.admin2.topics().getPartitionedTopicMetadata(newUniqueName).partitions, 2);
        cleanupTopics(() -> {
            this.admin1.topics().deletePartitionedTopic(newUniqueName, false);
            this.admin2.topics().deletePartitionedTopic(newUniqueName, false);
        });
    }

    @Test
    public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
        this.admin1.topics().createPartitionedTopic(newUniqueName, 2);
        verifyReplicationWorks(newUniqueName);
        this.admin1.topics().updatePartitionedTopic(newUniqueName, 3);
        Assert.assertEquals(this.admin1.topics().getPartitionedTopicMetadata(newUniqueName).partitions, 3);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topics().getPartitionedTopicMetadata(newUniqueName).partitions, 3);
        });
        cleanupTopics(() -> {
            this.admin1.topics().deletePartitionedTopic(newUniqueName, false);
            this.admin2.topics().deletePartitionedTopic(newUniqueName, false);
        });
    }
}
