/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.compaction;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker-compaction"})
public class CompactedTopicTest
extends MockedPulsarServiceBaseTest {
    private final Random r = new Random(0L);

    @DataProvider(name="batchEnabledProvider")
    public Object[][] batchEnabledProvider() {
        return new Object[][]{{Boolean.FALSE}, {Boolean.TRUE}};
    }

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("use", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("my-property", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2"}), (Set)Sets.newHashSet((Object[])new String[]{"use"})));
        this.admin.namespaces().createNamespace("my-property/use/my-ns");
    }

    @Override
    @AfterMethod(alwaysRun=true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    private Triple<Long, List<Pair<MessageIdData, Long>>, List<Pair<MessageIdData, Long>>> buildCompactedLedger(BookKeeper bk, int count) throws Exception {
        LedgerHandle lh = bk.createLedger(1, 1, Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
        ArrayList positions = new ArrayList();
        ArrayList idsInGaps = new ArrayList();
        AtomicLong ledgerIds = new AtomicLong(10L);
        AtomicLong entryIds = new AtomicLong(0L);
        CompletableFuture.allOf((CompletableFuture[])IntStream.range(0, count).mapToObj(i -> {
            long delta;
            ArrayList<MessageIdData> idsInGap = new ArrayList<MessageIdData>();
            if (this.r.nextInt(10) == 1) {
                delta = this.r.nextInt(10) + 1;
                idsInGap.add(new MessageIdData().setLedgerId(ledgerIds.get()).setEntryId(entryIds.get() + 1L));
                ledgerIds.addAndGet(delta);
                entryIds.set(0L);
            }
            if ((delta = (long)this.r.nextInt(5)) != 0L) {
                idsInGap.add(new MessageIdData().setLedgerId(ledgerIds.get()).setEntryId(entryIds.get() + 1L));
            }
            MessageIdData id = new MessageIdData().setLedgerId(ledgerIds.get()).setEntryId(entryIds.addAndGet(delta + 1L));
            RawMessageImpl m = new RawMessageImpl(id, Unpooled.EMPTY_BUFFER);
            try {
                CompletableFuture f = new CompletableFuture();
                ByteBuf buffer = m.serialize();
                lh.asyncAddEntry(buffer, (rc, ledger, eid, ctx) -> {
                    if (rc != 0) {
                        f.completeExceptionally(BKException.create((int)rc));
                    } else {
                        positions.add(Pair.of((Object)id, (Object)eid));
                        idsInGap.forEach(gid -> idsInGaps.add(Pair.of((Object)gid, (Object)eid)));
                        f.complete(null);
                    }
                }, null);
                CompletableFuture completableFuture = f;
                return completableFuture;
            }
            finally {
                if (Collections.singletonList(m).get(0) != null) {
                    m.close();
                }
            }
        }).toArray(CompletableFuture[]::new)).get();
        lh.close();
        return Triple.of((Object)lh.getId(), positions, idsInGaps);
    }

    @Test
    public void testEntryLookup() throws Exception {
        PositionImpl pos;
        BookKeeper bk = this.pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null);
        Triple<Long, List<Pair<MessageIdData, Long>>, List<Pair<MessageIdData, Long>>> compactedLedgerData = this.buildCompactedLedger(bk, 500);
        List positions = (List)compactedLedgerData.getMiddle();
        List idsInGaps = (List)compactedLedgerData.getRight();
        LedgerHandle lh = bk.openLedger(((Long)compactedLedgerData.getLeft()).longValue(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
        long lastEntryId = lh.getLastAddConfirmed();
        AsyncLoadingCache cache = CompactedTopicImpl.createCache((LedgerHandle)lh, (long)50L);
        MessageIdData firstPositionId = (MessageIdData)((Pair)positions.get(0)).getLeft();
        Pair lastPosition = (Pair)positions.get(positions.size() - 1);
        Assert.assertEquals(CompactedTopicImpl.findStartPoint((PositionImpl)new PositionImpl(0L, 0L), (long)lastEntryId, (AsyncLoadingCache)cache).get(), (Object)0L);
        Assert.assertEquals(CompactedTopicImpl.findStartPoint((PositionImpl)new PositionImpl(Long.MAX_VALUE, 0L), (long)lastEntryId, (AsyncLoadingCache)cache).get(), (Object)-4276948922L);
        Assert.assertEquals(CompactedTopicImpl.findStartPoint((PositionImpl)new PositionImpl(firstPositionId.getLedgerId(), 0L), (long)lastEntryId, (AsyncLoadingCache)cache).get(), (Object)0L);
        Assert.assertEquals(CompactedTopicImpl.findStartPoint((PositionImpl)new PositionImpl(((MessageIdData)lastPosition.getLeft()).getLedgerId(), ((MessageIdData)lastPosition.getLeft()).getEntryId() + 1L), (long)lastEntryId, (AsyncLoadingCache)cache).get(), (Object)-4276948922L);
        Collections.shuffle(positions, this.r);
        Collections.shuffle(idsInGaps, this.r);
        for (Pair p : positions) {
            pos = new PositionImpl(((MessageIdData)p.getLeft()).getLedgerId(), ((MessageIdData)p.getLeft()).getEntryId());
            Long got = (Long)CompactedTopicImpl.findStartPoint((PositionImpl)pos, (long)lastEntryId, (AsyncLoadingCache)cache).get();
            Assert.assertEquals((Object)got, (Object)p.getRight());
        }
        for (Pair gap : idsInGaps) {
            pos = new PositionImpl(((MessageIdData)gap.getLeft()).getLedgerId(), ((MessageIdData)gap.getLeft()).getEntryId());
            Assert.assertEquals(CompactedTopicImpl.findStartPoint((PositionImpl)pos, (long)lastEntryId, (AsyncLoadingCache)cache).get(), (Object)gap.getRight());
        }
    }

    @Test
    public void testCleanupOldCompactedTopicLedger() throws Exception {
        BookKeeper bk = this.pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null);
        LedgerHandle oldCompactedLedger = bk.createLedger(1, 1, Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
        oldCompactedLedger.close();
        LedgerHandle newCompactedLedger = bk.createLedger(1, 1, Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
        newCompactedLedger.close();
        CompactedTopicImpl compactedTopic = new CompactedTopicImpl(bk);
        compactedTopic.newCompactedLedger((Position)new PositionImpl(1L, 2L), oldCompactedLedger.getId()).get();
        bk.openLedger(oldCompactedLedger.getId(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
        bk.openLedger(newCompactedLedger.getId(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
        compactedTopic.newCompactedLedger((Position)new PositionImpl(1L, 2L), newCompactedLedger.getId()).get();
        try {
            bk.openLedger(oldCompactedLedger.getId(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
            Assert.fail((String)"Should have failed to open old ledger");
        }
        catch (BKException.BKNoSuchLedgerExistsException | BKException.BKNoSuchLedgerExistsOnMetadataServerException throwable) {
            // empty catch block
        }
        bk.openLedger(newCompactedLedger.getId(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
    }

    @Test(dataProvider="batchEnabledProvider")
    public void testCompactWithEmptyMessage(boolean batchEnabled) throws Exception {
        String key = "1";
        byte[] msgBytes = "".getBytes();
        String topic = "persistent://my-property/use/my-ns/testCompactWithEmptyMessage-" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(topic, 1);
        int messages = 10;
        ProducerBuilder builder = this.pulsarClient.newProducer().topic(topic);
        if (!batchEnabled) {
            builder.enableBatching(false);
        } else {
            builder.batchingMaxMessages(5);
        }
        Producer producer = builder.create();
        ArrayList<CompletableFuture> list = new ArrayList<CompletableFuture>(10);
        for (int i = 0; i < 10; ++i) {
            list.add(producer.newMessage().keyBytes("1".getBytes(Charset.defaultCharset())).value((Object)msgBytes).sendAsync());
        }
        FutureUtil.waitForAll(list).get();
        this.admin.topics().triggerCompaction(topic);
        boolean succeed = CompactedTopicTest.retryStrategically(test -> {
            try {
                return LongRunningProcessStatus.Status.SUCCESS.equals((Object)this.admin.topics().compactionStatus((String)topic).status);
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 10, 200L);
        Assert.assertTrue((boolean)succeed);
        list.clear();
        for (int i = 0; i < 10; ++i) {
            list.add(producer.newMessage().key("1").value((Object)msgBytes).sendAsync());
        }
        FutureUtil.waitForAll(list).get();
        this.admin.topics().triggerCompaction(topic);
        succeed = CompactedTopicTest.retryStrategically(test -> {
            try {
                return LongRunningProcessStatus.Status.SUCCESS.equals((Object)this.admin.topics().compactionStatus((String)topic).status);
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 10, 200L);
        Assert.assertTrue((boolean)succeed);
        producer.close();
    }

    @Test(timeOut=30000L)
    public void testReadMessageFromCompactedLedger() throws Exception {
        String key = "1";
        String msg = "test compaction msg";
        String topic = "persistent://my-property/use/my-ns/testCompactWithEmptyMessage-" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(topic, 1);
        int numMessages = 10;
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
        for (int i = 0; i < 10; ++i) {
            producer.newMessage().key("1").value((Object)msg).send();
        }
        this.admin.topics().triggerCompaction(topic);
        boolean succeed = CompactedTopicTest.retryStrategically(test -> {
            try {
                return LongRunningProcessStatus.Status.SUCCESS.equals((Object)this.admin.topics().compactionStatus((String)topic).status);
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 10, 200L);
        Assert.assertTrue((boolean)succeed);
        String newKey = "2";
        String newMsg = "test compaction msg v2";
        for (int i = 0; i < 10; ++i) {
            producer.newMessage().key("2").value((Object)newMsg).send();
        }
        Reader reader = this.pulsarClient.newReader(Schema.STRING).topic(topic).subscriptionName("test").readCompacted(true).startMessageId(MessageId.earliest).create();
        int compactedMsgCount = 0;
        int nonCompactedMsgCount = 0;
        while (reader.hasMessageAvailable()) {
            Message message = reader.readNext();
            if ("1".equals(message.getKey()) && msg.equals(message.getValue())) {
                ++compactedMsgCount;
                continue;
            }
            if (!"2".equals(message.getKey()) || !newMsg.equals(message.getValue())) continue;
            ++nonCompactedMsgCount;
        }
        Assert.assertEquals((int)compactedMsgCount, (int)1);
        Assert.assertEquals((int)nonCompactedMsgCount, (int)10);
    }
}

