package org.apache.pulsar.client.impl;

import com.google.common.collect.Sets;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.tests.KeySharedImplementationType;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/client/impl/KeySharedSubscriptionMaxUnackedMessagesTest.class */
public class KeySharedSubscriptionMaxUnackedMessagesTest extends ProducerConsumerBase {
    private final KeySharedImplementationType implementationType;

    /* loaded from: input_file:org/apache/pulsar/client/impl/KeySharedSubscriptionMaxUnackedMessagesTest$KeySharedSelectorType.class */
    enum KeySharedSelectorType {
        AutoSplit_ConsistentHashing(true),
        AutoSplit_Classic(true),
        Sticky(false);

        final boolean autoSplit;

        KeySharedSelectorType(boolean z) {
            this.autoSplit = z;
        }
    }

    @Factory
    public static Object[] createTestInstances() {
        return KeySharedImplementationType.generateTestInstances(KeySharedSubscriptionMaxUnackedMessagesTest::new);
    }

    public KeySharedSubscriptionMaxUnackedMessagesTest() {
        this(KeySharedImplementationType.DEFAULT);
    }

    public KeySharedSubscriptionMaxUnackedMessagesTest(KeySharedImplementationType keySharedImplementationType) {
        this.implementationType = keySharedImplementationType;
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setSubscriptionKeySharedUseClassicPersistentImplementation(this.implementationType.classic);
        this.conf.setSubscriptionSharedUseClassicPersistentImplementation(this.implementationType.classic);
        this.conf.setMaxUnackedMessagesPerConsumer(10);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider
    public Object[][] subType() {
        return this.implementationType.prependImplementationTypeToData(new Object[]{new Object[]{SubscriptionType.Shared, null}, new Object[]{SubscriptionType.Key_Shared, KeySharedSelectorType.AutoSplit_ConsistentHashing}, new Object[]{SubscriptionType.Key_Shared, KeySharedSelectorType.AutoSplit_Classic}, new Object[]{SubscriptionType.Key_Shared, KeySharedSelectorType.Sticky}});
    }

    @Test(dataProvider = "subType", timeOut = 30000)
    public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(KeySharedImplementationType keySharedImplementationType, SubscriptionType subscriptionType, KeySharedSelectorType keySharedSelectorType) throws PulsarClientException {
        if (keySharedSelectorType == KeySharedSelectorType.AutoSplit_Classic) {
            this.conf.setSubscriptionKeySharedUseConsistentHashing(false);
        }
        String newUniqueName = BrokerTestUtil.newUniqueName("broker-close-test");
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        Set newConcurrentHashSet = Sets.newConcurrentHashSet();
        Set newConcurrentHashSet2 = Sets.newConcurrentHashSet();
        AtomicLong atomicLong = new AtomicLong();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        if (subscriptionType == SubscriptionType.Key_Shared) {
            ConsumerBuilder subscriptionType2 = this.pulsarClient.newConsumer().topic(new String[]{newUniqueName}).subscriptionName("sub-1").subscriptionType(subscriptionType);
            if (subscriptionType == SubscriptionType.Key_Shared) {
                if (keySharedSelectorType.autoSplit) {
                    subscriptionType2.keySharedPolicy(KeySharedPolicy.autoSplitHashRange());
                } else {
                    subscriptionType2.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(new Range[]{Range.of(0, 65535)}));
                }
            }
            subscriptionType2.subscribe().close();
        }
        ArrayList arrayList = new ArrayList();
        Range[] rangeArr = null;
        if (subscriptionType == SubscriptionType.Key_Shared && !keySharedSelectorType.autoSplit) {
            rangeArr = splitRange(getSelector(newUniqueName, "sub-1").getKeyHashRange(), 3);
        }
        for (int i = 0; i < 3; i++) {
            ConsumerBuilder messageListener = this.pulsarClient.newConsumer().topic(new String[]{newUniqueName}).consumerName("consumer-" + i).subscriptionName("sub-1").subscriptionType(subscriptionType).messageListener((consumer, message) -> {
                atomicLong.set(System.currentTimeMillis());
                newConcurrentHashSet2.add(message.getMessageId());
                if (!atomicBoolean.get()) {
                    ((List) concurrentHashMap.computeIfAbsent(consumer, consumer -> {
                        return Collections.synchronizedList(new ArrayList());
                    })).add(message.getMessageId());
                    return;
                }
                try {
                    consumer.acknowledge(message);
                } catch (PulsarClientException e) {
                    throw new RuntimeException((Throwable) e);
                }
            });
            if (subscriptionType == SubscriptionType.Key_Shared) {
                if (keySharedSelectorType.autoSplit) {
                    messageListener.keySharedPolicy(KeySharedPolicy.autoSplitHashRange());
                } else {
                    messageListener.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(new Range[]{rangeArr[i]}));
                }
            }
            arrayList.add(messageListener.subscribe());
        }
        String[] strArr = new String[3];
        for (int i2 = 0; i2 < 3; i2++) {
            strArr[i2] = subscriptionType == SubscriptionType.Key_Shared ? generateKeyForConsumer(getSelector(newUniqueName, "sub-1"), ((Consumer) arrayList.get(i2)).getConsumerName()) : "key-" + i2;
        }
        Producer create = this.pulsarClient.newProducer().topic(newUniqueName).enableBatching(true).batchingMaxPublishDelay(1L, TimeUnit.MILLISECONDS).batchingMaxMessages(9).batcherBuilder(BatcherBuilder.KEY_BASED).create();
        for (int i3 = 0; i3 < 1000; i3++) {
            CompletableFuture sendAsync = create.newMessage().key(strArr[i3 % 3]).value(("message-" + i3).getBytes(StandardCharsets.UTF_8)).sendAsync();
            Objects.requireNonNull(newConcurrentHashSet);
            sendAsync.thenAccept((v1) -> {
                r1.add(v1);
            });
        }
        create.flush();
        waitUntilLastActiveTimeNoLongerGetsUpdated(atomicLong);
        atomicBoolean.set(true);
        for (Map.Entry entry : concurrentHashMap.entrySet()) {
            ((Consumer) entry.getKey()).acknowledge((List) entry.getValue());
        }
        atomicLong.set(System.currentTimeMillis());
        waitUntilLastActiveTimeNoLongerGetsUpdated(atomicLong);
        logTopicStats(newUniqueName);
        Assert.assertEquals(1000, newConcurrentHashSet.size());
        Assertions.assertThat(newConcurrentHashSet2).containsExactlyInAnyOrderElementsOf(newConcurrentHashSet);
        create.close();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Consumer) it.next()).close();
        }
    }

    private Range[] splitRange(Range range, int i) {
        Range[] rangeArr = new Range[i];
        int start = range.getStart();
        for (int i2 = 0; i2 < i; i2++) {
            int min = Math.min(start + (range.size() / i), range.getEnd());
            rangeArr[i2] = Range.of(start, min);
            start = min + 1;
        }
        return rangeArr;
    }

    private String generateKeyForConsumer(StickyKeyConsumerSelector stickyKeyConsumerSelector, String str) {
        int i = 0;
        while (!Thread.currentThread().isInterrupted()) {
            int i2 = i;
            i++;
            String str2 = "key" + i2;
            org.apache.pulsar.broker.service.Consumer select = stickyKeyConsumerSelector.select(str2.getBytes(StandardCharsets.UTF_8));
            if (select != null && select.consumerName().equals(str)) {
                return str2;
            }
        }
        return null;
    }

    private static void waitUntilLastActiveTimeNoLongerGetsUpdated(AtomicLong atomicLong) {
        Awaitility.await().pollInterval(100L, TimeUnit.MILLISECONDS).until(() -> {
            return Boolean.valueOf(System.currentTimeMillis() - atomicLong.get() > TimeUnit.SECONDS.toMillis(1L));
        });
    }

    private StickyKeyConsumerSelector getSelector(String str, String str2) {
        return ((Topic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get()).getSubscription(str2).getDispatcher().getSelector();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -900009204:
                if (implMethodName.equals("lambda$testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction$dabf892c$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case SHARED_VALUE:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/impl/KeySharedSubscriptionMaxUnackedMessagesTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicLong;Ljava/util/Set;Ljava/util/concurrent/atomic/AtomicBoolean;Ljava/util/Map;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicLong atomicLong = (AtomicLong) serializedLambda.getCapturedArg(0);
                    Set set = (Set) serializedLambda.getCapturedArg(1);
                    AtomicBoolean atomicBoolean = (AtomicBoolean) serializedLambda.getCapturedArg(2);
                    Map map = (Map) serializedLambda.getCapturedArg(3);
                    return (consumer, message) -> {
                        atomicLong.set(System.currentTimeMillis());
                        set.add(message.getMessageId());
                        if (!atomicBoolean.get()) {
                            ((List) map.computeIfAbsent(consumer, consumer -> {
                                return Collections.synchronizedList(new ArrayList());
                            })).add(message.getMessageId());
                            return;
                        }
                        try {
                            consumer.acknowledge(message);
                        } catch (PulsarClientException e) {
                            throw new RuntimeException((Throwable) e);
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
