/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentMatchers;
import org.mockito.MockSettings;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class MultiTopicsConsumerTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(MultiTopicsConsumerTest.class);
    private ScheduledExecutorService internalExecutorServiceDelegate;

    @Override
    @BeforeMethod(alwaysRun=true)
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Override
    protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws PulsarClientException {
        ClientConfigurationData conf = ((ClientBuilderImpl)clientBuilder).getClientConfigurationData();
        return new PulsarClientImpl(conf){
            {
                ScheduledExecutorService internalExecutorService = (ScheduledExecutorService)super.getInternalExecutorService();
                MultiTopicsConsumerTest.this.internalExecutorServiceDelegate = (ScheduledExecutorService)Mockito.mock(ScheduledExecutorService.class, (MockSettings)Mockito.withSettings().defaultAnswer(AdditionalAnswers.delegatesTo((Object)internalExecutorService)));
            }

            public ExecutorService getInternalExecutorService() {
                return MultiTopicsConsumerTest.this.internalExecutorServiceDelegate;
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMultiTopicsConsumerCloses() throws Exception {
        String topicNameBase = "persistent://my-property/my-ns/my-topic-consumer-closes-";
        Producer producer1 = this.pulsarClient.newProducer().topic(topicNameBase + "1").enableBatching(false).create();
        try {
            Producer producer2 = this.pulsarClient.newProducer().topic(topicNameBase + "2").enableBatching(false).create();
            try {
                Producer producer3 = this.pulsarClient.newProducer().topic(topicNameBase + "3").enableBatching(false).create();
                try {
                    Consumer consumer = this.pulsarClient.newConsumer().topics((List)Lists.newArrayList((Object[])new String[]{topicNameBase + "1", topicNameBase + "2", topicNameBase + "3"})).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).receiverQueueSize(1).subscriptionName(this.methodName).subscribe();
                    Thread.sleep(1000L);
                    consumer.close();
                    Thread.sleep(1000L);
                    ((ScheduledExecutorService)Mockito.verify((Object)this.internalExecutorServiceDelegate, (VerificationMode)Mockito.times((int)0))).schedule((Runnable)ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), (TimeUnit)((Object)ArgumentMatchers.any()));
                }
                finally {
                    if (Collections.singletonList(producer3).get(0) != null) {
                        producer3.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(producer2).get(0) != null) {
                    producer2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer1).get(0) != null) {
                producer1.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testShouldMaintainOrderForIndividualTopicInMultiTopicsConsumer() throws PulsarAdminException, PulsarClientException, ExecutionException, InterruptedException, TimeoutException {
        String topicName = this.newTopicName();
        int numPartitions = 2;
        int numMessages = 100000;
        this.admin.topics().createPartitionedTopic(topicName, numPartitions);
        Producer[] producers = new Producer[numPartitions];
        for (int i = 0; i < numPartitions; ++i) {
            producers[i] = this.pulsarClient.newProducer(Schema.INT64).topic(topicName + "-partition-" + i).enableBatching(true).maxPendingMessages(30000).maxPendingMessagesAcrossPartitions(60000).batchingMaxMessages(10000).batchingMaxPublishDelay(5L, TimeUnit.SECONDS).batchingMaxBytes(0x400000).blockIfQueueFull(true).create();
        }
        Consumer consumer = this.pulsarClient.newConsumer(Schema.INT64).topic(new String[]{topicName}).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).receiverQueueSize(numMessages).subscriptionName(this.methodName).subscribe();
        try {
            int receivedCount;
            long sequenceNumber = 1L;
            for (int i = 0; i < numMessages; ++i) {
                Producer[] producerArray = producers;
                int n = producerArray.length;
                for (int j = 0; j < n; ++j) {
                    Producer producer = producerArray[j];
                    producer.newMessage().value((Object)sequenceNumber).sendAsync();
                }
                ++sequenceNumber;
            }
            for (Producer producer : producers) {
                producer.close();
            }
            HashMap<String, AtomicLong> receivedSequences = new HashMap<String, AtomicLong>();
            for (receivedCount = 0; receivedCount < numPartitions * numMessages; ++receivedCount) {
                Message message = (Message)consumer.receiveAsync().get(5L, TimeUnit.SECONDS);
                consumer.acknowledge(message);
                AtomicLong receivedSequenceCounter = receivedSequences.computeIfAbsent(message.getTopicName(), k -> new AtomicLong(1L));
                Assert.assertEquals((long)((Long)message.getValue()), (long)receivedSequenceCounter.getAndIncrement());
            }
            Assert.assertEquals((int)(numPartitions * numMessages), (int)receivedCount);
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }
}

