package org.apache.pulsar.client.api;

import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.apache.avro.reflect.Nullable;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/client/api/DeadLetterTopicTest.class */
public class DeadLetterTopicTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(DeadLetterTopicTest.class);

    /* loaded from: input_file:org/apache/pulsar/client/api/DeadLetterTopicTest$Foo.class */
    public static class Foo {

        @Nullable
        private String field1;

        @Nullable
        private String field2;

        @Generated
        public Foo() {
        }

        @Generated
        public String getField1() {
            return this.field1;
        }

        @Generated
        public String getField2() {
            return this.field2;
        }

        @Generated
        public void setField1(String str) {
            this.field1 = str;
        }

        @Generated
        public void setField2(String str) {
            this.field2 = str;
        }

        @Generated
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Foo)) {
                return false;
            }
            Foo foo = (Foo) obj;
            if (!foo.canEqual(this)) {
                return false;
            }
            String field1 = getField1();
            String field12 = foo.getField1();
            if (field1 == null) {
                if (field12 != null) {
                    return false;
                }
            } else if (!field1.equals(field12)) {
                return false;
            }
            String field2 = getField2();
            String field22 = foo.getField2();
            return field2 == null ? field22 == null : field2.equals(field22);
        }

        @Generated
        protected boolean canEqual(Object obj) {
            return obj instanceof Foo;
        }

        @Generated
        public int hashCode() {
            String field1 = getField1();
            int hashCode = (1 * 59) + (field1 == null ? 43 : field1.hashCode());
            String field2 = getField2();
            return (hashCode * 59) + (field2 == null ? 43 : field2.hashCode());
        }

        @Generated
        public String toString() {
            return "DeadLetterTopicTest.Foo(field1=" + getField1() + ", field2=" + getField2() + ")";
        }
    }

    /* loaded from: input_file:org/apache/pulsar/client/api/DeadLetterTopicTest$FooV2.class */
    public static class FooV2 {

        @Nullable
        private String field1;

        @Nullable
        private String field2;

        @Nullable
        private String field3;

        @Generated
        public FooV2() {
        }

        @Generated
        public String getField1() {
            return this.field1;
        }

        @Generated
        public String getField2() {
            return this.field2;
        }

        @Generated
        public String getField3() {
            return this.field3;
        }

        @Generated
        public void setField1(String str) {
            this.field1 = str;
        }

        @Generated
        public void setField2(String str) {
            this.field2 = str;
        }

        @Generated
        public void setField3(String str) {
            this.field3 = str;
        }

        @Generated
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof FooV2)) {
                return false;
            }
            FooV2 fooV2 = (FooV2) obj;
            if (!fooV2.canEqual(this)) {
                return false;
            }
            String field1 = getField1();
            String field12 = fooV2.getField1();
            if (field1 == null) {
                if (field12 != null) {
                    return false;
                }
            } else if (!field1.equals(field12)) {
                return false;
            }
            String field2 = getField2();
            String field22 = fooV2.getField2();
            if (field2 == null) {
                if (field22 != null) {
                    return false;
                }
            } else if (!field2.equals(field22)) {
                return false;
            }
            String field3 = getField3();
            String field32 = fooV2.getField3();
            return field3 == null ? field32 == null : field3.equals(field32);
        }

        @Generated
        protected boolean canEqual(Object obj) {
            return obj instanceof FooV2;
        }

        @Generated
        public int hashCode() {
            String field1 = getField1();
            int hashCode = (1 * 59) + (field1 == null ? 43 : field1.hashCode());
            String field2 = getField2();
            int hashCode2 = (hashCode * 59) + (field2 == null ? 43 : field2.hashCode());
            String field3 = getField3();
            return (hashCode2 * 59) + (field3 == null ? 43 : field3.hashCode());
        }

        @Generated
        public String toString() {
            return "DeadLetterTopicTest.FooV2(field1=" + getField1() + ", field2=" + getField2() + ", field3=" + getField3() + ")";
        }
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod(alwaysRun = true)
    protected void setup() throws Exception {
        this.conf.setMaxMessageSize(5120);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    private String createMessagePayload(int i) {
        StringBuilder sb = new StringBuilder();
        Random random = new Random();
        for (int i2 = 0; i2 < i; i2++) {
            sb.append(random.nextInt(10));
        }
        return sb.toString();
    }

    @Test
    public void testDeadLetterTopicWithMessageKey() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build()).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer subscribe2 = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic").create();
            for (int i = 0; i < 100; i++) {
                create.newMessage().key("test-key").value(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes()).send();
            }
            create.close();
            int i2 = 0;
            do {
                Message receive = subscribe.receive();
                log.info("consumer received message : {} {}", receive.getMessageId(), new String(receive.getData()));
                i2++;
            } while (i2 < 200);
            int i3 = 0;
            do {
                Message receive2 = subscribe2.receive();
                Assert.assertEquals(receive2.getKey(), "test-key");
                log.info("dead letter consumer received message : {} {}", receive2.getMessageId(), new String(receive2.getData()));
                subscribe2.acknowledge(receive2);
                i3++;
            } while (i3 < 100);
            subscribe2.close();
            subscribe.close();
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
            throw th;
        }
    }

    @Test
    public void testDeadLetterTopicWithBinaryMessageKey() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build()).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer subscribe2 = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic").create();
            byte[] bArr = {1, 2, 3, 4};
            for (int i = 0; i < 100; i++) {
                create.newMessage().keyBytes(bArr).value(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes()).send();
            }
            create.close();
            int i2 = 0;
            do {
                Message receive = subscribe.receive();
                log.info("consumer received message : {} {}", receive.getMessageId(), new String(receive.getData()));
                i2++;
            } while (i2 < 200);
            int i3 = 0;
            do {
                Message receive2 = subscribe2.receive();
                Assert.assertEquals(receive2.getKeyBytes(), bArr);
                log.info("dead letter consumer received message : {} {}", receive2.getMessageId(), new String(receive2.getData()));
                subscribe2.acknowledge(receive2);
                i3++;
            } while (i3 < 100);
            subscribe2.close();
            subscribe.close();
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
            throw th;
        }
    }

    @Test
    public void testDeadLetterTopicMessagesWithOrderingKey() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build()).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer subscribe2 = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic").create();
            byte[] bArr = {1, 2, 3, 4};
            for (int i = 0; i < 100; i++) {
                create.newMessage().orderingKey(bArr).value(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes()).send();
            }
            create.close();
            int i2 = 0;
            do {
                Message receive = subscribe.receive();
                log.info("consumer received message : {} {}", receive.getMessageId(), new String(receive.getData()));
                i2++;
            } while (i2 < 200);
            int i3 = 0;
            do {
                Message receive2 = subscribe2.receive();
                Assert.assertEquals(receive2.getOrderingKey(), bArr);
                log.info("dead letter consumer received message : {} {}", receive2.getMessageId(), new String(receive2.getData()));
                subscribe2.acknowledge(receive2);
                i3++;
            } while (i3 < 100);
            subscribe2.close();
            subscribe.close();
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
            throw th;
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "produceLargeMessages")
    public Object[][] produceLargeMessages() {
        return new Object[]{new Object[]{false}, new Object[]{true}};
    }

    @Test(dataProvider = "produceLargeMessages")
    public void testDeadLetterTopic(boolean z) throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build()).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer subscribe2 = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic").enableChunking(z).enableBatching(!z).create();
            HashMap hashMap = new HashMap();
            for (int i = 0; i < 100; i++) {
                String format = !z ? String.format("Hello Pulsar [%d]", Integer.valueOf(i)) : createMessagePayload(10240);
                create.newMessage().key(String.valueOf(i)).value(format.getBytes()).send();
                hashMap.put(Integer.valueOf(i), format);
            }
            create.close();
            int i2 = 0;
            do {
                Message receive = subscribe.receive(5, TimeUnit.SECONDS);
                Assert.assertNotNull(receive, "The consumer should be able to receive messages.");
                log.info("consumer received message : {}", receive.getMessageId());
                i2++;
            } while (i2 < 300);
            int i3 = 0;
            do {
                Message receive2 = subscribe2.receive(5, TimeUnit.SECONDS);
                Assert.assertNotNull(receive2, "the deadLetterConsumer should receive messages.");
                Assert.assertEquals(new String(receive2.getData()), (String) hashMap.get(Integer.valueOf(Integer.parseInt(receive2.getKey()))));
                hashMap.remove(Integer.valueOf(Integer.parseInt(receive2.getKey())));
                log.info("dead letter consumer received message : {}", receive2.getMessageId());
                subscribe2.acknowledge(receive2);
                i3++;
            } while (i3 < 100);
            Assert.assertTrue(hashMap.isEmpty());
            subscribe2.close();
            subscribe.close();
            Consumer subscribe3 = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            Message receive3 = subscribe3.receive(3, TimeUnit.SECONDS);
            if (receive3 != null) {
                log.info("check consumer received message : {} {}", receive3.getMessageId(), new String(receive3.getData()));
            }
            Assert.assertNull(receive3);
            subscribe3.close();
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
            throw th;
        }
    }

    @Test(timeOut = 20000)
    public void testDeadLetterTopicHasOriginalInfo() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build()).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer subscribe2 = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic").create();
            HashSet hashSet = new HashSet();
            for (int i = 0; i < 10; i++) {
                hashSet.add(create.send(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes()).toString());
            }
            create.close();
            int i2 = 0;
            do {
                subscribe.receive();
                i2++;
            } while (i2 < 20);
            int i3 = 0;
            do {
                Message receive = subscribe2.receive();
                Assert.assertEquals((String) receive.getProperties().get("REAL_TOPIC"), "persistent://my-property/my-ns/dead-letter-topic");
                Assert.assertTrue(hashSet.contains(receive.getProperties().get("ORIGIN_MESSAGE_IDY_TIME")));
                subscribe2.acknowledge(receive);
                i3++;
            } while (i3 < 10);
            Assert.assertEquals(i3, 10);
            subscribe2.close();
            subscribe.close();
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
            throw th;
        }
    }

    @Test(timeOut = 20000)
    public void testAutoConsumeSchemaDeadLetter() throws Exception {
        this.admin.topics().createNonPartitionedTopic("persistent://my-property/my-ns/dead-letter-topic");
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer subscribe = newPulsarClient.newConsumer(Schema.AVRO(FooV2.class)).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic("persistent://my-property/my-ns/dead-letter-topic").create();
        HashSet hashSet = new HashSet();
        for (int i = 0; i < 10; i++) {
            if (i % 2 == 0) {
                Foo foo = new Foo();
                foo.field1 = i;
                foo.field2 = i;
                hashSet.add(create.newMessage(Schema.AVRO(Foo.class)).value(foo).send().toString());
            } else {
                FooV2 fooV2 = new FooV2();
                fooV2.field1 = i;
                fooV2.field2 = i;
                fooV2.field3 = i;
                hashSet.add(create.newMessage(Schema.AVRO(FooV2.class)).value(fooV2).send().toString());
            }
        }
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build()).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        create.close();
        int i2 = 0;
        do {
            subscribe2.receive();
            i2++;
        } while (i2 < 20);
        int i3 = 0;
        for (int i4 = 0; i4 < 10; i4++) {
            Message receive = subscribe.receive();
            FooV2 fooV22 = (FooV2) receive.getValue();
            Assert.assertNotNull(fooV22.field1);
            Assert.assertEquals(fooV22.field2, fooV22.field1);
            Assert.assertTrue(fooV22.field3 == null || fooV22.field1.equals(fooV22.field3));
            Assert.assertEquals((String) receive.getProperties().get("REAL_TOPIC"), "persistent://my-property/my-ns/dead-letter-topic");
            Assert.assertTrue(hashSet.contains(receive.getProperties().get("ORIGIN_MESSAGE_IDY_TIME")));
            subscribe.acknowledge(receive);
            i3++;
        }
        Assert.assertEquals(i3, 10);
        subscribe.close();
        subscribe2.close();
        newPulsarClient.close();
    }

    @Test(timeOut = 30000)
    public void testDuplicatedMessageSendToDeadLetterTopic() throws Exception {
        ArrayList arrayList = new ArrayList();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 3; i++) {
            try {
                newFixedThreadPool.execute(() -> {
                    try {
                        arrayList.add(this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage"}).subscriptionName("my-subscription-DuplicatedMessage").subscriptionType(SubscriptionType.Shared).ackTimeout(1001L, TimeUnit.MILLISECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).deadLetterTopic("persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage-DLQ").build()).negativeAckRedeliveryDelay(1001L, TimeUnit.MILLISECONDS).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).messageListener((consumer, message) -> {
                            atomicInteger.getAndIncrement();
                        }).subscribe());
                    } catch (PulsarClientException e) {
                        Assert.fail();
                    }
                });
            } finally {
                if (Collections.singletonList(newFixedThreadPool).get(0) != null) {
                    newFixedThreadPool.shutdownNow();
                }
            }
        }
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage").create();
        for (int i2 = 0; i2 < 10; i2++) {
            create.send(String.format("Message [%d]", Integer.valueOf(i2)));
        }
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage-DLQ"}).subscriptionName("my-subscription-DuplicatedMessage-DLQ").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        int i3 = 0;
        while (true) {
            Message receive = subscribe.receive(10, TimeUnit.SECONDS);
            if (receive == null) {
                break;
            }
            subscribe.acknowledge(receive);
            i3++;
        }
        Assert.assertEquals(atomicInteger.get(), 20);
        Assert.assertEquals(i3, 10);
        create.close();
        subscribe.close();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Consumer) it.next()).close();
        }
    }

    @Test(enabled = false)
    public void testDeadLetterTopicWithMultiTopic() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-1", "persistent://my-property/my-ns/dead-letter-topic-2"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build()).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-1-my-subscription-DLQ", "persistent://my-property/my-ns/dead-letter-topic-2-my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic-1").create();
        Producer create2 = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic-2").create();
        for (int i = 0; i < 100; i++) {
            create.send(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes());
            create2.send(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes());
        }
        int i2 = 100 * 2;
        create.close();
        create2.close();
        int i3 = 0;
        do {
            Message receive = subscribe.receive();
            i3++;
            log.info("consumer received message : {} {} - total = {}", new Object[]{receive.getMessageId(), new String(receive.getData()), Integer.valueOf(i3)});
        } while (i3 < i2 * 3);
        int i4 = 0;
        do {
            Message receive2 = subscribe2.receive();
            log.info("dead letter consumer received message : {} {}", receive2.getMessageId(), new String(receive2.getData()));
            subscribe2.acknowledge(receive2);
            i4++;
        } while (i4 < i2);
        subscribe2.close();
        subscribe.close();
        Consumer subscribe3 = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-1", "persistent://my-property/my-ns/dead-letter-topic-2"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Message receive3 = subscribe3.receive(3, TimeUnit.SECONDS);
        if (receive3 != null) {
            log.info("check consumer received message : {} {}", receive3.getMessageId(), new String(receive3.getData()));
        }
        Assert.assertNull(receive3);
        subscribe3.close();
    }

    @Test(groups = {"quarantine"})
    public void testDeadLetterTopicByCustomTopicName() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).receiverQueueSize(100).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).deadLetterTopic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ").build()).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer subscribe2 = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ"}).subscriptionName("my-subscription").subscribe();
            Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic").create();
            for (int i = 0; i < 100; i++) {
                create.send(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes());
            }
            create.close();
            int i2 = 0;
            do {
                Message receive = subscribe.receive();
                log.info("consumer received message : {} {}", receive.getMessageId(), new String(receive.getData()));
                i2++;
            } while (i2 < 300);
            int i3 = 0;
            do {
                Message receive2 = subscribe2.receive();
                log.info("dead letter consumer received message : {} {}", receive2.getMessageId(), new String(receive2.getData()));
                subscribe2.acknowledge(receive2);
                i3++;
            } while (i3 < 100);
            subscribe2.close();
            subscribe.close();
            newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
            try {
                Consumer subscribe3 = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                Message receive3 = subscribe3.receive(3, TimeUnit.SECONDS);
                if (receive3 != null) {
                    log.info("check consumer received message : {} {}", receive3.getMessageId(), new String(receive3.getData()));
                }
                Assert.assertNull(receive3);
                subscribe3.close();
                if (Collections.singletonList(newPulsarClient).get(0) != null) {
                    newPulsarClient.close();
                }
            } finally {
                if (Collections.singletonList(newPulsarClient).get(0) != null) {
                    newPulsarClient.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
            throw th;
        }
    }

    @Test(timeOut = 200000)
    public void testDeadLetterWithoutConsumerReceiveImmediately() throws PulsarClientException, InterruptedException {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-without-consumer-receive-immediately"}).subscriptionType(SubscriptionType.Shared).subscriptionName("my-subscription").ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build()).subscribe();
        this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/dead-letter-topic-without-consumer-receive-immediately").create().send("a message".getBytes());
        Thread.sleep(5000L);
        Assert.assertNotNull(subscribe.receive(1, TimeUnit.SECONDS));
    }

    @Test
    public void testDeadLetterTopicUnderPartitionedTopicWithKeyShareType() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/dead-letter-topic-with-partitioned-topic", 2);
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-with-partitioned-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Key_Shared).keySharedPolicy(KeySharedPolicy.autoSplitHashRange()).ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build()).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-with-partitioned-topic-partition-0-my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Consumer subscribe3 = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-with-partitioned-topic-partition-1-my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic-with-partitioned-topic").create();
        for (int i = 0; i < 1; i++) {
            create.send(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes());
        }
        create.close();
        int i2 = 0;
        do {
            Message receive = subscribe.receive();
            log.info("consumer received message : {} {}", receive.getMessageId(), new String(receive.getData()));
            i2++;
        } while (i2 < 3);
        int i3 = 0;
        do {
            Message receive2 = subscribe2.receive(3, TimeUnit.SECONDS);
            if (receive2 == null) {
                break;
            }
            log.info("dead letter consumer received message : {} {}", receive2.getMessageId(), new String(receive2.getData()));
            subscribe2.acknowledge(receive2);
            i3++;
        } while (i3 < 1);
        do {
            Message receive3 = subscribe3.receive(3, TimeUnit.SECONDS);
            if (receive3 == null) {
                break;
            }
            log.info("dead letter consumer received message : {} {}", receive3.getMessageId(), new String(receive3.getData()));
            subscribe3.acknowledge(receive3);
            i3++;
        } while (i3 < 1);
        Assert.assertEquals(i3, 1);
        subscribe2.close();
        subscribe3.close();
        subscribe.close();
        Consumer subscribe4 = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-with-partitioned-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Key_Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Message receive4 = subscribe4.receive(3, TimeUnit.SECONDS);
        if (receive4 != null) {
            log.info("check consumer received message : {} {}", receive4.getMessageId(), new String(receive4.getData()));
        }
        Assert.assertNull(receive4);
        subscribe4.close();
    }

    @Test
    public void testDeadLetterTopicWithInitialSubscription() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).initialSubscriptionName("init-sub").build()).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic").create();
            for (int i = 0; i < 100; i++) {
                create.send(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes());
            }
            create.close();
            int i2 = 0;
            do {
                Message receive = subscribe.receive(3, TimeUnit.SECONDS);
                if (receive == null) {
                    break;
                }
                log.info("consumer received message : {} {}", receive.getMessageId(), new String(receive.getData()));
                i2++;
            } while (i2 < 200);
            String str = "persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ";
            Awaitility.await().atMost(Duration.ofSeconds(10L)).pollInterval(Duration.ofSeconds(1L)).untilAsserted(() -> {
                Assert.assertTrue(this.admin.namespaces().getTopics("my-property/my-ns").contains(str));
                Assert.assertTrue(this.admin.topics().getSubscriptions(str).contains("init-sub"));
            });
            Consumer subscribe2 = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ"}).subscriptionName("init-sub").subscribe();
            int i3 = 0;
            do {
                Message receive2 = subscribe2.receive(10, TimeUnit.SECONDS);
                Assert.assertNotNull(receive2, "Dead letter consumer can not receive messages.");
                log.info("dead letter consumer received message : {} {}", receive2.getMessageId(), new String(receive2.getData()));
                subscribe2.acknowledge(receive2);
                i3++;
            } while (i3 < 100);
            subscribe2.close();
            subscribe.close();
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
            throw th;
        }
    }

    private CompletableFuture<Void> consumerReceiveForDLQ(Consumer<byte[]> consumer, AtomicInteger atomicInteger, int i, int i2) {
        return CompletableFuture.runAsync(() -> {
            do {
                try {
                    Message receive = consumer.receive(3, TimeUnit.SECONDS);
                    if (receive == null) {
                        return;
                    }
                    log.info("consumer received message : {} {}", receive.getMessageId(), new String(receive.getData()));
                    atomicInteger.incrementAndGet();
                } catch (PulsarClientException e) {
                    log.info("fail while receiving messages: {}", e.getMessage());
                    return;
                }
            } while (atomicInteger.get() < i * (i2 + 1));
        });
    }

    @Test
    public void testDeadLetterTopicWithInitialSubscriptionAndMultiConsumers() throws Exception {
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).initialSubscriptionName("init-sub").build()).receiverQueueSize(20).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Consumer<byte[]> subscribe2 = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).initialSubscriptionName("init-sub").build()).receiverQueueSize(20).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/dead-letter-topic").create();
            for (int i = 0; i < 100; i++) {
                create.send(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes());
            }
            create.close();
            AtomicInteger atomicInteger = new AtomicInteger(0);
            CompletableFuture.allOf(consumerReceiveForDLQ(subscribe, atomicInteger, 100, 1), consumerReceiveForDLQ(subscribe2, atomicInteger, 100, 1)).get(10L, TimeUnit.SECONDS);
            String str = "persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ";
            Awaitility.await().atMost(Duration.ofSeconds(10L)).pollInterval(Duration.ofSeconds(1L)).untilAsserted(() -> {
                Assert.assertTrue(this.admin.namespaces().getTopics("my-property/my-ns").contains(str));
                Assert.assertTrue(this.admin.topics().getSubscriptions(str).contains("init-sub"));
            });
            Consumer subscribe3 = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ"}).subscriptionName("init-sub").subscribe();
            int i2 = 0;
            do {
                Message receive = subscribe3.receive(10, TimeUnit.SECONDS);
                Assert.assertNotNull(receive, "Dead letter consumer can not receive messages.");
                log.info("dead letter consumer received message : {} {}", receive.getMessageId(), new String(receive.getData()));
                subscribe3.acknowledge(receive);
                i2++;
            } while (i2 < 100);
            subscribe3.close();
            subscribe2.close();
            subscribe.close();
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
            throw th;
        }
    }

    @Test
    public void testDeadLetterPolicyDeserialize() throws Exception {
        ConsumerBuilderImpl newConsumer = this.pulsarClient.newConsumer(Schema.STRING);
        DeadLetterPolicy build = DeadLetterPolicy.builder().deadLetterTopic("a").retryLetterTopic("a").initialSubscriptionName("a").maxRedeliverCount(1).build();
        newConsumer.deadLetterPolicy(build);
        newConsumer.loadConf(new HashMap());
        Assert.assertEquals(newConsumer.getConf().getDeadLetterPolicy(), build);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 931004142:
                if (implMethodName.equals("lambda$testDuplicatedMessageSendToDeadLetterTopic$7c7b6d72$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case SHARED_VALUE:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/DeadLetterTopicTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    return (consumer, message) -> {
                        atomicInteger.getAndIncrement();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
