package org.apache.pulsar.compaction;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.client.impl.RawReaderTest;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker-compaction"})
/* loaded from: input_file:org/apache/pulsar/compaction/CompactorTest.class */
public class CompactorTest extends MockedPulsarServiceBaseTest {
    private ScheduledExecutorService compactionScheduler;

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("use", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("my-property", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2"}), Sets.newHashSet(new String[]{"use"})));
        this.admin.namespaces().createNamespace("my-property/use/my-ns");
        this.compactionScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    public void cleanup() throws Exception {
        super.internalCleanup();
        this.compactionScheduler.shutdownNow();
    }

    private List<String> compactAndVerify(String str, Map<String, byte[]> map, boolean z) throws Exception {
        BookKeeper create = this.pulsar.getBookKeeperClientFactory().create(this.conf, (MetadataStoreExtended) null, (EventLoopGroup) null, Optional.empty(), (Map) null);
        TwoPhaseCompactor twoPhaseCompactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, create, this.compactionScheduler);
        LedgerHandle openLedger = create.openLedger(((Long) twoPhaseCompactor.compact(str).get()).longValue(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
        Assert.assertEquals(openLedger.getLastAddConfirmed() + 1, map.size(), "Should have as many entries as there is keys");
        ArrayList arrayList = new ArrayList();
        Enumeration readEntries = openLedger.readEntries(0L, openLedger.getLastAddConfirmed());
        while (readEntries.hasMoreElements()) {
            RawMessage deserializeFrom = RawMessageImpl.deserializeFrom(((LedgerEntry) readEntries.nextElement()).getEntryBuffer());
            String extractKey = RawReaderTest.extractKey(deserializeFrom);
            arrayList.add(extractKey);
            ByteBuf extractPayload = extractPayload(deserializeFrom);
            byte[] bArr = new byte[extractPayload.readableBytes()];
            extractPayload.readBytes(bArr);
            Assert.assertEquals(bArr, map.remove(extractKey), "Compacted version should match expected version");
            deserializeFrom.close();
        }
        if (z) {
            CompactionRecord compactionRecord = (CompactionRecord) twoPhaseCompactor.getStats().getCompactionRecordForTopic(str).get();
            long lastCompactionRemovedEventCount = compactionRecord.getLastCompactionRemovedEventCount();
            long lastCompactionSucceedTimestamp = compactionRecord.getLastCompactionSucceedTimestamp();
            long lastCompactionFailedTimestamp = compactionRecord.getLastCompactionFailedTimestamp();
            long lastCompactionDurationTimeInMills = compactionRecord.getLastCompactionDurationTimeInMills();
            Assert.assertTrue(lastCompactionRemovedEventCount >= 1);
            Assert.assertTrue(lastCompactionSucceedTimestamp >= 1);
            Assert.assertTrue(lastCompactionDurationTimeInMills >= 0);
            Assert.assertEquals(lastCompactionFailedTimestamp, 0L);
        }
        Assert.assertTrue(map.isEmpty(), "All expected keys should have been found");
        return arrayList;
    }

    @Test
    public void testCompaction() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        try {
            HashMap hashMap = new HashMap();
            Random random = new Random(0L);
            for (int i = 0; i < 1000; i++) {
                String str = "key" + random.nextInt(10);
                byte[] bytes = ("my-message-" + str + "-" + i).getBytes();
                create.newMessage().key(str).value(bytes).send();
                hashMap.put(str, bytes);
            }
            compactAndVerify("persistent://my-property/use/my-ns/my-topic1", hashMap, true);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testAllCompactedOut() throws Exception {
        String str = "persistent://my-property/use/my-ns/testAllCompactedOut";
        boolean isTopicCompactionRetainNullKey = this.pulsar.getConfig().isTopicCompactionRetainNullKey();
        this.pulsar.getConfig().setTopicCompactionRetainNullKey(true);
        restartBroker();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).enableBatching(true).topic("persistent://my-property/use/my-ns/testAllCompactedOut").batchingMaxMessages(3).create();
        try {
            create.newMessage().key("K1").value("V1").sendAsync();
            create.newMessage().key("K2").value("V2").sendAsync();
            create.newMessage().key("K2").value((Object) null).sendAsync();
            create.flush();
            this.admin.topics().triggerCompaction("persistent://my-property/use/my-ns/testAllCompactedOut");
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(this.admin.topics().compactionStatus(str).status, LongRunningProcessStatus.Status.SUCCESS);
            });
            create.newMessage().key("K1").value((Object) null).sendAsync();
            create.flush();
            this.admin.topics().triggerCompaction("persistent://my-property/use/my-ns/testAllCompactedOut");
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(this.admin.topics().compactionStatus(str).status, LongRunningProcessStatus.Status.SUCCESS);
            });
            Reader create2 = this.pulsarClient.newReader(Schema.STRING).subscriptionName("reader-test").topic("persistent://my-property/use/my-ns/testAllCompactedOut").readCompacted(true).startMessageId(MessageId.earliest).create();
            while (create2.hasMessageAvailable()) {
                try {
                    Assert.assertNotNull(create2.readNext(3, TimeUnit.SECONDS));
                } catch (Throwable th) {
                    if (Collections.singletonList(create2).get(0) != null) {
                        create2.close();
                    }
                    throw th;
                }
            }
            this.pulsar.getConfig().setTopicCompactionRetainNullKey(isTopicCompactionRetainNullKey);
            if (Collections.singletonList(create2).get(0) != null) {
                create2.close();
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testCompactAddCompact() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        try {
            HashMap hashMap = new HashMap();
            create.newMessage().key("a").value("A_1".getBytes()).send();
            create.newMessage().key("b").value("B_1".getBytes()).send();
            create.newMessage().key("a").value("A_2".getBytes()).send();
            hashMap.put("a", "A_2".getBytes());
            hashMap.put("b", "B_1".getBytes());
            compactAndVerify("persistent://my-property/use/my-ns/my-topic1", new HashMap(hashMap), false);
            create.newMessage().key("b").value("B_2".getBytes()).send();
            hashMap.put("b", "B_2".getBytes());
            compactAndVerify("persistent://my-property/use/my-ns/my-topic1", hashMap, false);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testCompactedInOrder() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        try {
            create.newMessage().key("c").value("C_1".getBytes()).send();
            create.newMessage().key("a").value("A_1".getBytes()).send();
            create.newMessage().key("b").value("B_1".getBytes()).send();
            create.newMessage().key("a").value("A_2".getBytes()).send();
            HashMap hashMap = new HashMap();
            hashMap.put("a", "A_2".getBytes());
            hashMap.put("b", "B_1".getBytes());
            hashMap.put("c", "C_1".getBytes());
            Assert.assertEquals(compactAndVerify("persistent://my-property/use/my-ns/my-topic1", hashMap, false), Lists.newArrayList(new String[]{"c", "b", "a"}));
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testCompactEmptyTopic() throws Exception {
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/my-topic1"}).subscriptionName("sub1").subscribe().close();
        new TwoPhaseCompactor(this.conf, this.pulsarClient, this.pulsar.getBookKeeperClientFactory().create(this.conf, (MetadataStoreExtended) null, (EventLoopGroup) null, Optional.empty(), (Map) null), this.compactionScheduler).compact("persistent://my-property/use/my-ns/my-topic1").get();
    }

    @Test
    public void testPhaseOneLoopTimeConfiguration() {
        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
        serviceConfiguration.setBrokerServiceCompactionPhaseOneLoopTimeInSeconds(60L);
        PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) Mockito.mock(PulsarClientImpl.class);
        Mockito.when(pulsarClientImpl.getCnxPool()).thenReturn((ConnectionPool) Mockito.mock(ConnectionPool.class));
        Assert.assertEquals(new TwoPhaseCompactor(serviceConfiguration, pulsarClientImpl, (BookKeeper) Mockito.mock(BookKeeper.class), this.compactionScheduler).getPhaseOneLoopReadTimeoutInSeconds(), 60L);
    }

    @Test
    public void testCompactedWithConcurrentSend() throws Exception {
        String str = "persistent://my-property/use/my-ns/testCompactedWithConcurrentSend";
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testCompactedWithConcurrentSend").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        try {
            TwoPhaseCompactor twoPhaseCompactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.pulsar.getBookKeeperClientFactory().create(this.conf, (MetadataStoreExtended) null, (EventLoopGroup) null, Optional.empty(), (Map) null), this.compactionScheduler);
            CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
                for (int i = 0; i < 100; i++) {
                    try {
                        create.newMessage().key(String.valueOf(i)).value(String.valueOf(i).getBytes()).send();
                    } catch (PulsarClientException e) {
                        throw new RuntimeException((Throwable) e);
                    }
                }
            });
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://my-property/use/my-ns/testCompactedWithConcurrentSend").get();
            CompactedTopic compactedTopic = persistentTopic.getCompactedTopic();
            Awaitility.await().untilAsserted(() -> {
                long longValue = ((Long) twoPhaseCompactor.compact(str).get()).longValue();
                Thread.sleep(300L);
                Optional compactedTopicContext = persistentTopic.getCompactedTopicContext();
                Assert.assertTrue(compactedTopicContext.isPresent());
                Assert.assertEquals(((CompactedTopicContext) compactedTopicContext.get()).ledger.getId(), longValue);
            });
            Position position = (Position) compactedTopic.getCompactionHorizon().get();
            Entry entry = (Entry) compactedTopic.readLastEntryOfCompactedLedger().get();
            Assert.assertTrue(PositionImpl.get(position.getLedgerId(), position.getEntryId()).compareTo(PositionImpl.get(entry.getLedgerId(), entry.getEntryId())) >= 0);
            runAsync.join();
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    public ByteBuf extractPayload(RawMessage rawMessage) throws Exception {
        ByteBuf headersAndPayload = rawMessage.getHeadersAndPayload();
        Commands.skipChecksumIfPresent(headersAndPayload);
        headersAndPayload.readBytes(new byte[headersAndPayload.readInt()]);
        return headersAndPayload.slice();
    }
}
