package org.apache.pulsar.client.impl;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Iterator;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/impl/DynamicReceiverQueueSizeTest.class */
public class DynamicReceiverQueueSizeTest extends MockedPulsarServiceBaseTest {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass(alwaysRun = true)
    protected void setup() throws Exception {
        super.internalSetup();
        setupDefaultTenantAndNamespace();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testConsumerImpl() throws PulsarClientException {
        String str = "persistent://public/default/testConsumerImpl" + System.currentTimeMillis();
        ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-sub").receiverQueueSize(5).subscribe();
        try {
            Producer create = this.pulsarClient.newProducer().topic(str).create();
            try {
                byte[] bytes = "data".getBytes(StandardCharsets.UTF_8);
                for (int i = 0; i < 10; i++) {
                    create.send(bytes);
                }
                Assert.assertEquals(subscribe.getCurrentReceiverQueueSize(), 5);
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertEquals(subscribe.getTotalIncomingMessages(), 5);
                });
                Assert.assertEquals(subscribe.getAvailablePermits(), 0);
                subscribe.setCurrentReceiverQueueSize(8);
                Assert.assertEquals(subscribe.getCurrentReceiverQueueSize(), 8);
                Assert.assertEquals(subscribe.getAvailablePermits(), 3);
                subscribe.setCurrentReceiverQueueSize(10);
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertEquals(subscribe.getTotalIncomingMessages(), 10);
                });
                Assert.assertEquals(subscribe.getAvailablePermits(), 0);
                subscribe.setCurrentReceiverQueueSize(3);
                Assert.assertEquals(subscribe.getAvailablePermits(), -7);
                for (int i2 = 0; i2 < 7; i2++) {
                    subscribe.acknowledge(subscribe.receive());
                    Assert.assertEquals(subscribe.getAvailablePermits(), (-6) + i2);
                }
                subscribe.acknowledge(subscribe.receive());
                subscribe.acknowledge(subscribe.receive());
                subscribe.acknowledge(subscribe.receive());
                Assert.assertEquals(subscribe.getAvailablePermits(), 0);
                for (int i3 = 0; i3 < 10; i3++) {
                    create.send(bytes);
                }
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertEquals(subscribe.getTotalIncomingMessages(), 3);
                });
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test
    public void testMultiConsumerImpl() throws Exception {
        String str = "persistent://public/default/testMultiConsumerImpl" + System.currentTimeMillis();
        this.admin.topics().createPartitionedTopic(str, 4);
        MultiTopicsConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-sub").receiverQueueSize(5).subscribe();
        try {
            Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(false).create();
            try {
                byte[] bytes = "data".getBytes(StandardCharsets.UTF_8);
                for (int i = 0; i < 30; i++) {
                    create.send(bytes);
                }
                Assert.assertEquals(subscribe.getCurrentReceiverQueueSize(), 5);
                Iterator it = subscribe.getConsumers().iterator();
                while (it.hasNext()) {
                    Assert.assertEquals(((ConsumerImpl) it.next()).getCurrentReceiverQueueSize(), 5);
                }
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertTrue(subscribe.getTotalIncomingMessages() >= 5);
                    Assert.assertTrue(subscribe.getTotalIncomingMessages() < 30);
                });
                subscribe.setCurrentReceiverQueueSize(30);
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertEquals(subscribe.getTotalIncomingMessages(), 30);
                });
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }
}
