package org.apache.pulsar.broker.admin;

import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/admin/AdminApiMaxUnackedMessages.class */
public class AdminApiMaxUnackedMessages extends MockedPulsarServiceBaseTest {
    private static final Logger log = LoggerFactory.getLogger(AdminApiMaxUnackedMessages.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("max-unacked-messages", new TenantInfoImpl(Sets.newHashSet(new String[]{"role1", "role2"}), Sets.newHashSet(new String[]{"test"})));
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    public void cleanup() throws Exception {
        super.internalCleanup();
        resetConfig();
    }

    @Test(timeOut = 30000)
    public void testNamespacePolicy() throws Exception {
        this.pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(3);
        this.admin.namespaces().createNamespace("max-unacked-messages/policy-on-consumers");
        Producer create = this.pulsarClient.newProducer().topic("persistent://max-unacked-messages/policy-on-consumers/testNamespacePolicy").create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer().receiverQueueSize(1).subscriptionType(SubscriptionType.Shared).subscriptionName("sub").topic(new String[]{"persistent://max-unacked-messages/policy-on-consumers/testNamespacePolicy"}).subscribe();
            try {
                this.admin.namespaces().setMaxUnackedMessagesPerConsumer("max-unacked-messages/policy-on-consumers", 1);
                PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists("persistent://max-unacked-messages/policy-on-consumers/testNamespacePolicy").get()).get();
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertEquals(((org.apache.pulsar.broker.service.Consumer) persistentTopic.getSubscription("sub").getConsumers().get(0)).getMaxUnackedMessages(), 1);
                });
                for (int i = 0; i < 20; i++) {
                    create.send("msg".getBytes());
                }
                Message receive = subscribe.receive(500, TimeUnit.MILLISECONDS);
                Assert.assertNotNull(receive);
                Assert.assertNull(subscribe.receive(500, TimeUnit.MILLISECONDS));
                this.admin.namespaces().setMaxUnackedMessagesPerConsumer("max-unacked-messages/policy-on-consumers", 0);
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertEquals(((org.apache.pulsar.broker.service.Consumer) persistentTopic.getSubscription("sub").getConsumers().get(0)).getMaxUnackedMessages(), 0);
                });
                subscribe.acknowledge(receive);
                Assert.assertNotNull(subscribe.receive(500, TimeUnit.MILLISECONDS));
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testMaxUnackedMessagesOnConsumers() throws Exception {
        this.admin.namespaces().createNamespace("max-unacked-messages/default-on-consumers");
        Assert.assertNull(this.admin.namespaces().getMaxUnackedMessagesPerConsumer("max-unacked-messages/default-on-consumers"));
        this.admin.namespaces().setMaxUnackedMessagesPerConsumer("max-unacked-messages/default-on-consumers", 100000);
        Assert.assertEquals(100000, this.admin.namespaces().getMaxUnackedMessagesPerConsumer("max-unacked-messages/default-on-consumers").intValue());
    }

    @Test
    public void testMaxUnackedMessagesOnSubscription() throws Exception {
        this.admin.namespaces().createNamespace("max-unacked-messages/default-on-subscription");
        String str = "max-unacked-messages/default-on-subscription";
        Assert.assertNull(this.admin.namespaces().getMaxUnackedMessagesPerSubscription("max-unacked-messages/default-on-subscription"));
        this.admin.namespaces().setMaxUnackedMessagesPerSubscription("max-unacked-messages/default-on-subscription", 400000);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(400000, this.admin.namespaces().getMaxUnackedMessagesPerSubscription(str).intValue());
        });
        this.admin.namespaces().removeMaxUnackedMessagesPerSubscription("max-unacked-messages/default-on-subscription");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.namespaces().getMaxUnackedMessagesPerSubscription(str));
        });
    }

    @Test
    public void testMaxUnackedMessagesPerConsumerPriority() throws Exception {
        int i = 2;
        cleanup();
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        this.conf.setMaxUnackedMessagesPerConsumer(3);
        setup();
        this.admin.namespaces().createNamespace("max-unacked-messages/priority-on-consumers");
        Producer create = this.pulsarClient.newProducer().topic("persistent://max-unacked-messages/priority-on-consumers/testMaxUnackedMessagesPerConsumerPriority").create();
        for (int i2 = 0; i2 < 50; i2++) {
            try {
                create.send("msg".getBytes());
            } finally {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            }
        }
        Assert.assertNull(this.admin.namespaces().getMaxUnackedMessagesPerConsumer("max-unacked-messages/priority-on-consumers"));
        Assert.assertNull(this.admin.topics().getMaxUnackedMessagesOnConsumer("persistent://max-unacked-messages/priority-on-consumers/testMaxUnackedMessagesPerConsumerPriority"));
        this.admin.namespaces().setMaxUnackedMessagesPerConsumer("max-unacked-messages/priority-on-consumers", 2);
        this.admin.topics().setMaxUnackedMessagesOnConsumer("persistent://max-unacked-messages/priority-on-consumers/testMaxUnackedMessagesPerConsumerPriority", 1);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.namespaces().getMaxUnackedMessagesPerConsumer("max-unacked-messages/priority-on-consumers"));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.topics().getMaxUnackedMessagesOnConsumer("persistent://max-unacked-messages/priority-on-consumers/testMaxUnackedMessagesPerConsumerPriority"));
        });
        Assert.assertEquals(this.admin.namespaces().getMaxUnackedMessagesPerConsumer("max-unacked-messages/priority-on-consumers").intValue(), 2);
        Assert.assertEquals(this.admin.topics().getMaxUnackedMessagesOnConsumer("persistent://max-unacked-messages/priority-on-consumers/testMaxUnackedMessagesPerConsumerPriority").intValue(), 1);
        Consumer<?> subscribe = this.pulsarClient.newConsumer().subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionName("sub").topic(new String[]{"persistent://max-unacked-messages/priority-on-consumers/testMaxUnackedMessagesPerConsumerPriority"}).receiverQueueSize(1).subscribe();
        try {
            org.apache.pulsar.broker.service.Consumer consumer = (org.apache.pulsar.broker.service.Consumer) ((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists("persistent://max-unacked-messages/priority-on-consumers/testMaxUnackedMessagesPerConsumerPriority").get()).get()).getSubscription("sub").getConsumers().get(0);
            Assert.assertEquals(consumer.getMaxUnackedMessages(), 1);
            List<Message> consumeMsg = consumeMsg(subscribe, 3);
            Assert.assertEquals(consumeMsg.size(), 1);
            this.admin.topics().setMaxUnackedMessagesOnConsumer("persistent://max-unacked-messages/priority-on-consumers/testMaxUnackedMessagesPerConsumerPriority", 0);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(this.admin.topics().getMaxUnackedMessagesOnConsumer("persistent://max-unacked-messages/priority-on-consumers/testMaxUnackedMessagesPerConsumerPriority").intValue(), 0);
            });
            ackMsgs(subscribe, consumeMsg);
            this.admin.topics().removeMaxUnackedMessagesOnConsumer("persistent://max-unacked-messages/priority-on-consumers/testMaxUnackedMessagesPerConsumerPriority");
            Awaitility.await().untilAsserted(() -> {
                Assert.assertNull(this.admin.topics().getMaxUnackedMessagesOnConsumer("persistent://max-unacked-messages/priority-on-consumers/testMaxUnackedMessagesPerConsumerPriority"));
            });
            Assert.assertEquals(this.admin.namespaces().getMaxUnackedMessagesPerConsumer("max-unacked-messages/priority-on-consumers").intValue(), 2);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(consumer.getMaxUnackedMessages(), i);
            });
            List<Message> consumeMsg2 = consumeMsg(subscribe, 5);
            Assert.assertEquals(consumeMsg2.size(), 2);
            ackMsgs(subscribe, consumeMsg2);
            this.admin.namespaces().setMaxUnackedMessagesPerConsumer("max-unacked-messages/priority-on-consumers", 0);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(this.admin.namespaces().getMaxUnackedMessagesPerConsumer("max-unacked-messages/priority-on-consumers").intValue(), 0);
            });
            List<Message> consumeMsg3 = consumeMsg(subscribe, 5);
            Assert.assertEquals(consumeMsg3.size(), 5);
            ackMsgs(subscribe, consumeMsg3);
            this.admin.namespaces().removeMaxUnackedMessagesPerConsumer("max-unacked-messages/priority-on-consumers");
            Awaitility.await().untilAsserted(() -> {
                Assert.assertNull(this.admin.namespaces().getMaxUnackedMessagesPerConsumer("max-unacked-messages/priority-on-consumers"));
            });
            List<Message> consumeMsg4 = consumeMsg(subscribe, 5);
            Assert.assertEquals(consumeMsg4.size(), 3);
            ackMsgs(subscribe, consumeMsg4);
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
            throw th;
        }
    }

    private List<Message> consumeMsg(Consumer<?> consumer, int i) throws Exception {
        Message receive;
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i && (receive = consumer.receive(500, TimeUnit.MILLISECONDS)) != null; i2++) {
            arrayList.add(receive);
        }
        return arrayList;
    }

    private void ackMsgs(Consumer<?> consumer, List<Message> list) throws Exception {
        Iterator<Message> it = list.iterator();
        while (it.hasNext()) {
            consumer.acknowledge(it.next());
        }
    }
}
