package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.schema.SchemaType;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-api"})
/* loaded from: input_file:org/apache/pulsar/client/api/InterceptorsTest.class */
public class InterceptorsTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(InterceptorsTest.class);

    /* renamed from: org.apache.pulsar.client.api.InterceptorsTest$1BaseInterceptor, reason: invalid class name */
    /* loaded from: input_file:org/apache/pulsar/client/api/InterceptorsTest$1BaseInterceptor.class */
    abstract class C1BaseInterceptor implements ProducerInterceptor {
        private static final String set = "set";
        private String tag;
        final /* synthetic */ Map val$ackCallback;

        private C1BaseInterceptor(String str, Map map) {
            this.val$ackCallback = map;
            this.tag = str;
        }

        public void close() {
        }

        public Message beforeSend(Producer producer, Message message) {
            ((MessageImpl) message).getMessageBuilder().addProperty().setKey(this.tag).setValue(set);
            return message;
        }

        public void onSendAcknowledgement(Producer producer, Message message, MessageId messageId, Throwable th) {
            if (set.equals(message.getProperties().get(this.tag))) {
                ((List) this.val$ackCallback.computeIfAbsent(messageId, messageId2 -> {
                    return new ArrayList();
                })).add(this.tag);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.pulsar.client.api.InterceptorsTest$1ReaderInterceptorImpl, reason: invalid class name */
    /* loaded from: input_file:org/apache/pulsar/client/api/InterceptorsTest$1ReaderInterceptorImpl.class */
    public class C1ReaderInterceptorImpl implements ReaderInterceptor<byte[]> {
        final int maxReadCount;
        final AtomicInteger beforeReadCount = new AtomicInteger();
        final AtomicInteger newPartition = new AtomicInteger(0);
        final AtomicBoolean encounterException = new AtomicBoolean(false);

        C1ReaderInterceptorImpl(int i) {
            this.maxReadCount = i;
        }

        public void close() {
        }

        public Message<byte[]> beforeRead(Reader<byte[]> reader, Message<byte[]> message) {
            if (this.beforeReadCount.get() > this.maxReadCount) {
                this.encounterException.set(true);
                throw new RuntimeException("The read count exceed maxReadCount - " + this.maxReadCount);
            }
            this.beforeReadCount.incrementAndGet();
            return message;
        }

        public void onPartitionsChange(String str, int i) {
            this.newPartition.set(i);
        }
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "receiverQueueSize")
    public Object[][] getReceiverQueueSize() {
        return new Object[]{new Object[]{0}, new Object[]{1000}};
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "topicPartition")
    public Object[][] getTopicPartition() {
        return new Object[]{new Object[]{0}, new Object[]{3}};
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "topics")
    public Object[][] getTopics() {
        return new Object[]{new Object[]{List.of("persistent://my-property/my-ns/my-topic")}, new Object[]{List.of("persistent://my-property/my-ns/my-topic", "persistent://my-property/my-ns/my-topic1")}};
    }

    @Test
    public void testProducerInterceptor() throws Exception {
        HashMap hashMap = new HashMap();
        String str = "my-property/my-ns" + RandomUtils.nextInt(999, 1999);
        this.admin.namespaces().createNamespace(str, Sets.newHashSet(new String[]{"test"}));
        this.admin.namespaces().setSchemaCompatibilityStrategy(str, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
        C1BaseInterceptor c1BaseInterceptor = new C1BaseInterceptor("int1", hashMap) { // from class: org.apache.pulsar.client.api.InterceptorsTest.1
            final /* synthetic */ Map val$ackCallback;

            {
                this.val$ackCallback = hashMap;
            }

            public boolean eligible(Message message) {
                return true;
            }
        };
        C1BaseInterceptor c1BaseInterceptor2 = new C1BaseInterceptor("int2", hashMap) { // from class: org.apache.pulsar.client.api.InterceptorsTest.2
            final /* synthetic */ Map val$ackCallback;

            {
                this.val$ackCallback = hashMap;
            }

            public boolean eligible(Message message) {
                return SchemaType.STRING.equals(((MessageImpl) message).getSchemaInternal().getSchemaInfo().getType());
            }
        };
        C1BaseInterceptor c1BaseInterceptor3 = new C1BaseInterceptor("int3", hashMap) { // from class: org.apache.pulsar.client.api.InterceptorsTest.3
            final /* synthetic */ Map val$ackCallback;

            {
                this.val$ackCallback = hashMap;
            }

            public boolean eligible(Message message) {
                return SchemaType.INT32.equals(((MessageImpl) message).getSchemaInternal().getSchemaInfo().getType());
            }
        };
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://" + str + "/my-topic").intercept(new ProducerInterceptor[]{c1BaseInterceptor, c1BaseInterceptor2, c1BaseInterceptor3}).create();
        MessageId send = create.newMessage().property("STR", "Y").value("Hello Pulsar!").send();
        Assert.assertEquals((Collection) hashMap.get(send), Arrays.asList(c1BaseInterceptor.tag, c1BaseInterceptor2.tag));
        log.info("Send result messageId: {}", send);
        MessageId send2 = create.newMessage(Schema.INT32).property("INT", "Y").value(18).send();
        Assert.assertEquals((Collection) hashMap.get(send2), Arrays.asList(c1BaseInterceptor.tag, c1BaseInterceptor3.tag));
        log.info("Send result messageId: {}", send2);
        create.close();
    }

    @Test
    public void testProducerInterceptorsWithExceptions() throws PulsarClientException {
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").intercept(new ProducerInterceptor[]{new ProducerInterceptor<String>() { // from class: org.apache.pulsar.client.api.InterceptorsTest.4
            public void close() {
            }

            public Message<String> beforeSend(Producer<String> producer, Message<String> message) {
                throw new IllegalArgumentException();
            }

            public void onSendAcknowledgement(Producer<String> producer, Message<String> message, MessageId messageId, Throwable th) {
                throw new IllegalArgumentException();
            }
        }}).create();
        Assert.assertNotNull(create.newMessage().value("Hello Pulsar!").send());
        create.close();
    }

    @Test
    public void testProducerInterceptorsWithErrors() throws PulsarClientException {
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").intercept(new ProducerInterceptor[]{new ProducerInterceptor<String>() { // from class: org.apache.pulsar.client.api.InterceptorsTest.5
            public void close() {
            }

            public Message<String> beforeSend(Producer<String> producer, Message<String> message) {
                throw new AbstractMethodError();
            }

            public void onSendAcknowledgement(Producer<String> producer, Message<String> message, MessageId messageId, Throwable th) {
                throw new AbstractMethodError();
            }
        }}).create();
        Assert.assertNotNull(create.newMessage().value("Hello Pulsar!").send());
        create.close();
    }

    @Test
    public void testConsumerInterceptorWithErrors() throws PulsarClientException {
        ConsumerInterceptor<String> consumerInterceptor = new ConsumerInterceptor<String>() { // from class: org.apache.pulsar.client.api.InterceptorsTest.6
            public void close() {
                throw new AbstractMethodError();
            }

            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
                throw new AbstractMethodError();
            }

            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable th) {
                throw new AbstractMethodError();
            }

            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable th) {
                throw new AbstractMethodError();
            }

            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> set) {
                throw new AbstractMethodError();
            }

            public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> set) {
                throw new AbstractMethodError();
            }
        };
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/my-topic-exception"}).subscriptionType(SubscriptionType.Shared).intercept(new ConsumerInterceptor[]{consumerInterceptor}).subscriptionName("my-subscription-ack-timeout").ackTimeout(3L, TimeUnit.SECONDS).subscribe();
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/my-topic-exception"}).subscriptionType(SubscriptionType.Shared).intercept(new ConsumerInterceptor[]{consumerInterceptor}).subscriptionName("my-subscription-negative").subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic-exception").create();
        create.newMessage().value("Hello Pulsar!").send();
        Assert.assertEquals((String) subscribe.receive().getValue(), "Hello Pulsar!");
        Message receive = subscribe.receive();
        Assert.assertEquals((String) receive.getValue(), "Hello Pulsar!");
        subscribe.acknowledge(receive);
        Message receive2 = subscribe2.receive();
        Assert.assertEquals((String) receive2.getValue(), "Hello Pulsar!");
        subscribe2.negativeAcknowledge(receive2);
        Message receive3 = subscribe2.receive();
        Assert.assertEquals((String) receive3.getValue(), "Hello Pulsar!");
        subscribe2.acknowledge(receive3);
        create.close();
        subscribe.close();
        subscribe2.close();
    }

    @Test(dataProvider = "receiverQueueSize")
    public void testConsumerInterceptorWithSingleTopicSubscribe(Integer num) throws Exception {
        ConsumerInterceptor<String> consumerInterceptor = new ConsumerInterceptor<String>() { // from class: org.apache.pulsar.client.api.InterceptorsTest.7
            public void close() {
            }

            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
                MessageImpl messageImpl = (MessageImpl) message;
                messageImpl.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
                return messageImpl;
            }

            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable th) {
                InterceptorsTest.log.info("onAcknowledge messageId: {}", messageId, th);
            }

            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable th) {
                InterceptorsTest.log.info("onAcknowledgeCumulative messageIds: {}", messageId, th);
            }

            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> set) {
            }

            public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> set) {
            }
        };
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/my-topic"}).subscriptionType(SubscriptionType.Shared).intercept(new ConsumerInterceptor[]{consumerInterceptor}).subscriptionName("my-subscription").receiverQueueSize(num.intValue()).subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").enableBatching(false).create();
        create.newMessage().value("Hello Pulsar!").send();
        MessageImpl receive = subscribe.receive();
        boolean z = false;
        Iterator it = receive.getMessageBuilder().getPropertiesList().iterator();
        while (it.hasNext()) {
            if ("beforeConsumer".equals(((KeyValue) it.next()).getKey())) {
                z = true;
            }
        }
        Assert.assertTrue(z);
        subscribe.acknowledge(receive);
        create.newMessage().value("Hello Pulsar!").send();
        MessageImpl messageImpl = (Message) subscribe.receiveAsync().get();
        boolean z2 = false;
        Iterator it2 = messageImpl.getMessageBuilder().getPropertiesList().iterator();
        while (it2.hasNext()) {
            if ("beforeConsumer".equals(((KeyValue) it2.next()).getKey())) {
                z2 = true;
            }
        }
        Assert.assertTrue(z2);
        subscribe.acknowledge(messageImpl);
        subscribe.close();
        CompletableFuture completableFuture = new CompletableFuture();
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/my-topic"}).subscriptionType(SubscriptionType.Shared).intercept(new ConsumerInterceptor[]{consumerInterceptor}).subscriptionName("my-subscription").receiverQueueSize(num.intValue()).messageListener((consumer, message) -> {
            try {
                consumer.acknowledge(message);
            } catch (Exception e) {
                Assert.fail("Failed to acknowledge", e);
            }
            completableFuture.complete(message);
        }).subscribe();
        create.newMessage().value("Hello Pulsar!").send();
        boolean z3 = false;
        Iterator it3 = ((Message) completableFuture.get()).getMessageBuilder().getPropertiesList().iterator();
        while (it3.hasNext()) {
            if ("beforeConsumer".equals(((KeyValue) it3.next()).getKey())) {
                z3 = true;
            }
        }
        Assert.assertTrue(z3);
        create.close();
        subscribe2.close();
    }

    @Test
    public void testConsumerInterceptorWithMultiTopicSubscribe() throws PulsarClientException {
        ConsumerInterceptor<String> consumerInterceptor = new ConsumerInterceptor<String>() { // from class: org.apache.pulsar.client.api.InterceptorsTest.8
            public void close() {
            }

            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
                ((TopicMessageImpl) message).getMessage().getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
                return message;
            }

            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable th) {
                InterceptorsTest.log.info("onAcknowledge messageId: {}", messageId, th);
            }

            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable th) {
                InterceptorsTest.log.info("onAcknowledgeCumulative messageIds: {}", messageId, th);
            }

            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> set) {
            }

            public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> set) {
            }
        };
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").create();
        Producer create2 = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic1").create();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/my-topic", "persistent://my-property/my-ns/my-topic1"}).subscriptionType(SubscriptionType.Shared).intercept(new ConsumerInterceptor[]{consumerInterceptor}).subscriptionName("my-subscription").subscribe();
        create.newMessage().value("Hello Pulsar!").send();
        create2.newMessage().value("Hello Pulsar!").send();
        int i = 0;
        for (int i2 = 0; i2 < 2; i2++) {
            Message receive = i2 % 2 == 0 ? subscribe.receive() : (Message) subscribe.receiveAsync().join();
            Iterator it = ((TopicMessageImpl) receive).getMessage().getMessageBuilder().getPropertiesList().iterator();
            while (it.hasNext()) {
                if ("beforeConsumer".equals(((KeyValue) it.next()).getKey())) {
                    i++;
                }
            }
            Assert.assertEquals(i, i2 + 1);
            subscribe.acknowledge(receive);
        }
        Assert.assertEquals(2, i);
        create.close();
        create2.close();
        subscribe.close();
    }

    @Test
    public void testConsumerInterceptorWithPatternTopicSubscribe() throws PulsarClientException {
        ConsumerInterceptor<String> consumerInterceptor = new ConsumerInterceptor<String>() { // from class: org.apache.pulsar.client.api.InterceptorsTest.9
            public void close() {
            }

            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
                ((TopicMessageImpl) message).getMessage().getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
                return message;
            }

            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable th) {
                InterceptorsTest.log.info("onAcknowledge messageId: {}", messageId, th);
            }

            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable th) {
                InterceptorsTest.log.info("onAcknowledgeCumulative messageIds: {}", messageId, th);
            }

            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> set) {
            }

            public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> set) {
            }
        };
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").create();
        Producer create2 = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic1").create();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topicsPattern("persistent://my-property/my-ns/my-.*").subscriptionType(SubscriptionType.Shared).intercept(new ConsumerInterceptor[]{consumerInterceptor}).subscriptionName("my-subscription").subscribe();
        create.newMessage().value("Hello Pulsar!").send();
        create2.newMessage().value("Hello Pulsar!").send();
        int i = 0;
        for (int i2 = 0; i2 < 2; i2++) {
            TopicMessageImpl receive = subscribe.receive();
            Iterator it = receive.getMessage().getMessageBuilder().getPropertiesList().iterator();
            while (it.hasNext()) {
                if ("beforeConsumer".equals(((KeyValue) it.next()).getKey())) {
                    i++;
                }
            }
            subscribe.acknowledge(receive);
        }
        Assert.assertEquals(2, i);
        create.close();
        create2.close();
        subscribe.close();
    }

    @Test
    public void testConsumerInterceptorForAcknowledgeCumulative() throws PulsarClientException {
        final ArrayList arrayList = new ArrayList();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/my-topic"}).subscriptionType(SubscriptionType.Failover).intercept(new ConsumerInterceptor[]{new ConsumerInterceptor<String>() { // from class: org.apache.pulsar.client.api.InterceptorsTest.10
            public void close() {
            }

            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
                MessageImpl messageImpl = (MessageImpl) message;
                messageImpl.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
                return messageImpl;
            }

            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable th) {
                InterceptorsTest.log.info("onAcknowledge messageId: {}", messageId, th);
            }

            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable th) {
                Assert.assertEquals(arrayList.stream().filter(messageId2 -> {
                    return messageId2.compareTo(messageId) <= 0;
                }).count(), 100L);
                arrayList.clear();
                InterceptorsTest.log.info("onAcknowledgeCumulative messageIds: {}", messageId, th);
            }

            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> set) {
            }

            public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> set) {
            }
        }}).subscriptionName("my-subscription").subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").create();
        for (int i = 0; i < 100; i++) {
            create.newMessage().value("Hello Pulsar!").send();
        }
        int i2 = 0;
        for (int i3 = 0; i3 < 100; i3++) {
            MessageImpl receive = subscribe.receive();
            Iterator it = receive.getMessageBuilder().getPropertiesList().iterator();
            while (it.hasNext()) {
                if ("beforeConsumer".equals(((KeyValue) it.next()).getKey())) {
                    i2++;
                }
            }
            arrayList.add(receive.getMessageId());
            if (i3 == 99) {
                subscribe.acknowledgeCumulative(receive);
            }
        }
        Assert.assertEquals(100, i2);
        create.close();
        subscribe.close();
    }

    @Test(dataProvider = "topics")
    public void testConsumerInterceptorForNegativeAcksSend(List<String> list) throws PulsarClientException, InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(50);
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topics(list).subscriptionType(SubscriptionType.Failover).intercept(new ConsumerInterceptor[]{new ConsumerInterceptor<String>() { // from class: org.apache.pulsar.client.api.InterceptorsTest.11
            public void close() {
            }

            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
                return message;
            }

            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable th) {
            }

            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable th) {
            }

            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> set) {
                Assert.assertTrue(countDownLatch.getCount() > 0);
                CountDownLatch countDownLatch2 = countDownLatch;
                set.forEach(messageId -> {
                    countDownLatch2.countDown();
                });
            }

            public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> set) {
            }
        }}).negativeAckRedeliveryDelay(100L, TimeUnit.MILLISECONDS).subscriptionName("my-subscription").subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(list.get(0)).create();
        for (int i = 0; i < 100; i++) {
            create.send("Mock message");
        }
        for (int i2 = 0; i2 < 100; i2++) {
            Message receive = subscribe.receive();
            if (i2 % 2 == 0) {
                subscribe.negativeAcknowledge(receive);
            } else {
                subscribe.acknowledge(receive);
            }
        }
        countDownLatch.await();
        Assert.assertEquals(countDownLatch.getCount(), 0L);
        create.close();
        subscribe.close();
    }

    @Test(dataProvider = "topics")
    public void testConsumerInterceptorForAckTimeoutSend(List<String> list) throws PulsarClientException, InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(50);
        ConsumerInterceptor<String> consumerInterceptor = new ConsumerInterceptor<String>() { // from class: org.apache.pulsar.client.api.InterceptorsTest.12
            public void close() {
            }

            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
                return message;
            }

            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable th) {
            }

            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable th) {
            }

            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> set) {
            }

            public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> set) {
                Assert.assertTrue(countDownLatch.getCount() > 0);
                CountDownLatch countDownLatch2 = countDownLatch;
                set.forEach(messageId -> {
                    countDownLatch2.countDown();
                });
            }
        };
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(list.get(0)).create();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topics(list).subscriptionName("foo").intercept(new ConsumerInterceptor[]{consumerInterceptor}).ackTimeout(2L, TimeUnit.SECONDS).subscribe();
        for (int i = 0; i < 100; i++) {
            create.send("Mock message");
        }
        for (int i2 = 0; i2 < 100; i2++) {
            Message receive = subscribe.receive();
            if (i2 % 2 == 0) {
                subscribe.acknowledge(receive);
            }
        }
        countDownLatch.await();
        Assert.assertEquals(countDownLatch.getCount(), 0L);
        create.close();
        subscribe.close();
    }

    /* JADX WARN: Finally extract failed */
    @Test(timeOut = 30000, dataProvider = "topicPartition")
    public void testReaderInterceptor(int i) throws Exception {
        String str = "reader-interceptor-" + i + "-" + RandomStringUtils.randomAlphabetic(5);
        if (i > 0) {
            this.admin.topics().createPartitionedTopic(str, i);
        }
        Producer<byte[]> create = this.pulsarClient.newProducer().topic(str).create();
        try {
            int i2 = 10;
            C1ReaderInterceptorImpl c1ReaderInterceptorImpl = new C1ReaderInterceptorImpl(Integer.MAX_VALUE);
            C1ReaderInterceptorImpl c1ReaderInterceptorImpl2 = new C1ReaderInterceptorImpl(10 * 2);
            Reader<byte[]> create2 = this.pulsarClient.newReader().topic(str).startMessageId(MessageId.earliest).intercept(new ReaderInterceptor[]{c1ReaderInterceptorImpl, c1ReaderInterceptorImpl2}).autoUpdatePartitionsInterval(1, TimeUnit.SECONDS).create();
            try {
                C1ReaderInterceptorImpl c1ReaderInterceptorImpl3 = new C1ReaderInterceptorImpl(10 * 2);
                AtomicInteger atomicInteger = new AtomicInteger();
                Reader create3 = this.pulsarClient.newReader().topic(str).startMessageId(MessageId.earliest).intercept(new ReaderInterceptor[]{c1ReaderInterceptorImpl3}).autoUpdatePartitionsInterval(1, TimeUnit.SECONDS).readerListener((reader, message) -> {
                    atomicInteger.incrementAndGet();
                }).create();
                try {
                    produceAndConsume(10, create, create2);
                    Assert.assertEquals(c1ReaderInterceptorImpl.beforeReadCount.get(), 10);
                    Assert.assertEquals(c1ReaderInterceptorImpl2.beforeReadCount.get(), 10);
                    Awaitility.await().pollInterval(1L, TimeUnit.SECONDS).untilAsserted(() -> {
                        Assert.assertEquals(atomicInteger.get(), i2);
                    });
                    Assert.assertEquals(c1ReaderInterceptorImpl3.beforeReadCount.get(), 10);
                    if (i > 0) {
                        int i3 = i + 3;
                        this.admin.topics().updatePartitionedTopic(str, i3);
                        Awaitility.await().pollInterval(1L, TimeUnit.SECONDS).untilAsserted(() -> {
                            Assert.assertEquals(c1ReaderInterceptorImpl.newPartition.get(), i3);
                            Assert.assertEquals(c1ReaderInterceptorImpl2.newPartition.get(), i3);
                            Assert.assertEquals(c1ReaderInterceptorImpl3.newPartition.get(), i3);
                        });
                        produceAndConsume(10, create, create2);
                        Assert.assertEquals(c1ReaderInterceptorImpl.beforeReadCount.get(), 10 * 2);
                        Assert.assertEquals(c1ReaderInterceptorImpl2.beforeReadCount.get(), 10 * 2);
                        Awaitility.await().pollInterval(1L, TimeUnit.SECONDS).untilAsserted(() -> {
                            Assert.assertEquals(atomicInteger.get(), i2 * 2);
                        });
                        Assert.assertEquals(c1ReaderInterceptorImpl3.beforeReadCount.get(), 10 * 2);
                    }
                    produceAndConsume(10 * 2, create, create2);
                    Assert.assertFalse(c1ReaderInterceptorImpl.encounterException.get());
                    Assert.assertTrue(c1ReaderInterceptorImpl2.encounterException.get());
                    Assert.assertTrue(c1ReaderInterceptorImpl3.encounterException.get());
                    Assert.assertNull(create2.readNext(3, TimeUnit.SECONDS));
                    if (Collections.singletonList(create3).get(0) != null) {
                        create3.close();
                    }
                    if (Collections.singletonList(create2).get(0) != null) {
                        create2.close();
                    }
                } catch (Throwable th) {
                    if (Collections.singletonList(create3).get(0) != null) {
                        create3.close();
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(create2).get(0) != null) {
                    create2.close();
                }
                throw th2;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    private void produceAndConsume(int i, Producer<byte[]> producer, Reader<byte[]> reader) throws PulsarClientException {
        for (int i2 = 0; i2 < i; i2++) {
            producer.newMessage().value(("msg - " + i2).getBytes()).send();
        }
        for (int i3 = 0; i3 < i; i3++) {
            Assert.assertNotNull(reader.readNext());
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1618603993:
                if (implMethodName.equals("lambda$testConsumerInterceptorWithSingleTopicSubscribe$8709f49b$1")) {
                    z = false;
                    break;
                }
                break;
            case -245324278:
                if (implMethodName.equals("lambda$testReaderInterceptor$5450ccc5$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case SHARED_VALUE:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/InterceptorsTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/CompletableFuture;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    CompletableFuture completableFuture = (CompletableFuture) serializedLambda.getCapturedArg(0);
                    return (consumer, message) -> {
                        try {
                            consumer.acknowledge(message);
                        } catch (Exception e) {
                            Assert.fail("Failed to acknowledge", e);
                        }
                        completableFuture.complete(message);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/ReaderListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Reader;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/InterceptorsTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lorg/apache/pulsar/client/api/Reader;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    return (reader, message2) -> {
                        atomicInteger.incrementAndGet();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
