package org.apache.pulsar.client.impl;

import io.netty.buffer.Unpooled;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/client/impl/MessageChunkingDeduplicationTest.class */
public class MessageChunkingDeduplicationTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(MessageChunkingDeduplicationTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        this.conf.setBrokerDeduplicationEnabled(true);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testSendChunkMessageWithSameSequenceID() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).subscriptionName("test-sub").topic(new String[]{"persistent://my-property/my-ns/testSendChunkMessageWithSameSequenceID"}).subscribe();
        try {
            Producer create = this.pulsarClient.newProducer(Schema.STRING).producerName("test-producer").topic("persistent://my-property/my-ns/testSendChunkMessageWithSameSequenceID").enableChunking(true).enableBatching(false).create();
            try {
                String join = String.join("", Collections.nCopies(6000 * 1000, "a"));
                create.newMessage().value(join).sequenceId(10L).send();
                Message receive = subscribe.receive(10, TimeUnit.SECONDS);
                Assert.assertNotNull(receive);
                Assert.assertTrue(receive.getMessageId() instanceof ChunkMessageIdImpl);
                Assert.assertEquals((String) receive.getValue(), join);
                create.newMessage().value(join).sequenceId(10L).send();
                Assert.assertNull(subscribe.receive(3, TimeUnit.SECONDS));
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test
    public void testDeduplicateChunksInSingleChunkMessages() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).subscriptionName("test-sub").topic(new String[]{"persistent://my-property/my-ns/testDeduplicateChunksInSingleChunkMessage"}).subscribe();
        try {
            PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists("persistent://my-property/my-ns/testDeduplicateChunksInSingleChunkMessage").get()).orElse(null);
            Assert.assertNotNull(persistentTopic);
            sendChunk(persistentTopic, "test-producer", 1L, 0, 2);
            sendChunk(persistentTopic, "test-producer", 1L, 1, 2);
            sendChunk(persistentTopic, "test-producer", 1L, 1, 2);
            Assert.assertEquals(subscribe.receive(15, TimeUnit.SECONDS).getData().length, 2);
            sendChunk(persistentTopic, "test-producer", 2L, 0, 3);
            sendChunk(persistentTopic, "test-producer", 2L, 1, 3);
            sendChunk(persistentTopic, "test-producer", 2L, 1, 3);
            sendChunk(persistentTopic, "test-producer", 2L, 2, 3);
            Assert.assertEquals(subscribe.receive(20, TimeUnit.SECONDS).getData().length, 3);
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
            throw th;
        }
    }

    protected static void sendChunk(PersistentTopic persistentTopic, final String str, final long j, final Integer num, final Integer num2) {
        MessageMetadata messageMetadata = new MessageMetadata();
        messageMetadata.setProducerName(str);
        messageMetadata.setSequenceId(j);
        messageMetadata.setPublishTime(System.currentTimeMillis());
        if (num != null && num2 != null) {
            messageMetadata.setUuid(str + "-" + j);
            messageMetadata.setChunkId(num.intValue());
            messageMetadata.setNumChunksFromMsg(num2.intValue());
            messageMetadata.setTotalChunkMsgSize(num2.intValue());
        }
        persistentTopic.publishMessage(Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, Unpooled.wrappedBuffer("a".getBytes())), new Topic.PublishContext() { // from class: org.apache.pulsar.client.impl.MessageChunkingDeduplicationTest.1
            public boolean isChunked() {
                return num != null;
            }

            public String getProducerName() {
                return str;
            }

            public long getSequenceId() {
                return j;
            }

            public void completed(Exception exc, long j2, long j3) {
                String str2 = str + "-" + j;
                if (num != null) {
                    str2 = str2 + "-" + num + "-" + num2;
                }
                if (exc == null) {
                    MessageChunkingDeduplicationTest.log.info("Sent {} to ({}, {})", new Object[]{str2, Long.valueOf(j2), Long.valueOf(j3)});
                } else {
                    MessageChunkingDeduplicationTest.log.error("Failed to send {}: {}", str2, exc.getMessage());
                }
            }
        });
    }
}
