package org.apache.pulsar.client.impl;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.Future;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.tests.EnumValuesDataProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test
/* loaded from: input_file:org/apache/pulsar/client/impl/MessageIdTest.class */
public class MessageIdTest extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(MessageIdTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        internalCleanup();
    }

    @Test(timeOut = 10000, dataProviderClass = EnumValuesDataProvider.class, dataProvider = "values")
    public void producerSendAsync(TopicType topicType) throws PulsarClientException, PulsarAdminException {
        String str = "producerSendAsync-" + topicType;
        String str2 = "persistent://prop/cluster/namespace/topic-" + str;
        String str3 = "my-subscription-" + str;
        String str4 = "my-message-" + str + "-";
        if (topicType == TopicType.PARTITIONED) {
            this.admin.topics().createPartitionedTopic(str2, 3);
        }
        Producer create = this.pulsarClient.newProducer().topic(str2).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str2}).subscriptionName(str3).subscribe();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 30; i++) {
            arrayList.add(create.sendAsync((str4 + i).getBytes()));
        }
        HashSet hashSet = new HashSet();
        MessageId messageId = null;
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                MessageId messageId2 = (MessageIdImpl) ((Future) it.next()).get();
                if (messageId != null) {
                    Assert.assertTrue(messageId2.compareTo(messageId) > 0, "Message Ids should be in ascending order");
                }
                hashSet.add(messageId2);
                messageId = messageId2;
            } catch (Exception e) {
                Assert.fail("Failed to publish message", e);
            }
        }
        log.info("Message IDs = {}", hashSet);
        Assert.assertEquals(hashSet.size(), 30, "Not all messages published successfully");
        for (int i2 = 0; i2 < 30; i2++) {
            Message receive = subscribe.receive();
            Assert.assertEquals(new String(receive.getData()), str4 + i2);
            Assert.assertTrue(hashSet.remove(receive.getMessageId()), "Failed to receive message");
        }
        log.info("Remaining message IDs = {}", hashSet);
        Assert.assertEquals(hashSet.size(), 0, "Not all messages received successfully");
        subscribe.unsubscribe();
    }

    @Test(timeOut = 10000, dataProviderClass = EnumValuesDataProvider.class, dataProvider = "values")
    public void producerSend(TopicType topicType) throws PulsarClientException, PulsarAdminException {
        String str = "producerSend-" + topicType;
        String str2 = "persistent://prop/cluster/namespace/topic-" + str;
        String str3 = "my-subscription-" + str;
        String str4 = "my-message-" + str + "-";
        if (topicType == TopicType.PARTITIONED) {
            this.admin.topics().createPartitionedTopic(str2, 7);
        }
        Producer create = this.pulsarClient.newProducer().enableBatching(false).topic(str2).create();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str2}).subscriptionName(str3).subscribe();
        HashSet hashSet = new HashSet();
        for (int i = 0; i < 30; i++) {
            hashSet.add(create.send((str4 + i).getBytes()));
        }
        log.info("Message IDs = {}", hashSet);
        Assert.assertEquals(hashSet.size(), 30, "Not all messages published successfully");
        for (int i2 = 0; i2 < 30; i2++) {
            Assert.assertTrue(hashSet.remove(subscribe.receive().getMessageId()), "Failed to receive Message");
        }
        log.info("Remaining message IDs = {}", hashSet);
        Assert.assertEquals(hashSet.size(), 0, "Not all messages received successfully");
        subscribe.unsubscribe();
    }
}
