package org.apache.pulsar.client.api;

import com.google.common.util.concurrent.Uninterruptibles;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker-api"})
/* loaded from: input_file:org/apache/pulsar/client/api/MessageListenerExecutorTest.class */
public class MessageListenerExecutorTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(MessageListenerExecutorTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass(alwaysRun = true)
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    public void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
        clientBuilder.listenerThreads(1);
    }

    @Test
    public void testConsumerMessageListenerExecutorIsolation() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        try {
            ArrayList arrayList = new ArrayList();
            long j = 10000;
            int i = 0;
            while (i < 5) {
                arrayList.add(startConsumeAndComputeMaxConsumeDelay("persistent://my-property/my-ns/testConsumerMessageListenerDisableIsolation-" + i, "my-sub-testConsumerMessageListenerDisableIsolation-" + i, i == 0 ? Duration.ofMillis(10000L) : Duration.ofMillis(0L), false, newCachedThreadPool));
                i++;
            }
            Assert.assertTrue(arrayList.stream().map((v0) -> {
                return v0.join();
            }).allMatch(l -> {
                return l.longValue() > j;
            }));
            ArrayList arrayList2 = new ArrayList();
            int i2 = 0;
            while (i2 < 5) {
                arrayList2.add(startConsumeAndComputeMaxConsumeDelay("persistent://my-property/my-ns/testConsumerMessageListenerEnableIsolation-" + i2, "my-sub-testConsumerMessageListenerEnableIsolation-" + i2, i2 == 0 ? Duration.ofMillis(10000L) : Duration.ofMillis(0L), true, newCachedThreadPool));
                i2++;
            }
            Assert.assertTrue(((Long) ((CompletableFuture) arrayList2.get(0)).join()).longValue() > 10000);
            Assert.assertTrue(arrayList2.stream().skip(1L).map((v0) -> {
                return v0.join();
            }).allMatch(l2 -> {
                return l2.longValue() < 1000;
            }));
            log.info("-- Exiting {} test --", this.methodName);
            if (Collections.singletonList(newCachedThreadPool).get(0) != null) {
                newCachedThreadPool.shutdownNow();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newCachedThreadPool).get(0) != null) {
                newCachedThreadPool.shutdownNow();
            }
            throw th;
        }
    }

    private CompletableFuture<Long> startConsumeAndComputeMaxConsumeDelay(String str, String str2, Duration duration, boolean z, ExecutorService executorService) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        TopicName topicName = TopicName.get(str);
        this.admin.topics().createPartitionedTopic(topicName.toString(), 50);
        AtomicLong atomicLong = new AtomicLong(-1L);
        ConsumerBuilder messageListener = this.pulsarClient.newConsumer(Schema.INT64).topic(new String[]{topicName.toString()}).subscriptionName(str2).messageListener((consumer, message) -> {
            Assert.assertNotNull(message, "Message cannot be null");
            log.debug("Received message [{}] in the listener", message.getValue());
            consumer.acknowledgeAsync(message);
            atomicLong.set(Math.max(atomicLong.get(), System.currentTimeMillis() - ((Long) message.getValue()).longValue()));
            if (duration.toMillis() > 0) {
                Uninterruptibles.sleepUninterruptibly(duration);
            }
            countDownLatch.countDown();
        });
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(new ExecutorProvider.ExtendedThreadFactory(str2 + "listener-executor-", true));
        if (z) {
            messageListener.messageListenerExecutor((message2, runnable) -> {
                newSingleThreadExecutor.execute(runnable);
            });
        }
        Consumer subscribe = messageListener.subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.INT64).topic(topicName.toString()).create();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2; i++) {
            arrayList.add(create.sendAsync(Long.valueOf(System.currentTimeMillis())));
        }
        log.info("Waiting for async publish to complete");
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        CompletableFuture<Long> completableFuture = new CompletableFuture<>();
        CompletableFuture.runAsync(() -> {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, executorService).whenCompleteAsync((r9, th) -> {
            completableFuture.complete(Long.valueOf(atomicLong.get()));
            try {
                create.close();
                subscribe.close();
                newSingleThreadExecutor.shutdownNow();
            } catch (PulsarClientException e) {
                throw new RuntimeException((Throwable) e);
            }
        });
        return completableFuture;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1625479785:
                if (implMethodName.equals("lambda$startConsumeAndComputeMaxConsumeDelay$96c386f5$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case SHARED_VALUE:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/MessageListenerExecutorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicLong;Ljava/time/Duration;Ljava/util/concurrent/CountDownLatch;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicLong atomicLong = (AtomicLong) serializedLambda.getCapturedArg(0);
                    Duration duration = (Duration) serializedLambda.getCapturedArg(1);
                    CountDownLatch countDownLatch = (CountDownLatch) serializedLambda.getCapturedArg(2);
                    return (consumer, message) -> {
                        Assert.assertNotNull(message, "Message cannot be null");
                        log.debug("Received message [{}] in the listener", message.getValue());
                        consumer.acknowledgeAsync(message);
                        atomicLong.set(Math.max(atomicLong.get(), System.currentTimeMillis() - ((Long) message.getValue()).longValue()));
                        if (duration.toMillis() > 0) {
                            Uninterruptibles.sleepUninterruptibly(duration);
                        }
                        countDownLatch.countDown();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
