/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker-impl"})
public class MessageChunkingTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(MessageChunkingTest.class);

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider(name="ackReceiptEnabled")
    public Object[][] ackReceiptEnabled() {
        return new Object[][]{{true}, {false}};
    }

    @Test
    public void testInvalidConfig() throws Exception {
        String topicName = "persistent://my-property/my-ns/my-topic1";
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1");
        try {
            Producer producer = producerBuilder.enableChunking(true).enableBatching(true).create();
            Assert.fail((String)"producer creation should have fail");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test(dataProvider="ackReceiptEnabled")
    public void testLargeMessage(boolean ackReceiptEnabled) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        this.conf.setMaxMessageSize(5);
        int totalMessages = 5;
        String topicName = "persistent://my-property/my-ns/my-topic1";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").isAckReceiptEnabled(ackReceiptEnabled).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1");
        Producer producer = producerBuilder.compressionType(CompressionType.LZ4).enableChunking(true).enableBatching(false).create();
        PersistentTopic topic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists("persistent://my-property/my-ns/my-topic1").get()).get();
        ArrayList publishedMessages = Lists.newArrayList();
        for (int i = 0; i < 5; ++i) {
            String message = this.createMessagePayload(i * 10);
            publishedMessages.add(message);
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        ArrayList msgIds = Lists.newArrayList();
        for (int i = 0; i < 5; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.info("[{}] - Published [{}] Received message: [{}]", new Object[]{i, publishedMessages.get(i), receivedMessage});
            String expectedMessage = (String)publishedMessages.get(i);
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
            msgIds.add(msg.getMessageId());
        }
        this.pulsar.getBrokerService().updateRates();
        PublisherStats producerStats = (PublisherStats)topic.getStats((boolean)false, (boolean)false).publishers.get(0);
        Assert.assertTrue((producerStats.chunkedMessageRate > 0.0 ? 1 : 0) != 0);
        ManagedCursorImpl mcursor = (ManagedCursorImpl)topic.getManagedLedger().getCursors().iterator().next();
        PositionImpl readPosition = (PositionImpl)mcursor.getReadPosition();
        for (MessageId msgId : msgIds) {
            consumer.acknowledge(msgId);
        }
        MessageChunkingTest.retryStrategically(test -> mcursor.getMarkDeletedPosition().getNext().equals(readPosition), 5, 200L);
        Assert.assertEquals((Object)readPosition, (Object)mcursor.getMarkDeletedPosition().getNext());
        Assert.assertEquals((long)readPosition.getEntryId(), (long)((ConsumerImpl)consumer).getAvailablePermits());
        consumer.close();
        producer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test(dataProvider="ackReceiptEnabled")
    public void testLargeMessageAckTimeOut(boolean ackReceiptEnabled) throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        this.conf.setMaxMessageSize(5);
        int totalMessages = 5;
        String topicName = "persistent://my-property/my-ns/my-topic1";
        ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/my-topic1"}).subscriptionName("my-subscriber-name").acknowledgmentGroupTime(0L, TimeUnit.SECONDS).isAckReceiptEnabled(ackReceiptEnabled).ackTimeout(5L, TimeUnit.SECONDS).subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1");
        Producer producer = producerBuilder.enableChunking(true).enableBatching(false).create();
        PersistentTopic topic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists("persistent://my-property/my-ns/my-topic1").get()).get();
        ArrayList publishedMessages = Lists.newArrayList();
        for (int i = 0; i < 5; ++i) {
            String message = this.createMessagePayload(i * 10);
            publishedMessages.add(message);
            producer.send((Object)message.getBytes());
        }
        Message msg = null;
        HashSet messageSet = Sets.newHashSet();
        for (int i = 0; i < 5; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            log.info("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = (String)publishedMessages.get(i);
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        MessageChunkingTest.retryStrategically(test -> consumer.getUnAckedMessageTracker().messageIdPartitionMap.isEmpty(), 10, TimeUnit.SECONDS.toMillis(1L));
        msg = null;
        messageSet.clear();
        MessageId lastMsgId = null;
        for (int i = 0; i < 5; ++i) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            lastMsgId = msg.getMessageId();
            String receivedMessage = new String(msg.getData());
            log.info("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = (String)publishedMessages.get(i);
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        ManagedCursorImpl mcursor = (ManagedCursorImpl)topic.getManagedLedger().getCursors().iterator().next();
        PositionImpl readPosition = (PositionImpl)mcursor.getReadPosition();
        consumer.acknowledgeCumulative(lastMsgId);
        MessageChunkingTest.retryStrategically(test -> mcursor.getMarkDeletedPosition().getNext().equals(readPosition), 5, 200L);
        Assert.assertEquals((Object)readPosition, (Object)mcursor.getMarkDeletedPosition().getNext());
        consumer.close();
        producer.close();
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testPublishWithFailure() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        this.conf.setMaxMessageSize(5);
        String topicName = "persistent://my-property/my-ns/my-topic1";
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1");
        Producer producer = producerBuilder.enableChunking(true).enableBatching(false).create();
        this.stopBroker();
        try {
            producer.send((Object)this.createMessagePayload(100).getBytes());
            Assert.fail((String)"should have failed with timeout exception");
        }
        catch (PulsarClientException.TimeoutException timeoutException) {
            // empty catch block
        }
        producer.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(enabled=false)
    public void testMaxPendingChunkMessages() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        this.conf.setMaxMessageSize(10);
        int totalMessages = 25;
        String topicName = "persistent://my-property/my-ns/maxPending";
        int totalProducers = 25;
        ExecutorService executor = Executors.newFixedThreadPool(25);
        try {
            ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/maxPending"}).subscriptionName("my-subscriber-name").acknowledgmentGroupTime(0L, TimeUnit.SECONDS).maxPendingChunkedMessage(1).autoAckOldestChunkedMessageOnQueueFull(true).ackTimeout(5L, TimeUnit.SECONDS).subscribe();
            ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/maxPending");
            Producer[] producers = new Producer[25];
            int totalPublishedMessages = 25;
            ArrayList futures = Lists.newArrayList();
            int i = 0;
            while (i < 25) {
                producers[i] = producerBuilder.enableChunking(true).enableBatching(false).create();
                int index = i++;
                executor.submit(() -> futures.add(producers[index].sendAsync((Object)this.createMessagePayload(45).getBytes())));
            }
            FutureUtil.waitForAll((List)futures).get();
            PersistentTopic topic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists("persistent://my-property/my-ns/maxPending").get()).get();
            Message msg = null;
            HashSet messageSet = Sets.newHashSet();
            for (int i2 = 0; i2 < 25 && (msg = consumer.receive(1, TimeUnit.SECONDS)) != null; ++i2) {
                String receivedMessage = new String(msg.getData());
                log.info("Received message: [{}]", (Object)receivedMessage);
                messageSet.add(receivedMessage);
                consumer.acknowledge(msg);
            }
            Assert.assertNotEquals((Object)messageSet.size(), (Object)totalPublishedMessages);
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    @Test
    public void testInvalidUseCaseForChunking() throws Exception {
        log.info("-- Starting {} test --", (Object)this.methodName);
        this.conf.setMaxMessageSize(5);
        String topicName = "persistent://my-property/my-ns/my-topic1";
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1");
        try {
            Producer producer = producerBuilder.enableChunking(true).enableBatching(true).create();
            Assert.fail((String)"it should have failied because chunking can't be used with batching enabled");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }

    @Test
    public void testExpireIncompleteChunkMessage() throws Exception {
        String topicName = "persistent://prop/use/ns-abc/expireMsg";
        ProducerImpl producer = (ProducerImpl)this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/expireMsg").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).sendTimeout(10, TimeUnit.MINUTES).create();
        Field producerIdField = ProducerImpl.class.getDeclaredField("producerId");
        producerIdField.setAccessible(true);
        long producerId = (Long)producerIdField.get(producer);
        producer.cnx().registerProducer(producerId, producer);
        ConsumerImpl consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/expireMsg"}).subscriptionName("my-sub").subscribe();
        TypedMessageBuilderImpl msg = (TypedMessageBuilderImpl)producer.newMessage().value((Object)"message-1".getBytes());
        ByteBuf payload = Unpooled.wrappedBuffer((ByteBuffer)msg.getContent());
        MessageMetadata msgMetadata = msg.getMetadataBuilder();
        msgMetadata.setProducerName("test").setSequenceId(1L).setPublishTime(10L).setUuid("123").setNumChunksFromMsg(2).setChunkId(0).setTotalChunkMsgSize(100);
        ByteBufPair cmd = Commands.newSend((long)producerId, (long)1L, (int)1, (Commands.ChecksumType)Commands.ChecksumType.Crc32c, (MessageMetadata)msgMetadata, (ByteBuf)payload);
        MessageImpl msgImpl = (MessageImpl)msg.getMessage();
        msgImpl.setSchemaState(MessageImpl.SchemaState.Ready);
        ProducerImpl.OpSendMsg op = ProducerImpl.OpSendMsg.create((MessageImpl)msgImpl, (ByteBufPair)cmd, (long)1L, null);
        producer.processOpSendMsg(op);
        MessageChunkingTest.retryStrategically(test -> consumer.chunkedMessagesMap.size() > 0L, 5, 500L);
        Assert.assertEquals((long)consumer.chunkedMessagesMap.size(), (long)1L);
        consumer.expireTimeOfIncompleteChunkedMessageMillis = 1L;
        Thread.sleep(10L);
        consumer.removeExpireIncompleteChunkedMessages();
        Assert.assertEquals((long)consumer.chunkedMessagesMap.size(), (long)0L);
        producer.close();
        consumer.close();
        producer = null;
    }

    private String createMessagePayload(int size) {
        StringBuilder str = new StringBuilder();
        Random rand = new Random();
        for (int i = 0; i < size; ++i) {
            str.append(rand.nextInt(10));
        }
        return str.toString();
    }
}

