package org.apache.pulsar.compaction;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker-compaction"})
/* loaded from: input_file:org/apache/pulsar/compaction/EventTimeOrderCompactorTest.class */
public class EventTimeOrderCompactorTest extends CompactorTest {
    private EventTimeOrderCompactor compactor;

    @Override // org.apache.pulsar.compaction.CompactorTest, org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        super.setup();
        this.compactor = new EventTimeOrderCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
    }

    @Override // org.apache.pulsar.compaction.CompactorTest
    protected long compact(String str) throws ExecutionException, InterruptedException {
        return ((Long) this.compactor.compact(str).get()).longValue();
    }

    @Override // org.apache.pulsar.compaction.CompactorTest
    protected Compactor getCompactor() {
        return this.compactor;
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testCompactedOutByEventTime() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://my-property/use/my-ns/testCompactedOutByEventTime");
        restartBroker();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).enableBatching(true).topic(newUniqueName).batchingMaxMessages(3).create();
        try {
            create.newMessage().key("K1").value("V1").eventTime(1L).sendAsync();
            create.newMessage().key("K2").value("V2").eventTime(1L).sendAsync();
            create.newMessage().key("K2").value((Object) null).eventTime(2L).sendAsync();
            create.flush();
            this.admin.topics().triggerCompaction(newUniqueName);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(this.admin.topics().compactionStatus(newUniqueName).status, LongRunningProcessStatus.Status.SUCCESS);
            });
            Attributes build = Attributes.builder().put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent").put(OpenTelemetryAttributes.PULSAR_TENANT, "my-property").put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "my-property/use/my-ns").put(OpenTelemetryAttributes.PULSAR_TOPIC, newUniqueName).build();
            Collection collectAllMetrics = this.pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
            BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.topic.compaction.removed.message.count", build, 1L);
            BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.topic.compaction.operation.count", Attributes.builder().putAll(build).put(OpenTelemetryAttributes.PULSAR_COMPACTION_STATUS, "success").build(), 1L);
            BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.topic.compaction.operation.count", Attributes.builder().putAll(build).put(OpenTelemetryAttributes.PULSAR_COMPACTION_STATUS, "failure").build(), 0L);
            BrokerOpenTelemetryTestUtil.assertMetricDoubleSumValue(collectAllMetrics, "pulsar.broker.topic.compaction.duration", build, d -> {
                Assertions.assertThat(d).isPositive();
            });
            BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.topic.compaction.incoming.size", build, (Consumer<Long>) l -> {
                Assertions.assertThat(l).isPositive();
            });
            BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.topic.compaction.outgoing.size", build, (Consumer<Long>) l2 -> {
                Assertions.assertThat(l2).isPositive();
            });
            BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.topic.compaction.compacted.entry.count", build, 1L);
            BrokerOpenTelemetryTestUtil.assertMetricLongSumValue((Collection<MetricData>) collectAllMetrics, "pulsar.broker.topic.compaction.compacted.entry.size", build, (Consumer<Long>) l3 -> {
                Assertions.assertThat(l3).isPositive();
            });
            create.newMessage().key("K1").eventTime(2L).value("V1-2").sendAsync();
            create.flush();
            this.admin.topics().triggerCompaction(newUniqueName);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(this.admin.topics().compactionStatus(newUniqueName).status, LongRunningProcessStatus.Status.SUCCESS);
            });
            Reader create2 = this.pulsarClient.newReader(Schema.STRING).subscriptionName("reader-test").topic(newUniqueName).readCompacted(true).startMessageId(MessageId.earliest).create();
            while (create2.hasMessageAvailable()) {
                try {
                    Assert.assertEquals(create2.readNext(3, TimeUnit.SECONDS).getEventTime(), 2L);
                } catch (Throwable th) {
                    if (Collections.singletonList(create2).get(0) != null) {
                        create2.close();
                    }
                    throw th;
                }
            }
            if (Collections.singletonList(create2).get(0) != null) {
                create2.close();
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testCompactWithEventTimeAddCompact() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        try {
            HashMap hashMap = new HashMap();
            create.newMessage().key("a").eventTime(1L).value("A_1".getBytes()).send();
            create.newMessage().key("b").eventTime(1L).value("B_1".getBytes()).send();
            create.newMessage().key("a").eventTime(2L).value("A_2".getBytes()).send();
            hashMap.put("a", "A_2".getBytes());
            hashMap.put("b", "B_1".getBytes());
            compactAndVerify("persistent://my-property/use/my-ns/my-topic1", new HashMap(hashMap), false);
            create.newMessage().key("b").eventTime(2L).value("B_2".getBytes()).send();
            hashMap.put("b", "B_2".getBytes());
            compactAndVerify("persistent://my-property/use/my-ns/my-topic1", hashMap, false);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Override // org.apache.pulsar.compaction.CompactorTest
    @Test
    public void testPhaseOneLoopTimeConfiguration() {
        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
        serviceConfiguration.setBrokerServiceCompactionPhaseOneLoopTimeInSeconds(60L);
        PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) Mockito.mock(PulsarClientImpl.class);
        Mockito.when(pulsarClientImpl.getCnxPool()).thenReturn((ConnectionPool) Mockito.mock(ConnectionPool.class));
        Assert.assertEquals(new EventTimeOrderCompactor(serviceConfiguration, pulsarClientImpl, (BookKeeper) Mockito.mock(BookKeeper.class), this.compactionScheduler).getPhaseOneLoopReadTimeoutInSeconds(), 60L);
    }
}
