package org.apache.pulsar.client.api;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-api"})
/* loaded from: input_file:org/apache/pulsar/client/api/ClientDeduplicationTest.class */
public class ClientDeduplicationTest extends ProducerConsumerBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ClientDeduplicationTest.class);

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider
    public static Object[][] batchingTypes() {
        return new Object[]{new Object[]{BatcherBuilder.DEFAULT}, new Object[]{BatcherBuilder.KEY_BASED}};
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(priority = -1)
    public void testNamespaceDeduplicationApi() throws Exception {
        Assert.assertNull(this.admin.namespaces().getDeduplicationStatus("my-property/my-ns"));
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(this.admin.namespaces().getDeduplicationStatus("my-property/my-ns").booleanValue());
        });
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", false);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertFalse(this.admin.namespaces().getDeduplicationStatus("my-property/my-ns").booleanValue());
        });
        this.admin.namespaces().removeDeduplicationStatus("my-property/my-ns");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.namespaces().getDeduplicationStatus("my-property/my-ns"));
        });
    }

    @Test
    public void testProducerSequenceAfterReconnect() throws Exception {
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
        ProducerBuilder producerName = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testProducerSequenceAfterReconnect").producerName("my-producer-name");
        Producer create = producerName.create();
        Assert.assertEquals(create.getLastSequenceId(), -1L);
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
            Assert.assertEquals(create.getLastSequenceId(), i);
        }
        create.close();
        Producer create2 = producerName.create();
        Assert.assertEquals(create2.getLastSequenceId(), 9L);
        for (int i2 = 10; i2 < 20; i2++) {
            create2.send(("my-message-" + i2).getBytes());
            Assert.assertEquals(create2.getLastSequenceId(), i2);
        }
        create2.close();
    }

    @Test
    public void testProducerSequenceAfterRestart() throws Exception {
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
        ProducerBuilder producerName = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testProducerSequenceAfterRestart").producerName("my-producer-name");
        Producer create = producerName.create();
        Assert.assertEquals(create.getLastSequenceId(), -1L);
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
            Assert.assertEquals(create.getLastSequenceId(), i);
        }
        create.close();
        restartBroker();
        Producer create2 = producerName.create();
        Assert.assertEquals(create2.getLastSequenceId(), 9L);
        for (int i2 = 10; i2 < 20; i2++) {
            create2.send(("my-message-" + i2).getBytes());
            Assert.assertEquals(create2.getLastSequenceId(), i2);
        }
        create2.close();
    }

    @Test(timeOut = 30000)
    public void testProducerDeduplication() throws Exception {
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
        ProducerBuilder sendTimeout = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testProducerDeduplication").producerName("my-producer-name").sendTimeout(0, TimeUnit.SECONDS);
        Producer create = sendTimeout.create();
        Assert.assertEquals(create.getLastSequenceId(), -1L);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/testProducerDeduplication"}).subscriptionName("my-subscription").subscribe();
        create.newMessage().value("my-message-0".getBytes()).sequenceId(0L).send();
        create.newMessage().value("my-message-1".getBytes()).sequenceId(1L).send();
        create.newMessage().value("my-message-2".getBytes()).sequenceId(2L).send();
        create.newMessage().value("my-message-1".getBytes()).sequenceId(1L).send();
        create.newMessage().value("my-message-2".getBytes()).sequenceId(2L).send();
        create.close();
        for (int i = 0; i < 3; i++) {
            Message receive = subscribe.receive();
            Assert.assertEquals(new String(receive.getData()), "my-message-" + i);
            subscribe.acknowledge(receive);
        }
        Assert.assertNull(subscribe.receive(1, TimeUnit.SECONDS));
        restartBroker();
        Producer create2 = sendTimeout.create();
        Assert.assertEquals(create2.getLastSequenceId(), 2L);
        create2.newMessage().value("my-message-1".getBytes()).sequenceId(1L).send();
        create2.newMessage().value("my-message-2".getBytes()).sequenceId(2L).send();
        Assert.assertNull(subscribe.receive(1, TimeUnit.SECONDS));
        create2.close();
    }

    @Test(timeOut = 30000, dataProvider = "batchingTypes")
    public void testProducerDeduplicationWithDiscontinuousSequenceId(BatcherBuilder batcherBuilder) throws Exception {
        String str = "persistent://my-property/my-ns/testProducerDeduplicationWithDiscontinuousSequenceId-" + System.currentTimeMillis();
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
        ProducerBuilder sendTimeout = this.pulsarClient.newProducer().topic(str).producerName("my-producer-name").enableBatching(true).batcherBuilder(batcherBuilder).batchingMaxMessages(10).batchingMaxPublishDelay(1L, TimeUnit.HOURS).sendTimeout(0, TimeUnit.SECONDS);
        Producer create = sendTimeout.create();
        Assert.assertEquals(create.getLastSequenceId(), -1L);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-subscription").subscribe();
        create.newMessage().value("my-message-0".getBytes()).sequenceId(2L).sendAsync();
        create.newMessage().value("my-message-1".getBytes()).sequenceId(3L).sendAsync();
        create.newMessage().value("my-message-2".getBytes()).sequenceId(5L).sendAsync();
        create.flush();
        create.newMessage().value("my-message-0".getBytes()).sequenceId(2L).sendAsync();
        create.newMessage().value("my-message-1".getBytes()).sequenceId(4L).sendAsync();
        create.newMessage().value("my-message-3".getBytes()).sequenceId(6L).sendAsync();
        create.flush();
        for (int i = 0; i < 4; i++) {
            Message receive = subscribe.receive(3, TimeUnit.SECONDS);
            Assert.assertNotNull(receive);
            Assert.assertEquals(new String(receive.getData()), "my-message-" + i);
            subscribe.acknowledge(receive);
        }
        Assert.assertNull(subscribe.receive(1, TimeUnit.SECONDS));
        create.close();
        restartBroker();
        Producer create2 = sendTimeout.create();
        Assert.assertEquals(create2.getLastSequenceId(), 6L);
        create2.newMessage().value("my-message-1".getBytes()).sequenceId(2L).sendAsync();
        create2.newMessage().value("my-message-2".getBytes()).sequenceId(4L).sendAsync();
        create2.flush();
        Assert.assertNull(subscribe.receive(1, TimeUnit.SECONDS));
        create2.close();
    }

    @Test(timeOut = 30000)
    public void testProducerDeduplicationNonBatchAsync() throws Exception {
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
        ProducerBuilder sendTimeout = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testProducerDeduplicationNonBatchAsync").producerName("my-producer-name").enableBatching(false).sendTimeout(0, TimeUnit.SECONDS);
        Producer create = sendTimeout.create();
        Assert.assertEquals(create.getLastSequenceId(), -1L);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/testProducerDeduplicationNonBatchAsync"}).subscriptionName("my-subscription").subscribe();
        create.newMessage().value("my-message-0".getBytes()).sequenceId(2L).sendAsync();
        create.newMessage().value("my-message-1".getBytes()).sequenceId(3L).sendAsync();
        create.newMessage().value("my-message-2".getBytes()).sequenceId(5L).sendAsync();
        create.newMessage().value("my-message-1".getBytes()).sequenceId(2L).sendAsync();
        create.newMessage().value("my-message-2".getBytes()).sequenceId(4L).sendAsync();
        create.close();
        for (int i = 0; i < 3; i++) {
            Message receive = subscribe.receive();
            Assert.assertEquals(new String(receive.getData()), "my-message-" + i);
            subscribe.acknowledge(receive);
        }
        Assert.assertNull(subscribe.receive(1, TimeUnit.SECONDS));
        restartBroker();
        Producer create2 = sendTimeout.create();
        Assert.assertEquals(create2.getLastSequenceId(), 5L);
        create2.newMessage().value("my-message-1".getBytes()).sequenceId(2L).sendAsync();
        create2.newMessage().value("my-message-2".getBytes()).sequenceId(4L).sendAsync();
        Assert.assertNull(subscribe.receive(1, TimeUnit.SECONDS));
        create2.close();
    }

    @Test(timeOut = 30000)
    public void testKeyBasedBatchingOrder() throws Exception {
        Message receive;
        this.admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/test-key-based-batching-order"}).subscriptionName("sub").subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/test-key-based-batching-order").batcherBuilder(BatcherBuilder.KEY_BASED).batchingMaxMessages(100).batchingMaxBytes(5242880).batchingMaxPublishDelay(1L, TimeUnit.HOURS).create();
        ArrayList arrayList = new ArrayList();
        arrayList.add(create.newMessage().key("A").value("msg-0").sequenceId(0L).sendAsync());
        arrayList.add(create.newMessage().key("B").value("msg-1").sequenceId(1L).sendAsync());
        arrayList.add(create.newMessage().key("B").value("msg-2").sequenceId(2L).sendAsync());
        arrayList.add(create.newMessage().key("A").value("msg-3").sequenceId(3L).sendAsync());
        arrayList.add(create.newMessage().key("A").value("msg-4").sequenceId(4L).sendAsync());
        create.flush();
        FutureUtil.waitForAll(arrayList);
        List list = (List) arrayList.stream().map((v0) -> {
            return v0.join();
        }).collect(Collectors.toList());
        for (int i = 0; i < list.size(); i++) {
            log.info("Send msg-{} to {}", Integer.valueOf(i), list.get(i));
        }
        ArrayList arrayList2 = new ArrayList();
        for (int i2 = 0; i2 < 5 && (receive = subscribe.receive(3, TimeUnit.SECONDS)) != null; i2++) {
            log.info("Received {}, key: {}, seq id: {}, msg id: {}", new Object[]{receive.getValue(), receive.getKey(), Long.valueOf(receive.getSequenceId()), receive.getMessageId()});
            Assert.assertNotNull(receive);
            arrayList2.add(Long.valueOf(receive.getSequenceId()));
        }
        Assert.assertEquals(arrayList2, Arrays.asList(1L, 2L, 0L, 3L, 4L));
        for (int i3 = 0; i3 < 5; i3++) {
            MessageIdImpl send = create.newMessage().value("msg").sequenceId(i3).send();
            Assert.assertTrue(send instanceof MessageIdImpl);
            Assert.assertFalse(send instanceof BatchMessageIdImpl);
            MessageIdImpl messageIdImpl = send;
            Assert.assertEquals(messageIdImpl.getLedgerId(), -1L);
            Assert.assertEquals(messageIdImpl.getEntryId(), -1L);
        }
        subscribe.close();
        create.close();
    }

    @Test
    public void testUpdateSequenceIdInSyncCodeSegment() throws Exception {
        int i = 200;
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        this.conf.setBrokerDeduplicationEnabled(true);
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testUpdateSequenceIdInSyncCodeSegment").producerName("producer").sendTimeout(0, TimeUnit.SECONDS).create();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/testUpdateSequenceIdInSyncCodeSegment"}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("subscription").subscribe();
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int i2 = 0; i2 < 5; i2++) {
            newFixedThreadPool.submit(() -> {
                for (int i3 = 0; i3 < i; i3++) {
                    try {
                        try {
                            create.newMessage().sendAsync();
                        } catch (Exception e) {
                            log.error("Failed to send/ack messages with transaction.", e);
                            countDownLatch.countDown();
                            return;
                        }
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }
        countDownLatch.await();
        for (int i3 = 0; i3 < 5 * 200; i3++) {
            Assert.assertNotNull(subscribe.receive(5, TimeUnit.SECONDS));
        }
    }
}
