package org.apache.pulsar.client.impl;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/client/impl/MessageChunkingTest.class */
public class MessageChunkingTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(MessageChunkingTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "ackReceiptEnabled")
    public Object[][] ackReceiptEnabled() {
        return new Object[]{new Object[]{true}, new Object[]{false}};
    }

    @Test
    public void testInvalidConfig() throws Exception {
        try {
            this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1").enableChunking(true).enableBatching(true).create();
            Assert.fail("producer creation should have fail");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test(dataProvider = "ackReceiptEnabled")
    public void testLargeMessage(boolean z) throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        this.conf.setMaxMessageSize(5);
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").isAckReceiptEnabled(z).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1").compressionType(CompressionType.LZ4).enableChunking(true).enableBatching(false).create();
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists("persistent://my-property/my-ns/my-topic1").get()).get();
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 5; i++) {
            String createMessagePayload = createMessagePayload(i * 10);
            newArrayList.add(createMessagePayload);
            create.send(createMessagePayload.getBytes());
        }
        HashSet newHashSet = Sets.newHashSet();
        ArrayList newArrayList2 = Lists.newArrayList();
        for (int i2 = 0; i2 < 5; i2++) {
            Message receive = subscribe.receive(5, TimeUnit.SECONDS);
            String str = new String(receive.getData());
            log.info("[{}] - Published [{}] Received message: [{}]", new Object[]{Integer.valueOf(i2), newArrayList.get(i2), str});
            testMessageOrderAndDuplicates(newHashSet, str, (String) newArrayList.get(i2));
            newArrayList2.add(receive.getMessageId());
        }
        this.pulsar.getBrokerService().updateRates();
        Assert.assertTrue(((PublisherStats) persistentTopic.getStats(false, false, false).publishers.get(0)).getChunkedMessageRate() > 0.0d);
        ManagedCursorImpl managedCursorImpl = (ManagedCursorImpl) persistentTopic.getManagedLedger().getCursors().iterator().next();
        PositionImpl readPosition = managedCursorImpl.getReadPosition();
        Iterator it = newArrayList2.iterator();
        while (it.hasNext()) {
            subscribe.acknowledge((MessageId) it.next());
        }
        retryStrategically(r5 -> {
            return managedCursorImpl.getMarkDeletedPosition().getNext().equals(readPosition);
        }, 5, 200L);
        Assert.assertEquals(readPosition, managedCursorImpl.getMarkDeletedPosition().getNext());
        Assert.assertEquals(readPosition.getEntryId(), subscribe.getAvailablePermits());
        subscribe.close();
        create.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test
    public void testChunkingWithOrderingKey() throws Exception {
        this.conf.setMaxMessageSize(5);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/testChunkingWithOrderingKey"}).subscriptionName("my-subscriber-name").acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        try {
            Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testChunkingWithOrderingKey").enableChunking(true).enableBatching(false).create();
            try {
                byte[] nextBytes = RandomUtils.nextBytes(20);
                byte[] nextBytes2 = RandomUtils.nextBytes(10);
                create.newMessage().value(nextBytes).orderingKey(nextBytes2).send();
                Message receive = subscribe.receive();
                Assert.assertEquals(receive.getData(), nextBytes);
                Assert.assertEquals(receive.getOrderingKey(), nextBytes2);
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test(dataProvider = "ackReceiptEnabled")
    public void testLargeMessageAckTimeOut(boolean z) throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        this.conf.setMaxMessageSize(5);
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").acknowledgmentGroupTime(0L, TimeUnit.SECONDS).isAckReceiptEnabled(z).ackTimeout(5L, TimeUnit.SECONDS).subscribe();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1").enableChunking(true).enableBatching(false).create();
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists("persistent://my-property/my-ns/my-topic1").get()).get();
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 5; i++) {
            String createMessagePayload = createMessagePayload(i * 10);
            newArrayList.add(createMessagePayload);
            create.send(createMessagePayload.getBytes());
        }
        HashSet newHashSet = Sets.newHashSet();
        for (int i2 = 0; i2 < 5; i2++) {
            String str = new String(subscribe.receive(5, TimeUnit.SECONDS).getData());
            log.info("Received message: [{}]", str);
            testMessageOrderAndDuplicates(newHashSet, str, (String) newArrayList.get(i2));
        }
        retryStrategically(r3 -> {
            return subscribe.getUnAckedMessageTracker().messageIdPartitionMap.isEmpty();
        }, 10, TimeUnit.SECONDS.toMillis(1L));
        newHashSet.clear();
        MessageId messageId = null;
        for (int i3 = 0; i3 < 5; i3++) {
            Message receive = subscribe.receive(5, TimeUnit.SECONDS);
            messageId = receive.getMessageId();
            String str2 = new String(receive.getData());
            log.info("Received message: [{}]", str2);
            testMessageOrderAndDuplicates(newHashSet, str2, (String) newArrayList.get(i3));
        }
        ManagedCursorImpl managedCursorImpl = (ManagedCursorImpl) persistentTopic.getManagedLedger().getCursors().iterator().next();
        PositionImpl readPosition = managedCursorImpl.getReadPosition();
        subscribe.acknowledgeCumulative(messageId);
        retryStrategically(r5 -> {
            return managedCursorImpl.getMarkDeletedPosition().getNext().equals(readPosition);
        }, 5, 200L);
        Assert.assertEquals(readPosition, managedCursorImpl.getMarkDeletedPosition().getNext());
        subscribe.close();
        create.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test
    public void testPublishWithFailure() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        this.conf.setMaxMessageSize(5);
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1").enableChunking(true).enableBatching(false).create();
        stopBroker();
        try {
            create.send(createMessagePayload(100).getBytes());
            Assert.fail("should have failed with timeout exception");
        } catch (PulsarClientException.TimeoutException e) {
        }
        create.close();
    }

    @Test(enabled = false)
    public void testMaxPendingChunkMessages() throws Exception {
        Message receive;
        log.info("-- Starting {} test --", this.methodName);
        this.conf.setMaxMessageSize(10);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(25);
        try {
            ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/maxPending"}).subscriptionName("my-subscriber-name").acknowledgmentGroupTime(0L, TimeUnit.SECONDS).maxPendingChunkedMessage(1).autoAckOldestChunkedMessageOnQueueFull(true).ackTimeout(5L, TimeUnit.SECONDS).subscribe();
            ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/maxPending");
            Producer[] producerArr = new Producer[25];
            ArrayList newArrayList = Lists.newArrayList();
            for (int i = 0; i < 25; i++) {
                producerArr[i] = producerBuilder.enableChunking(true).enableBatching(false).create();
                int i2 = i;
                newFixedThreadPool.submit(() -> {
                    newArrayList.add(producerArr[i2].sendAsync(createMessagePayload(45).getBytes()));
                });
            }
            FutureUtil.waitForAll(newArrayList).get();
            HashSet newHashSet = Sets.newHashSet();
            for (int i3 = 0; i3 < 25 && (receive = subscribe.receive(1, TimeUnit.SECONDS)) != null; i3++) {
                String str = new String(receive.getData());
                log.info("Received message: [{}]", str);
                newHashSet.add(str);
                subscribe.acknowledge(receive);
            }
            Assert.assertNotEquals(Integer.valueOf(newHashSet.size()), 25);
            if (Collections.singletonList(newFixedThreadPool).get(0) != null) {
                newFixedThreadPool.shutdownNow();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newFixedThreadPool).get(0) != null) {
                newFixedThreadPool.shutdownNow();
            }
            throw th;
        }
    }

    @Test
    public void testInvalidUseCaseForChunking() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        this.conf.setMaxMessageSize(5);
        try {
            this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1").enableChunking(true).enableBatching(true).create();
            Assert.fail("it should have failied because chunking can't be used with batching enabled");
        } catch (IllegalArgumentException e) {
        }
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test
    public void testExpireIncompleteChunkMessage() throws Exception {
        ProducerImpl create = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/expireMsg").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).sendTimeout(10, TimeUnit.MINUTES).create();
        Field declaredField = ProducerImpl.class.getDeclaredField("producerId");
        declaredField.setAccessible(true);
        long longValue = ((Long) declaredField.get(create)).longValue();
        create.cnx().registerProducer(longValue, create);
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/expireMsg"}).subscriptionName("my-sub").subscribe();
        TypedMessageBuilderImpl value = create.newMessage().value("message-1".getBytes());
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(value.getContent());
        MessageMetadata metadataBuilder = value.getMetadataBuilder();
        metadataBuilder.setProducerName("test").setSequenceId(1L).setPublishTime(10L).setUuid("123").setNumChunksFromMsg(2).setChunkId(0).setTotalChunkMsgSize(100);
        ByteBufPair newSend = Commands.newSend(longValue, 1L, 1, Commands.ChecksumType.Crc32c, metadataBuilder, wrappedBuffer);
        MessageImpl message = value.getMessage();
        message.setSchemaState(MessageImpl.SchemaState.Ready);
        create.processOpSendMsg(ProducerImpl.OpSendMsg.create(message, newSend, 1L, (SendCallback) null));
        retryStrategically(r6 -> {
            return subscribe.chunkedMessagesMap.size() > 0;
        }, 5, 500L);
        Assert.assertEquals(subscribe.chunkedMessagesMap.size(), 1L);
        subscribe.expireTimeOfIncompleteChunkedMessageMillis = 1L;
        Thread.sleep(10L);
        subscribe.removeExpireIncompleteChunkedMessages();
        Assert.assertEquals(subscribe.chunkedMessagesMap.size(), 0L);
        create.close();
        subscribe.close();
    }

    @Test
    public void testChunksEnqueueFailed() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        this.conf.setMaxMessageSize(5);
        MemoryLimitController memoryLimitController = this.pulsarClient.getMemoryLimitController();
        Assert.assertEquals(memoryLimitController.currentUsage(), 0L);
        ProducerImpl create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/test-chunks-enqueue-failed").maxPendingMessages(10).enableChunking(true).enableBatching(false).create();
        try {
            Assert.assertTrue(create instanceof ProducerImpl);
            Semaphore semaphore = (Semaphore) create.getSemaphore().orElse(null);
            Assert.assertNotNull(semaphore);
            Assert.assertEquals(semaphore.availablePermits(), 10);
            create.send(createMessagePayload(1).getBytes());
            try {
                create.send(createMessagePayload(100).getBytes(StandardCharsets.UTF_8));
                Assert.fail("It should fail with ProducerQueueIsFullError");
            } catch (PulsarClientException e) {
                Assert.assertTrue(e instanceof PulsarClientException.ProducerQueueIsFullError);
                Assert.assertEquals(memoryLimitController.currentUsage(), 0L);
                Assert.assertEquals(semaphore.availablePermits(), 10);
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
        clientBuilder.memoryLimit(10000L, SizeUnit.BYTES);
    }

    @Test
    public void testSeekChunkMessages() throws PulsarClientException {
        log.info("-- Starting {} test --", this.methodName);
        this.conf.setMaxMessageSize(5);
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/test-seek-chunk").enableChunking(true).enableBatching(false).create();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/test-seek-chunk"}).subscriptionName("inclusive-seek").startMessageIdInclusive().subscribe();
        Consumer subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/test-seek-chunk"}).subscriptionName("default-seek").subscribe();
        for (int i = 0; i < 5; i++) {
            create.send(createMessagePayload(10).getBytes());
        }
        ArrayList newArrayList = Lists.newArrayList();
        for (int i2 = 0; i2 < 5; i2++) {
            Message receive = subscribe.receive(5, TimeUnit.SECONDS);
            log.info("[{}] - Received message: [{}]", Integer.valueOf(i2), new String(receive.getData()));
            newArrayList.add(receive.getMessageId());
        }
        subscribe.seek((MessageId) newArrayList.get(1));
        for (int i3 = 1; i3 < 5; i3++) {
            Assert.assertEquals(newArrayList.get(i3), subscribe.receive(5, TimeUnit.SECONDS).getMessageId());
        }
        subscribe2.seek((MessageId) newArrayList.get(1));
        for (int i4 = 2; i4 < 5; i4++) {
            Assert.assertEquals(newArrayList.get(i4), subscribe2.receive(5, TimeUnit.SECONDS).getMessageId());
        }
        subscribe.close();
        subscribe2.close();
        create.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    private String createMessagePayload(int i) {
        StringBuilder sb = new StringBuilder();
        Random random = new Random();
        for (int i2 = 0; i2 < i; i2++) {
            sb.append(random.nextInt(10));
        }
        return sb.toString();
    }
}
