/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"quarantine"})
public class ClientDeduplicationFailureTest {
    private static final Logger log = LoggerFactory.getLogger(ClientDeduplicationFailureTest.class);
    LocalBookkeeperEnsemble bkEnsemble;
    ServiceConfiguration config;
    URL url;
    PulsarService pulsar;
    PulsarAdmin admin;
    PulsarClient pulsarClient;
    BrokerStats brokerStatsClient;
    final String tenant = "external-repl-prop";
    String primaryHost;

    @BeforeMethod(timeOut=300000L, alwaysRun=true)
    void setup(Method method) throws Exception {
        log.info("--- Setting up method {} ---", (Object)method.getName());
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
        this.bkEnsemble.start();
        this.config = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        this.config.setClusterName("use");
        this.config.setWebServicePort(Optional.of(0));
        this.config.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
        this.config.setBrokerShutdownTimeoutMs(0L);
        this.config.setBrokerServicePort(Optional.of(0));
        this.config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
        this.config.setTlsAllowInsecureConnection(true);
        this.config.setAdvertisedAddress("localhost");
        this.config.setLoadBalancerSheddingEnabled(false);
        this.config.setLoadBalancerAutoBundleSplitEnabled(false);
        this.config.setLoadBalancerEnabled(false);
        this.config.setLoadBalancerAutoUnloadSplitBundlesEnabled(false);
        this.config.setAllowAutoTopicCreationType("non-partitioned");
        this.pulsar = new PulsarService(this.config);
        this.pulsar.start();
        String brokerServiceUrl = this.pulsar.getWebServiceAddress();
        this.url = new URL(brokerServiceUrl);
        this.admin = PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).build();
        this.brokerStatsClient = this.admin.brokerStats();
        this.primaryHost = this.pulsar.getWebServiceAddress();
        ClusterData clusterData = new ClusterData(this.url.toString());
        this.admin.clusters().createCluster(this.config.getClusterName(), clusterData);
        if (this.pulsarClient != null) {
            this.pulsarClient.shutdown();
        }
        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).maxBackoffInterval(1L, TimeUnit.SECONDS);
        this.pulsarClient = clientBuilder.build();
        TenantInfo tenantInfo = new TenantInfo();
        tenantInfo.setAllowedClusters((Set)Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"})));
        this.admin.tenants().createTenant("external-repl-prop", tenantInfo);
    }

    @AfterMethod(alwaysRun=true)
    void shutdown() throws Exception {
        log.info("--- Shutting down ---");
        this.pulsarClient.close();
        this.admin.close();
        this.pulsar.close();
        this.bkEnsemble.stop();
    }

    @Test(timeOut=300000L, groups={"quarantine"})
    public void testClientDeduplicationCorrectnessWithFailure() throws Exception {
        String namespacePortion = "dedup";
        String replNamespace = "external-repl-prop/dedup";
        String sourceTopic = "persistent://external-repl-prop/dedup/my-topic1";
        this.admin.namespaces().createNamespace("external-repl-prop/dedup");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/dedup", (Set)clusters);
        this.admin.namespaces().setDeduplicationStatus("external-repl-prop/dedup", true);
        this.admin.namespaces().setRetention("external-repl-prop/dedup", new RetentionPolicies(-1, -1));
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).blockIfQueueFull(true).sendTimeout(0, TimeUnit.SECONDS).topic("persistent://external-repl-prop/dedup/my-topic1").producerName("test-producer-1").create();
        ProducerThread producerThread = new ProducerThread((Producer<String>)producer);
        producerThread.start();
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                TopicStats topicStats = this.admin.topics().getStats("persistent://external-repl-prop/dedup/my-topic1");
                return topicStats.publishers.size() == 1 && ((PublisherStats)topicStats.publishers.get(0)).getProducerName().equals("test-producer-1") && topicStats.storageSize > 0L;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200L);
        TopicStats topicStats = this.admin.topics().getStats("persistent://external-repl-prop/dedup/my-topic1");
        Assert.assertEquals((int)topicStats.publishers.size(), (int)1);
        Assert.assertEquals((String)((PublisherStats)topicStats.publishers.get(0)).getProducerName(), (String)"test-producer-1");
        Assert.assertTrue((topicStats.storageSize > 0L ? 1 : 0) != 0);
        for (int i = 0; i < 5; ++i) {
            log.info("Stopping BK...");
            this.bkEnsemble.stopBK();
            Thread.sleep(1000 + new Random().nextInt(500));
            log.info("Starting BK...");
            this.bkEnsemble.startBK();
        }
        producerThread.stop();
        producer.newMessage().sequenceId(producerThread.getLastSeqId() + 1L).value((Object)"end").send();
        producer.close();
        Reader reader = this.pulsarClient.newReader(Schema.STRING).startMessageId(MessageId.earliest).topic("persistent://external-repl-prop/dedup/my-topic1").create();
        Message prevMessage = null;
        Message message = null;
        int count = 0;
        while ((message = reader.readNext(5, TimeUnit.SECONDS)) != null) {
            if (((String)message.getValue()).equals("end")) {
                log.info("Last seq Id received: {}", (Object)prevMessage.getSequenceId());
                break;
            }
            if (prevMessage == null) {
                Assert.assertEquals((long)message.getSequenceId(), (long)1L);
            } else {
                Assert.assertEquals((long)message.getSequenceId(), (long)(prevMessage.getSequenceId() + 1L));
            }
            prevMessage = message;
            ++count;
        }
        log.info("# of messages read: {}", (Object)count);
        Assert.assertNotNull(prevMessage);
        Assert.assertEquals((long)prevMessage.getSequenceId(), (long)producerThread.getLastSeqId());
    }

    @Test(timeOut=300000L)
    public void testClientDeduplicationWithBkFailure() throws Exception {
        int i;
        int i2;
        String namespacePortion = "dedup";
        String replNamespace = "external-repl-prop/dedup";
        String sourceTopic = "persistent://external-repl-prop/dedup/my-topic1";
        String subscriptionName1 = "sub1";
        String subscriptionName2 = "sub2";
        String consumerName1 = "test-consumer-1";
        String consumerName2 = "test-consumer-2";
        LinkedList msgRecvd = new LinkedList();
        this.admin.namespaces().createNamespace("external-repl-prop/dedup");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/dedup", (Set)clusters);
        this.admin.namespaces().setDeduplicationStatus("external-repl-prop/dedup", true);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/dedup/my-topic1").producerName("test-producer-1").create();
        Consumer consumer1 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://external-repl-prop/dedup/my-topic1"}).consumerName("test-consumer-1").subscriptionName("sub1").subscribe();
        Consumer consumer2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://external-repl-prop/dedup/my-topic1"}).consumerName("test-consumer-2").subscriptionName("sub2").subscribe();
        Thread thread = new Thread(() -> {
            try {
                while (true) {
                    Message msg = consumer2.receive();
                    msgRecvd.add(msg);
                    consumer2.acknowledge(msg);
                }
            }
            catch (PulsarClientException e) {
                log.error("Failed to consume message: {}", (Object)e, (Object)e);
                return;
            }
        });
        thread.start();
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                TopicStats topicStats = this.admin.topics().getStats("persistent://external-repl-prop/dedup/my-topic1");
                boolean c1 = topicStats != null && topicStats.subscriptions.get("sub1") != null && ((SubscriptionStats)topicStats.subscriptions.get((Object)"sub1")).consumers.size() == 1 && ((ConsumerStats)((SubscriptionStats)topicStats.subscriptions.get((Object)"sub1")).consumers.get((int)0)).consumerName.equals("test-consumer-1");
                boolean c2 = topicStats != null && topicStats.subscriptions.get("sub2") != null && ((SubscriptionStats)topicStats.subscriptions.get((Object)"sub2")).consumers.size() == 1 && ((ConsumerStats)((SubscriptionStats)topicStats.subscriptions.get((Object)"sub2")).consumers.get((int)0)).consumerName.equals("test-consumer-2");
                return c1 && c2;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200L);
        TopicStats topicStats1 = this.admin.topics().getStats("persistent://external-repl-prop/dedup/my-topic1");
        Assert.assertNotNull((Object)topicStats1);
        Assert.assertNotNull(topicStats1.subscriptions.get("sub1"));
        Assert.assertEquals((int)((SubscriptionStats)topicStats1.subscriptions.get((Object)"sub1")).consumers.size(), (int)1);
        Assert.assertEquals((String)((ConsumerStats)((SubscriptionStats)topicStats1.subscriptions.get((Object)"sub1")).consumers.get((int)0)).consumerName, (String)"test-consumer-1");
        TopicStats topicStats2 = this.admin.topics().getStats("persistent://external-repl-prop/dedup/my-topic1");
        Assert.assertNotNull((Object)topicStats2);
        Assert.assertNotNull(topicStats2.subscriptions.get("sub2"));
        Assert.assertEquals((int)((SubscriptionStats)topicStats2.subscriptions.get((Object)"sub2")).consumers.size(), (int)1);
        Assert.assertEquals((String)((ConsumerStats)((SubscriptionStats)topicStats2.subscriptions.get((Object)"sub2")).consumers.get((int)0)).consumerName, (String)"test-consumer-2");
        for (i2 = 0; i2 < 10; ++i2) {
            producer.newMessage().sequenceId((long)i2).value((Object)("foo-" + i2)).send();
        }
        for (i2 = 0; i2 < 10; ++i2) {
            Message msg = consumer1.receive();
            consumer1.acknowledge(msg);
            Assert.assertEquals((String)((String)msg.getValue()), (String)("foo-" + i2));
            Assert.assertEquals((long)msg.getSequenceId(), (long)i2);
        }
        log.info("Stopping BK...");
        this.bkEnsemble.stopBK();
        LinkedList<CompletableFuture> futures = new LinkedList<CompletableFuture>();
        int i3 = 10;
        while (i3 < 20) {
            CompletableFuture future = producer.newMessage().sequenceId((long)i3).value((Object)("foo-" + i3)).sendAsync();
            int finalI = i3++;
            ((CompletableFuture)future.thenRun(() -> log.error("message: {} successful", (Object)finalI))).exceptionally((java.util.function.Function)((Function)throwable -> {
                log.info("message: {} failed: {}", new Object[]{finalI, throwable, throwable});
                return null;
            }));
            futures.add(future);
        }
        for (i3 = 0; i3 < futures.size(); ++i3) {
            try {
                ((CompletableFuture)futures.get(i3)).join();
                Assert.fail();
                continue;
            }
            catch (CompletionException future) {
                continue;
            }
            catch (Exception e) {
                Assert.fail();
            }
        }
        try {
            producer.newMessage().sequenceId(10L).value((Object)"foo-10").send();
            Assert.fail();
        }
        catch (PulsarClientException i22) {
            // empty catch block
        }
        try {
            producer.newMessage().sequenceId(10L).value((Object)"foo-10").send();
            Assert.fail();
        }
        catch (PulsarClientException i22) {
            // empty catch block
        }
        log.info("Starting BK...");
        this.bkEnsemble.startBK();
        for (i = 20; i < 30; ++i) {
            producer.newMessage().sequenceId((long)i).value((Object)("foo-" + i)).send();
        }
        MessageId lastMessageId = null;
        for (i = 20; i < 30; ++i) {
            Message msg = consumer1.receive();
            lastMessageId = msg.getMessageId();
            consumer1.acknowledge(msg);
            Assert.assertEquals((String)((String)msg.getValue()), (String)("foo-" + i));
            Assert.assertEquals((long)msg.getSequenceId(), (long)i);
        }
        MockedPulsarServiceBaseTest.retryStrategically(test -> msgRecvd.size() >= 20, 5, 200L);
        Assert.assertEquals((int)msgRecvd.size(), (int)20);
        for (i = 0; i < 10; ++i) {
            Assert.assertEquals((String)((String)((Message)msgRecvd.get(i)).getValue()), (String)("foo-" + i));
            Assert.assertEquals((long)((Message)msgRecvd.get(i)).getSequenceId(), (long)i);
        }
        for (i = 10; i < 20; ++i) {
            Assert.assertEquals((String)((String)((Message)msgRecvd.get(i)).getValue()), (String)("foo-" + (i + 10)));
            Assert.assertEquals((long)((Message)msgRecvd.get(i)).getSequenceId(), (long)(i + 10));
        }
        BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)lastMessageId;
        MessageIdImpl messageId = (MessageIdImpl)consumer1.getLastMessageId();
        Assert.assertEquals((long)messageId.getLedgerId(), (long)batchMessageId.getLedgerId());
        Assert.assertEquals((long)messageId.getEntryId(), (long)batchMessageId.getEntryId());
        thread.interrupt();
    }

    private static class ProducerThread
    implements Runnable {
        private volatile boolean isRunning = false;
        private Thread thread;
        private Producer<String> producer;
        private long i = 1L;
        private final AtomicLong atomicLong = new AtomicLong(0L);
        private CompletableFuture<MessageId> lastMessageFuture;

        public ProducerThread(Producer<String> producer) {
            this.thread = new Thread(this);
            this.producer = producer;
        }

        @Override
        public void run() {
            while (this.isRunning) {
                this.lastMessageFuture = this.producer.newMessage().sequenceId(this.i).value((Object)("foo-" + this.i)).sendAsync();
                ((CompletableFuture)this.lastMessageFuture.thenAccept(messageId -> this.atomicLong.incrementAndGet())).exceptionally(ex -> {
                    log.info("publish exception:", ex);
                    return null;
                });
                ++this.i;
            }
            log.info("done Producing! Last send: {}", (Object)this.i);
        }

        public void start() {
            this.isRunning = true;
            this.thread.start();
        }

        public void stop() {
            this.isRunning = false;
            try {
                log.info("Waiting for last message to complete");
                try {
                    this.lastMessageFuture.get(60L, TimeUnit.SECONDS);
                }
                catch (TimeoutException e) {
                    throw new RuntimeException("Last message hasn't completed within timeout!");
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            log.info("Producer Thread stopped!");
        }

        public long getLastSeqId() {
            return this.atomicLong.get();
        }
    }
}

