package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.collections.CollectionUtils;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.proto.Test;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/PersistentQueueE2ETest.class */
public class PersistentQueueE2ETest extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(PersistentQueueE2ETest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    public void setup() throws Exception {
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    private void deleteTopic(String str) {
        try {
            this.admin.topics().delete(str);
        } catch (PulsarAdminException e) {
        }
    }

    @Test
    public void testSimpleConsumerEvents() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/shared-topic1"}).subscriptionName("sub1").subscriptionType(SubscriptionType.Shared).subscribe();
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer subscribe2 = newPulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/shared-topic1"}).subscriptionName("sub1").subscriptionType(SubscriptionType.Shared).subscribe();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/shared-topic1").get();
        PersistentSubscription subscription = persistentTopic.getSubscription("sub1");
        Assert.assertNotNull(persistentTopic);
        Assert.assertNotNull(subscription);
        Assert.assertTrue(subscription.getDispatcher().isConsumerConnected());
        Assert.assertEquals(subscription.getDispatcher().getType(), CommandSubscribe.SubType.Shared);
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(200);
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/shared-topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 200; i++) {
            newArrayListWithCapacity.add(create.sendAsync(("my-message-" + i).getBytes()));
        }
        FutureUtil.waitForAll(newArrayListWithCapacity).get();
        rolloverPerIntervalStats();
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 200L);
        Thread.sleep(100L);
        Consumer consumer = subscribe;
        while (true) {
            try {
                consumer.acknowledge(consumer.receive(1, TimeUnit.SECONDS));
            } catch (PulsarClientException e) {
                if (!consumer.equals(subscribe)) {
                    rolloverPerIntervalStats();
                    Thread.sleep(100L);
                    Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 0L);
                    subscribe.unsubscribe();
                    Assert.fail("should fail");
                    subscribe.close();
                    create.send("message".getBytes());
                    subscribe2.acknowledgeCumulative(subscribe2.receive());
                    Assert.fail("Should fail");
                    subscribe2.unsubscribe();
                    Thread.sleep(100L);
                    Assert.assertNull(persistentTopic.getSubscription("sub1"));
                    create.close();
                    subscribe2.close();
                    newPulsarClient.close();
                    deleteTopic("persistent://prop/use/ns-abc/shared-topic1");
                }
                subscribe.close();
                consumer = subscribe2;
            }
        }
        rolloverPerIntervalStats();
        Thread.sleep(100L);
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(false), 0L);
        try {
            subscribe.unsubscribe();
            Assert.fail("should fail");
        } catch (PulsarClientException e2) {
        }
        subscribe.close();
        create.send("message".getBytes());
        try {
            subscribe2.acknowledgeCumulative(subscribe2.receive());
            Assert.fail("Should fail");
        } catch (PulsarClientException e3) {
            Assert.assertTrue(e3 instanceof PulsarClientException.InvalidConfigurationException);
        }
        try {
            subscribe2.unsubscribe();
        } catch (PulsarClientException e4) {
            Assert.fail("Should not fail");
        }
        Thread.sleep(100L);
        Assert.assertNull(persistentTopic.getSubscription("sub1"));
        create.close();
        subscribe2.close();
        newPulsarClient.close();
        deleteTopic("persistent://prop/use/ns-abc/shared-topic1");
    }

    @Test
    public void testReplayOnConsumerDisconnect() throws Exception {
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(100);
        BlockingArrayQueue blockingArrayQueue = new BlockingArrayQueue(100);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/shared-topic3"}).subscriptionName("sub3").subscriptionType(SubscriptionType.Shared).messageListener((consumer, message) -> {
            try {
                consumer.acknowledge(message);
                blockingArrayQueue.add(new String(message.getData()));
            } catch (Exception e) {
                Assert.fail("Should not fail");
            }
        }).subscribe();
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer subscribe2 = newPulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/shared-topic3"}).subscriptionName("sub3").subscriptionType(SubscriptionType.Shared).messageListener((consumer2, message2) -> {
        }).subscribe();
        ArrayList newArrayListWithCapacity2 = Lists.newArrayListWithCapacity(200);
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/shared-topic3").create();
        for (int i = 0; i < 100; i++) {
            String str = "msg-" + i;
            newArrayListWithCapacity2.add(create.sendAsync(str.getBytes()));
            newArrayListWithCapacity.add(str);
        }
        FutureUtil.waitForAll(newArrayListWithCapacity2).get();
        create.close();
        subscribe2.close();
        for (int i2 = 0; i2 < 10 && blockingArrayQueue.size() < 100; i2++) {
            Thread.sleep(100L);
        }
        Assert.assertTrue(CollectionUtils.subtract(newArrayListWithCapacity, blockingArrayQueue).isEmpty());
        subscribe.close();
        newPulsarClient.close();
        deleteTopic("persistent://prop/use/ns-abc/shared-topic3");
    }

    @Test(enabled = false)
    public void testRoundRobinBatchDistribution() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        AtomicInteger atomicInteger3 = new AtomicInteger(0);
        CountDownLatch countDownLatch = new CountDownLatch(411);
        ConsumerBuilder subscriptionType = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/shared-topic5"}).subscriptionName("sub5").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared);
        Consumer subscribe = subscriptionType.clone().messageListener((consumer, message) -> {
            try {
                atomicInteger.incrementAndGet();
                consumer.acknowledge(message);
                countDownLatch.countDown();
            } catch (Exception e) {
                Assert.fail("Should not fail");
            }
        }).subscribe();
        Consumer subscribe2 = subscriptionType.clone().messageListener((consumer2, message2) -> {
            try {
                atomicInteger2.incrementAndGet();
                consumer2.acknowledge(message2);
                countDownLatch.countDown();
            } catch (Exception e) {
                Assert.fail("Should not fail");
            }
        }).subscribe();
        Consumer subscribe3 = subscriptionType.clone().messageListener((consumer3, message3) -> {
            try {
                atomicInteger.incrementAndGet();
                consumer3.acknowledge(message3);
                countDownLatch.countDown();
            } catch (Exception e) {
                Assert.fail("Should not fail");
            }
        }).subscribe();
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(137);
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/shared-topic5").create();
        for (int i = 0; i < 411; i++) {
            newArrayListWithCapacity.add(create.sendAsync(("msg-" + i).getBytes()));
        }
        FutureUtil.waitForAll(newArrayListWithCapacity).get();
        create.close();
        countDownLatch.await(1L, TimeUnit.SECONDS);
        Assert.assertTrue(CollectionUtils.subtract(Lists.newArrayList(new Integer[]{140, 140, 131}), Lists.newArrayList(new Integer[]{Integer.valueOf(atomicInteger.get()), Integer.valueOf(atomicInteger2.get()), Integer.valueOf(atomicInteger3.get())})).isEmpty());
        subscribe.close();
        subscribe2.close();
        subscribe3.close();
        deleteTopic("persistent://prop/use/ns-abc/shared-topic5");
    }

    @Test(timeOut = 300000)
    public void testSharedSingleAckedNormalTopic() throws Exception {
        String str = "persistent://prop/use/ns-abc/topic-test1";
        String str2 = "my-shared-subscription-test1";
        String str3 = "my-message-test1-";
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference(str).get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(str2).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer subscribe2 = newPulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(str2).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscribe();
        for (int i = 0; i < 50; i++) {
            String str4 = str3 + i;
            create.send(str4.getBytes());
            log.info("Producer produced " + str4);
        }
        int i2 = 0;
        int i3 = 0;
        Message receive = subscribe.receive();
        Message receive2 = subscribe2.receive();
        while (true) {
            if (receive != null) {
                log.info("Consumer 1 Received: " + new String(receive.getData()));
                i2++;
            }
            if (receive2 != null) {
                log.info("Consumer 2 Received: " + new String(receive2.getData()));
                i3++;
            }
            receive = subscribe.receive(10000, TimeUnit.MILLISECONDS);
            receive2 = subscribe2.receive(10000, TimeUnit.MILLISECONDS);
            if (receive == null && receive2 == null) {
                break;
            }
        }
        log.info("Total receives = " + (i3 + i2));
        Assert.assertEquals(i3 + i2, 50);
        log.info("Consumer 1 closed");
        subscribe.close();
        for (int i4 = 0; i4 < 50; i4++) {
            Message receive3 = subscribe2.receive(100, TimeUnit.MILLISECONDS);
            if (receive3 == null) {
                log.info("Consumer 2 - No Message in Incoming Message Queue, will try again");
            } else {
                log.info("Consumer 2 Received: " + new String(receive3.getData()));
                i3++;
            }
        }
        newPulsarClient.close();
        log.info("Total receives by Consumer 2 = " + i3);
        Assert.assertEquals(i3, 50);
    }

    @Test(timeOut = 60000)
    public void testCancelReadRequestOnLastDisconnect() throws Exception {
        String str = "persistent://prop/use/ns-abc/topic-testCancelReadRequestOnLastDisconnect";
        String str2 = "my-message-testCancelReadRequestOnLastDisconnect-";
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference(str).get();
        Assert.assertNotNull(persistentTopic);
        Assert.assertEquals(persistentTopic.getProducers().size(), 1);
        ConsumerBuilder subscriptionType = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-shared-subscription-testCancelReadRequestOnLastDisconnect").receiverQueueSize(1000).subscriptionType(SubscriptionType.Shared);
        Consumer subscribe = subscriptionType.subscribe();
        Consumer subscribe2 = subscriptionType.subscribe();
        for (int i = 0; i < 10; i++) {
            String str3 = str2 + i;
            create.send(str3.getBytes());
            log.info("Producer produced " + str3);
        }
        int i2 = 0;
        int i3 = 0;
        Message receive = subscribe.receive();
        Message receive2 = subscribe2.receive();
        while (true) {
            if (receive != null) {
                log.info("Consumer 1 Received: " + new String(receive.getData()));
                i2++;
                subscribe.acknowledge(receive);
            }
            if (receive2 != null) {
                log.info("Consumer 2 Received: " + new String(receive2.getData()));
                i3++;
                subscribe2.acknowledge(receive2);
            }
            receive = subscribe.receive(5000, TimeUnit.MILLISECONDS);
            receive2 = subscribe2.receive(5000, TimeUnit.MILLISECONDS);
            if (receive == null && receive2 == null) {
                break;
            }
        }
        log.info("Total receives = " + (i3 + i2));
        Assert.assertEquals(i3 + i2, 10);
        log.info("Consumer 1 closed");
        log.info("Consumer 2 closed");
        subscribe.close();
        subscribe2.close();
        for (int i4 = 10; i4 < 20; i4++) {
            String str4 = str2 + i4;
            create.send(str4.getBytes());
            log.info("Producer produced " + str4);
        }
        Consumer subscribe3 = subscriptionType.subscribe();
        int i5 = 0;
        Message receive3 = subscribe3.receive();
        while (true) {
            Message message = receive3;
            if (message == null) {
                log.info("Total receives by Consumer 2 = " + i3);
                Assert.assertEquals(i5, 10);
                return;
            } else {
                log.info("Consumer 1 Received: " + new String(message.getData()));
                i5++;
                receive3 = subscribe3.receive(5000, TimeUnit.MILLISECONDS);
            }
        }
    }

    @Test
    public void testUnackedCountWithRedeliveries() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/testUnackedCountWithRedeliveries").create();
        ConsumerBuilder acknowledgmentGroupTime = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/use/ns-abc/testUnackedCountWithRedeliveries"}).subscriptionName("sub3").receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0L, TimeUnit.SECONDS);
        ConsumerImpl subscribe = acknowledgmentGroupTime.subscribe();
        for (int i = 0; i < 10; i++) {
            create.send(("hello-" + i).getBytes());
        }
        HashSet hashSet = new HashSet();
        for (int i2 = 0; i2 < 10; i2++) {
            hashSet.add(subscribe.receive().getMessageId());
        }
        Consumer subscribe2 = acknowledgmentGroupTime.subscribe();
        subscribe.redeliverUnacknowledgedMessages(hashSet);
        for (int i3 = 0; i3 < 10; i3++) {
            subscribe2.receive();
        }
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            subscribe.acknowledge((MessageId) it.next());
        }
        Awaitility.await().untilAsserted(() -> {
            SubscriptionStats subscriptionStats = (SubscriptionStats) this.admin.topics().getStats("persistent://prop/use/ns-abc/testUnackedCountWithRedeliveries").getSubscriptions().get("sub3");
            Assert.assertEquals(subscriptionStats.getMsgBacklog(), 0L);
            Iterator it2 = subscriptionStats.getConsumers().iterator();
            while (it2.hasNext()) {
                Assert.assertEquals(((ConsumerStats) it2.next()).getUnackedMessages(), 0);
            }
        });
        create.close();
        subscribe.close();
        subscribe2.close();
        deleteTopic("persistent://prop/use/ns-abc/testUnackedCountWithRedeliveries");
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1595459366:
                if (implMethodName.equals("lambda$testRoundRobinBatchDistribution$f79bc5e5$1")) {
                    z = 3;
                    break;
                }
                break;
            case -1148802644:
                if (implMethodName.equals("lambda$testReplayOnConsumerDisconnect$802e92fd$1")) {
                    z = 2;
                    break;
                }
                break;
            case -924823001:
                if (implMethodName.equals("lambda$testReplayOnConsumerDisconnect$eb11ebcf$1")) {
                    z = true;
                    break;
                }
                break;
            case -406985662:
                if (implMethodName.equals("lambda$testRoundRobinBatchDistribution$35103567$1")) {
                    z = 4;
                    break;
                }
                break;
            case 710709901:
                if (implMethodName.equals("lambda$testRoundRobinBatchDistribution$cfcd8b45$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case SHARED_VALUE:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/broker/service/PersistentQueueE2ETest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Ljava/util/concurrent/CountDownLatch;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    CountDownLatch countDownLatch = (CountDownLatch) serializedLambda.getCapturedArg(1);
                    return (consumer2, message2) -> {
                        try {
                            atomicInteger.incrementAndGet();
                            consumer2.acknowledge(message2);
                            countDownLatch.countDown();
                        } catch (Exception e) {
                            Assert.fail("Should not fail");
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/broker/service/PersistentQueueE2ETest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    List list = (List) serializedLambda.getCapturedArg(0);
                    return (consumer, message) -> {
                        try {
                            consumer.acknowledge(message);
                            list.add(new String(message.getData()));
                        } catch (Exception e) {
                            Assert.fail("Should not fail");
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/broker/service/PersistentQueueE2ETest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    return (consumer22, message22) -> {
                    };
                }
                break;
            case Test.TestMessage.INTFIELD_FIELD_NUMBER /* 3 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/broker/service/PersistentQueueE2ETest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Ljava/util/concurrent/CountDownLatch;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger2 = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    CountDownLatch countDownLatch2 = (CountDownLatch) serializedLambda.getCapturedArg(1);
                    return (consumer3, message3) -> {
                        try {
                            atomicInteger2.incrementAndGet();
                            consumer3.acknowledge(message3);
                            countDownLatch2.countDown();
                        } catch (Exception e) {
                            Assert.fail("Should not fail");
                        }
                    };
                }
                break;
            case Test.TestMessage.TESTENUM_FIELD_NUMBER /* 4 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/broker/service/PersistentQueueE2ETest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Ljava/util/concurrent/CountDownLatch;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger3 = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    CountDownLatch countDownLatch3 = (CountDownLatch) serializedLambda.getCapturedArg(1);
                    return (consumer32, message32) -> {
                        try {
                            atomicInteger3.incrementAndGet();
                            consumer32.acknowledge(message32);
                            countDownLatch3.countDown();
                        } catch (Exception e) {
                            Assert.fail("Should not fail");
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
