/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.tests.EnumValuesDataProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test
public class MessageIdTest
extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(MessageIdTest.class);

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        this.baseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        this.internalCleanup();
    }

    @Test(timeOut=10000L, dataProviderClass=EnumValuesDataProvider.class, dataProvider="values")
    public void producerSendAsync(TopicType topicType) throws PulsarClientException, PulsarAdminException {
        String key = "producerSendAsync-" + topicType;
        String topicName = "persistent://prop/cluster/namespace/topic-" + key;
        String subscriptionName = "my-subscription-" + key;
        String messagePrefix = "my-message-" + key + "-";
        int numberOfMessages = 30;
        if (topicType == TopicType.PARTITIONED) {
            int numberOfPartitions = 3;
            this.admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
        }
        Producer producer = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).subscribe();
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
        for (int i = 0; i < 30; ++i) {
            String message = messagePrefix + i;
            futures.add(producer.sendAsync((Object)message.getBytes()));
        }
        HashSet<MessageIdImpl> messageIds = new HashSet<MessageIdImpl>();
        MessageIdImpl previousMessageId = null;
        for (Future future : futures) {
            try {
                MessageIdImpl currentMessageId = (MessageIdImpl)future.get();
                if (previousMessageId != null) {
                    Assert.assertTrue((currentMessageId.compareTo(previousMessageId) > 0 ? 1 : 0) != 0, (String)"Message Ids should be in ascending order");
                }
                messageIds.add(currentMessageId);
                previousMessageId = currentMessageId;
            }
            catch (Exception e) {
                Assert.fail((String)"Failed to publish message", (Throwable)e);
            }
        }
        log.info("Message IDs = {}", messageIds);
        Assert.assertEquals((int)messageIds.size(), (int)30, (String)"Not all messages published successfully");
        for (int i = 0; i < 30; ++i) {
            Message message = consumer.receive();
            Assert.assertEquals((String)new String(message.getData()), (String)(messagePrefix + i));
            MessageId messageId = message.getMessageId();
            if (topicType == TopicType.PARTITIONED) {
                messageId = ((TopicMessageIdImpl)messageId).getInnerMessageId();
            }
            Assert.assertTrue((boolean)messageIds.remove(messageId), (String)"Failed to receive message");
        }
        log.info("Remaining message IDs = {}", messageIds);
        Assert.assertEquals((int)messageIds.size(), (int)0, (String)"Not all messages received successfully");
        consumer.unsubscribe();
    }

    @Test(timeOut=10000L, dataProviderClass=EnumValuesDataProvider.class, dataProvider="values")
    public void producerSend(TopicType topicType) throws PulsarClientException, PulsarAdminException {
        int i;
        String key = "producerSend-" + topicType;
        String topicName = "persistent://prop/cluster/namespace/topic-" + key;
        String subscriptionName = "my-subscription-" + key;
        String messagePrefix = "my-message-" + key + "-";
        int numberOfMessages = 30;
        if (topicType == TopicType.PARTITIONED) {
            int numberOfPartitions = 7;
            this.admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
        }
        Producer producer = this.pulsarClient.newProducer().enableBatching(false).topic(topicName).create();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName(subscriptionName).subscribe();
        HashSet<MessageId> messageIds = new HashSet<MessageId>();
        for (i = 0; i < 30; ++i) {
            String message = messagePrefix + i;
            messageIds.add(producer.send((Object)message.getBytes()));
        }
        log.info("Message IDs = {}", messageIds);
        Assert.assertEquals((int)messageIds.size(), (int)30, (String)"Not all messages published successfully");
        for (i = 0; i < 30; ++i) {
            MessageId messageId = consumer.receive().getMessageId();
            if (topicType == TopicType.PARTITIONED) {
                messageId = ((TopicMessageIdImpl)messageId).getInnerMessageId();
            }
            Assert.assertTrue((boolean)messageIds.remove(messageId), (String)"Failed to receive Message");
        }
        log.info("Remaining message IDs = {}", messageIds);
        Assert.assertEquals((int)messageIds.size(), (int)0, (String)"Not all messages received successfully");
        consumer.unsubscribe();
    }
}

