package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"flaky"})
/* loaded from: input_file:org/apache/pulsar/client/api/KeySharedSubscriptionTest.class */
public class KeySharedSubscriptionTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(KeySharedSubscriptionTest.class);
    private static final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
    private static final Random random = new Random(System.nanoTime());
    private static final int NUMBER_OF_KEYS = 300;

    /* loaded from: input_file:org/apache/pulsar/client/api/KeySharedSubscriptionTest$EncKeyReader.class */
    private static class EncKeyReader implements CryptoKeyReader {
        EncryptionKeyInfo keyInfo;

        private EncKeyReader() {
            this.keyInfo = new EncryptionKeyInfo();
        }

        public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
            String str2 = "./src/test/resources/certificate/public-key." + str;
            if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                return null;
            }
            try {
                this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                return this.keyInfo;
            } catch (IOException e) {
                Assert.fail("Failed to read certificate from " + str2);
                return null;
            }
        }

        public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
            String str2 = "./src/test/resources/certificate/private-key." + str;
            if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                return null;
            }
            try {
                this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                return this.keyInfo;
            } catch (IOException e) {
                Assert.fail("Failed to read certificate from " + str2);
                return null;
            }
        }
    }

    @DataProvider(name = "batch")
    public Object[] batchProvider() {
        return new Object[]{false, true};
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "partitioned")
    public Object[][] partitionedProvider() {
        return new Object[]{new Object[]{false}, new Object[]{true}};
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "data")
    public Object[][] dataProvider() {
        return new Object[]{new Object[]{"persistent", false}, new Object[]{"persistent", true}, new Object[]{"non-persistent", false}, new Object[]{"non-persistent", true}};
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod(alwaysRun = true)
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
        this.conf.setSubscriptionKeySharedUseConsistentHashing(true);
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(dataProvider = "data")
    public void testSendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(String str, boolean z) throws PulsarClientException {
        this.conf.setSubscriptionKeySharedEnable(true);
        String str2 = str + "://public/default/key_shared-" + UUID.randomUUID();
        Consumer<Integer> createConsumer = createConsumer(str2);
        try {
            Consumer<Integer> createConsumer2 = createConsumer(str2);
            try {
                createConsumer = createConsumer(str2);
                try {
                    Producer<Integer> createProducer = createProducer(str2, z);
                    for (int i = 0; i < 1000; i++) {
                        try {
                            createProducer.newMessage().key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))).value(Integer.valueOf(i)).send();
                        } catch (Throwable th) {
                            if (Collections.singletonList(createProducer).get(0) != null) {
                                createProducer.close();
                            }
                            throw th;
                        }
                    }
                    receiveAndCheckDistribution(Lists.newArrayList(new Consumer[]{createConsumer, createConsumer2, createConsumer}), 1000);
                    if (Collections.singletonList(createProducer).get(0) != null) {
                        createProducer.close();
                    }
                    if (Collections.singletonList(createConsumer).get(0) != null) {
                        createConsumer.close();
                    }
                    if (Collections.singletonList(createConsumer2).get(0) != null) {
                        createConsumer2.close();
                    }
                } finally {
                    if (Collections.singletonList(createConsumer).get(0) != null) {
                        createConsumer.close();
                    }
                }
            } finally {
                if (Collections.singletonList(createConsumer2).get(0) != null) {
                    createConsumer2.close();
                }
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(createConsumer).get(0) != null) {
                createConsumer.close();
            }
            throw th2;
        }
    }

    @Test(dataProvider = "data")
    public void testSendAndReceiveWithBatching(String str, boolean z) throws Exception {
        this.conf.setSubscriptionKeySharedEnable(true);
        String str2 = str + "://public/default/key_shared-" + UUID.randomUUID();
        Consumer<Integer> createConsumer = createConsumer(str2);
        try {
            Consumer<Integer> createConsumer2 = createConsumer(str2);
            try {
                createConsumer = createConsumer(str2);
                try {
                    Producer<Integer> createProducer = createProducer(str2, z);
                    for (int i = 0; i < 1000; i++) {
                        try {
                            String valueOf = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
                            CompletableFuture sendAsync = createProducer.newMessage().key(valueOf).value(Integer.valueOf(i)).sendAsync();
                            if (!z) {
                                sendAsync.get();
                            }
                            CompletableFuture sendAsync2 = createProducer.newMessage().key(valueOf).value(Integer.valueOf(i)).sendAsync();
                            if (!z) {
                                sendAsync2.get();
                            }
                        } catch (Throwable th) {
                            if (Collections.singletonList(createProducer).get(0) != null) {
                                createProducer.close();
                            }
                            throw th;
                        }
                    }
                    createProducer.flush();
                    receiveAndCheckDistribution(Lists.newArrayList(new Consumer[]{createConsumer, createConsumer2, createConsumer}), 2000);
                    if (Collections.singletonList(createProducer).get(0) != null) {
                        createProducer.close();
                    }
                    if (Collections.singletonList(createConsumer).get(0) != null) {
                        createConsumer.close();
                    }
                    if (Collections.singletonList(createConsumer2).get(0) != null) {
                        createConsumer2.close();
                    }
                } finally {
                    if (Collections.singletonList(createConsumer).get(0) != null) {
                        createConsumer.close();
                    }
                }
            } finally {
                if (Collections.singletonList(createConsumer2).get(0) != null) {
                    createConsumer2.close();
                }
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(createConsumer).get(0) != null) {
                createConsumer.close();
            }
            throw th2;
        }
    }

    @Test(dataProvider = "batch")
    public void testSendAndReceiveWithHashRangeExclusiveStickyKeyConsumerSelector(boolean z) throws PulsarClientException {
        this.conf.setSubscriptionKeySharedEnable(true);
        String str = "persistent://public/default/key_shared_exclusive-" + UUID.randomUUID();
        Consumer<Integer> createConsumer = createConsumer(str, KeySharedPolicy.stickyHashRange().ranges(new Range[]{Range.of(0, 20000)}));
        try {
            createConsumer = createConsumer(str, KeySharedPolicy.stickyHashRange().ranges(new Range[]{Range.of(20001, 40000)}));
            try {
                Consumer<Integer> createConsumer2 = createConsumer(str, KeySharedPolicy.stickyHashRange().ranges(new Range[]{Range.of(40001, 65535)}));
                try {
                    Producer<Integer> createProducer = createProducer(str, z);
                    int i = 0;
                    int i2 = 0;
                    int i3 = 0;
                    for (int i4 = 0; i4 < 10; i4++) {
                        try {
                            for (String str2 : keys) {
                                int makeHash = Murmur3_32Hash.getInstance().makeHash(str2.getBytes()) % 65536;
                                if (makeHash <= 20000) {
                                    i++;
                                } else if (makeHash <= 40000) {
                                    i2++;
                                } else {
                                    i3++;
                                }
                                createProducer.newMessage().key(str2).value(Integer.valueOf(i4)).send();
                            }
                        } catch (Throwable th) {
                            if (Collections.singletonList(createProducer).get(0) != null) {
                                createProducer.close();
                            }
                            throw th;
                        }
                    }
                    ArrayList arrayList = new ArrayList();
                    arrayList.add(new KeyValue<>(createConsumer, Integer.valueOf(i)));
                    arrayList.add(new KeyValue<>(createConsumer, Integer.valueOf(i2)));
                    arrayList.add(new KeyValue<>(createConsumer2, Integer.valueOf(i3)));
                    receiveAndCheck(arrayList);
                    if (Collections.singletonList(createProducer).get(0) != null) {
                        createProducer.close();
                    }
                    if (Collections.singletonList(createConsumer2).get(0) != null) {
                        createConsumer2.close();
                    }
                    if (Collections.singletonList(createConsumer).get(0) != null) {
                        createConsumer.close();
                    }
                } finally {
                    if (Collections.singletonList(createConsumer2).get(0) != null) {
                        createConsumer2.close();
                    }
                }
            } finally {
                if (Collections.singletonList(createConsumer).get(0) != null) {
                    createConsumer.close();
                }
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(createConsumer).get(0) != null) {
                createConsumer.close();
            }
            throw th2;
        }
    }

    @Test(dataProvider = "data")
    public void testConsumerCrashSendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(String str, boolean z) throws PulsarClientException, InterruptedException {
        this.conf.setSubscriptionKeySharedEnable(true);
        String str2 = str + "://public/default/key_shared_consumer_crash-" + UUID.randomUUID();
        Consumer<Integer> createConsumer = createConsumer(str2);
        try {
            createConsumer = createConsumer(str2);
            try {
                Consumer<Integer> createConsumer2 = createConsumer(str2);
                try {
                    Producer<Integer> createProducer = createProducer(str2, z);
                    for (int i = 0; i < 1000; i++) {
                        try {
                            createProducer.newMessage().key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))).value(Integer.valueOf(i)).send();
                        } catch (Throwable th) {
                            if (Collections.singletonList(createProducer).get(0) != null) {
                                createProducer.close();
                            }
                            throw th;
                        }
                    }
                    receiveAndCheckDistribution(Lists.newArrayList(new Consumer[]{createConsumer, createConsumer, createConsumer2}), 1000);
                    Thread.sleep(1000L);
                    createConsumer.close();
                    createConsumer.close();
                    for (int i2 = 0; i2 < 10; i2++) {
                        createProducer.newMessage().key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))).value(Integer.valueOf(i2)).send();
                    }
                    receiveAndCheckDistribution(Lists.newArrayList(new Consumer[]{createConsumer2}), 10);
                    if (Collections.singletonList(createProducer).get(0) != null) {
                        createProducer.close();
                    }
                    if (Collections.singletonList(createConsumer2).get(0) != null) {
                        createConsumer2.close();
                    }
                    if (Collections.singletonList(createConsumer).get(0) != null) {
                        createConsumer.close();
                    }
                } finally {
                    if (Collections.singletonList(createConsumer2).get(0) != null) {
                        createConsumer2.close();
                    }
                }
            } finally {
                if (Collections.singletonList(createConsumer).get(0) != null) {
                    createConsumer.close();
                }
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(createConsumer).get(0) != null) {
                createConsumer.close();
            }
            throw th2;
        }
    }

    @Test(dataProvider = "data")
    public void testNonKeySendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(String str, boolean z) throws PulsarClientException {
        this.conf.setSubscriptionKeySharedEnable(true);
        String str2 = str + "://public/default/key_shared_none_key-" + UUID.randomUUID();
        Consumer<Integer> createConsumer = createConsumer(str2);
        try {
            createConsumer = createConsumer(str2);
            try {
                Consumer<Integer> createConsumer2 = createConsumer(str2);
                try {
                    Producer<Integer> createProducer = createProducer(str2, z);
                    for (int i = 0; i < 100; i++) {
                        try {
                            createProducer.newMessage().value(Integer.valueOf(i)).send();
                        } catch (Throwable th) {
                            if (Collections.singletonList(createProducer).get(0) != null) {
                                createProducer.close();
                            }
                            throw th;
                        }
                    }
                    receive(Lists.newArrayList(new Consumer[]{createConsumer, createConsumer, createConsumer2}));
                    if (Collections.singletonList(createProducer).get(0) != null) {
                        createProducer.close();
                    }
                    if (Collections.singletonList(createConsumer2).get(0) != null) {
                        createConsumer2.close();
                    }
                    if (Collections.singletonList(createConsumer).get(0) != null) {
                        createConsumer.close();
                    }
                } finally {
                    if (Collections.singletonList(createConsumer2).get(0) != null) {
                        createConsumer2.close();
                    }
                }
            } finally {
                if (Collections.singletonList(createConsumer).get(0) != null) {
                    createConsumer.close();
                }
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(createConsumer).get(0) != null) {
                createConsumer.close();
            }
            throw th2;
        }
    }

    @Test(dataProvider = "batch")
    public void testNonKeySendAndReceiveWithHashRangeExclusiveStickyKeyConsumerSelector(boolean z) throws PulsarClientException {
        this.conf.setSubscriptionKeySharedEnable(true);
        String str = "persistent://public/default/key_shared_none_key_exclusive-" + UUID.randomUUID();
        Consumer<Integer> createConsumer = createConsumer(str, KeySharedPolicy.stickyHashRange().ranges(new Range[]{Range.of(0, 20000)}));
        try {
            Consumer<Integer> createConsumer2 = createConsumer(str, KeySharedPolicy.stickyHashRange().ranges(new Range[]{Range.of(20001, 40000)}));
            try {
                createConsumer = createConsumer(str, KeySharedPolicy.stickyHashRange().ranges(new Range[]{Range.of(40001, 65535)}));
                try {
                    Producer<Integer> createProducer = createProducer(str, z);
                    for (int i = 0; i < 100; i++) {
                        try {
                            createProducer.newMessage().value(Integer.valueOf(i)).send();
                        } catch (Throwable th) {
                            if (Collections.singletonList(createProducer).get(0) != null) {
                                createProducer.close();
                            }
                            throw th;
                        }
                    }
                    int makeHash = Murmur3_32Hash.getInstance().makeHash("NONE_KEY".getBytes()) % 65536;
                    ArrayList arrayList = new ArrayList();
                    if (makeHash <= 20000) {
                        arrayList.add(new KeyValue<>(createConsumer, 100));
                    } else if (makeHash <= 40000) {
                        arrayList.add(new KeyValue<>(createConsumer2, 100));
                    } else {
                        arrayList.add(new KeyValue<>(createConsumer, 100));
                    }
                    receiveAndCheck(arrayList);
                    if (Collections.singletonList(createProducer).get(0) != null) {
                        createProducer.close();
                    }
                    if (Collections.singletonList(createConsumer).get(0) != null) {
                        createConsumer.close();
                    }
                    if (Collections.singletonList(createConsumer2).get(0) != null) {
                        createConsumer2.close();
                    }
                } finally {
                    if (Collections.singletonList(createConsumer).get(0) != null) {
                        createConsumer.close();
                    }
                }
            } finally {
                if (Collections.singletonList(createConsumer2).get(0) != null) {
                    createConsumer2.close();
                }
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(createConsumer).get(0) != null) {
                createConsumer.close();
            }
            throw th2;
        }
    }

    @Test(dataProvider = "batch")
    public void testOrderingKeyWithHashRangeAutoSplitStickyKeyConsumerSelector(boolean z) throws PulsarClientException {
        this.conf.setSubscriptionKeySharedEnable(true);
        String str = "persistent://public/default/key_shared_ordering_key-" + UUID.randomUUID();
        Consumer<Integer> createConsumer = createConsumer(str);
        try {
            createConsumer = createConsumer(str);
            try {
                Consumer<Integer> createConsumer2 = createConsumer(str);
                try {
                    Producer<Integer> createProducer = createProducer(str, z);
                    for (int i = 0; i < 1000; i++) {
                        try {
                            createProducer.newMessage().key("any key").orderingKey(String.valueOf(random.nextInt(NUMBER_OF_KEYS)).getBytes()).value(Integer.valueOf(i)).send();
                        } catch (Throwable th) {
                            if (Collections.singletonList(createProducer).get(0) != null) {
                                createProducer.close();
                            }
                            throw th;
                        }
                    }
                    receiveAndCheckDistribution(Lists.newArrayList(new Consumer[]{createConsumer, createConsumer, createConsumer2}), 1000);
                    if (Collections.singletonList(createProducer).get(0) != null) {
                        createProducer.close();
                    }
                    if (Collections.singletonList(createConsumer2).get(0) != null) {
                        createConsumer2.close();
                    }
                    if (Collections.singletonList(createConsumer).get(0) != null) {
                        createConsumer.close();
                    }
                } finally {
                    if (Collections.singletonList(createConsumer2).get(0) != null) {
                        createConsumer2.close();
                    }
                }
            } finally {
                if (Collections.singletonList(createConsumer).get(0) != null) {
                    createConsumer.close();
                }
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(createConsumer).get(0) != null) {
                createConsumer.close();
            }
            throw th2;
        }
    }

    @Test(dataProvider = "batch")
    public void testOrderingKeyWithHashRangeExclusiveStickyKeyConsumerSelector(boolean z) throws PulsarClientException {
        this.conf.setSubscriptionKeySharedEnable(true);
        String str = "persistent://public/default/key_shared_exclusive_ordering_key-" + UUID.randomUUID();
        Consumer<Integer> createConsumer = createConsumer(str, KeySharedPolicy.stickyHashRange().ranges(new Range[]{Range.of(0, 20000)}));
        try {
            createConsumer = createConsumer(str, KeySharedPolicy.stickyHashRange().ranges(new Range[]{Range.of(20001, 40000)}));
            try {
                Consumer<Integer> createConsumer2 = createConsumer(str, KeySharedPolicy.stickyHashRange().ranges(new Range[]{Range.of(40001, 65535)}));
                try {
                    Producer<Integer> createProducer = createProducer(str, z);
                    int i = 0;
                    int i2 = 0;
                    int i3 = 0;
                    for (int i4 = 0; i4 < 10; i4++) {
                        try {
                            for (String str2 : keys) {
                                int makeHash = Murmur3_32Hash.getInstance().makeHash(str2.getBytes()) % 65536;
                                if (makeHash <= 20000) {
                                    i++;
                                } else if (makeHash <= 40000) {
                                    i2++;
                                } else {
                                    i3++;
                                }
                                createProducer.newMessage().key("any key").orderingKey(str2.getBytes()).value(Integer.valueOf(i4)).send();
                            }
                        } catch (Throwable th) {
                            if (Collections.singletonList(createProducer).get(0) != null) {
                                createProducer.close();
                            }
                            throw th;
                        }
                    }
                    ArrayList arrayList = new ArrayList();
                    arrayList.add(new KeyValue<>(createConsumer, Integer.valueOf(i)));
                    arrayList.add(new KeyValue<>(createConsumer, Integer.valueOf(i2)));
                    arrayList.add(new KeyValue<>(createConsumer2, Integer.valueOf(i3)));
                    receiveAndCheck(arrayList);
                    if (Collections.singletonList(createProducer).get(0) != null) {
                        createProducer.close();
                    }
                    if (Collections.singletonList(createConsumer2).get(0) != null) {
                        createConsumer2.close();
                    }
                    if (Collections.singletonList(createConsumer).get(0) != null) {
                        createConsumer.close();
                    }
                } finally {
                    if (Collections.singletonList(createConsumer2).get(0) != null) {
                        createConsumer2.close();
                    }
                }
            } finally {
                if (Collections.singletonList(createConsumer).get(0) != null) {
                    createConsumer.close();
                }
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(createConsumer).get(0) != null) {
                createConsumer.close();
            }
            throw th2;
        }
    }

    @Test(expectedExceptions = {PulsarClientException.NotAllowedException.class})
    public void testDisableKeySharedSubscription() throws PulsarClientException {
        this.conf.setSubscriptionKeySharedEnable(false);
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://public/default/key_shared_disabled"}).subscriptionName("key_shared").subscriptionType(SubscriptionType.Key_Shared).ackTimeout(10L, TimeUnit.SECONDS).subscribe();
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testCannotUseAcknowledgeCumulative() throws PulsarClientException {
        this.conf.setSubscriptionKeySharedEnable(true);
        String str = "persistent://public/default/key_shared_ack_cumulative-" + UUID.randomUUID();
        Producer<Integer> createProducer = createProducer(str, false);
        try {
            Consumer<Integer> createConsumer = createConsumer(str);
            for (int i = 0; i < 10; i++) {
                try {
                    createProducer.send(Integer.valueOf(i));
                } catch (Throwable th) {
                    if (Collections.singletonList(createConsumer).get(0) != null) {
                        createConsumer.close();
                    }
                    throw th;
                }
            }
            for (int i2 = 0; i2 < 10; i2++) {
                Message receive = createConsumer.receive();
                if (i2 == 9) {
                    try {
                        createConsumer.acknowledgeCumulative(receive);
                        Assert.fail("should have failed");
                    } catch (PulsarClientException.InvalidConfigurationException e) {
                    }
                }
            }
            if (Collections.singletonList(createConsumer).get(0) != null) {
                createConsumer.close();
            }
        } finally {
            if (Collections.singletonList(createProducer).get(0) != null) {
                createProducer.close();
            }
        }
    }

    @Test(dataProvider = "batch")
    public void testMakingProgressWithSlowerConsumer(boolean z) throws Exception {
        this.conf.setSubscriptionKeySharedEnable(true);
        String str = "testMakingProgressWithSlowerConsumer-" + UUID.randomUUID();
        String str2 = "slowKey";
        ArrayList arrayList = new ArrayList();
        try {
            AtomicInteger atomicInteger = new AtomicInteger();
            for (int i = 0; i < 10; i++) {
                PulsarClient build = PulsarClient.builder().serviceUrl(this.brokerUrl.toString()).build();
                arrayList.add(build);
                build.newConsumer(Schema.INT32).topic(new String[]{str}).subscriptionName("key_shared").subscriptionType(SubscriptionType.Key_Shared).receiverQueueSize(1).messageListener((consumer, message) -> {
                    try {
                        if (str2.equals(message.getKey())) {
                            Thread.sleep(10000L);
                        }
                        atomicInteger.incrementAndGet();
                        consumer.acknowledge(message);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }).subscribe();
            }
            Producer<Integer> createProducer = createProducer(str, z);
            try {
                createProducer.newMessage().key("slowKey").value(-1).send();
                for (int i2 = 0; i2 < 1000; i2++) {
                    createProducer.newMessage().key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))).value(Integer.valueOf(i2)).send();
                }
                Thread.sleep(5000L);
                Assert.assertEquals(atomicInteger.get(), 1000 * 0.9d, 1000 * 0.3d);
                if (Collections.singletonList(createProducer).get(0) != null) {
                    createProducer.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(createProducer).get(0) != null) {
                    createProducer.close();
                }
                throw th;
            }
        } finally {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((PulsarClient) it.next()).close();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testOrderingWhenAddingConsumers() throws Exception {
        this.conf.setSubscriptionKeySharedEnable(true);
        String str = "testOrderingWhenAddingConsumers-" + UUID.randomUUID();
        Producer<Integer> createProducer = createProducer(str, false);
        try {
            Consumer<Integer> createConsumer = createConsumer(str);
            for (int i = 0; i < 10; i++) {
                try {
                    createProducer.newMessage().key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))).value(Integer.valueOf(i)).send();
                } catch (Throwable th) {
                    throw th;
                }
            }
            createConsumer = createConsumer(str);
            for (int i2 = 10; i2 < 20; i2++) {
                try {
                    createProducer.newMessage().key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))).value(Integer.valueOf(i2)).send();
                } finally {
                    if (Collections.singletonList(createConsumer).get(0) != null) {
                        createConsumer.close();
                    }
                }
            }
            createConsumer.close();
            for (int i3 = 0; i3 < 20; i3++) {
                Message receive = createConsumer.receive();
                Assert.assertEquals(((Integer) receive.getValue()).intValue(), i3);
                createConsumer.acknowledge(receive);
            }
            if (Collections.singletonList(createConsumer).get(0) != null) {
                createConsumer.close();
            }
            if (Collections.singletonList(createConsumer).get(0) != null) {
                createConsumer.close();
            }
        } finally {
            if (Collections.singletonList(createProducer).get(0) != null) {
                createProducer.close();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testReadAheadWhenAddingConsumers() throws Exception {
        this.conf.setSubscriptionKeySharedEnable(true);
        String str = "testReadAheadWhenAddingConsumers-" + UUID.randomUUID();
        Producer<Integer> createProducer = createProducer(str, false);
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{str}).subscriptionName("key_shared").subscriptionType(SubscriptionType.Key_Shared).receiverQueueSize(10).subscribe();
            for (int i = 0; i < 10; i++) {
                try {
                    createProducer.newMessage().key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))).value(Integer.valueOf(i)).send();
                } catch (Throwable th) {
                    throw th;
                }
            }
            subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{str}).subscriptionName("key_shared").subscriptionType(SubscriptionType.Key_Shared).receiverQueueSize(10).subscribe();
            for (int i2 = 10; i2 < 1000; i2++) {
                try {
                    createProducer.newMessage().key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))).value(Integer.valueOf(i2)).sendAsync();
                } finally {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                }
            }
            createProducer.flush();
            Thread.sleep(1000L);
            Assert.assertTrue(((Topic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get()).getSubscription("key_shared").getCursor().getReadPosition().getEntryId() < 1000);
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } finally {
            if (Collections.singletonList(createProducer).get(0) != null) {
                createProducer.close();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testRemoveFirstConsumer() throws Exception {
        this.conf.setSubscriptionKeySharedEnable(true);
        String str = "testReadAheadWhenAddingConsumers-" + UUID.randomUUID();
        Producer<Integer> createProducer = createProducer(str, false);
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{str}).subscriptionName("key_shared").subscriptionType(SubscriptionType.Key_Shared).receiverQueueSize(10).consumerName("c1").subscribe();
            for (int i = 0; i < 10; i++) {
                try {
                    createProducer.newMessage().key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))).value(Integer.valueOf(i)).send();
                } catch (Throwable th) {
                    throw th;
                }
            }
            subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{str}).subscriptionName("key_shared").subscriptionType(SubscriptionType.Key_Shared).receiverQueueSize(10).consumerName("c2").subscribe();
            for (int i2 = 10; i2 < 20; i2++) {
                try {
                    createProducer.newMessage().key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))).value(Integer.valueOf(i2)).send();
                } finally {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                }
            }
            Assert.assertNull(subscribe.receive(100, TimeUnit.MILLISECONDS));
            subscribe.close();
            for (int i3 = 0; i3 < 20; i3++) {
                Message receive = subscribe.receive();
                Assert.assertEquals(((Integer) receive.getValue()).intValue(), i3);
                subscribe.acknowledge(receive);
            }
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } finally {
            if (Collections.singletonList(createProducer).get(0) != null) {
                createProducer.close();
            }
        }
    }

    @Test
    public void testHashRangeConflict() throws PulsarClientException {
        this.conf.setSubscriptionKeySharedEnable(true);
        String str = "persistent://public/default/testHashRangeConflict-" + UUID.randomUUID().toString();
        Consumer<String> createFixedHashRangesConsumer = createFixedHashRangesConsumer(str, "test", Range.of(0, 99), Range.of(400, 65535));
        Assert.assertTrue(createFixedHashRangesConsumer.isConnected());
        Consumer<String> createFixedHashRangesConsumer2 = createFixedHashRangesConsumer(str, "test", Range.of(100, 399));
        Assert.assertTrue(createFixedHashRangesConsumer2.isConnected());
        PersistentStickyKeyDispatcherMultipleConsumers dispatcher = ((Topic) this.pulsar.getBrokerService().getTopicReference(str).get()).getSubscription("test").getDispatcher();
        Assert.assertEquals(dispatcher.getConsumers().size(), 2);
        try {
            createFixedHashRangesConsumer(str, "test", Range.of(0, 65535));
            Assert.fail("Should failed with conflict range.");
        } catch (PulsarClientException.ConsumerAssignException e) {
        }
        try {
            createFixedHashRangesConsumer(str, "test", Range.of(1, 1));
            Assert.fail("Should failed with conflict range.");
        } catch (PulsarClientException.ConsumerAssignException e2) {
        }
        Assert.assertEquals(dispatcher.getConsumers().size(), 2);
        createFixedHashRangesConsumer.close();
        Assert.assertEquals(dispatcher.getConsumers().size(), 1);
        try {
            createFixedHashRangesConsumer(str, "test", Range.of(0, 65535));
            Assert.fail("Should failed with conflict range.");
        } catch (PulsarClientException.ConsumerAssignException e3) {
        }
        try {
            createFixedHashRangesConsumer(str, "test", Range.of(50, 100));
            Assert.fail("Should failed with conflict range.");
        } catch (PulsarClientException.ConsumerAssignException e4) {
        }
        try {
            createFixedHashRangesConsumer(str, "test", Range.of(399, 500));
            Assert.fail("Should failed with conflict range.");
        } catch (PulsarClientException.ConsumerAssignException e5) {
        }
        Consumer<String> createFixedHashRangesConsumer3 = createFixedHashRangesConsumer(str, "test", Range.of(400, 600));
        Assert.assertTrue(createFixedHashRangesConsumer3.isConnected());
        Consumer<String> createFixedHashRangesConsumer4 = createFixedHashRangesConsumer(str, "test", Range.of(50, 99));
        Assert.assertTrue(createFixedHashRangesConsumer4.isConnected());
        Assert.assertEquals(dispatcher.getConsumers().size(), 3);
        createFixedHashRangesConsumer2.close();
        createFixedHashRangesConsumer3.close();
        createFixedHashRangesConsumer4.close();
        Assert.assertFalse(dispatcher.isConsumerConnected());
    }

    @Test
    public void testWithMessageCompression() throws Exception {
        String str = "testWithMessageCompression" + UUID.randomUUID().toString();
        Producer create = this.pulsarClient.newProducer().topic(str).compressionType(CompressionType.LZ4).create();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("test").subscriptionType(SubscriptionType.Key_Shared).subscribe();
        for (int i = 0; i < 10; i++) {
            create.send(("Hello Pulsar > " + i).getBytes());
        }
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < 10; i2++) {
            Message receive = subscribe.receive();
            arrayList.add(receive);
            subscribe.acknowledge(receive);
        }
        Assert.assertEquals(arrayList.size(), 10);
        create.close();
        subscribe.close();
    }

    @Test
    public void testAttachKeyToMessageMetadata() throws PulsarClientException {
        this.conf.setSubscriptionKeySharedEnable(true);
        String str = "persistent://public/default/key_shared-" + UUID.randomUUID();
        Consumer<Integer> createConsumer = createConsumer(str);
        try {
            createConsumer = createConsumer(str);
            try {
                Consumer<Integer> createConsumer2 = createConsumer(str);
                try {
                    Producer create = this.pulsarClient.newProducer(Schema.INT32).topic(str).create();
                    for (int i = 0; i < 1000; i++) {
                        try {
                            create.newMessage().key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))).value(Integer.valueOf(i)).send();
                        } catch (Throwable th) {
                            if (Collections.singletonList(create).get(0) != null) {
                                create.close();
                            }
                            throw th;
                        }
                    }
                    receiveAndCheckDistribution(Lists.newArrayList(new Consumer[]{createConsumer, createConsumer, createConsumer2}), 1000);
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    if (Collections.singletonList(createConsumer2).get(0) != null) {
                        createConsumer2.close();
                    }
                    if (Collections.singletonList(createConsumer).get(0) != null) {
                        createConsumer.close();
                    }
                } finally {
                    if (Collections.singletonList(createConsumer2).get(0) != null) {
                        createConsumer2.close();
                    }
                }
            } finally {
                if (Collections.singletonList(createConsumer).get(0) != null) {
                    createConsumer.close();
                }
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(createConsumer).get(0) != null) {
                createConsumer.close();
            }
            throw th2;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testContinueDispatchMessagesWhenMessageTTL() throws Exception {
        this.conf.setTtlDurationDefaultInSeconds(3);
        String str = "persistent://public/default/key_shared-" + UUID.randomUUID();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{str}).subscriptionName("my-sub").receiverQueueSize(10).subscriptionType(SubscriptionType.Key_Shared).subscribe();
        try {
            Producer create = this.pulsarClient.newProducer(Schema.INT32).topic(str).create();
            for (int i = 0; i < 1000; i++) {
                try {
                    create.newMessage().key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))).value(Integer.valueOf(i)).send();
                } catch (Throwable th) {
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    throw th;
                }
            }
            subscribe.receive();
            subscribe.acknowledge(subscribe.receive());
            subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{str}).subscriptionName("my-sub").subscriptionType(SubscriptionType.Key_Shared).subscribe();
            Message message = null;
            try {
                try {
                    message = subscribe.receive(1, TimeUnit.SECONDS);
                } finally {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                }
            } catch (PulsarClientException e) {
            }
            Assert.assertNull(message);
            Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{str}).subscriptionName("my-sub").subscriptionType(SubscriptionType.Key_Shared).subscribe();
            try {
                try {
                    message = subscribe2.receive(1, TimeUnit.SECONDS);
                } finally {
                    if (Collections.singletonList(subscribe2).get(0) != null) {
                        subscribe2.close();
                    }
                }
            } catch (PulsarClientException e2) {
            }
            Assert.assertNull(message);
            Optional optional = (Optional) this.pulsar.getBrokerService().getTopic(str, false).get();
            Assert.assertTrue(optional.isPresent());
            Thread.sleep((3 - 1) * 1000);
            ((Topic) optional.get()).checkMessageExpiry();
            for (int i2 = 0; i2 < 1000; i2++) {
                create.newMessage().key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))).value(Integer.valueOf(i2)).send();
            }
            Assert.assertNotNull(subscribe.receive(1, TimeUnit.SECONDS));
            Assert.assertNotNull(subscribe2.receive(1, TimeUnit.SECONDS));
            if (Collections.singletonList(subscribe2).get(0) != null) {
                subscribe2.close();
            }
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test(dataProvider = "partitioned")
    public void testOrderingWithConsumerListener(boolean z) throws Exception {
        String str = "persistent://public/default/key_shared-" + UUID.randomUUID();
        if (z) {
            this.admin.topics().createPartitionedTopic(str, 3);
        }
        List<Message> synchronizedList = Collections.synchronizedList(new ArrayList(1000));
        Random random2 = new Random();
        PulsarClient build = PulsarClient.builder().serviceUrl(this.lookupUrl.toString()).listenerThreads(8).build();
        try {
            Consumer subscribe = build.newConsumer(Schema.INT32).topic(new String[]{str}).subscriptionName("my-sub").subscriptionType(SubscriptionType.Key_Shared).messageListener((consumer, message) -> {
                try {
                    Thread.sleep(random2.nextInt(5));
                    synchronizedList.add(message);
                } catch (InterruptedException e) {
                }
            }).subscribe();
            Producer create = build.newProducer(Schema.INT32).topic(str).create();
            String[] strArr = {"key-1", "key-2", "key-3"};
            for (int i = 0; i < 1000; i++) {
                create.newMessage().key(strArr[i % 3]).value(Integer.valueOf(i)).send();
            }
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(synchronizedList.size(), 1000);
            });
            HashMap hashMap = new HashMap();
            for (Message message2 : synchronizedList) {
                String key = message2.getKey();
                Integer num = (Integer) message2.getValue();
                if (hashMap.containsKey(key)) {
                    Assert.assertTrue(num.intValue() > ((Integer) hashMap.get(key)).intValue());
                }
                hashMap.put(key, num);
                subscribe.acknowledge(message2);
            }
            create.close();
            subscribe.close();
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testKeySharedConsumerWithEncrypted() throws Exception {
        String str = "persistent://public/default/key_shared-" + UUID.randomUUID();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{str}).subscriptionName("my-sub").subscriptionType(SubscriptionType.Key_Shared).cryptoKeyReader(new EncKeyReader()).subscribe();
        try {
            subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{str}).subscriptionName("my-sub").subscriptionType(SubscriptionType.Key_Shared).cryptoKeyReader(new EncKeyReader()).subscribe();
            try {
                ArrayList newArrayList = Lists.newArrayList(new Consumer[]{subscribe, subscribe});
                Producer create = this.pulsarClient.newProducer(Schema.INT32).topic(str).cryptoKeyReader(new EncKeyReader()).create();
                for (int i = 0; i < 100; i++) {
                    try {
                        create.newMessage().key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))).value(Integer.valueOf(i)).send();
                    } catch (Throwable th) {
                        if (Collections.singletonList(create).get(0) != null) {
                            create.close();
                        }
                        throw th;
                    }
                }
                ArrayList arrayList = new ArrayList(100);
                int[] iArr = new int[2];
                iArr[0] = 0;
                iArr[1] = 0;
                for (int i2 = 0; i2 < newArrayList.size(); i2++) {
                    while (true) {
                        Message receive = ((Consumer) newArrayList.get(i2)).receive(3, TimeUnit.SECONDS);
                        if (receive != null) {
                            arrayList.add(receive);
                            iArr[i2] = iArr[i2] + 1;
                        }
                    }
                }
                Assert.assertEquals(arrayList.size(), 100);
                Assert.assertEquals(iArr[0] + iArr[1], 100);
                Assert.assertTrue(iArr[0] > 0);
                Assert.assertTrue(iArr[1] > 0);
                HashMap hashMap = new HashMap();
                arrayList.forEach(message -> {
                    if (hashMap.containsKey(message.getKey())) {
                        Assert.assertTrue(((Integer) message.getValue()).intValue() > ((Integer) hashMap.get(message.getKey())).intValue());
                    }
                    hashMap.put(message.getKey(), message.getValue());
                });
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } finally {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
            throw th2;
        }
    }

    private Consumer<String> createFixedHashRangesConsumer(String str, String str2, Range... rangeArr) throws PulsarClientException {
        return this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionType(SubscriptionType.Key_Shared).subscriptionName(str2).keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(rangeArr)).subscribe();
    }

    private Producer<Integer> createProducer(String str, boolean z) throws PulsarClientException {
        return z ? this.pulsarClient.newProducer(Schema.INT32).topic(str).enableBatching(true).maxPendingMessages(2001).batcherBuilder(BatcherBuilder.KEY_BASED).create() : this.pulsarClient.newProducer(Schema.INT32).topic(str).maxPendingMessages(2001).enableBatching(false).create();
    }

    private Consumer<Integer> createConsumer(String str) throws PulsarClientException {
        return createConsumer(str, null);
    }

    private Consumer<Integer> createConsumer(String str, KeySharedPolicy keySharedPolicy) throws PulsarClientException {
        ConsumerBuilder newConsumer = this.pulsarClient.newConsumer(Schema.INT32);
        newConsumer.topic(new String[]{str}).subscriptionName("key_shared").subscriptionType(SubscriptionType.Key_Shared).ackTimeout(3L, TimeUnit.SECONDS);
        if (keySharedPolicy != null) {
            newConsumer.keySharedPolicy(keySharedPolicy);
        }
        return newConsumer.subscribe();
    }

    private void receive(List<Consumer<?>> list) throws PulsarClientException {
        HashMap hashMap = new HashMap();
        for (Consumer<?> consumer : list) {
            while (true) {
                Message receive = consumer.receive(100, TimeUnit.MILLISECONDS);
                if (receive == null) {
                    break;
                }
                consumer.acknowledge(receive);
                if (receive.hasKey()) {
                    Consumer consumer2 = (Consumer) hashMap.get(receive.getKey());
                    if (consumer2 == null) {
                        hashMap.put(receive.getKey(), consumer);
                    } else {
                        Assert.assertEquals(consumer, consumer2);
                    }
                }
            }
        }
    }

    private void receiveAndCheckDistribution(List<Consumer<?>> list, int i) throws PulsarClientException {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        int i2 = 0;
        for (Consumer<?> consumer : list) {
            int i3 = 0;
            while (true) {
                Message receive = consumer.receive(100, TimeUnit.MILLISECONDS);
                if (receive == null) {
                    break;
                }
                i2++;
                i3++;
                consumer.acknowledge(receive);
                if (receive.hasKey() || receive.hasOrderingKey()) {
                    String str = receive.hasOrderingKey() ? new String(receive.getOrderingKey()) : receive.getKey();
                    Consumer consumer2 = (Consumer) hashMap.get(str);
                    if (consumer2 == null) {
                        hashMap.put(str, consumer);
                    } else {
                        Assert.assertEquals(consumer, consumer2);
                    }
                }
            }
            hashMap2.put(consumer, Integer.valueOf(i3));
        }
        double size = i2 / list.size();
        Assert.assertEquals(i, i2);
        Iterator it = hashMap2.values().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(((Integer) it.next()).intValue(), size, size * 0.4d);
        }
    }

    private void receiveAndCheck(List<KeyValue<Consumer<Integer>, Integer>> list) throws PulsarClientException {
        HashMap hashMap = new HashMap();
        for (KeyValue<Consumer<Integer>, Integer> keyValue : list) {
            if (((Integer) keyValue.getValue()).intValue() % 2 != 0) {
                throw new IllegalArgumentException();
            }
            int i = 0;
            HashMap hashMap2 = new HashMap();
            for (Integer num = 0; num.intValue() < ((Integer) keyValue.getValue()).intValue(); num = Integer.valueOf(num.intValue() + 1)) {
                Message receive = ((Consumer) keyValue.getKey()).receive();
                if (num.intValue() % 2 == 0) {
                    ((Consumer) keyValue.getKey()).acknowledge(receive);
                }
                String str = receive.hasOrderingKey() ? new String(receive.getOrderingKey()) : receive.getKey();
                log.info("[{}] Receive message key: {} value: {} messageId: {}", new Object[]{((Consumer) keyValue.getKey()).getConsumerName(), str, receive.getValue(), receive.getMessageId()});
                if (hashMap2.get(str) == null) {
                    Assert.assertNotNull(receive);
                } else {
                    Assert.assertTrue(((Integer) receive.getValue()).compareTo((Integer) ((Message) hashMap2.get(str)).getValue()) > 0);
                }
                hashMap2.put(str, receive);
                hashMap.putIfAbsent(keyValue.getKey(), Sets.newHashSet());
                ((Set) hashMap.get(keyValue.getKey())).add(str);
                i++;
            }
            Assert.assertEquals(((Integer) keyValue.getValue()).intValue(), i);
            int intValue = ((Integer) keyValue.getValue()).intValue() / 2;
            log.info("[{}] Consumer wait for {} messages redelivery ...", keyValue, Integer.valueOf(intValue));
            HashMap hashMap3 = new HashMap();
            for (int i2 = 0; i2 < intValue; i2++) {
                Message receive2 = ((Consumer) keyValue.getKey()).receive();
                i++;
                ((Consumer) keyValue.getKey()).acknowledge(receive2);
                String str2 = receive2.hasOrderingKey() ? new String(receive2.getOrderingKey()) : receive2.getKey();
                log.info("[{}] Receive redeliver message key: {} value: {} messageId: {}", new Object[]{((Consumer) keyValue.getKey()).getConsumerName(), str2, receive2.getValue(), receive2.getMessageId()});
                if (hashMap3.get(str2) == null) {
                    Assert.assertNotNull(receive2);
                } else {
                    Assert.assertTrue(((Integer) receive2.getValue()).compareTo((Integer) ((Message) hashMap3.get(str2)).getValue()) > 0);
                }
                hashMap3.put(str2, receive2);
            }
            Message message = null;
            try {
                message = ((Consumer) keyValue.getKey()).receive(100, TimeUnit.MILLISECONDS);
            } catch (PulsarClientException e) {
            }
            Assert.assertNull(message, "redeliver too many messages.");
            Assert.assertEquals(((Integer) keyValue.getValue()).intValue() + intValue, i);
        }
        HashSet newHashSet = Sets.newHashSet();
        hashMap.forEach((consumer, set) -> {
            set.forEach(str3 -> {
                Assert.assertTrue(newHashSet.add(str3), "Key " + str3 + "is distributed to multiple consumers.");
            });
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 93634218:
                if (implMethodName.equals("lambda$testOrderingWithConsumerListener$eeb59354$1")) {
                    z = true;
                    break;
                }
                break;
            case 276582495:
                if (implMethodName.equals("lambda$testMakingProgressWithSlowerConsumer$28846641$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case SHARED_VALUE:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/KeySharedSubscriptionTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/util/concurrent/atomic/AtomicInteger;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    AtomicInteger atomicInteger = (AtomicInteger) serializedLambda.getCapturedArg(1);
                    return (consumer, message) -> {
                        try {
                            if (str.equals(message.getKey())) {
                                Thread.sleep(10000L);
                            }
                            atomicInteger.incrementAndGet();
                            consumer.acknowledge(message);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/KeySharedSubscriptionTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Random;Ljava/util/List;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    Random random2 = (Random) serializedLambda.getCapturedArg(0);
                    List list = (List) serializedLambda.getCapturedArg(1);
                    return (consumer2, message2) -> {
                        try {
                            Thread.sleep(random2.nextInt(5));
                            list.add(message2);
                        } catch (InterruptedException e) {
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
