package org.apache.pulsar.client.api;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/api/DeadLetterTopicTest.class */
public class DeadLetterTopicTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DeadLetterTopicTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    public void testDeadLetterTopic() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic").subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build()).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer subscribe2 = newPulsarClient.newConsumer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ").subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic").create();
        for (int i = 0; i < 100; i++) {
            create.send(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes());
        }
        create.close();
        int i2 = 0;
        do {
            Message receive = subscribe.receive();
            log.info("consumer received message : {} {}", receive.getMessageId(), new String(receive.getData()));
            i2++;
        } while (i2 < 300);
        int i3 = 0;
        do {
            Message<?> receive2 = subscribe2.receive();
            log.info("dead letter consumer received message : {} {}", receive2.getMessageId(), new String(receive2.getData()));
            subscribe2.acknowledge(receive2);
            i3++;
        } while (i3 < 100);
        subscribe2.close();
        subscribe.close();
        Consumer subscribe3 = this.pulsarClient.newConsumer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic").subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Message receive3 = subscribe3.receive(3, TimeUnit.SECONDS);
        if (receive3 != null) {
            log.info("check consumer received message : {} {}", receive3.getMessageId(), new String(receive3.getData()));
        }
        Assert.assertNull(receive3);
        subscribe3.close();
        newPulsarClient.close();
    }

    @Test(timeOut = 20000)
    public void testDeadLetterTopicHasOriginalInfo() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic").subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build()).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer subscribe2 = newPulsarClient.newConsumer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ").subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic").create();
        HashSet hashSet = new HashSet();
        for (int i = 0; i < 10; i++) {
            hashSet.add(create.send(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes()).toString());
        }
        create.close();
        int i2 = 0;
        do {
            subscribe.receive();
            i2++;
        } while (i2 < 20);
        int i3 = 0;
        do {
            Message<?> receive = subscribe2.receive();
            Assert.assertEquals(receive.getProperties().get("REAL_TOPIC"), "persistent://my-property/my-ns/dead-letter-topic");
            Assert.assertTrue(hashSet.contains(receive.getProperties().get("ORIGIN_MESSAGE_IDY_TIME")));
            subscribe2.acknowledge(receive);
            i3++;
        } while (i3 < 10);
        Assert.assertEquals(i3, 10);
        subscribe2.close();
        subscribe.close();
        newPulsarClient.close();
    }

    @Test(timeOut = 30000)
    public void testDuplicatedMessageSendToDeadLetterTopic() throws Exception {
        ArrayList arrayList = new ArrayList();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 3; i++) {
            newFixedThreadPool.execute(() -> {
                try {
                    arrayList.add(this.pulsarClient.newConsumer(Schema.STRING).topic("persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage").subscriptionName("my-subscription-DuplicatedMessage").subscriptionType(SubscriptionType.Shared).ackTimeout(1001L, TimeUnit.MILLISECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).deadLetterTopic("persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage-DLQ").build()).negativeAckRedeliveryDelay(1001L, TimeUnit.MILLISECONDS).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).messageListener((consumer, message) -> {
                        atomicInteger.getAndIncrement();
                    }).subscribe());
                } catch (PulsarClientException e) {
                    Assert.fail();
                }
            });
        }
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage").create();
        for (int i2 = 0; i2 < 10; i2++) {
            create.send(String.format("Message [%d]", Integer.valueOf(i2)));
        }
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic("persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage-DLQ").subscriptionName("my-subscription-DuplicatedMessage-DLQ").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        int i3 = 0;
        while (true) {
            Message<?> receive = subscribe.receive(10, TimeUnit.SECONDS);
            if (receive == null) {
                break;
            }
            subscribe.acknowledge(receive);
            i3++;
        }
        Assert.assertEquals(atomicInteger.get(), 20);
        Assert.assertEquals(i3, 10);
        newFixedThreadPool.shutdownNow();
        create.close();
        subscribe.close();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Consumer) it.next()).close();
        }
    }

    @Test(enabled = false)
    public void testDeadLetterTopicWithMultiTopic() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic-1", "persistent://my-property/my-ns/dead-letter-topic-2").subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build()).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic-1-my-subscription-DLQ", "persistent://my-property/my-ns/dead-letter-topic-2-my-subscription-DLQ").subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic-1").create();
        Producer create2 = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic-2").create();
        for (int i = 0; i < 100; i++) {
            create.send(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes());
            create2.send(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes());
        }
        int i2 = 100 * 2;
        create.close();
        create2.close();
        int i3 = 0;
        do {
            Message receive = subscribe.receive();
            i3++;
            log.info("consumer received message : {} {} - total = {}", receive.getMessageId(), new String(receive.getData()), Integer.valueOf(i3));
        } while (i3 < i2 * 3);
        int i4 = 0;
        do {
            Message<?> receive2 = subscribe2.receive();
            log.info("dead letter consumer received message : {} {}", receive2.getMessageId(), new String(receive2.getData()));
            subscribe2.acknowledge(receive2);
            i4++;
        } while (i4 < i2);
        subscribe2.close();
        subscribe.close();
        Consumer subscribe3 = this.pulsarClient.newConsumer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic-1", "persistent://my-property/my-ns/dead-letter-topic-2").subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Message receive3 = subscribe3.receive(3, TimeUnit.SECONDS);
        if (receive3 != null) {
            log.info("check consumer received message : {} {}", receive3.getMessageId(), new String(receive3.getData()));
        }
        Assert.assertNull(receive3);
        subscribe3.close();
    }

    public void testDeadLetterTopicByCustomTopicName() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic").subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).receiverQueueSize(100).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).deadLetterTopic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ").build()).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer subscribe2 = newPulsarClient.newConsumer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ").subscriptionName("my-subscription").subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic").create();
        for (int i = 0; i < 100; i++) {
            create.send(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes());
        }
        create.close();
        int i2 = 0;
        do {
            Message receive = subscribe.receive();
            log.info("consumer received message : {} {}", receive.getMessageId(), new String(receive.getData()));
            i2++;
        } while (i2 < 300);
        int i3 = 0;
        do {
            Message<?> receive2 = subscribe2.receive();
            log.info("dead letter consumer received message : {} {}", receive2.getMessageId(), new String(receive2.getData()));
            subscribe2.acknowledge(receive2);
            i3++;
        } while (i3 < 100);
        subscribe2.close();
        subscribe.close();
        PulsarClient newPulsarClient2 = newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer subscribe3 = newPulsarClient2.newConsumer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic").subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Message receive3 = subscribe3.receive(3, TimeUnit.SECONDS);
        if (receive3 != null) {
            log.info("check consumer received message : {} {}", receive3.getMessageId(), new String(receive3.getData()));
        }
        Assert.assertNull(receive3);
        newPulsarClient.close();
        newPulsarClient2.close();
        subscribe3.close();
    }

    @Test(timeOut = 200000)
    public void testDeadLetterWithoutConsumerReceiveImmediately() throws PulsarClientException, InterruptedException {
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/my-ns/dead-letter-topic-without-consumer-receive-immediately").subscriptionType(SubscriptionType.Shared).subscriptionName("my-subscription").ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build()).subscribe();
        this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/dead-letter-topic-without-consumer-receive-immediately").create().send("a message".getBytes());
        Thread.sleep(5000L);
        Assert.assertNotNull(subscribe.receive(1, TimeUnit.SECONDS));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 2100808264:
                if (implMethodName.equals("lambda$null$9fce800d$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/DeadLetterTopicTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    return (consumer, message) -> {
                        atomicInteger.getAndIncrement();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
