package org.apache.pulsar.broker.service;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/DisabledCreateTopicToRemoteClusterForReplicationTest.class */
public class DisabledCreateTopicToRemoteClusterForReplicationTest extends OneWayReplicatorTestBase {
    private static final Logger log = LoggerFactory.getLogger(DisabledCreateTopicToRemoteClusterForReplicationTest.class);

    @Override // org.apache.pulsar.broker.service.OneWayReplicatorTestBase
    @BeforeClass(alwaysRun = true, timeOut = 300000)
    public void setup() throws Exception {
        super.setup();
        this.admin1.namespaces().setRetention("public/default", new RetentionPolicies(300, 1024L));
        this.admin2.namespaces().setRetention("public/default", new RetentionPolicies(300, 1024L));
        this.admin1.namespaces().setRetention("public/ns1", new RetentionPolicies(300, 1024L));
        this.admin2.namespaces().setRetention("public/ns1", new RetentionPolicies(300, 1024L));
    }

    @Override // org.apache.pulsar.broker.service.OneWayReplicatorTestBase
    @AfterClass(alwaysRun = true, timeOut = 300000)
    public void cleanup() throws Exception {
        super.cleanup();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.service.OneWayReplicatorTestBase
    public void setConfigDefaults(ServiceConfiguration serviceConfiguration, String str, LocalBookkeeperEnsemble localBookkeeperEnsemble, ZookeeperServerTest zookeeperServerTest) {
        super.setConfigDefaults(serviceConfiguration, str, localBookkeeperEnsemble, zookeeperServerTest);
        serviceConfiguration.setCreateTopicToRemoteClusterForReplication(false);
        serviceConfiguration.setReplicationStartAt("earliest");
    }

    @Test
    public void testCreatePartitionedTopicWithNsReplication() throws Exception {
        String str = "public/" + UUID.randomUUID().toString().replace("-", "");
        this.admin1.namespaces().createNamespace(str);
        this.admin2.namespaces().createNamespace(str);
        this.admin1.namespaces().setRetention(str, new RetentionPolicies(3600, -1L));
        this.admin2.namespaces().setRetention(str, new RetentionPolicies(3600, -1L));
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://" + str + "/tp_");
        String topicName = TopicName.get(newUniqueName).getPartition(0).toString();
        this.admin1.topics().createPartitionedTopic(newUniqueName, 1);
        this.admin1.namespaces().setNamespaceReplicationClusters(str, new HashSet(Arrays.asList("r1", "r2")));
        Producer create = this.client1.newProducer(Schema.STRING).topic(newUniqueName).create();
        create.send("msg-1");
        create.close();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertFalse(((PersistentTopic) ((Optional) this.broker1.getTopic(topicName, false).join()).get()).getReplicators().isEmpty());
        });
        try {
            this.admin2.topics().getPartitionedTopicMetadata(newUniqueName);
            Assert.fail("Expected a not found ex");
        } catch (PulsarAdminException.NotFoundException e) {
        }
        this.admin2.topics().createPartitionedTopic(newUniqueName, 1);
        Consumer subscribe = this.client2.newConsumer(Schema.STRING).topic(new String[]{newUniqueName}).isAckReceiptEnabled(true).subscriptionName("s1").subscribe();
        Assert.assertEquals((String) subscribe.receive(10, TimeUnit.SECONDS).getValue(), "msg-1");
        subscribe.close();
        this.admin1.namespaces().setNamespaceReplicationClusters(str, new HashSet(Arrays.asList("r1")));
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(((PersistentTopic) ((Optional) this.broker1.getTopic(topicName, false).join()).get()).getReplicators().isEmpty());
        });
        this.admin1.topics().deletePartitionedTopic(newUniqueName, false);
        this.admin2.topics().deletePartitionedTopic(newUniqueName, false);
        this.admin1.namespaces().deleteNamespace(str);
        this.admin2.namespaces().deleteNamespace(str);
    }

    @Test
    public void testEnableTopicReplication() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://" + "public/ns1" + "/tp_");
        String topicName = TopicName.get(newUniqueName).getPartition(0).toString();
        this.admin1.topics().createPartitionedTopic(newUniqueName, 1);
        this.admin1.topics().setReplicationClusters(newUniqueName, Arrays.asList("r1", "r2"));
        Producer create = this.client1.newProducer(Schema.STRING).topic(newUniqueName).create();
        create.send("msg-1");
        create.close();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertFalse(((PersistentTopic) ((Optional) this.broker1.getTopic(topicName, false).join()).get()).getReplicators().isEmpty());
        });
        try {
            this.admin2.topics().getPartitionedTopicMetadata(newUniqueName);
            Assert.fail("Expected a not found ex");
        } catch (PulsarAdminException.NotFoundException e) {
        }
        this.admin2.topics().createPartitionedTopic(newUniqueName, 1);
        waitReplicatorStarted(topicName);
        this.admin1.topics().setReplicationClusters(newUniqueName, Arrays.asList("r1"));
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(((PersistentTopic) ((Optional) this.broker1.getTopic(topicName, false).join()).get()).getReplicators().isEmpty());
        });
        this.admin1.topics().deletePartitionedTopic(newUniqueName, false);
        this.admin2.topics().deletePartitionedTopic(newUniqueName, false);
    }

    @Test
    public void testNonPartitionedTopic() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://" + "public/ns1" + "/tp_");
        this.admin1.topics().createNonPartitionedTopic(newUniqueName);
        this.admin1.topics().setReplicationClusters(newUniqueName, Arrays.asList("r1", "r2"));
        Producer create = this.client1.newProducer(Schema.STRING).topic(newUniqueName).create();
        create.send("msg-1");
        create.close();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertFalse(((PersistentTopic) ((Optional) this.broker1.getTopic(newUniqueName, false).join()).get()).getReplicators().isEmpty());
        });
        try {
            this.admin2.topics().getPartitionedTopicMetadata(newUniqueName);
            Assert.fail("Expected a not found ex");
        } catch (PulsarAdminException.NotFoundException e) {
        }
        this.admin2.topics().createNonPartitionedTopic(newUniqueName);
        waitReplicatorStarted(newUniqueName);
        this.admin1.topics().setReplicationClusters(newUniqueName, Arrays.asList("r1"));
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(((PersistentTopic) ((Optional) this.broker1.getTopic(newUniqueName, false).join()).get()).getReplicators().isEmpty());
        });
        this.admin1.topics().delete(newUniqueName, false);
        this.admin2.topics().delete(newUniqueName, false);
    }
}
