/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker-api"})
public class ExposeMessageRedeliveryCountTest
extends ProducerConsumerBase {
    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(timeOut=30000L)
    public void testRedeliveryCount() throws PulsarClientException {
        Message message;
        int redeliveryCount;
        String topic = "persistent://my-property/my-ns/redeliveryCount";
        Consumer consumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/redeliveryCount"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Producer producer = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/redeliveryCount").create();
        producer.send((Object)"Hello Pulsar".getBytes());
        do {
            message = consumer.receive();
            message.getProperties();
        } while ((redeliveryCount = message.getRedeliveryCount()) <= 2);
        consumer.acknowledge(message);
        Assert.assertEquals((int)3, (int)redeliveryCount);
        producer.close();
        consumer.close();
    }

    @Test(timeOut=30000L)
    public void testRedeliveryCountWithPartitionedTopic() throws PulsarClientException, PulsarAdminException {
        Message message;
        int redeliveryCount;
        String topic = "persistent://my-property/my-ns/redeliveryCount.partitioned";
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/redeliveryCount.partitioned", 3);
        Consumer consumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/redeliveryCount.partitioned"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Producer producer = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/redeliveryCount.partitioned").create();
        producer.send((Object)"Hello Pulsar".getBytes());
        do {
            message = consumer.receive();
            message.getProperties();
        } while ((redeliveryCount = message.getRedeliveryCount()) <= 2);
        consumer.acknowledge(message);
        Assert.assertEquals((int)3, (int)redeliveryCount);
        producer.close();
        consumer.close();
        this.admin.topics().deletePartitionedTopic("persistent://my-property/my-ns/redeliveryCount.partitioned");
    }

    @Test(timeOut=30000L)
    public void testRedeliveryCountWhenConsumerDisconnected() throws PulsarClientException {
        Message msg;
        int i;
        String topic = "persistent://my-property/my-ns/testRedeliveryCountWhenConsumerDisconnected";
        Consumer consumer0 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topic}).subscriptionName("s1").subscriptionType(SubscriptionType.Shared).subscribe();
        Consumer consumer1 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topic}).subscriptionName("s1").subscriptionType(SubscriptionType.Shared).subscribe();
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(true).batchingMaxMessages(5).batchingMaxPublishDelay(1L, TimeUnit.SECONDS).create();
        int messages = 10;
        for (int i2 = 0; i2 < 10; ++i2) {
            producer.send((Object)("my-message-" + i2));
        }
        ArrayList<Message> receivedMessagesForConsumer0 = new ArrayList<Message>();
        ArrayList<Message> receivedMessagesForConsumer1 = new ArrayList<Message>();
        for (i = 0; i < 10 && (msg = consumer0.receive(1, TimeUnit.SECONDS)) != null; ++i) {
            receivedMessagesForConsumer0.add(msg);
        }
        for (i = 0; i < 10 && (msg = consumer1.receive(1, TimeUnit.SECONDS)) != null; ++i) {
            receivedMessagesForConsumer1.add(msg);
        }
        Assert.assertEquals((int)(receivedMessagesForConsumer0.size() + receivedMessagesForConsumer1.size()), (int)10);
        consumer0.close();
        for (i = 0; i < receivedMessagesForConsumer0.size(); ++i) {
            Assert.assertEquals((int)consumer1.receive().getRedeliveryCount(), (int)1);
        }
    }
}

