package org.apache.pulsar.broker.admin;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker-admin"})
/* loaded from: input_file:org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.class */
public class AnalyzeBacklogSubscriptionTest extends ProducerConsumerBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void simpleAnalyzeBacklogTest() throws Exception {
        simpleAnalyzeBacklogTest(false);
    }

    @Test
    public void simpleAnalyzeBacklogTestWithBatching() throws Exception {
        simpleAnalyzeBacklogTest(true);
    }

    private void simpleAnalyzeBacklogTest(boolean z) throws Exception {
        int i = z ? 5 : 1;
        int i2 = 20 / i;
        String str = "persistent://my-property/my-ns/my-topic-" + z;
        this.admin.topics().createSubscription(str, "sub-1", MessageId.latest);
        Assert.assertEquals(this.admin.topics().getSubscriptions(str), List.of("sub-1"));
        verifyBacklog(str, "sub-1", 0, 0);
        Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(z).batchingMaxMessages(i).batchingMaxPublishDelay(2147483647L, TimeUnit.SECONDS).create();
        try {
            ArrayList arrayList = new ArrayList();
            for (int i3 = 0; i3 < 20; i3++) {
                arrayList.add(create.sendAsync(("test-" + i3).getBytes()));
            }
            FutureUtil.waitForAll(arrayList).get();
            MessageId messageId = (MessageId) ((CompletableFuture) arrayList.get(20 / 2)).get();
            verifyBacklog(str, "sub-1", i2, 20);
            this.admin.topics().createSubscription(str, "from-middle", messageId);
            verifyBacklog(str, "from-middle", i2 / 2, 20 / 2);
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).isAckReceiptEnabled(true).subscriptionName("sub-1").subscriptionType(SubscriptionType.Shared).subscribe();
            try {
                Message receive = subscribe.receive();
                Message receive2 = subscribe.receive();
                Message receive3 = subscribe.receive();
                Message receive4 = subscribe.receive();
                Message receive5 = subscribe.receive();
                verifyBacklog(str, "sub-1", i2, 20);
                subscribe.acknowledge(receive2);
                if (z) {
                    verifyBacklog(str, "sub-1", i2, 20);
                } else {
                    verifyBacklog(str, "sub-1", i2 - 1, 20 - 1);
                }
                subscribe.acknowledge(receive);
                subscribe.acknowledge(receive3);
                subscribe.acknowledge(receive4);
                subscribe.acknowledge(receive5);
                verifyBacklog(str, "sub-1", i2 - (5 / i), 20 - 5);
                int i4 = 20 - 5;
                while (true) {
                    int i5 = i4;
                    i4--;
                    if (i5 <= 0) {
                        break;
                    } else {
                        subscribe.acknowledge(subscribe.receive());
                    }
                }
                verifyBacklog(str, "sub-1", 0, 0);
                if (subscribe != null) {
                    subscribe.close();
                }
            } finally {
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    private void verifyBacklog(String str, String str2, int i, int i2) throws Exception {
        AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog = this.admin.topics().analyzeSubscriptionBacklog(str, str2, Optional.empty());
        Assert.assertEquals(i, analyzeSubscriptionBacklog.getEntries());
        Assert.assertEquals(i, analyzeSubscriptionBacklog.getFilterAcceptedEntries());
        Assert.assertEquals(0L, analyzeSubscriptionBacklog.getFilterRejectedEntries());
        Assert.assertEquals(0L, analyzeSubscriptionBacklog.getFilterRescheduledEntries());
        Assert.assertEquals(0L, analyzeSubscriptionBacklog.getFilterRescheduledEntries());
        Assert.assertEquals(i2, analyzeSubscriptionBacklog.getMessages());
        Assert.assertEquals(i2, analyzeSubscriptionBacklog.getFilterAcceptedMessages());
        Assert.assertEquals(0L, analyzeSubscriptionBacklog.getFilterRejectedMessages());
        Assert.assertEquals(0L, analyzeSubscriptionBacklog.getFilterRescheduledMessages());
        Assert.assertFalse(analyzeSubscriptionBacklog.isAborted());
    }

    @Test
    public void partitionedTopicNotAllowed() throws Exception {
        String str = "persistent://my-property/my-ns/my-partitioned-topic";
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/my-partitioned-topic", 2);
        this.admin.topics().createSubscription("persistent://my-property/my-ns/my-partitioned-topic", "sub-1", MessageId.latest);
        Assert.assertEquals(this.admin.topics().getSubscriptions("persistent://my-property/my-ns/my-partitioned-topic"), List.of("sub-1"));
        Assert.assertThrows(PulsarAdminException.NotAllowedException.class, () -> {
            this.admin.topics().analyzeSubscriptionBacklog(str, "sub-1", Optional.empty());
        });
        Assert.assertEquals(0L, this.admin.topics().analyzeSubscriptionBacklog("persistent://my-property/my-ns/my-partitioned-topic" + "-partition-0", "sub-1", Optional.empty()).getEntries());
    }
}
