package org.apache.pulsar.client.impl;

import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-api"})
/* loaded from: input_file:org/apache/pulsar/client/impl/ProducerMemoryLeakTest.class */
public class ProducerMemoryLeakTest extends ProducerConsumerBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ProducerMemoryLeakTest.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/client/impl/ProducerMemoryLeakTest$MsgPayloadTouchableMessageBuilder.class */
    public static class MsgPayloadTouchableMessageBuilder<T> extends TypedMessageBuilderImpl {
        public volatile ByteBuf payload;

        public <T> MsgPayloadTouchableMessageBuilder(ProducerBase producerBase, Schema<T> schema) {
            super(producerBase, schema);
        }

        public Message<T> getMessage() {
            MessageImpl message = super.getMessage();
            this.payload = message.getPayload();
            this.payload.retain();
            return message;
        }

        public void release() {
            this.payload.release();
        }
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass(alwaysRun = true)
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testSendQueueIsFull() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        ProducerImpl create = this.pulsarClient.newProducer(Schema.STRING).blockIfQueueFull(false).maxPendingMessages(1).enableBatching(true).topic(newUniqueName).create();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(newMessage(create));
        }
        CompletableFuture completableFuture = null;
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            completableFuture = ((MsgPayloadTouchableMessageBuilder) it.next()).value("msg-1").sendAsync();
        }
        try {
            completableFuture.join();
        } catch (Exception e) {
            Assert.assertTrue(FutureUtil.unwrapCompletionException(e) instanceof PulsarClientException.ProducerQueueIsFullError);
        }
        create.close();
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            MsgPayloadTouchableMessageBuilder msgPayloadTouchableMessageBuilder = (MsgPayloadTouchableMessageBuilder) arrayList.get(i2);
            Assert.assertEquals(msgPayloadTouchableMessageBuilder.payload.refCnt(), 1);
            msgPayloadTouchableMessageBuilder.release();
            Assert.assertEquals(msgPayloadTouchableMessageBuilder.payload.refCnt(), 0);
        }
        this.admin.topics().delete(newUniqueName);
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "maxMessageSizeAndCompressions")
    public Object[][] maxMessageSizeAndCompressions() {
        return new Object[]{new Object[]{1, CompressionType.NONE}, new Object[]{5, CompressionType.NONE}, new Object[]{1, CompressionType.LZ4}, new Object[]{6, CompressionType.LZ4}};
    }

    @Test(dataProvider = "maxMessageSizeAndCompressions")
    public void testSendMessageSizeExceeded(int i, CompressionType compressionType) throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        ProducerImpl create = this.pulsarClient.newProducer(Schema.STRING).topic(newUniqueName).compressionType(compressionType).enableBatching(false).create();
        create.getConnectionHandler().setMaxMessageSize(i);
        MsgPayloadTouchableMessageBuilder newMessage = newMessage(create);
        MockedStatic mockStatic = Mockito.mockStatic(ByteBufPair.class);
        try {
            List<ByteBufPair> synchronizedList = Collections.synchronizedList(new ArrayList());
            mockStatic.when(() -> {
                ByteBufPair.get((ByteBuf) ArgumentMatchers.any(ByteBuf.class), (ByteBuf) ArgumentMatchers.any(ByteBuf.class));
            }).then(invocationOnMock -> {
                ByteBufPair byteBufPair = (ByteBufPair) invocationOnMock.callRealMethod();
                synchronizedList.add(byteBufPair);
                byteBufPair.retain();
                return byteBufPair;
            });
            try {
                newMessage.value("msg-1").send();
                Assert.fail("expected an error that reached the max message size");
            } catch (Exception e) {
                Assert.assertTrue(FutureUtil.unwrapCompletionException(e) instanceof PulsarClientException.InvalidMessageException);
            }
            create.close();
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(create.getPendingQueueSize(), 0);
            });
            if (i == 1) {
                Assert.assertEquals(synchronizedList.size(), 0);
            } else {
                Assert.assertEquals(synchronizedList.size(), 1);
                if (compressionType == CompressionType.NONE) {
                    Assert.assertEquals(newMessage.payload.refCnt(), 2);
                } else {
                    Assert.assertEquals(newMessage.payload.refCnt(), 1);
                }
                for (ByteBufPair byteBufPair : synchronizedList) {
                    Assert.assertEquals(byteBufPair.refCnt(), 1);
                    byteBufPair.release();
                    Assert.assertEquals(byteBufPair.refCnt(), 0);
                }
            }
            Assert.assertEquals(newMessage.payload.refCnt(), 1);
            newMessage.release();
            Assert.assertEquals(newMessage.payload.refCnt(), 0);
            if (mockStatic != null) {
                mockStatic.close();
            }
            Assert.assertEquals(newMessage.payload.refCnt(), 0);
            this.admin.topics().delete(newUniqueName);
        } catch (Throwable th) {
            if (mockStatic != null) {
                try {
                    mockStatic.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "maxMessageSizes")
    public Object[][] maxMessageSizes() {
        return new Object[]{new Object[]{1}, new Object[]{3}, new Object[]{26}};
    }

    @Test(dataProvider = "maxMessageSizes")
    public void testBatchedSendMessageSizeExceeded(int i) throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        ProducerImpl create = this.pulsarClient.newProducer(Schema.STRING).topic(newUniqueName).enableBatching(true).compressionType(CompressionType.NONE).create();
        ClientCnx clientCnx = create.getClientCnx();
        create.getConnectionHandler().setMaxMessageSize(i);
        MsgPayloadTouchableMessageBuilder newMessage = newMessage(create);
        MsgPayloadTouchableMessageBuilder newMessage2 = newMessage(create);
        newMessage.value("msg-1").sendAsync();
        try {
            newMessage2.value("msg-1").send();
            if (i != 26) {
                Assert.fail("expected an error that reached the max message size");
            }
        } catch (Exception e) {
            Assert.assertTrue(FutureUtil.unwrapCompletionException(e) instanceof PulsarClientException.InvalidMessageException);
        }
        create.close();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(create.getPendingQueueSize(), 0);
        });
        Assert.assertEquals(newMessage.payload.refCnt(), 1);
        Assert.assertEquals(newMessage2.payload.refCnt(), 1);
        clientCnx.ctx().close();
        newMessage.release();
        newMessage2.release();
        Assert.assertEquals(newMessage.payload.refCnt(), 0);
        Assert.assertEquals(newMessage2.payload.refCnt(), 0);
        this.admin.topics().delete(newUniqueName);
    }

    @Test
    public void testSendAfterClosedProducer() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        ProducerImpl create = this.pulsarClient.newProducer(Schema.STRING).topic(newUniqueName).create();
        MsgPayloadTouchableMessageBuilder newMessage = newMessage(create);
        create.close();
        try {
            newMessage.value("msg-1").send();
            Assert.fail("expected an error that the producer has closed");
        } catch (Exception e) {
            Assert.assertTrue(FutureUtil.unwrapCompletionException(e) instanceof PulsarClientException.AlreadyClosedException);
        }
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(create.getPendingQueueSize(), 0);
        });
        Assert.assertEquals(newMessage.payload.refCnt(), 1);
        newMessage.release();
        Assert.assertEquals(newMessage.payload.refCnt(), 0);
        this.admin.topics().delete(newUniqueName);
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider
    public Object[][] failedInterceptAt() {
        return new Object[]{new Object[]{"close"}, new Object[]{"eligible"}, new Object[]{"beforeSend"}, new Object[]{"onSendAcknowledgement"}};
    }

    @Test(dataProvider = "failedInterceptAt")
    public void testInterceptorError(final String str) throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        ProducerImpl create = this.pulsarClient.newProducer(Schema.STRING).topic(newUniqueName).intercept(new ProducerInterceptor[]{new ProducerInterceptor() { // from class: org.apache.pulsar.client.impl.ProducerMemoryLeakTest.1
            public void close() {
                if (str.equals("close")) {
                    throw new RuntimeException("Mocked error");
                }
            }

            public boolean eligible(Message message) {
                if (str.equals("eligible")) {
                    throw new RuntimeException("Mocked error");
                }
                return false;
            }

            public Message beforeSend(Producer producer, Message message) {
                if (str.equals("beforeSend")) {
                    throw new RuntimeException("Mocked error");
                }
                return message;
            }

            public void onSendAcknowledgement(Producer producer, Message message, MessageId messageId, Throwable th) {
                if (str.equals("onSendAcknowledgement")) {
                    throw new RuntimeException("Mocked error");
                }
            }
        }}).create();
        MsgPayloadTouchableMessageBuilder newMessage = newMessage(create);
        try {
            newMessage.value("msg-1").sendAsync().get(3L, TimeUnit.SECONDS);
        } catch (Exception e) {
            Assert.assertTrue(e.getMessage().contains("Mocked"));
        }
        create.close();
        Assert.assertEquals(newMessage.payload.refCnt(), 1);
        newMessage.release();
        Assert.assertEquals(newMessage.payload.refCnt(), 0);
        this.admin.topics().delete(newUniqueName);
    }

    private <T> MsgPayloadTouchableMessageBuilder<T> newMessage(ProducerImpl<T> producerImpl) {
        return new MsgPayloadTouchableMessageBuilder<>(producerImpl, producerImpl.schema);
    }
}
