package org.apache.pulsar.client.api;

import com.google.common.base.Predicate;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.reflections.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker-api"})
/* loaded from: input_file:org/apache/pulsar/client/api/RetryTopicTest.class */
public class RetryTopicTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(RetryTopicTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testRetryTopic() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).enableRetry(true).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build()).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer subscribe2 = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic-my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/retry-topic").create();
            for (int i = 0; i < 100; i++) {
                create.send(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes());
            }
            create.close();
            int i2 = 0;
            do {
                Message receive = subscribe.receive();
                log.info("consumer received message : {} {}", receive.getMessageId(), new String(receive.getData()));
                subscribe.reconsumeLater(receive, 1L, TimeUnit.SECONDS);
                i2++;
            } while (i2 < 300);
            int i3 = 0;
            do {
                Message receive2 = subscribe2.receive();
                log.info("dead letter consumer received message : {} {}", receive2.getMessageId(), new String(receive2.getData()));
                subscribe2.acknowledge(receive2);
                i3++;
            } while (i3 < 100);
            subscribe2.close();
            subscribe.close();
            Consumer subscribe3 = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            Message receive3 = subscribe3.receive(3, TimeUnit.SECONDS);
            if (receive3 != null) {
                log.info("check consumer received message : {} {}", receive3.getMessageId(), new String(receive3.getData()));
            }
            Assert.assertNull(receive3);
            subscribe3.close();
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
            throw th;
        }
    }

    @Test
    public void testRetryTopicProperties() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).enableRetry(true).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(3).build()).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer subscribe2 = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic-my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/retry-topic").create();
            HashSet newHashSet = Sets.newHashSet();
            for (int i = 0; i < 10; i++) {
                newHashSet.add(create.send(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes()).toString());
            }
            create.close();
            int i2 = 0;
            HashSet newHashSet2 = Sets.newHashSet();
            do {
                Message receive = subscribe.receive();
                log.info("consumer received message : {} {}", receive.getMessageId(), new String(receive.getData()));
                if (receive.hasProperty("RECONSUMETIMES")) {
                    Assert.assertEquals(receive.getProperty("REAL_TOPIC"), "persistent://my-property/my-ns/retry-topic");
                    newHashSet2.add(receive.getProperty("ORIGIN_MESSAGE_IDY_TIME"));
                }
                subscribe.reconsumeLater(receive, 1L, TimeUnit.SECONDS);
                i2++;
            } while (i2 < 40);
            Assert.assertEquals(newHashSet2, newHashSet);
            int i3 = 0;
            HashSet newHashSet3 = Sets.newHashSet();
            do {
                Message receive2 = subscribe2.receive();
                log.info("dead letter consumer received message : {} {}", receive2.getMessageId(), new String(receive2.getData()));
                if (receive2.hasProperty("RECONSUMETIMES")) {
                    Assert.assertEquals(receive2.getProperty("REAL_TOPIC"), "persistent://my-property/my-ns/retry-topic");
                    newHashSet3.add(receive2.getProperty("ORIGIN_MESSAGE_IDY_TIME"));
                }
                subscribe2.acknowledge(receive2);
                i3++;
            } while (i3 < 10);
            Assert.assertEquals(newHashSet3, newHashSet);
            subscribe2.close();
            subscribe.close();
            Consumer subscribe3 = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            Message receive3 = subscribe3.receive(3, TimeUnit.SECONDS);
            if (receive3 != null) {
                log.info("check consumer received message : {} {}", receive3.getMessageId(), new String(receive3.getData()));
            }
            Assert.assertNull(receive3);
            subscribe3.close();
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
            throw th;
        }
    }

    @Test
    public void testRetryTopicNameForCompatibility() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/my-subscription-RETRY", 2);
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/my-subscription-DLQ", 2);
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).enableRetry(true).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build()).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        Consumer subscribe2 = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/retry-topic").create();
        for (int i = 0; i < 100; i++) {
            create.send(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes());
        }
        create.close();
        int i2 = 0;
        do {
            Message receive = subscribe.receive();
            log.info("consumer received message : {} {}", receive.getMessageId(), new String(receive.getData()));
            subscribe.reconsumeLater(receive, 1L, TimeUnit.SECONDS);
            i2++;
        } while (i2 < 300);
        int i3 = 0;
        do {
            Message receive2 = subscribe2.receive();
            log.info("dead letter consumer received message : {} {}", receive2.getMessageId(), new String(receive2.getData()));
            subscribe2.acknowledge(receive2);
            i3++;
        } while (i3 < 100);
        subscribe2.close();
        subscribe.close();
        Consumer subscribe3 = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Message receive3 = subscribe3.receive(3, TimeUnit.SECONDS);
        if (receive3 != null) {
            log.info("check consumer received message : {} {}", receive3.getMessageId(), new String(receive3.getData()));
        }
        Assert.assertNull(receive3);
        subscribe3.close();
        newPulsarClient.close();
    }

    @Test
    public void testRetryTopicWithMultiTopic() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic-1", "persistent://my-property/my-ns/retry-topic-2"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).enableRetry(true).ackTimeout(1L, TimeUnit.SECONDS).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build()).receiverQueueSize(100).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic-1-my-subscription-DLQ"}).subscriptionName("my-subscription").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/retry-topic-1").create();
        Producer create2 = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/retry-topic-2").create();
        for (int i = 0; i < 100; i++) {
            create.send(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes());
            create2.send(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes());
        }
        int i2 = 100 * 2;
        create.close();
        create2.close();
        int i3 = 0;
        do {
            Message receive = subscribe.receive();
            i3++;
            log.info("consumer received message : {} {} - total = {}", new Object[]{receive.getMessageId(), new String(receive.getData()), Integer.valueOf(i3)});
        } while (i3 < i2 * 3);
        int i4 = 0;
        do {
            Message receive2 = subscribe2.receive();
            log.info("dead letter consumer received message : {} {}", receive2.getMessageId(), new String(receive2.getData()));
            subscribe2.acknowledge(receive2);
            i4++;
        } while (i4 < i2);
        subscribe2.close();
        subscribe.close();
        Consumer subscribe3 = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic-1", "persistent://my-property/my-ns/retry-topic-2"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Message receive3 = subscribe3.receive(3, TimeUnit.SECONDS);
        if (receive3 != null) {
            log.info("check consumer received message : {} {}", receive3.getMessageId(), new String(receive3.getData()));
        }
        Assert.assertNull(receive3);
        subscribe3.close();
    }

    @Test
    public void testRetryTopicByCustomTopicName() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).enableRetry(true).receiverQueueSize(100).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry").build()).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        PulsarClient newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
        try {
            Consumer subscribe2 = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic-my-subscription-DLQ"}).subscriptionName("my-subscription").subscribe();
            Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/retry-topic").create();
            for (int i = 0; i < 100; i++) {
                create.send(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes());
            }
            create.close();
            int i2 = 0;
            do {
                Message receive = subscribe.receive();
                log.info("consumer received message : {} {}", receive.getMessageId(), new String(receive.getData()));
                subscribe.reconsumeLater(receive, 1L, TimeUnit.SECONDS);
                i2++;
            } while (i2 < 300);
            int i3 = 0;
            do {
                Message receive2 = subscribe2.receive();
                log.info("dead letter consumer received message : {} {}", receive2.getMessageId(), new String(receive2.getData()));
                subscribe2.acknowledge(receive2);
                i3++;
            } while (i3 < 100);
            subscribe2.close();
            subscribe.close();
            newPulsarClient = newPulsarClient(this.lookupUrl.toString(), 0);
            try {
                Consumer subscribe3 = newPulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                Message receive3 = subscribe3.receive(3, TimeUnit.SECONDS);
                if (receive3 != null) {
                    log.info("check consumer received message : {} {}", receive3.getMessageId(), new String(receive3.getData()));
                }
                Assert.assertNull(receive3);
                subscribe3.close();
                if (Collections.singletonList(newPulsarClient).get(0) != null) {
                    newPulsarClient.close();
                }
            } finally {
                if (Collections.singletonList(newPulsarClient).get(0) != null) {
                    newPulsarClient.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newPulsarClient).get(0) != null) {
                newPulsarClient.close();
            }
            throw th;
        }
    }

    @Test(timeOut = 30000)
    public void testRetryTopicException() throws Exception {
        MultiTopicsConsumerImpl subscribe = this.pulsarClient.newConsumer(Schema.BYTES).topic(new String[]{"persistent://my-property/my-ns/retry-topic"}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).enableRetry(true).receiverQueueSize(100).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry").build()).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.BYTES).topic("persistent://my-property/my-ns/retry-topic").create();
        for (int i = 0; i < 1; i++) {
            create.send(String.format("Hello Pulsar [%d]", Integer.valueOf(i)).getBytes());
        }
        create.close();
        for (ConsumerImpl consumerImpl : subscribe.getConsumers()) {
            Set allFields = ReflectionUtils.getAllFields(consumerImpl.getClass(), new Predicate[]{ReflectionUtils.withName("deadLetterPolicy")});
            if (allFields.size() != 0) {
                Field field = (Field) allFields.iterator().next();
                field.setAccessible(true);
                ((DeadLetterPolicy) field.get(consumerImpl)).setRetryLetterTopic("#persistent://invlaid-topic#");
            }
        }
        Message receive = subscribe.receive();
        log.info("consumer received message : {} {}", receive.getMessageId(), new String(receive.getData()));
        try {
            subscribe.reconsumeLater(receive, 1L, TimeUnit.SECONDS);
        } catch (PulsarClientException.InvalidTopicNameException e) {
            Assert.assertEquals(e.getClass(), PulsarClientException.InvalidTopicNameException.class);
        } catch (Exception e2) {
            Assert.fail("exception should be PulsarClientException.InvalidTopicNameException");
        }
        subscribe.close();
    }
}
