/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.ReplicatorTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class ReplicatorSubscriptionTest
extends ReplicatorTestBase {
    private static final Logger log = LoggerFactory.getLogger(ReplicatorSubscriptionTest.class);

    @Override
    @BeforeClass(timeOut=300000L)
    public void setup() throws Exception {
        super.setup();
    }

    @Override
    @AfterClass(alwaysRun=true, timeOut=300000L)
    public void cleanup() throws Exception {
        super.cleanup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception {
        String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
        String topicName = "persistent://" + namespace + "/mytopic";
        String subscriptionName = "cluster-subscription";
        boolean allowDuplicates = true;
        boolean replicateSubscriptionState = true;
        this.admin1.namespaces().createNamespace(namespace);
        this.admin1.namespaces().setNamespaceReplicationClusters(namespace, (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2"}));
        PulsarClient client1 = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            this.createReplicatedSubscription(client1, topicName, subscriptionName, replicateSubscriptionState);
            PulsarClient client2 = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                this.createReplicatedSubscription(client2, topicName, subscriptionName, replicateSubscriptionState);
                LinkedHashSet<String> sentMessages = new LinkedHashSet<String>();
                Producer producer = client1.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                try {
                    int numMessages = 6;
                    for (int i = 0; i < numMessages; ++i) {
                        String body = "message" + i;
                        producer.send((Object)body.getBytes(StandardCharsets.UTF_8));
                        sentMessages.add(body);
                    }
                    producer.close();
                }
                finally {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                }
                LinkedHashSet<String> receivedMessages = new LinkedHashSet<String>();
                try (Consumer consumer1 = client1.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).replicateSubscriptionState(replicateSubscriptionState).subscribe();){
                    this.readMessages((Consumer<byte[]>)consumer1, receivedMessages, 3, allowDuplicates);
                }
                Thread.sleep(2 * this.config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
                var11_13 = null;
                try (Consumer consumer2 = client2.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).replicateSubscriptionState(replicateSubscriptionState).subscribe();){
                    this.readMessages((Consumer<byte[]>)consumer2, receivedMessages, -1, allowDuplicates);
                }
                catch (Throwable throwable) {
                    var11_13 = throwable;
                    throw throwable;
                }
                Assert.assertEquals(new ArrayList(sentMessages), new ArrayList<String>(receivedMessages), (String)"Sent and received messages don't match.");
            }
            finally {
                if (Collections.singletonList(client2).get(0) != null) {
                    client2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client1).get(0) != null) {
                client1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReplicationSnapshotStopWhenNoTraffic() throws Exception {
        String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
        String topicName = "persistent://" + namespace + "/mytopic";
        String subscriptionName = "cluster-subscription";
        this.admin1.namespaces().createNamespace(namespace);
        this.admin1.namespaces().setNamespaceReplicationClusters(namespace, (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2"}));
        PulsarClient client1 = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            this.createReplicatedSubscription(client1, topicName, subscriptionName, true);
            PulsarClient client2 = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                LinkedHashSet sentMessages = new LinkedHashSet();
                Producer producer = client1.newProducer(Schema.STRING).topic(topicName).create();
                try {
                    for (int i = 0; i < 10; ++i) {
                        producer.send((Object)("hello-" + i));
                    }
                }
                finally {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                }
                Thread.sleep(2 * this.config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
                PersistentTopic t1 = (PersistentTopic)((Optional)this.pulsar1.getBrokerService().getTopic(topicName, false).get()).get();
                ReplicatedSubscriptionsController rsc1 = (ReplicatedSubscriptionsController)t1.getReplicatedSubscriptionController().get();
                Position p1 = t1.getLastPosition();
                String snapshot1 = (String)rsc1.getLastCompletedSnapshotId().get();
                PersistentTopic t2 = (PersistentTopic)((Optional)this.pulsar1.getBrokerService().getTopic(topicName, false).get()).get();
                ReplicatedSubscriptionsController rsc2 = (ReplicatedSubscriptionsController)t2.getReplicatedSubscriptionController().get();
                Position p2 = t2.getLastPosition();
                String snapshot2 = (String)rsc2.getLastCompletedSnapshotId().get();
                Thread.sleep(2 * this.config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
                Assert.assertEquals((Object)t1.getLastPosition(), (Object)p1);
                Assert.assertEquals((String)((String)rsc1.getLastCompletedSnapshotId().get()), (String)snapshot1);
                Assert.assertEquals((Object)t2.getLastPosition(), (Object)p2);
                Assert.assertEquals((String)((String)rsc2.getLastCompletedSnapshotId().get()), (String)snapshot2);
                Producer producer2 = client2.newProducer(Schema.STRING).topic(topicName).create();
                try {
                    for (int i = 0; i < 10; ++i) {
                        producer2.send((Object)("hello-" + i));
                    }
                    Thread.sleep(2 * this.config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
                    Assert.assertNotEquals((Object)t1.getLastPosition(), (Object)p1);
                    Assert.assertNotEquals(rsc1.getLastCompletedSnapshotId().get(), (Object)snapshot1);
                    Assert.assertNotEquals((Object)t2.getLastPosition(), (Object)p2);
                    Assert.assertNotEquals(rsc2.getLastCompletedSnapshotId().get(), (Object)snapshot2);
                }
                finally {
                    if (Collections.singletonList(producer2).get(0) != null) {
                        producer2.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(client2).get(0) != null) {
                    client2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client1).get(0) != null) {
                client1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testReplicatedSubscriptionRestApi1() throws Exception {
        String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
        String topicName = "persistent://" + namespace + "/topic-rest-api1";
        String subName = "sub";
        boolean allowDuplicates = true;
        this.admin1.namespaces().createNamespace(namespace);
        this.admin1.namespaces().setNamespaceReplicationClusters(namespace, (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2"}));
        PulsarClient client1 = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            this.createReplicatedSubscription(client1, topicName, "sub", true);
            PulsarClient client2 = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                this.createReplicatedSubscription(client2, topicName, "sub", true);
                TopicStats stats = this.admin1.topics().getStats(topicName);
                Assert.assertTrue((boolean)((SubscriptionStats)stats.getSubscriptions().get("sub")).isReplicated());
                this.admin1.topics().setReplicatedSubscriptionStatus(topicName, "sub", false);
                stats = this.admin1.topics().getStats(topicName);
                Assert.assertFalse((boolean)((SubscriptionStats)stats.getSubscriptions().get("sub")).isReplicated());
                stats = this.admin2.topics().getStats(topicName);
                Assert.assertTrue((boolean)((SubscriptionStats)stats.getSubscriptions().get("sub")).isReplicated());
                this.admin2.topics().setReplicatedSubscriptionStatus(topicName, "sub", false);
                stats = this.admin2.topics().getStats(topicName);
                Assert.assertFalse((boolean)((SubscriptionStats)stats.getSubscriptions().get("sub")).isReplicated());
                this.admin1.topics().unload(topicName);
                Thread.sleep(1000L);
                stats = this.admin1.topics().getStats(topicName);
                Assert.assertFalse((boolean)((SubscriptionStats)stats.getSubscriptions().get("sub")).isReplicated());
                int numMessages = 20;
                LinkedHashSet<String> sentMessages = new LinkedHashSet<String>();
                LinkedHashSet<String> receivedMessages = new LinkedHashSet<String>();
                Producer producer = client1.newProducer().topic(topicName).enableBatching(false).create();
                sentMessages.clear();
                this.publishMessages((Producer<byte[]>)producer, 0, 20, sentMessages);
                producer.close();
                Consumer consumer1 = client1.newConsumer().topic(new String[]{topicName}).subscriptionName("sub").subscribe();
                receivedMessages.clear();
                this.readMessages((Consumer<byte[]>)consumer1, receivedMessages, 20, false);
                Assert.assertEquals(receivedMessages, sentMessages);
                consumer1.close();
                Consumer consumer2 = client2.newConsumer().topic(new String[]{topicName}).subscriptionName("sub").subscribe();
                receivedMessages.clear();
                this.readMessages((Consumer<byte[]>)consumer2, receivedMessages, 20, false);
                Assert.assertEquals(receivedMessages, sentMessages);
                consumer2.close();
                this.admin1.topics().setReplicatedSubscriptionStatus(topicName, "sub", true);
                stats = this.admin1.topics().getStats(topicName);
                Assert.assertTrue((boolean)((SubscriptionStats)stats.getSubscriptions().get("sub")).isReplicated());
                stats = this.admin2.topics().getStats(topicName);
                Assert.assertFalse((boolean)((SubscriptionStats)stats.getSubscriptions().get("sub")).isReplicated());
                this.admin2.topics().setReplicatedSubscriptionStatus(topicName, "sub", true);
                stats = this.admin2.topics().getStats(topicName);
                Assert.assertTrue((boolean)((SubscriptionStats)stats.getSubscriptions().get("sub")).isReplicated());
                sentMessages.clear();
                receivedMessages.clear();
                producer = client1.newProducer().topic(topicName).enableBatching(false).create();
                this.publishMessages((Producer<byte[]>)producer, 0, 10, sentMessages);
                producer.close();
                Thread.sleep(2 * this.config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
                consumer1 = client1.newConsumer().topic(new String[]{topicName}).subscriptionName("sub").subscribe();
                int numReceivedMessages1 = this.readMessages((Consumer<byte[]>)consumer1, receivedMessages, 10, true);
                consumer1.close();
                producer = client1.newProducer().topic(topicName).enableBatching(false).create();
                this.publishMessages((Producer<byte[]>)producer, 10, 10, sentMessages);
                producer.close();
                Thread.sleep(2 * this.config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
                consumer2 = client2.newConsumer().topic(new String[]{topicName}).subscriptionName("sub").subscribe();
                int numReceivedMessages2 = this.readMessages((Consumer<byte[]>)consumer2, receivedMessages, -1, true);
                consumer2.close();
                Assert.assertEquals(receivedMessages, sentMessages);
                Assert.assertTrue((numReceivedMessages1 < 20 ? 1 : 0) != 0, (String)String.format("numReceivedMessages1 (%d) should be less than %d", numReceivedMessages1, 20));
                Assert.assertTrue((numReceivedMessages2 < 20 ? 1 : 0) != 0, (String)String.format("numReceivedMessages2 (%d) should be less than %d", numReceivedMessages2, 20));
            }
            finally {
                if (Collections.singletonList(client2).get(0) != null) {
                    client2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client1).get(0) != null) {
                client1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testReplicatedSubscriptionRestApi2() throws Exception {
        String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
        String topicName = "persistent://" + namespace + "/topic-rest-api2";
        String subName = "sub";
        boolean allowDuplicates = true;
        this.admin1.namespaces().createNamespace(namespace);
        this.admin1.namespaces().setNamespaceReplicationClusters(namespace, (Set)Sets.newHashSet((Object[])new String[]{"r1", "r2"}));
        this.admin1.topics().createPartitionedTopic(topicName, 2);
        PulsarClient client1 = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            this.createReplicatedSubscription(client1, topicName, "sub", true);
            PulsarClient client2 = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                this.createReplicatedSubscription(client2, topicName, "sub", true);
                PartitionedTopicStats partitionedStats = this.admin1.topics().getPartitionedStats(topicName, true);
                for (TopicStats stats : partitionedStats.getPartitions().values()) {
                    Assert.assertTrue((boolean)((SubscriptionStats)stats.getSubscriptions().get("sub")).isReplicated());
                }
                this.admin1.topics().setReplicatedSubscriptionStatus(topicName, "sub", false);
                partitionedStats = this.admin1.topics().getPartitionedStats(topicName, true);
                for (TopicStats stats : partitionedStats.getPartitions().values()) {
                    Assert.assertFalse((boolean)((SubscriptionStats)stats.getSubscriptions().get("sub")).isReplicated());
                }
                this.admin2.topics().setReplicatedSubscriptionStatus(topicName, "sub", false);
                partitionedStats = this.admin2.topics().getPartitionedStats(topicName, true);
                for (TopicStats stats : partitionedStats.getPartitions().values()) {
                    Assert.assertFalse((boolean)((SubscriptionStats)stats.getSubscriptions().get("sub")).isReplicated());
                }
                int numMessages = 20;
                LinkedHashSet<String> sentMessages = new LinkedHashSet<String>();
                LinkedHashSet<String> receivedMessages = new LinkedHashSet<String>();
                Producer producer = client1.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                sentMessages.clear();
                this.publishMessages((Producer<byte[]>)producer, 0, 20, sentMessages);
                producer.close();
                Consumer consumer1 = client1.newConsumer().topic(new String[]{topicName}).subscriptionName("sub").subscribe();
                receivedMessages.clear();
                this.readMessages((Consumer<byte[]>)consumer1, receivedMessages, 20, false);
                Assert.assertEquals(receivedMessages, sentMessages);
                consumer1.close();
                Consumer consumer2 = client2.newConsumer().topic(new String[]{topicName}).subscriptionName("sub").subscribe();
                receivedMessages.clear();
                this.readMessages((Consumer<byte[]>)consumer2, receivedMessages, 20, false);
                Assert.assertEquals(receivedMessages, sentMessages);
                consumer2.close();
                this.admin1.topics().setReplicatedSubscriptionStatus(topicName, "sub", true);
                partitionedStats = this.admin1.topics().getPartitionedStats(topicName, true);
                for (TopicStats stats : partitionedStats.getPartitions().values()) {
                    Assert.assertTrue((boolean)((SubscriptionStats)stats.getSubscriptions().get("sub")).isReplicated());
                }
                this.admin2.topics().setReplicatedSubscriptionStatus(topicName, "sub", true);
                partitionedStats = this.admin2.topics().getPartitionedStats(topicName, true);
                for (TopicStats stats : partitionedStats.getPartitions().values()) {
                    Assert.assertTrue((boolean)((SubscriptionStats)stats.getSubscriptions().get("sub")).isReplicated());
                }
                sentMessages.clear();
                receivedMessages.clear();
                producer = client1.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                this.publishMessages((Producer<byte[]>)producer, 0, 10, sentMessages);
                producer.close();
                Thread.sleep(2 * this.config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
                consumer1 = client1.newConsumer().topic(new String[]{topicName}).subscriptionName("sub").subscribe();
                int numReceivedMessages1 = this.readMessages((Consumer<byte[]>)consumer1, receivedMessages, 10, true);
                consumer1.close();
                producer = client1.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
                this.publishMessages((Producer<byte[]>)producer, 10, 10, sentMessages);
                producer.close();
                Thread.sleep(2 * this.config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
                consumer2 = client2.newConsumer().topic(new String[]{topicName}).subscriptionName("sub").subscribe();
                int numReceivedMessages2 = this.readMessages((Consumer<byte[]>)consumer2, receivedMessages, -1, true);
                consumer2.close();
                Assert.assertEquals(receivedMessages, sentMessages);
                Assert.assertTrue((numReceivedMessages1 < 20 ? 1 : 0) != 0, (String)String.format("numReceivedMessages1 (%d) should be less than %d", numReceivedMessages1, 20));
                Assert.assertTrue((numReceivedMessages2 < 20 ? 1 : 0) != 0, (String)String.format("numReceivedMessages2 (%d) should be less than %d", numReceivedMessages2, 20));
            }
            finally {
                if (Collections.singletonList(client2).get(0) != null) {
                    client2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client1).get(0) != null) {
                client1.close();
            }
        }
    }

    void publishMessages(Producer<byte[]> producer, int startIndex, int numMessages, Set<String> sentMessages) throws PulsarClientException {
        for (int i = startIndex; i < startIndex + numMessages; ++i) {
            String msg = "msg" + i;
            producer.send((Object)msg.getBytes(StandardCharsets.UTF_8));
            sentMessages.add(msg);
        }
    }

    int readMessages(Consumer<byte[]> consumer, Set<String> messages, int maxMessages, boolean allowDuplicates) throws PulsarClientException {
        Message message;
        int count = 0;
        while ((count < maxMessages || maxMessages == -1) && (message = consumer.receive(2, TimeUnit.SECONDS)) != null) {
            ++count;
            String body = new String((byte[])message.getValue(), StandardCharsets.UTF_8);
            if (!allowDuplicates) {
                Assert.assertFalse((boolean)messages.contains(body), (String)("Duplicate message '" + body + "' detected."));
            }
            messages.add(body);
            consumer.acknowledge(message);
        }
        return count;
    }

    void createReplicatedSubscription(PulsarClient pulsarClient, String topicName, String subscriptionName, boolean replicateSubscriptionState) throws PulsarClientException {
        pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).replicateSubscriptionState(replicateSubscriptionState).subscribe().close();
    }
}

