package org.apache.pulsar.broker.service;

import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.class */
public class MessagePublishBufferThrottleTest extends BrokerTestBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    protected void setup() throws Exception {
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    public void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
        super.customizeMainPulsarTestContextBuilder(builder);
        builder.enableOpenTelemetry(true);
    }

    @Test
    public void testMessagePublishBufferThrottleDisabled() throws Exception {
        this.conf.setMaxMessagePublishBufferSizeInMB(-1);
        super.baseSetup();
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testMessagePublishBufferThrottleDisabled").producerName("producer-name").create();
        assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName.PAUSED, 0);
        assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName.RESUMED, 0);
        this.pulsarTestContext.getMockBookKeeper().addEntryDelay(1L, TimeUnit.SECONDS);
        byte[] bArr = new byte[1048576];
        for (int i = 0; i < 10; i++) {
            create.sendAsync(bArr);
        }
        create.flush();
        assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName.PAUSED, 0);
        assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName.RESUMED, 0);
    }

    @Test
    public void testMessagePublishBufferThrottleEnable() throws Exception {
        this.conf.setMaxMessagePublishBufferSizeInMB(1);
        super.baseSetup();
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testMessagePublishBufferThrottleEnable").producerName("producer-name").create();
        assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName.PAUSED, 0);
        assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName.RESUMED, 0);
        this.pulsarTestContext.getMockBookKeeper().addEntryDelay(1L, TimeUnit.SECONDS);
        byte[] bArr = new byte[1048576];
        for (int i = 0; i < 10; i++) {
            create.sendAsync(bArr);
        }
        Awaitility.await().untilAsserted(() -> {
            assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName.PAUSED, 0);
            assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName.RESUMED, 0);
        });
        create.flush();
        Awaitility.await().untilAsserted(() -> {
            assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName.PAUSED, 0);
            assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName.RESUMED, 0);
        });
    }

    @Test
    public void testBlockByPublishRateLimiting() throws Exception {
        this.conf.setMaxMessagePublishBufferSizeInMB(1);
        super.baseSetup();
        assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName.PAUSED, 0);
        assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName.RESUMED, 0);
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testBlockByPublishRateLimiting").producerName("producer-name").create();
        Assert.assertNotNull((Topic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testBlockByPublishRateLimiting").get());
        assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName.PAUSED, 0);
        assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName.RESUMED, 0);
        this.pulsarTestContext.getMockBookKeeper().addEntryDelay(5L, TimeUnit.SECONDS);
        byte[] bArr = new byte[1048576];
        for (int i = 0; i < 10; i++) {
            create.sendAsync(bArr);
        }
        Awaitility.await().untilAsserted(() -> {
            assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName.PAUSED, 1);
        });
        CompletableFuture flushAsync = create.flushAsync();
        assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName.PAUSED, 1);
        assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName.RESUMED, 0);
        try {
            flushAsync.get(2L, TimeUnit.SECONDS);
            Assert.fail("Should have timed out");
        } catch (TimeoutException e) {
        }
        flushAsync.join();
        Awaitility.await().untilAsserted(() -> {
            assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName.PAUSED, 10);
            assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName.RESUMED, 10);
        });
    }

    @Test
    public void testConnectionThrottled() throws Exception {
        super.baseSetup();
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testSendThrottled");
        assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName.THROTTLED, 0);
        assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName.UNTHROTTLED, 0);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(newUniqueName).create();
        for (int i = 0; i < 2000; i++) {
            try {
                create.sendAsync("Message - " + i);
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        }
        create.flush();
        Awaitility.await().untilAsserted(() -> {
            Collection collectAllMetrics = this.pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
            BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.connection.rate_limit.count", OpenTelemetryAttributes.ConnectionRateLimitOperationName.THROTTLED.attributes, (Consumer<Long>) l -> {
                OpenTelemetryAssertions.assertThat(l).isPositive();
            });
            BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.connection.rate_limit.count", OpenTelemetryAttributes.ConnectionRateLimitOperationName.UNTHROTTLED.attributes, (Consumer<Long>) l2 -> {
                OpenTelemetryAssertions.assertThat(l2).isPositive();
            });
        });
        if (Collections.singletonList(create).get(0) != null) {
            create.close();
        }
    }

    private void assertRateLimitCounter(OpenTelemetryAttributes.ConnectionRateLimitOperationName connectionRateLimitOperationName, int i) {
        Collection collectAllMetrics = this.pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
        if (i == 0) {
            OpenTelemetryAssertions.assertThat(collectAllMetrics).noneSatisfy(metricData -> {
                OpenTelemetryAssertions.assertThat(metricData).hasName("pulsar.broker.connection.rate_limit.count").hasLongSumSatisfying(longSumAssert -> {
                    longSumAssert.hasPointsSatisfying(new Consumer[]{longPointAssert -> {
                        longPointAssert.hasAttributes(connectionRateLimitOperationName.attributes);
                    }});
                });
            });
        } else {
            BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.connection.rate_limit.count", connectionRateLimitOperationName.attributes, i);
        }
    }
}
