package org.apache.pulsar.compaction;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/compaction/TopicCompactionServiceTest.class */
public class TopicCompactionServiceTest extends CompactorTest {
    @Override // org.apache.pulsar.compaction.CompactorTest, org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        super.setup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("prop-xyz", new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")));
        this.admin.namespaces().createNamespace("prop-xyz" + "/ns1", Set.of("test"));
    }

    @Test
    public void test() throws PulsarClientException, PulsarAdminException {
        PulsarTopicCompactionService pulsarTopicCompactionService = new PulsarTopicCompactionService("persistent://prop-xyz/ns1/my-topic", this.bk, () -> {
            return this.compactor;
        });
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop-xyz/ns1/my-topic").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        try {
            create.newMessage().key("a").value("A_1".getBytes()).send();
            create.newMessage().key("b").value("B_1".getBytes()).send();
            create.newMessage().key("a").value("A_2".getBytes()).send();
            create.newMessage().key("b").value("B_2".getBytes()).send();
            create.newMessage().key("b").value("B_3".getBytes()).send();
            create.flush();
            pulsarTopicCompactionService.compact().join();
            CompactedTopicImpl compactedTopic = pulsarTopicCompactionService.getCompactedTopic();
            Long l = (Long) ((ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats("persistent://prop-xyz/ns1/my-topic").cursors.get("__compaction")).properties.get("CompactedTopicLedger");
            String[] split = ((ManagedLedgerInternalStats.CursorStats) this.admin.topics().getInternalStats("persistent://prop-xyz/ns1/my-topic").cursors.get("__compaction")).markDeletePosition.split(":");
            compactedTopic.newCompactedLedger(PositionImpl.get(Long.valueOf(split[0]).longValue(), Long.valueOf(split[1]).longValue()), l.longValue()).join();
            Assert.assertEquals(this.admin.topics().getInternalStats("persistent://prop-xyz/ns1/my-topic").lastConfirmedEntry, ((Position) pulsarTopicCompactionService.getLastCompactedPosition().join()).toString());
            List list = (List) pulsarTopicCompactionService.readCompactedEntries(PositionImpl.EARLIEST, 4).join();
            Assert.assertEquals(list.size(), 2);
            list.stream().map(entry -> {
                try {
                    return MessageImpl.deserialize(entry.getDataBuffer());
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }).forEach(messageImpl -> {
                String str = new String(messageImpl.getData());
                if (Objects.equals(messageImpl.getKey(), "a")) {
                    Assert.assertEquals(str, "A_2");
                } else {
                    Assert.assertEquals(str, "B_3");
                }
            });
            Assert.assertEquals(((List) pulsarTopicCompactionService.readCompactedEntries(PositionImpl.EARLIEST, 1).join()).size(), 1);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }
}
