package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.class */
public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest$ThrowingConsumer.class */
    public interface ThrowingConsumer<I> {
        void apply(I i) throws Throwable;
    }

    @Override // org.apache.pulsar.broker.service.ReplicatorTestBase
    @BeforeClass(alwaysRun = true, timeOut = 300000)
    public void setup() throws Exception {
        this.config1.setDefaultNumberOfNamespaceBundles(1);
        this.config2.setDefaultNumberOfNamespaceBundles(1);
        this.config3.setDefaultNumberOfNamespaceBundles(1);
        super.setup();
    }

    @Override // org.apache.pulsar.broker.service.ReplicatorTestBase
    @AfterClass(alwaysRun = true, timeOut = 300000)
    public void cleanup() throws Exception {
        super.cleanup();
    }

    @Test
    public void testReplicateQuotaTopicPolicies() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        BacklogQuotaImpl backlogQuotaImpl = new BacklogQuotaImpl();
        backlogQuotaImpl.setLimitSize(1L);
        backlogQuotaImpl.setLimitTime(2);
        this.admin1.topicPolicies().setBacklogQuota(str2, backlogQuotaImpl);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topicPolicies().getBacklogQuotaMap(str2).size(), 0);
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin3.topicPolicies().getBacklogQuotaMap(str2).size(), 0);
        });
        this.admin1.topicPolicies(true).setBacklogQuota(str2, backlogQuotaImpl);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(this.admin2.topicPolicies(true).getBacklogQuotaMap(str2).containsValue(backlogQuotaImpl));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(this.admin3.topicPolicies(true).getBacklogQuotaMap(str2).containsValue(backlogQuotaImpl));
        });
        this.admin1.topicPolicies(true).removeBacklogQuota(str2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topicPolicies(true).getBacklogQuotaMap(str2).size(), 0);
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin3.topicPolicies(true).getBacklogQuotaMap(str2).size(), 0);
        });
    }

    @Test
    public void testReplicateMessageTTLPolicies() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        this.admin1.topicPolicies().setMessageTTL(str2, 10);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies().getMessageTTL(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies().getMessageTTL(str2));
        });
        this.admin1.topicPolicies(true).setMessageTTL(str2, 10);
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topicPolicies(true).getMessageTTL(str2).intValue(), 10);
        });
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(this.admin3.topicPolicies(true).getMessageTTL(str2).intValue(), 10);
        });
        this.admin1.topicPolicies(true).removeMessageTTL(str2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies(true).getMessageTTL(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies(true).getMessageTTL(str2));
        });
    }

    @Test
    public void testReplicateSubscribeRatePolicies() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        SubscribeRate subscribeRate = new SubscribeRate(100, 10000);
        this.admin1.topicPolicies().setSubscribeRate(str2, subscribeRate);
        untilRemoteClustersAsserted(pulsarAdmin -> {
            Assert.assertNull(pulsarAdmin.topicPolicies().getSubscribeRate(str2));
        });
        this.admin1.topicPolicies(true).setSubscribeRate(str2, subscribeRate);
        untilRemoteClustersAsserted(pulsarAdmin2 -> {
            Assert.assertEquals(pulsarAdmin2.topicPolicies(true).getSubscribeRate(str2), subscribeRate);
        });
        this.admin1.topicPolicies(true).removeSubscribeRate(str2);
        untilRemoteClustersAsserted(pulsarAdmin3 -> {
            Assert.assertNull(pulsarAdmin3.topicPolicies(true).getSubscribeRate(str2));
        });
    }

    @Test
    public void testReplicateMaxMessageSizePolicies() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        this.admin1.topicPolicies().setMaxMessageSize(str2, 1000);
        untilRemoteClustersAsserted(pulsarAdmin -> {
            Assert.assertNull(pulsarAdmin.topicPolicies().getMaxMessageSize(str2));
        });
        this.admin1.topicPolicies(true).setMaxMessageSize(str2, 1000);
        untilRemoteClustersAsserted(pulsarAdmin2 -> {
            Assert.assertEquals(pulsarAdmin2.topicPolicies(true).getMaxMessageSize(str2), 1000);
        });
        this.admin1.topicPolicies(true).removeMaxMessageSize(str2);
        untilRemoteClustersAsserted(pulsarAdmin3 -> {
            Assert.assertNull(pulsarAdmin3.topicPolicies(true).getMaxMessageSize(str2));
        });
    }

    @Test
    public void testReplicatePublishRatePolicies() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        PublishRate publishRate = new PublishRate(100, 10000L);
        this.admin1.topicPolicies().setPublishRate(str2, publishRate);
        untilRemoteClustersAsserted(pulsarAdmin -> {
            Assert.assertNull(pulsarAdmin.topicPolicies().getPublishRate(str2));
        });
        this.admin1.topicPolicies(true).setPublishRate(str2, publishRate);
        untilRemoteClustersAsserted(pulsarAdmin2 -> {
            Assert.assertEquals(pulsarAdmin2.topicPolicies(true).getPublishRate(str2), publishRate);
        });
        this.admin1.topicPolicies(true).removePublishRate(str2);
        untilRemoteClustersAsserted(pulsarAdmin3 -> {
            Assert.assertNull(pulsarAdmin3.topicPolicies(true).getPublishRate(str2));
        });
    }

    @Test
    public void testReplicateDeduplicationSnapshotIntervalPolicies() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        this.admin1.topicPolicies().setDeduplicationSnapshotInterval(str2, 100);
        untilRemoteClustersAsserted(pulsarAdmin -> {
            Assert.assertNull(pulsarAdmin.topicPolicies().getDeduplicationSnapshotInterval(str2));
        });
        this.admin1.topicPolicies(true).setDeduplicationSnapshotInterval(str2, 100);
        untilRemoteClustersAsserted(pulsarAdmin2 -> {
            Assert.assertEquals(pulsarAdmin2.topicPolicies(true).getDeduplicationSnapshotInterval(str2), 100);
        });
        this.admin1.topicPolicies(true).removeDeduplicationSnapshotInterval(str2);
        untilRemoteClustersAsserted(pulsarAdmin3 -> {
            Assert.assertNull(pulsarAdmin3.topicPolicies(true).getDeduplicationSnapshotInterval(str2));
        });
    }

    private void untilRemoteClustersAsserted(ThrowingConsumer<PulsarAdmin> throwingConsumer) {
        Awaitility.await().untilAsserted(() -> {
            throwingConsumer.apply(this.admin2);
        });
        Awaitility.await().untilAsserted(() -> {
            throwingConsumer.apply(this.admin3);
        });
    }

    @Test
    public void testReplicatePersistentPolicies() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        PersistencePolicies persistencePolicies = new PersistencePolicies(5, 3, 2, 1000.0d);
        this.admin1.topicPolicies().setPersistence(str2, persistencePolicies);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies().getPersistence(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies().getPersistence(str2));
        });
        this.admin1.topicPolicies(true).setPersistence(str2, persistencePolicies);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topicPolicies(true).getPersistence(str2), persistencePolicies);
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin3.topicPolicies(true).getPersistence(str2), persistencePolicies);
        });
        this.admin1.topicPolicies(true).removePersistence(str2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies(true).getPersistence(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies(true).getPersistence(str2));
        });
    }

    @Test
    public void testReplicateDeduplicationStatusPolicies() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        this.admin1.topicPolicies().setDeduplicationStatus(str2, true);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies().getDeduplicationStatus(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies().getDeduplicationStatus(str2));
        });
        this.admin1.topicPolicies(true).setDeduplicationStatus(str2, true);
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertTrue(this.admin2.topicPolicies(true).getDeduplicationStatus(str2).booleanValue());
        });
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertTrue(this.admin3.topicPolicies(true).getDeduplicationStatus(str2).booleanValue());
        });
        this.admin1.topicPolicies(true).removeDeduplicationStatus(str2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies(true).getDeduplicationStatus(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies(true).getDeduplicationStatus(str2));
        });
    }

    @Test
    public void testReplicatorMaxProducer() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        this.admin1.topicPolicies().setMaxProducers(str2, 100);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies().getMaxProducers(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies().getMaxProducers(str2));
        });
        this.admin1.topicPolicies(true).setMaxProducers(str2, 100);
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topicPolicies(true).getMaxProducers(str2).intValue(), 100);
        });
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(this.admin3.topicPolicies(true).getMaxProducers(str2).intValue(), 100);
        });
        this.admin1.topicPolicies(true).removeMaxProducers(str2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies(true).getMaxProducers(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies(true).getMaxProducers(str2));
        });
    }

    @Test
    public void testReplicatorMaxConsumerPerSubPolicies() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        this.admin1.topicPolicies().setMaxConsumersPerSubscription(str2, 100);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies().getMaxConsumersPerSubscription(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies().getMaxConsumersPerSubscription(str2));
        });
        this.admin1.topicPolicies(true).setMaxConsumersPerSubscription(str2, 100);
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topicPolicies(true).getMaxConsumersPerSubscription(str2).intValue(), 100);
        });
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(this.admin3.topicPolicies(true).getMaxConsumersPerSubscription(str2).intValue(), 100);
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin1.topicPolicies(true).getMaxConsumersPerSubscription(str2).intValue(), 100);
        });
        this.admin1.topicPolicies(true).removeMaxConsumersPerSubscription(str2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies(true).getMaxConsumersPerSubscription(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies(true).getMaxConsumersPerSubscription(str2));
        });
    }

    @Test
    public void testReplicateMaxUnackedMsgPerConsumer() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        this.admin1.topicPolicies().setMaxUnackedMessagesOnConsumer(str2, 100);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies().getMaxUnackedMessagesOnConsumer(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies().getMaxUnackedMessagesOnConsumer(str2));
        });
        this.admin1.topicPolicies(true).setMaxUnackedMessagesOnConsumer(str2, 100);
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topicPolicies(true).getMaxUnackedMessagesOnConsumer(str2).intValue(), 100);
        });
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(this.admin3.topicPolicies(true).getMaxUnackedMessagesOnConsumer(str2).intValue(), 100);
        });
        this.admin1.topicPolicies(true).removeMaxUnackedMessagesOnConsumer(str2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies(true).getMaxUnackedMessagesOnConsumer(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies(true).getMaxUnackedMessagesOnConsumer(str2));
        });
    }

    @Test
    public void testReplicatorTopicPolicies() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1L);
        this.admin1.topicPolicies().setRetention(str2, retentionPolicies);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies().getRetention(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies().getRetention(str2));
        });
        this.admin1.topicPolicies(true).setRetention(str2, retentionPolicies);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topicPolicies(true).getRetention(str2), retentionPolicies);
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin3.topicPolicies(true).getRetention(str2), retentionPolicies);
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin1.topicPolicies(true).getRetention(str2), retentionPolicies);
        });
        this.admin1.topicPolicies(true).removeRetention(str2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies(true).getRetention(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies(true).getRetention(str2));
        });
    }

    @Test
    public void testReplicateSubscriptionTypesPolicies() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        HashSet hashSet = new HashSet();
        hashSet.add(SubscriptionType.Shared);
        this.admin1.topicPolicies().setSubscriptionTypesEnabled(str2, hashSet);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies().getSubscriptionTypesEnabled(str2), (String) null);
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies().getSubscriptionTypesEnabled(str2), (String) null);
        });
        this.admin1.topicPolicies(true).setSubscriptionTypesEnabled(str2, hashSet);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topicPolicies(true).getSubscriptionTypesEnabled(str2), hashSet);
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin3.topicPolicies(true).getSubscriptionTypesEnabled(str2), hashSet);
        });
        this.admin1.topicPolicies(true).removeSubscriptionTypesEnabled(str2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topicPolicies(true).getSubscriptionTypesEnabled(str2), Collections.emptySet());
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin3.topicPolicies(true).getSubscriptionTypesEnabled(str2), Collections.emptySet());
        });
    }

    @Test
    public void testReplicateMaxConsumers() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        this.admin1.topicPolicies().setMaxConsumers(str2, 100);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies().getMaxConsumers(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies().getMaxConsumers(str2));
        });
        this.admin1.topicPolicies(true).setMaxConsumers(str2, 100);
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topicPolicies(true).getMaxConsumers(str2).intValue(), 100);
        });
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(this.admin3.topicPolicies(true).getMaxConsumers(str2).intValue(), 100);
        });
        this.admin1.topicPolicies(true).removeMaxConsumers(str2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies(true).getMaxConsumers(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies(true).getMaxConsumers(str2));
        });
    }

    @Test
    public void testReplicatorMessageDispatchRatePolicies() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(1).dispatchThrottlingRateInMsg(2).ratePeriodInSecond(3).relativeToPublishRate(true).build();
        this.admin1.topicPolicies().setDispatchRate(str2, build);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies().getDispatchRate(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies().getDispatchRate(str2));
        });
        this.admin1.topicPolicies(true).setDispatchRate(str2, build);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topicPolicies(true).getDispatchRate(str2), build);
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin3.topicPolicies(true).getDispatchRate(str2), build);
        });
        this.admin1.topicPolicies(true).removeDispatchRate(str2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies(true).getDispatchRate(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies(true).getDispatchRate(str2));
        });
    }

    @Test
    public void testReplicateDelayedDelivery() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        DelayedDeliveryPolicies build = DelayedDeliveryPolicies.builder().active(true).tickTime(10000L).build();
        this.admin1.topicPolicies().setDelayedDeliveryPolicy(str2, build);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies().getDelayedDeliveryPolicy(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies().getDelayedDeliveryPolicy(str2));
        });
        this.admin1.topicPolicies(true).setDelayedDeliveryPolicy(str2, build);
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topicPolicies(true).getDelayedDeliveryPolicy(str2), build);
        });
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(this.admin3.topicPolicies(true).getDelayedDeliveryPolicy(str2), build);
        });
        this.admin1.topicPolicies(true).removeDelayedDeliveryPolicy(str2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies(true).getDelayedDeliveryPolicy(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies(true).getDelayedDeliveryPolicy(str2));
        });
    }

    @Test
    public void testReplicatorInactiveTopicPolicies() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true);
        this.admin1.topicPolicies().setInactiveTopicPolicies(str2, inactiveTopicPolicies);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies().getInactiveTopicPolicies(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies().getInactiveTopicPolicies(str2));
        });
        this.admin1.topicPolicies(true).setInactiveTopicPolicies(str2, inactiveTopicPolicies);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topicPolicies(true).getInactiveTopicPolicies(str2), inactiveTopicPolicies);
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin3.topicPolicies(true).getInactiveTopicPolicies(str2), inactiveTopicPolicies);
        });
        this.admin1.topicPolicies(true).removeInactiveTopicPolicies(str2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies(true).getInactiveTopicPolicies(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies(true).getInactiveTopicPolicies(str2));
        });
    }

    @Test
    public void testReplicatorSubscriptionDispatchRatePolicies() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(1).ratePeriodInSecond(1).dispatchThrottlingRateInByte(1L).relativeToPublishRate(true).build();
        this.admin1.topicPolicies().setSubscriptionDispatchRate(str2, build);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies().getSubscriptionDispatchRate(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies().getSubscriptionDispatchRate(str2));
        });
        this.admin1.topicPolicies(true).setSubscriptionDispatchRate(str2, build);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topicPolicies(true).getSubscriptionDispatchRate(str2), build);
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin3.topicPolicies(true).getSubscriptionDispatchRate(str2), build);
        });
        this.admin1.topicPolicies(true).removeSubscriptionDispatchRate(str2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies(true).getSubscriptionDispatchRate(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies(true).getSubscriptionDispatchRate(str2));
        });
    }

    @Test
    public void testReplicateReplicatorDispatchRatePolicies() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        DispatchRate build = DispatchRate.builder().dispatchThrottlingRateInMsg(1).ratePeriodInSecond(1).dispatchThrottlingRateInByte(1L).relativeToPublishRate(true).build();
        this.admin1.topicPolicies().setReplicatorDispatchRate(str2, build);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies().getReplicatorDispatchRate(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies().getReplicatorDispatchRate(str2));
        });
        this.admin1.topicPolicies(true).setReplicatorDispatchRate(str2, build);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topicPolicies(true).getReplicatorDispatchRate(str2), build);
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin3.topicPolicies(true).getReplicatorDispatchRate(str2), build);
        });
        this.admin1.topicPolicies(true).removeReplicatorDispatchRate(str2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies(true).getReplicatorDispatchRate(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies(true).getReplicatorDispatchRate(str2));
        });
    }

    @Test
    public void testReplicateMaxUnackedMsgPerSub() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        this.admin1.topicPolicies().setMaxUnackedMessagesOnSubscription(str2, 100);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies().getMaxUnackedMessagesOnSubscription(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies().getMaxUnackedMessagesOnSubscription(str2));
        });
        this.admin1.topicPolicies(true).setMaxUnackedMessagesOnSubscription(str2, 100);
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topicPolicies(true).getMaxUnackedMessagesOnSubscription(str2).intValue(), 100);
        });
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(this.admin3.topicPolicies(true).getMaxUnackedMessagesOnSubscription(str2).intValue(), 100);
        });
        this.admin1.topicPolicies(true).removeMaxUnackedMessagesOnSubscription(str2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies(true).getMaxUnackedMessagesOnSubscription(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies(true).getMaxUnackedMessagesOnSubscription(str2));
        });
    }

    @Test
    public void testReplicatorCompactionThresholdPolicies() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        this.admin1.topicPolicies().setCompactionThreshold(str2, 1L);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies().getCompactionThreshold(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies().getCompactionThreshold(str2));
        });
        this.admin1.topicPolicies(true).setCompactionThreshold(str2, 1L);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topicPolicies(true).getCompactionThreshold(str2), 1L);
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin3.topicPolicies(true).getCompactionThreshold(str2), 1L);
        });
        this.admin1.topicPolicies(true).removeCompactionThreshold(str2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies(true).getCompactionThreshold(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies(true).getCompactionThreshold(str2));
        });
    }

    @Test
    public void testReplicateMaxSubscriptionsPerTopic() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        this.admin1.topicPolicies().setMaxSubscriptionsPerTopic(str2, 1024);
        untilRemoteClustersAsserted(pulsarAdmin -> {
            Assert.assertNull(pulsarAdmin.topicPolicies().getMaxSubscriptionsPerTopic(str2));
        });
        this.admin1.topicPolicies(true).setMaxSubscriptionsPerTopic(str2, 1024);
        untilRemoteClustersAsserted(pulsarAdmin2 -> {
            Assert.assertEquals(pulsarAdmin2.topicPolicies(true).getMaxSubscriptionsPerTopic(str2), 1024);
        });
        this.admin1.topicPolicies(true).removeMaxSubscriptionsPerTopic(str2);
        untilRemoteClustersAsserted(pulsarAdmin3 -> {
            Assert.assertNull(pulsarAdmin3.topicPolicies(true).getMaxSubscriptionsPerTopic(str2));
        });
    }

    @Test
    public void testReplicatorOffloadPolicies() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        OffloadPoliciesImpl create = OffloadPoliciesImpl.create("s3", "region", "bucket", "endpoint", (String) null, (String) null, (String) null, (String) null, 8, 9, 10L, 10L, (Long) null, OffloadedReadPriority.BOOKKEEPER_FIRST);
        try {
            this.admin1.topicPolicies().setOffloadPolicies(str2, create);
        } catch (Exception e) {
            Assert.assertTrue(e instanceof PulsarAdminException.ServerSideErrorException);
        }
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies().getOffloadPolicies(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies().getOffloadPolicies(str2));
        });
        try {
            this.admin1.topicPolicies(true).setOffloadPolicies(str2, create);
        } catch (Exception e2) {
            Assert.assertTrue(e2 instanceof PulsarAdminException.ServerSideErrorException);
        }
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topicPolicies(true).getOffloadPolicies(str2), create);
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin3.topicPolicies(true).getOffloadPolicies(str2), create);
        });
        this.admin1.topicPolicies(true).removeOffloadPolicies(str2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies(true).getOffloadPolicies(str2));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies(true).getOffloadPolicies(str2));
        });
    }

    @Test
    public void testRemoveReplicationClusters() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        this.admin1.topics().setReplicationClusters(str2, Arrays.asList("r1", "r2"));
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar1.getBrokerService().getTopicReference(str2 + "-partition-0").get();
        Assert.assertNotNull(persistentTopic);
        Awaitility.await().untilAsserted(() -> {
            List list = persistentTopic.getReplicators().keySet().stream().sorted().toList();
            Assert.assertEquals(list.size(), 1);
            Assert.assertEquals(list.toString(), "[r2]");
        });
        this.admin1.topics().removeReplicationClusters(str2);
        Awaitility.await().untilAsserted(() -> {
            List list = persistentTopic.getReplicators().keySet().stream().sorted().toList();
            Assert.assertEquals(list.size(), 2);
            Assert.assertEquals(list.toString(), "[r2, r3]");
        });
    }

    @Test
    public void testReplicateAutoSubscriptionCreation() throws Exception {
        String str = "pulsar/partitionedNs-" + UUID.randomUUID();
        String str2 = "persistent://" + str + "/topic" + UUID.randomUUID();
        init(str, str2);
        AutoSubscriptionCreationOverride build = AutoSubscriptionCreationOverride.builder().allowAutoSubscriptionCreation(true).build();
        this.admin1.topicPolicies().setAutoSubscriptionCreation(str2, build);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies().getAutoSubscriptionCreation(str2, false));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies().getAutoSubscriptionCreation(str2, false));
        });
        this.admin1.topicPolicies(true).setAutoSubscriptionCreation(str2, build);
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(this.admin2.topicPolicies(true).getAutoSubscriptionCreation(str2, false).isAllowAutoSubscriptionCreation(), true);
        });
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(this.admin3.topicPolicies(true).getAutoSubscriptionCreation(str2, false).isAllowAutoSubscriptionCreation(), true);
        });
        this.admin1.topicPolicies(true).removeAutoSubscriptionCreation(str2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin2.topicPolicies(true).getAutoSubscriptionCreation(str2, false));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin3.topicPolicies(true).getAutoSubscriptionCreation(str2, false));
        });
    }

    private void init(String str, String str2) throws Exception {
        String clusterName = this.pulsar2.getConfig().getClusterName();
        this.admin1.namespaces().createNamespace(str, Sets.newHashSet(new String[]{this.pulsar1.getConfig().getClusterName(), clusterName, this.pulsar3.getConfig().getClusterName()}));
        this.admin1.namespaces().setNamespaceReplicationClusters(str, Sets.newHashSet(new String[]{"r1", "r2", "r3"}));
        this.admin1.topics().createPartitionedTopic(str2, 3);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin2.topics().getPartitionedTopicList(str));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals((String) this.admin2.topics().getPartitionedTopicList(str).get(0), str2);
        });
        Assert.assertEquals(this.admin1.topics().getList(str).size(), 3);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin3.topics().getPartitionedTopicList(str));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals((String) this.admin3.topics().getPartitionedTopicList(str).get(0), str2);
        });
        this.pulsar1.getClient().newProducer().topic(str2).create().close();
        this.pulsar2.getClient().newProducer().topic(str2).create().close();
        this.pulsar3.getClient().newProducer().topic(str2).create().close();
        TopicPolicyTestUtils.getTopicPolicies(this.pulsar1.getTopicPoliciesService(), TopicName.get(str2));
        TopicPolicyTestUtils.getTopicPolicies(this.pulsar2.getTopicPoliciesService(), TopicName.get(str2));
        TopicPolicyTestUtils.getTopicPolicies(this.pulsar3.getTopicPoliciesService(), TopicName.get(str2));
    }
}
