package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.class */
public class TransactionalReplicateSubscriptionTest extends ReplicatorTestBase {
    @Override // org.apache.pulsar.broker.service.ReplicatorTestBase
    @BeforeClass(timeOut = 300000)
    public void setup() throws Exception {
        super.setup();
        this.admin1.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
        createTransactionCoordinatorAssign(16, this.pulsar1);
    }

    @Override // org.apache.pulsar.broker.service.ReplicatorTestBase
    @AfterClass(alwaysRun = true, timeOut = 300000)
    public void cleanup() throws Exception {
        super.cleanup();
    }

    @Override // org.apache.pulsar.broker.service.ReplicatorTestBase
    public void setConfig1DefaultValue() {
        super.setConfig1DefaultValue();
        this.config1.setTransactionCoordinatorEnabled(true);
    }

    protected void createTransactionCoordinatorAssign(int i, PulsarService pulsarService) throws MetadataStoreException {
        pulsarService.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, new PartitionedTopicMetadata(i));
    }

    @Test
    public void testReplicatedSubscribeAndSwitchToStandbyClusterWithTransaction() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("pulsar/ns_");
        String newUniqueName2 = BrokerTestUtil.newUniqueName("persistent://" + newUniqueName + "/tp_");
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        Set synchronizedSet = Collections.synchronizedSet(new LinkedHashSet());
        this.admin1.namespaces().createNamespace(newUniqueName);
        this.admin1.namespaces().setNamespaceReplicationClusters(newUniqueName, Sets.newHashSet(new String[]{"r1", "r2"}));
        this.admin1.topics().createNonPartitionedTopic(newUniqueName2);
        this.admin1.topics().createSubscription(newUniqueName2, "s1", MessageId.earliest, true);
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar1.getBrokerService().getTopic(newUniqueName2, false).join()).get();
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).enableTransaction(true).build();
        Producer create = build.newProducer(Schema.STRING).topic(newUniqueName2).enableBatching(false).create();
        Consumer subscribe = build.newConsumer(Schema.STRING).topic(new String[]{newUniqueName2}).subscriptionName("s1").replicateSubscriptionState(true).subscribe();
        Transaction transaction = (Transaction) build.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        for (int i = 0; i < 10; i++) {
            String str = i;
            create.newMessage(transaction).value(str).send();
            linkedHashSet.add(str);
        }
        transaction.commit().get();
        Awaitility.await().untilAsserted(() -> {
            Map replicators = persistentTopic.getReplicators();
            Assert.assertTrue(replicators != null && replicators.size() == 1, "Replicator should started");
            Assert.assertTrue(((Replicator) replicators.values().iterator().next()).isConnected(), "Replicator should be connected");
            Assert.assertTrue(((ReplicatedSubscriptionsController) persistentTopic.getReplicatedSubscriptionController().get()).getLastCompletedSnapshotId().isPresent(), "One snapshot should be finished");
        });
        PersistentTopic persistentTopic2 = (PersistentTopic) ((Optional) this.pulsar2.getBrokerService().getTopic(newUniqueName2, false).join()).get();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(persistentTopic2.getReplicatedSubscriptionController().isPresent(), "Replicated subscription controller should created");
        });
        Transaction transaction2 = (Transaction) build.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        for (int i2 = 10; i2 < 20; i2++) {
            String str2 = i2;
            create.newMessage(transaction2).value(str2).send();
            linkedHashSet.add(str2);
        }
        transaction2.commit().get();
        for (int i3 = 0; i3 < 10; i3++) {
            Message receive = subscribe.receive(2, TimeUnit.SECONDS);
            if (receive == null) {
                Assert.fail("Should not receive null.");
            }
            synchronizedSet.add((String) receive.getValue());
            subscribe.acknowledge(receive);
        }
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(persistentTopic2.getSubscriptions().get("s1"), "Subscription should created");
        });
        subscribe.close();
        PulsarClient build2 = PulsarClient.builder().serviceUrl(this.url2.toString()).build();
        Consumer subscribe2 = build2.newConsumer(Schema.AUTO_CONSUME()).topic(new String[]{newUniqueName2}).subscriptionName("s1").replicateSubscriptionState(true).subscribe();
        Awaitility.await().untilAsserted(() -> {
            while (true) {
                Message receive2 = subscribe2.receive(2, TimeUnit.SECONDS);
                if (receive2 == null) {
                    Assert.assertEquals(synchronizedSet.size(), linkedHashSet.size());
                    return;
                } else {
                    synchronizedSet.add(receive2.getValue().toString());
                    subscribe2.acknowledge(receive2);
                }
            }
        });
        subscribe2.close();
        create.close();
        build.close();
        build2.close();
    }
}
