/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker-api"})
public class ConsumerRedeliveryTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(ConsumerRedeliveryTest.class);

    @Override
    @BeforeClass
    protected void setup() throws Exception {
        this.conf.setManagedLedgerCacheEvictionFrequency(0.1);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterClass(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider(name="ackReceiptEnabled")
    public Object[][] ackReceiptEnabled() {
        return new Object[][]{{true}, {false}};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="ackReceiptEnabled")
    public void testOrderedRedelivery(boolean ackReceiptEnabled) throws Exception {
        String topic = "persistent://my-property/my-ns/redelivery-" + System.currentTimeMillis();
        this.conf.setManagedLedgerMaxEntriesPerLedger(2);
        this.conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
        Producer producer = this.pulsarClient.newProducer().topic(topic).producerName("my-producer-name").create();
        try {
            ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("s1").subscriptionType(SubscriptionType.Shared).isAckReceiptEnabled(ackReceiptEnabled);
            ConsumerImpl consumer1 = (ConsumerImpl)consumerBuilder.subscribe();
            int totalMsgs = 100;
            for (int i = 0; i < 100; ++i) {
                String message = "my-message-" + i;
                producer.send((Object)message.getBytes());
            }
            int consumedCount = 0;
            HashSet messageIds = Sets.newHashSet();
            for (int i = 0; i < 100; ++i) {
                Message message = consumer1.receive(5, TimeUnit.SECONDS);
                if (message != null && consumedCount % 2 == 0) {
                    consumer1.acknowledge(message);
                } else {
                    messageIds.add(message.getMessageId());
                }
                ++consumedCount;
            }
            Assert.assertEquals((int)100, (int)consumedCount);
            consumer1.redeliverUnacknowledgedMessages((Set)messageIds);
            MessageIdImpl lastMsgId = null;
            for (int i = 0; i < 50; ++i) {
                Message message = consumer1.receive(5, TimeUnit.SECONDS);
                MessageIdImpl msgId = (MessageIdImpl)message.getMessageId();
                if (lastMsgId != null) {
                    Assert.assertTrue((lastMsgId.getLedgerId() <= msgId.getLedgerId() ? 1 : 0) != 0, (String)("lastMsgId: " + lastMsgId + " -- msgId: " + msgId));
                }
                lastMsgId = msgId;
            }
            consumer1.close();
            Consumer consumer2 = consumerBuilder.subscribe();
            try {
                lastMsgId = null;
                for (int i = 0; i < 50; ++i) {
                    Message message = consumer2.receive(5, TimeUnit.SECONDS);
                    MessageIdImpl msgId = (MessageIdImpl)message.getMessageId();
                    if (lastMsgId != null) {
                        Assert.assertTrue((lastMsgId.getLedgerId() <= msgId.getLedgerId() ? 1 : 0) != 0);
                    }
                    lastMsgId = msgId;
                }
            }
            finally {
                if (Collections.singletonList(consumer2).get(0) != null) {
                    consumer2.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    @Test(dataProvider="ackReceiptEnabled")
    public void testUnAckMessageRedeliveryWithReceiveAsync(boolean ackReceiptEnabled) throws PulsarClientException, ExecutionException, InterruptedException {
        int i;
        String topic = "persistent://my-property/my-ns/async-unack-redelivery";
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topic}).subscriptionName("s1").isAckReceiptEnabled(ackReceiptEnabled).enableBatchIndexAcknowledgment(ackReceiptEnabled).ackTimeout(3L, TimeUnit.SECONDS).subscribe();
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(true).batchingMaxMessages(5).batchingMaxPublishDelay(1L, TimeUnit.SECONDS).create();
        int messages = 10;
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>(10);
        for (i = 0; i < 10; ++i) {
            futures.add(consumer.receiveAsync());
        }
        for (i = 0; i < 10; ++i) {
            producer.sendAsync((Object)("my-message-" + i));
        }
        int messageReceived = 0;
        for (CompletableFuture future : futures) {
            Message message = (Message)future.get();
            Assert.assertNotNull((Object)message);
            ++messageReceived;
        }
        Assert.assertEquals((int)10, (int)messageReceived);
        for (int i2 = 0; i2 < 10; ++i2) {
            Message message = consumer.receive();
            Assert.assertNotNull((Object)message);
            ++messageReceived;
            consumer.acknowledge(message);
        }
        Assert.assertEquals((int)20, (int)messageReceived);
        producer.close();
        consumer.close();
    }

    @Test
    public void testConsumerWithPermitReceiveBatchMessages() throws Exception {
        String message;
        int i;
        log.info("-- Starting {} test --", (Object)this.methodName);
        int queueSize = 10;
        int batchSize = 100;
        String subName = "my-subscriber-name";
        String topicName = "permitReceiveBatchMessages" + UUID.randomUUID().toString();
        ConsumerImpl consumer1 = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{topicName}).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscriptionName(subName).subscribe();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic(topicName);
        producerBuilder.enableBatching(true);
        producerBuilder.batchingMaxPublishDelay(2000L, TimeUnit.MILLISECONDS);
        producerBuilder.batchingMaxMessages(100);
        Producer producer = producerBuilder.create();
        for (i = 0; i < batchSize; ++i) {
            message = "my-message-" + i;
            producer.sendAsync((Object)message.getBytes());
        }
        producer.flush();
        for (i = 0; i < 10; ++i) {
            message = "my-message-" + i;
            producer.sendAsync((Object)message.getBytes());
        }
        producer.flush();
        ConsumerRedeliveryTest.retryStrategically(test -> consumer1.getTotalIncomingMessages() == batchSize, 5, 2000L);
        Assert.assertEquals((int)consumer1.getTotalIncomingMessages(), (int)batchSize);
        ConsumerImpl consumer2 = (ConsumerImpl)this.pulsarClient.newConsumer().topic(new String[]{topicName}).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared).subscriptionName(subName).subscribe();
        ConsumerRedeliveryTest.retryStrategically(test -> consumer2.getTotalIncomingMessages() == 10, 5, 2000L);
        Assert.assertEquals((int)consumer2.getTotalIncomingMessages(), (int)10);
        log.info("-- Exiting {} test --", (Object)this.methodName);
    }
}

