package org.apache.pulsar.client.processor;

import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.processor.CustomBatchFormat;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/client/processor/MessagePayloadProcessorTest.class */
public class MessagePayloadProcessorTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(MessagePayloadProcessorTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("public", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid"}), Sets.newHashSet(new String[]{"test"})));
        this.admin.namespaces().createNamespace("public/default", Sets.newHashSet(new String[]{"test"}));
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider
    public static Object[][] config() {
        return new Object[]{new Object[]{1, true, 1}, new Object[]{1, true, 4}, new Object[]{1, false, 1}, new Object[]{3, false, 1}};
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider
    public static Object[][] customBatchConfig() {
        return new Object[]{new Object[]{10, 1}, new Object[]{10, 4}};
    }

    /* JADX WARN: Finally extract failed */
    @Test(dataProvider = "config")
    public void testDefaultProcessor(int i, boolean z, int i2) throws Exception {
        String str = "testDefaultProcessor-" + i + "-" + z + "-" + i2;
        if (i > 1) {
            this.admin.topics().createPartitionedTopic(str, i);
        }
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).enableBatching(z).batchingMaxMessages(i2).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).messageRouter(new MessageRouter() { // from class: org.apache.pulsar.client.processor.MessagePayloadProcessorTest.1
            int i = 0;

            public int choosePartition(Message<?> message, TopicMetadata topicMetadata) {
                int i3 = this.i;
                this.i = i3 + 1;
                return i3 % topicMetadata.numPartitions();
            }
        }).create();
        for (int i3 = 0; i3 < 10; i3++) {
            try {
                String str2 = "msg-" + i3;
                create.sendAsync(str2).whenComplete((messageId, th) -> {
                    if (th == null) {
                        log.info("Send {} to {} {}", new Object[]{str2, str, messageId});
                    } else {
                        log.error("Failed to send {}: {}", str2, th.getMessage());
                    }
                });
            } finally {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            }
        }
        DefaultProcessorWithRefCnt defaultProcessorWithRefCnt = new DefaultProcessorWithRefCnt();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).messagePayloadProcessor(defaultProcessorWithRefCnt).subscribe();
        try {
            ArrayList arrayList = new ArrayList();
            for (int i4 = 0; i4 < 10; i4++) {
                Message receive = subscribe.receive(1, TimeUnit.SECONDS);
                Assert.assertNotNull(receive);
                arrayList.add(receive.getValue());
                subscribe.acknowledge(receive.getMessageId());
            }
            if (i > 1) {
                Collections.sort(arrayList);
            }
            for (int i5 = 0; i5 < 10; i5++) {
                Assert.assertEquals((String) arrayList.get(i5), "msg-" + i5);
            }
            if (z) {
                Assert.assertEquals(defaultProcessorWithRefCnt.getTotalRefCnt(), 2 * ((10 / i2) + (10 % i2 == 0 ? 0 : 1)));
            } else {
                Assert.assertEquals(defaultProcessorWithRefCnt.getTotalRefCnt(), 20);
            }
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
            throw th2;
        }
    }

    @Test
    public void testCustomBatchFormat() {
        ArrayList<List> arrayList = new ArrayList();
        arrayList.add(Collections.emptyList());
        arrayList.add(Collections.singletonList("java"));
        arrayList.add(Arrays.asList("hello", "world", "java"));
        for (List list : arrayList) {
            ByteBuf serialize = CustomBatchFormat.serialize(list);
            CustomBatchFormat.Metadata readMetadata = CustomBatchFormat.readMetadata(serialize);
            ArrayList arrayList2 = new ArrayList();
            for (int i = 0; i < readMetadata.getNumMessages(); i++) {
                arrayList2.add(Schema.STRING.decode(CustomBatchFormat.readMessage(serialize)));
            }
            Assert.assertEquals(arrayList2, list);
            Assert.assertEquals(arrayList2.size(), list.size());
            Assert.assertEquals(serialize.refCnt(), 1);
            serialize.release();
        }
    }

    @Test(dataProvider = "customBatchConfig")
    public void testCustomProcessor(int i, int i2) throws Exception {
        String str = "persistent://public/default/testCustomProcessor-" + i + "-" + i2;
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).messagePayloadProcessor(new CustomBatchPayloadProcessor()).subscribe();
        try {
            PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).orElse(null);
            Assert.assertNotNull(persistentTopic);
            CustomBatchProducer customBatchProducer = new CustomBatchProducer(persistentTopic, i2);
            for (int i3 = 0; i3 < i; i3++) {
                customBatchProducer.sendAsync("msg-" + i3);
            }
            customBatchProducer.flush();
            for (int i4 = 0; i4 < i; i4++) {
                Message receive = subscribe.receive(1, TimeUnit.SECONDS);
                Assert.assertNotNull(receive);
                Assert.assertEquals((String) receive.getValue(), "msg-" + i4);
                subscribe.acknowledge(receive.getMessageId());
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }
}
