package org.apache.pulsar.client.impl;

import java.util.HashSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.class */
public class UnAcknowledgedMessagesTimeoutTest extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(UnAcknowledgedMessagesTimeoutTest.class);
    private final long ackTimeOutMillis = TimeUnit.SECONDS.toMillis(2);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "variationsRedeliveryTracker")
    public static Object[][] variationsRedeliveryTracker() {
        return new Object[]{new Object[]{false}, new Object[]{true}};
    }

    @Test(dataProvider = "variationsRedeliveryTracker")
    public void testExclusiveSingleAckedNormalTopic(boolean z) throws Exception {
        String str = "persistent://prop/ns-abc/topic-" + "testExclusiveSingleAckedNormalTopic";
        String str2 = "my-ex-subscription-" + "testExclusiveSingleAckedNormalTopic";
        String str3 = "my-message-" + "testExclusiveSingleAckedNormalTopic" + "-";
        Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer subscribe = z ? this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(str2).receiverQueueSize(7).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().minDelayMs(1000L).maxDelayMs(20000L).build()).subscribe() : this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(str2).receiverQueueSize(7).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
        for (int i = 0; i < 5; i++) {
            String str4 = str3 + i;
            log.info("Producer produced: " + str4);
            create.send(str4.getBytes());
        }
        Message receive = subscribe.receive();
        while (true) {
            Message message = receive;
            if (message == null) {
                break;
            }
            log.info("Consumer received : " + new String(message.getData()));
            receive = subscribe.receive(500, TimeUnit.MILLISECONDS);
        }
        long size = ((ConsumerImpl) subscribe).getUnAckedMessageTracker().size();
        log.info("testExclusiveSingleAckedNormalTopic" + " Unacked Message Tracker size is " + size);
        Assert.assertEquals(size, 5L);
        Message receive2 = subscribe.receive();
        log.info("Consumer received : " + new String(receive2.getData()));
        HashSet hashSet = new HashSet();
        for (int i2 = 5; i2 < 10; i2++) {
            create.send((str3 + i2).getBytes());
        }
        do {
            hashSet.add(new String(receive2.getData()));
            subscribe.acknowledge(receive2);
            log.info("Consumer acknowledged : " + new String(receive2.getData()));
            receive2 = subscribe.receive(500, TimeUnit.MILLISECONDS);
        } while (receive2 != null);
        long size2 = ((ConsumerImpl) subscribe).getUnAckedMessageTracker().size();
        log.info("testExclusiveSingleAckedNormalTopic" + " Unacked Message Tracker size is " + size2);
        Assert.assertEquals(size2, 0L);
        Assert.assertEquals(hashSet.size(), 10);
    }

    @Test(dataProvider = "variationsRedeliveryTracker")
    public void testExclusiveCumulativeAckedNormalTopic(boolean z) throws Exception {
        String str = "persistent://prop/use/ns-abc/topic-" + "testExclusiveCumulativeAckedNormalTopic";
        String str2 = "my-ex-subscription-" + "testExclusiveCumulativeAckedNormalTopic";
        String str3 = "my-message-" + "testExclusiveCumulativeAckedNormalTopic" + "-";
        Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer subscribe = z ? this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(str2).receiverQueueSize(7).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().minDelayMs(1000L).maxDelayMs(20000L).build()).subscribe() : this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(str2).receiverQueueSize(7).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
        for (int i = 0; i < 10; i++) {
            create.send((str3 + i).getBytes());
        }
        HashSet hashSet = new HashSet();
        Message receive = subscribe.receive();
        Message message = receive;
        while (receive != null) {
            message = receive;
            hashSet.add(new String(receive.getData()));
            log.info("Consumer received " + new String(receive.getData()));
            log.info("Message ID details " + receive.getMessageId().toString());
            receive = subscribe.receive(500, TimeUnit.MILLISECONDS);
        }
        Assert.assertEquals(((ConsumerImpl) subscribe).getUnAckedMessageTracker().size(), 10L);
        log.info("Comulative Ack sent for " + new String(message.getData()));
        log.info("Message ID details " + message.getMessageId().toString());
        subscribe.acknowledgeCumulative(message);
        Assert.assertEquals(((ConsumerImpl) subscribe).getUnAckedMessageTracker().size(), 0L);
        Assert.assertNull(subscribe.receive((int) (2 * this.ackTimeOutMillis), TimeUnit.MILLISECONDS));
    }

    @Test(dataProvider = "variationsRedeliveryTracker")
    public void testSharedSingleAckedPartitionedTopic(boolean z) throws Exception {
        Consumer subscribe;
        Consumer subscribe2;
        String str = "persistent://prop/ns-abc/topic-" + "testSharedSingleAckedPartitionedTopic";
        String str2 = "my-shared-subscription-" + "testSharedSingleAckedPartitionedTopic";
        String str3 = "my-message-" + "testSharedSingleAckedPartitionedTopic" + "-";
        this.admin.topics().createPartitionedTopic(str, 3);
        Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        if (z) {
            subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(str2).receiverQueueSize(100).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().minDelayMs(1000L).maxDelayMs(20000L).build()).consumerName("Consumer-1").subscribe();
            subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(str2).receiverQueueSize(100).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().minDelayMs(1000L).maxDelayMs(20000L).build()).consumerName("Consumer-2").subscribe();
        } else {
            subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(str2).receiverQueueSize(100).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).consumerName("Consumer-1").subscribe();
            subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(str2).receiverQueueSize(100).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).consumerName("Consumer-2").subscribe();
        }
        for (int i = 0; i < 20; i++) {
            String str4 = str3 + i;
            log.info("Message produced: {} -- msgId: {}", str4, create.send(str4.getBytes()));
        }
        int receiveAllMessage = receiveAllMessage(subscribe, false);
        int receiveAllMessage2 = receiveAllMessage(subscribe2, true);
        log.info("testSharedSingleAckedPartitionedTopic" + " messageCount1 = " + receiveAllMessage);
        log.info("testSharedSingleAckedPartitionedTopic" + " messageCount2 = " + receiveAllMessage2);
        log.info("testSharedSingleAckedPartitionedTopic" + " ackCount1 = " + 0);
        log.info("testSharedSingleAckedPartitionedTopic" + " ackCount2 = " + receiveAllMessage2);
        Assert.assertEquals(receiveAllMessage + receiveAllMessage2, 20);
        Thread.sleep((int) (this.ackTimeOutMillis * 1.1d));
        log.info("testSharedSingleAckedPartitionedTopic" + " Timeout should be triggered now");
        int receiveAllMessage3 = receiveAllMessage(subscribe, true);
        int receiveAllMessage4 = receiveAllMessage2 + receiveAllMessage(subscribe2, false);
        log.info("testSharedSingleAckedPartitionedTopic" + " messageCount1 = " + receiveAllMessage3);
        log.info("testSharedSingleAckedPartitionedTopic" + " messageCount2 = " + receiveAllMessage4);
        log.info("testSharedSingleAckedPartitionedTopic" + " ackCount1 = " + receiveAllMessage3);
        log.info("testSharedSingleAckedPartitionedTopic" + " ackCount2 = " + receiveAllMessage2);
        Assert.assertEquals(receiveAllMessage3 + receiveAllMessage4, 20);
        Assert.assertEquals(receiveAllMessage3 + receiveAllMessage4, 20);
        Thread.sleep((int) (this.ackTimeOutMillis * 2));
        log.info("testSharedSingleAckedPartitionedTopic" + " Timeout should be triggered again");
        int receiveAllMessage5 = receiveAllMessage3 + receiveAllMessage(subscribe, true);
        int receiveAllMessage6 = receiveAllMessage2 + receiveAllMessage(subscribe2, true);
        log.info("testSharedSingleAckedPartitionedTopic" + " ackCount1 = " + receiveAllMessage5);
        log.info("testSharedSingleAckedPartitionedTopic" + " ackCount2 = " + receiveAllMessage6);
        Assert.assertEquals(receiveAllMessage5 + receiveAllMessage6, 20);
    }

    private static int receiveAllMessage(Consumer<?> consumer, boolean z) throws Exception {
        int i = 0;
        Message receive = consumer.receive(1, TimeUnit.SECONDS);
        while (true) {
            Message message = receive;
            if (message == null) {
                return i;
            }
            i++;
            log.info("Consumer received {}", new String(message.getData()));
            if (z) {
                consumer.acknowledge(message);
            }
            receive = consumer.receive(1, TimeUnit.SECONDS);
        }
    }

    @Test(dataProvider = "variationsRedeliveryTracker")
    public void testFailoverSingleAckedPartitionedTopic(boolean z) throws Exception {
        Consumer subscribe;
        Consumer subscribe2;
        String str = "persistent://prop/ns-abc/topic-" + "testFailoverSingleAckedPartitionedTopic" + UUID.randomUUID().toString();
        String str2 = "my-failover-subscription-" + "testFailoverSingleAckedPartitionedTopic";
        String str3 = "my-message-" + "testFailoverSingleAckedPartitionedTopic" + "-";
        this.admin.topics().createPartitionedTopic(str, 3);
        Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        if (z) {
            subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(str2).receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().minDelayMs(1000L).maxDelayMs(20000L).build()).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).consumerName("Consumer-1").subscribe();
            subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(str2).receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().minDelayMs(1000L).maxDelayMs(20000L).build()).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).consumerName("Consumer-2").subscribe();
        } else {
            subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(str2).receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).consumerName("Consumer-1").subscribe();
            subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(str2).receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).consumerName("Consumer-2").subscribe();
        }
        for (int i = 0; i < 10; i++) {
            String str4 = str3 + i;
            log.info("Message produced: " + str4);
            create.send(str4.getBytes());
        }
        int i2 = 0;
        while (subscribe.receive(500, TimeUnit.MILLISECONDS) != null) {
            i2++;
        }
        int i3 = 0;
        while (true) {
            Message receive = subscribe2.receive(500, TimeUnit.MILLISECONDS);
            if (receive == null) {
                break;
            }
            subscribe2.acknowledge(receive);
            i2++;
            i3++;
        }
        Assert.assertEquals(i2, 10);
        Thread.sleep(this.ackTimeOutMillis * 2);
        log.info("testFailoverSingleAckedPartitionedTopic" + " Timeout should be triggered now");
        int i4 = 0;
        while (subscribe.receive(500, TimeUnit.MILLISECONDS) != null) {
            i4++;
        }
        while (subscribe2.receive(500, TimeUnit.MILLISECONDS) != null) {
            i4++;
        }
        Assert.assertEquals(i4 + i3, 10);
    }

    @Test
    public void testAckTimeoutMinValue() {
        try {
            this.pulsarClient.newConsumer().ackTimeout(999L, TimeUnit.MILLISECONDS);
            Assert.fail("Exception should have been thrown since the set timeout is less than min timeout.");
            this.pulsarClient.newConsumer().ackTimeout(2000L, TimeUnit.MILLISECONDS).ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().minDelayMs(1000L).maxDelayMs(60000L).multiplier(0.0d).build());
            Assert.fail("multiplier must be > 0.");
        } catch (Exception e) {
        }
    }

    @Test
    public void testCheckUnAcknowledgedMessageTimer() throws PulsarClientException, InterruptedException {
        String str = "persistent://prop/ns-abc/topic-" + "testCheckUnAcknowledgedMessageTimer";
        String str2 = "my-message-" + "testCheckUnAcknowledgedMessageTimer" + "-";
        Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-ex-subscription-" + "testCheckUnAcknowledgedMessageTimer").receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).ackTimeout(this.ackTimeOutMillis, TimeUnit.MILLISECONDS).subscribe();
        for (int i = 0; i < 3; i++) {
            String str3 = str2 + i;
            log.info("Producer produced: " + str3);
            create.send(str3.getBytes());
        }
        for (int i2 = 0; i2 < 3; i2++) {
            Message receive = subscribe.receive();
            if (i2 != 2) {
                subscribe.acknowledge(receive);
            }
        }
        Assert.assertEquals(subscribe.getUnAckedMessageTracker().size(), 1L);
        subscribe.acknowledge(subscribe.receive());
        Assert.assertEquals(subscribe.getUnAckedMessageTracker().size(), 0L);
        Thread.sleep((long) (this.ackTimeOutMillis * 1.1d));
        Assert.assertEquals(subscribe.getUnAckedMessageTracker().size(), 0L);
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "variationsBackoff")
    public static Object[][] variationsBackoff() {
        return new Object[]{new Object[]{2000, 1000, 2000, 2}, new Object[]{3000, 1000, 3000, 3}};
    }

    @Test(dataProvider = "variationsBackoff")
    public void testCheckUnAcknowledgedMessageRedeliveryTimer(long j, long j2, long j3, int i) throws PulsarClientException, InterruptedException {
        String str = "persistent://prop/ns-abc/topic-" + "testCheckUnAcknowledgedMessageRedeliveryTimer";
        String str2 = "my-message-" + "testCheckUnAcknowledgedMessageRedeliveryTimer" + "-";
        Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-ex-subscription-" + "testCheckUnAcknowledgedMessageRedeliveryTimer").receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).ackTimeout(j, TimeUnit.MILLISECONDS).ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().minDelayMs(j2).maxDelayMs(j3).multiplier(i).build()).subscribe();
        for (int i2 = 0; i2 < 3; i2++) {
            String str3 = str2 + i2;
            log.info("Producer produced: " + str3);
            create.send(str3.getBytes());
        }
        for (int i3 = 0; i3 < 3; i3++) {
            Message receive = subscribe.receive();
            if (i3 != 2) {
                subscribe.acknowledge(receive);
            }
        }
        for (int i4 = 0; i4 < 5; i4++) {
            int i5 = i4 + 1;
            int min = (int) Math.min(j2 * ((int) Math.pow(i, i4)), j3);
            Assert.assertEquals(subscribe.getUnAckedMessageTracker().size(), 1L);
            long currentTimeMillis = System.currentTimeMillis();
            Message receive2 = subscribe.receive();
            Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis >= j + ((long) min));
            Assert.assertEquals(receive2.getRedeliveryCount(), i5);
            Assert.assertEquals(subscribe.getUnAckedMessageTracker().size(), 1L);
        }
    }

    @Test
    public void testSingleMessageBatch() throws Exception {
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("prop/ns-abc/topic-estSingleMessageBatch").enableBatching(true).batchingMaxPublishDelay(10L, TimeUnit.SECONDS).create();
        ConsumerImpl subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"prop/ns-abc/topic-estSingleMessageBatch"}).subscriptionName("subscription").ackTimeout(1L, TimeUnit.HOURS).subscribe();
        create.sendAsync("hello");
        create.flush();
        Message receive = subscribe.receive();
        Assert.assertFalse(subscribe.getUnAckedMessageTracker().isEmpty());
        subscribe.acknowledge(receive);
        Assert.assertTrue(subscribe.getUnAckedMessageTracker().isEmpty());
        subscribe.close();
        ConsumerImpl subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"prop/ns-abc/topic-estSingleMessageBatch"}).subscriptionName("subscription").ackTimeout(1L, TimeUnit.HOURS).ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().minDelayMs(1000L).maxDelayMs(60000L).build()).subscribe();
        create.sendAsync("hello");
        create.flush();
        Message receive2 = subscribe2.receive();
        Assert.assertFalse(subscribe2.getUnAckedMessageTracker().isEmpty());
        subscribe2.acknowledge(receive2);
        Assert.assertTrue(subscribe2.getUnAckedMessageTracker().isEmpty());
    }
}
