/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"flaky"})
public class DeadLetterTopicTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(DeadLetterTopicTest.class);

    @Override
    @BeforeMethod(alwaysRun=true)
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"quarantine"})
    public void testDeadLetterTopic() throws Exception {
        String topic = "persistent://my-property/my-ns/dead-letter-topic";
        int maxRedeliveryCount = 2;
        int sendMessages = 100;
        Consumer consumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build()).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            Producer producer = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic").create();
            for (int i = 0; i < 100; ++i) {
                producer.send((Object)String.format("Hello Pulsar [%d]", i).getBytes());
            }
            producer.close();
            int totalReceived = 0;
            do {
                Message message = consumer.receive();
                log.info("consumer received message : {} {}", (Object)message.getMessageId(), (Object)new String(message.getData()));
            } while (++totalReceived < 300);
            int totalInDeadLetter = 0;
            do {
                Message message = deadLetterConsumer.receive();
                log.info("dead letter consumer received message : {} {}", (Object)message.getMessageId(), (Object)new String(message.getData()));
                deadLetterConsumer.acknowledge(message);
            } while (++totalInDeadLetter < 100);
            deadLetterConsumer.close();
            consumer.close();
            Consumer checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
            if (checkMessage != null) {
                log.info("check consumer received message : {} {}", (Object)checkMessage.getMessageId(), (Object)new String(checkMessage.getData()));
            }
            Assert.assertNull((Object)checkMessage);
            checkConsumer.close();
        }
        finally {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=20000L)
    public void testDeadLetterTopicHasOriginalInfo() throws Exception {
        String topic = "persistent://my-property/my-ns/dead-letter-topic";
        boolean maxRedeliveryCount = true;
        int sendMessages = 10;
        Consumer consumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build()).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            Producer producer = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic").create();
            HashSet<String> messageIds = new HashSet<String>();
            for (int i = 0; i < 10; ++i) {
                MessageId messageId = producer.send((Object)String.format("Hello Pulsar [%d]", i).getBytes());
                messageIds.add(messageId.toString());
            }
            producer.close();
            int totalReceived = 0;
            do {
                consumer.receive();
            } while (++totalReceived < 20);
            int totalInDeadLetter = 0;
            do {
                Message message = deadLetterConsumer.receive();
                Assert.assertEquals((String)((String)message.getProperties().get("REAL_TOPIC")), (String)"persistent://my-property/my-ns/dead-letter-topic");
                Assert.assertTrue((boolean)messageIds.contains(message.getProperties().get("ORIGIN_MESSAGE_IDY_TIME")));
                deadLetterConsumer.acknowledge(message);
            } while (++totalInDeadLetter < 10);
            Assert.assertEquals((int)totalInDeadLetter, (int)10);
            deadLetterConsumer.close();
            consumer.close();
        }
        finally {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    public void testDuplicatedMessageSendToDeadLetterTopic() throws Exception {
        String topic = "persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage";
        boolean maxRedeliveryCount = true;
        int messageCount = 10;
        int consumerCount = 3;
        ArrayList consumers = new ArrayList();
        AtomicInteger totalReceived = new AtomicInteger(0);
        ExecutorService executor = Executors.newFixedThreadPool(3);
        try {
            Message message;
            for (int i = 0; i < 3; ++i) {
                executor.execute(() -> {
                    try {
                        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage"}).subscriptionName("my-subscription-DuplicatedMessage").subscriptionType(SubscriptionType.Shared).ackTimeout(1001L, TimeUnit.MILLISECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).deadLetterTopic("persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage-DLQ").build()).negativeAckRedeliveryDelay(1001L, TimeUnit.MILLISECONDS).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).messageListener((MessageListener & Serializable)(consumer1, msg) -> totalReceived.getAndIncrement()).subscribe();
                        consumers.add(consumer);
                    }
                    catch (PulsarClientException e) {
                        Assert.fail();
                    }
                });
            }
            Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage").create();
            for (int i = 0; i < 10; ++i) {
                producer.send((Object)String.format("Message [%d]", i));
            }
            Consumer deadLetterConsumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage-DLQ"}).subscriptionName("my-subscription-DuplicatedMessage-DLQ").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            int totalInDeadLetter = 0;
            while ((message = deadLetterConsumer.receive(10, TimeUnit.SECONDS)) != null) {
                deadLetterConsumer.acknowledge(message);
                ++totalInDeadLetter;
            }
            Assert.assertEquals((int)totalReceived.get(), (int)20);
            Assert.assertEquals((int)totalInDeadLetter, (int)10);
            producer.close();
            deadLetterConsumer.close();
            for (Consumer consumer : consumers) {
                consumer.close();
            }
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    @Test(enabled=false)
    public void testDeadLetterTopicWithMultiTopic() throws Exception {
        String topic1 = "persistent://my-property/my-ns/dead-letter-topic-1";
        String topic2 = "persistent://my-property/my-ns/dead-letter-topic-2";
        int maxRedeliveryCount = 2;
        int sendMessages = 100;
        Consumer consumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-1", "persistent://my-property/my-ns/dead-letter-topic-2"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build()).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Consumer deadLetterConsumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-1-my-subscription-DLQ", "persistent://my-property/my-ns/dead-letter-topic-2-my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Producer producer1 = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic-1").create();
        Producer producer2 = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic-2").create();
        for (int i = 0; i < sendMessages; ++i) {
            producer1.send((Object)String.format("Hello Pulsar [%d]", i).getBytes());
            producer2.send((Object)String.format("Hello Pulsar [%d]", i).getBytes());
        }
        sendMessages *= 2;
        producer1.close();
        producer2.close();
        int totalReceived = 0;
        do {
            Message message = consumer.receive();
            log.info("consumer received message : {} {} - total = {}", new Object[]{message.getMessageId(), new String(message.getData()), ++totalReceived});
        } while (totalReceived < sendMessages * 3);
        int totalInDeadLetter = 0;
        do {
            Message message = deadLetterConsumer.receive();
            log.info("dead letter consumer received message : {} {}", (Object)message.getMessageId(), (Object)new String(message.getData()));
            deadLetterConsumer.acknowledge(message);
        } while (++totalInDeadLetter < sendMessages);
        deadLetterConsumer.close();
        consumer.close();
        Consumer checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-1", "persistent://my-property/my-ns/dead-letter-topic-2"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
        if (checkMessage != null) {
            log.info("check consumer received message : {} {}", (Object)checkMessage.getMessageId(), (Object)new String(checkMessage.getData()));
        }
        Assert.assertNull((Object)checkMessage);
        checkConsumer.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"quarantine"})
    public void testDeadLetterTopicByCustomTopicName() throws Exception {
        String topic = "persistent://my-property/my-ns/dead-letter-topic";
        int maxRedeliveryCount = 2;
        int sendMessages = 100;
        Consumer consumer = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).receiverQueueSize(100).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).deadLetterTopic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ").build()).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ"}).subscriptionName("my-subscription").subscribe();
            Producer producer = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic").create();
            for (int i = 0; i < 100; ++i) {
                producer.send((Object)String.format("Hello Pulsar [%d]", i).getBytes());
            }
            producer.close();
            int totalReceived = 0;
            do {
                Message message = consumer.receive();
                log.info("consumer received message : {} {}", (Object)message.getMessageId(), (Object)new String(message.getData()));
            } while (++totalReceived < 300);
            int totalInDeadLetter = 0;
            do {
                Message message = deadLetterConsumer.receive();
                log.info("dead letter consumer received message : {} {}", (Object)message.getMessageId(), (Object)new String(message.getData()));
                deadLetterConsumer.acknowledge(message);
            } while (++totalInDeadLetter < 100);
            deadLetterConsumer.close();
            consumer.close();
            PulsarClient newPulsarClient1 = this.newPulsarClient(this.lookupUrl.toString(), 0);
            try {
                Consumer checkConsumer = newPulsarClient1.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
                if (checkMessage != null) {
                    log.info("check consumer received message : {} {}", (Object)checkMessage.getMessageId(), (Object)new String(checkMessage.getData()));
                }
                Assert.assertNull((Object)checkMessage);
                checkConsumer.close();
            }
            finally {
                if (Collections.singletonList(newPulsarClient1).get(0) != null) {
                    newPulsarClient1.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        }
    }

    @Test(timeOut=200000L)
    public void testDeadLetterWithoutConsumerReceiveImmediately() throws PulsarClientException, InterruptedException {
        String topic = "persistent://my-property/my-ns/dead-letter-topic-without-consumer-receive-immediately";
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-without-consumer-receive-immediately"}).subscriptionType(SubscriptionType.Shared).subscriptionName("my-subscription").ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build()).subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/dead-letter-topic-without-consumer-receive-immediately").create();
        producer.send((Object)"a message".getBytes());
        Thread.sleep(5000L);
        Message msg = consumer.receive(1, TimeUnit.SECONDS);
        Assert.assertNotNull((Object)msg);
    }
}

