/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups={"flaky"})
public class ClientDeduplicationTest
extends ProducerConsumerBase {
    @Override
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterClass(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testNamespaceDeduplicationApi() throws Exception {
        String namespace = "my-property/my-ns";
        Assert.assertNull((Object)this.admin.namespaces().getDeduplicationStatus("my-property/my-ns"));
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
        Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)this.admin.namespaces().getDeduplicationStatus("my-property/my-ns")));
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", false);
        Awaitility.await().untilAsserted(() -> Assert.assertFalse((boolean)this.admin.namespaces().getDeduplicationStatus("my-property/my-ns")));
        this.admin.namespaces().removeDeduplicationStatus("my-property/my-ns");
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.namespaces().getDeduplicationStatus("my-property/my-ns")));
    }

    @Test
    public void testProducerSequenceAfterReconnect() throws Exception {
        String message;
        int i;
        String topic = "persistent://my-property/my-ns/testProducerSequenceAfterReconnect";
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testProducerSequenceAfterReconnect").producerName("my-producer-name");
        Producer producer = producerBuilder.create();
        Assert.assertEquals((long)producer.getLastSequenceId(), (long)-1L);
        for (i = 0; i < 10; ++i) {
            message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            Assert.assertEquals((long)producer.getLastSequenceId(), (long)i);
        }
        producer.close();
        producer = producerBuilder.create();
        Assert.assertEquals((long)producer.getLastSequenceId(), (long)9L);
        for (i = 10; i < 20; ++i) {
            message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            Assert.assertEquals((long)producer.getLastSequenceId(), (long)i);
        }
        producer.close();
    }

    @Test
    public void testProducerSequenceAfterRestart() throws Exception {
        String message;
        int i;
        String topic = "persistent://my-property/my-ns/testProducerSequenceAfterRestart";
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic(topic).producerName("my-producer-name");
        Producer producer = producerBuilder.create();
        Assert.assertEquals((long)producer.getLastSequenceId(), (long)-1L);
        for (i = 0; i < 10; ++i) {
            message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            Assert.assertEquals((long)producer.getLastSequenceId(), (long)i);
        }
        producer.close();
        this.restartBroker();
        producer = producerBuilder.create();
        Assert.assertEquals((long)producer.getLastSequenceId(), (long)9L);
        for (i = 10; i < 20; ++i) {
            message = "my-message-" + i;
            producer.send((Object)message.getBytes());
            Assert.assertEquals((long)producer.getLastSequenceId(), (long)i);
        }
        producer.close();
    }

    @Test(timeOut=30000L)
    public void testProducerDeduplication() throws Exception {
        String topic = "persistent://my-property/my-ns/testProducerDeduplication";
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic(topic).producerName("my-producer-name").sendTimeout(0, TimeUnit.SECONDS);
        Producer producer = producerBuilder.create();
        Assert.assertEquals((long)producer.getLastSequenceId(), (long)-1L);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("my-subscription").subscribe();
        producer.newMessage().value((Object)"my-message-0".getBytes()).sequenceId(0L).send();
        producer.newMessage().value((Object)"my-message-1".getBytes()).sequenceId(1L).send();
        producer.newMessage().value((Object)"my-message-2".getBytes()).sequenceId(2L).send();
        producer.newMessage().value((Object)"my-message-1".getBytes()).sequenceId(1L).send();
        producer.newMessage().value((Object)"my-message-2".getBytes()).sequenceId(2L).send();
        producer.close();
        for (int i = 0; i < 3; ++i) {
            Message msg = consumer.receive();
            Assert.assertEquals((String)new String(msg.getData()), (String)("my-message-" + i));
            consumer.acknowledge(msg);
        }
        Message msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg);
        this.restartBroker();
        producer = producerBuilder.create();
        Assert.assertEquals((long)producer.getLastSequenceId(), (long)2L);
        producer.newMessage().value((Object)"my-message-1".getBytes()).sequenceId(1L).send();
        producer.newMessage().value((Object)"my-message-2".getBytes()).sequenceId(2L).send();
        msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg);
        producer.close();
    }

    @Test(timeOut=30000L)
    public void testProducerDeduplicationWithDiscontinuousSequenceId() throws Exception {
        String topic = "persistent://my-property/my-ns/testProducerDeduplicationWithDiscontinuousSequenceId";
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic(topic).producerName("my-producer-name").enableBatching(true).batchingMaxMessages(10).sendTimeout(0, TimeUnit.SECONDS);
        Producer producer = producerBuilder.create();
        Assert.assertEquals((long)producer.getLastSequenceId(), (long)-1L);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("my-subscription").subscribe();
        producer.newMessage().value((Object)"my-message-0".getBytes()).sequenceId(2L).sendAsync();
        producer.newMessage().value((Object)"my-message-1".getBytes()).sequenceId(3L).sendAsync();
        producer.newMessage().value((Object)"my-message-2".getBytes()).sequenceId(5L).sendAsync();
        producer.flush();
        producer.newMessage().value((Object)"my-message-0".getBytes()).sequenceId(2L).sendAsync();
        producer.newMessage().value((Object)"my-message-1".getBytes()).sequenceId(4L).sendAsync();
        producer.newMessage().value((Object)"my-message-3".getBytes()).sequenceId(6L).sendAsync();
        producer.flush();
        for (int i = 0; i < 4; ++i) {
            Message msg = consumer.receive();
            Assert.assertEquals((String)new String(msg.getData()), (String)("my-message-" + i));
            consumer.acknowledge(msg);
        }
        Message msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg);
        producer.close();
        this.restartBroker();
        producer = producerBuilder.create();
        Assert.assertEquals((long)producer.getLastSequenceId(), (long)6L);
        producer.newMessage().value((Object)"my-message-1".getBytes()).sequenceId(2L).sendAsync();
        producer.newMessage().value((Object)"my-message-2".getBytes()).sequenceId(4L).sendAsync();
        producer.flush();
        msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg);
        producer.close();
    }

    @Test(timeOut=30000L)
    public void testProducerDeduplicationNonBatchAsync() throws Exception {
        String topic = "persistent://my-property/my-ns/testProducerDeduplicationNonBatchAsync";
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic(topic).producerName("my-producer-name").enableBatching(false).sendTimeout(0, TimeUnit.SECONDS);
        Producer producer = producerBuilder.create();
        Assert.assertEquals((long)producer.getLastSequenceId(), (long)-1L);
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("my-subscription").subscribe();
        producer.newMessage().value((Object)"my-message-0".getBytes()).sequenceId(2L).sendAsync();
        producer.newMessage().value((Object)"my-message-1".getBytes()).sequenceId(3L).sendAsync();
        producer.newMessage().value((Object)"my-message-2".getBytes()).sequenceId(5L).sendAsync();
        producer.newMessage().value((Object)"my-message-1".getBytes()).sequenceId(2L).sendAsync();
        producer.newMessage().value((Object)"my-message-2".getBytes()).sequenceId(4L).sendAsync();
        producer.close();
        for (int i = 0; i < 3; ++i) {
            Message msg = consumer.receive();
            Assert.assertEquals((String)new String(msg.getData()), (String)("my-message-" + i));
            consumer.acknowledge(msg);
        }
        Message msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg);
        this.restartBroker();
        producer = producerBuilder.create();
        Assert.assertEquals((long)producer.getLastSequenceId(), (long)5L);
        producer.newMessage().value((Object)"my-message-1".getBytes()).sequenceId(2L).sendAsync();
        producer.newMessage().value((Object)"my-message-2".getBytes()).sequenceId(4L).sendAsync();
        msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg);
        producer.close();
    }
}

