package org.apache.pulsar.broker.intercept;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/intercept/ExceptionsBrokerInterceptorTest.class */
public class ExceptionsBrokerInterceptorTest extends ProducerConsumerBase {
    private String interceptorName = "exception_interceptor";

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        this.conf.setSystemTopicEnabled(false);
        this.conf.setTopicLevelPoliciesEnabled(false);
        this.conf.setDisableBrokerInterceptors(false);
        this.enableBrokerInterceptor = true;
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
        HashMap hashMap = new HashMap();
        hashMap.put(this.interceptorName, new BrokerInterceptorWithClassLoader(new ExceptionsBrokerInterceptor(), (NarClassLoader) Mockito.mock(NarClassLoader.class)));
        builder.brokerInterceptor(new BrokerInterceptors(hashMap));
    }

    @Test
    public void testMessageAckedExceptions() throws Exception {
        int i = 10;
        this.admin.topics().createNonPartitionedTopic("persistent://public/default/test");
        BrokerInterceptors brokerInterceptor = this.pulsar.getBrokerInterceptor();
        Assert.assertNotNull(brokerInterceptor);
        BrokerInterceptorWithClassLoader brokerInterceptorWithClassLoader = (BrokerInterceptorWithClassLoader) brokerInterceptor.getInterceptors().get(this.interceptorName);
        Assert.assertNotNull(brokerInterceptorWithClassLoader);
        BrokerInterceptor interceptor = brokerInterceptorWithClassLoader.getInterceptor();
        Assert.assertTrue(interceptor instanceof ExceptionsBrokerInterceptor);
        Producer create = this.pulsarClient.newProducer().topic("persistent://public/default/test").create();
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://public/default/test"}).subscriptionName("test-sub").acknowledgmentGroupTime(0L, TimeUnit.MILLISECONDS).isAckReceiptEnabled(true).subscribe();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(((ExceptionsBrokerInterceptor) interceptor).getProducerCount().get() == 1);
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(((ExceptionsBrokerInterceptor) interceptor).getConsumerCount().get() == 1);
        });
        for (int i2 = 0; i2 < 10; i2++) {
            create.send("test".getBytes(StandardCharsets.UTF_8));
        }
        int i3 = 0;
        while (true) {
            Message receive = subscribe.receive(3, TimeUnit.SECONDS);
            if (receive == null) {
                Assert.assertEquals(i3, 10);
                Awaitility.await().until(() -> {
                    return Boolean.valueOf(((ExceptionsBrokerInterceptor) interceptor).getMessageAckCount().get() == i);
                });
                Assert.assertEquals(subscribe.getClientCnx().getDuplicatedResponseCount(), 0L);
                return;
            }
            i3++;
            subscribe.acknowledge(receive);
        }
    }
}
