package org.apache.pulsar.compaction;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/compaction/CompactionRetentionTest.class */
public class CompactionRetentionTest extends MockedPulsarServiceBaseTest {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) CompactionRetentionTest.class);
    private ScheduledExecutorService compactionScheduler;
    private BookKeeper bk;

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        this.conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
        this.conf.setManagedLedgerMaxEntriesPerLedger(2);
        super.internalSetup();
        this.admin.clusters().createCluster("use", new ClusterData(this.pulsar.getWebServiceAddress()));
        this.admin.tenants().createTenant("my-tenant", new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
        this.admin.namespaces().createNamespace("my-tenant/use/my-ns");
        this.compactionScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
        this.bk = this.pulsar.getBookKeeperClientFactory().create(this.conf, (ZooKeeper) null, Optional.empty(), (Map) null);
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    public void cleanup() throws Exception {
        super.internalCleanup();
        if (this.compactionScheduler != null) {
            this.compactionScheduler.shutdownNow();
        }
    }

    @Test
    public void testCompaction() throws Exception {
        String str = "persistent://my-tenant/use/my-ns/my-topic-" + System.nanoTime();
        Set<String> newHashSet = Sets.newHashSet("a", "b", "c");
        HashSet newHashSet2 = Sets.newHashSet("x1", "x2");
        Set<String> hashSet = new HashSet<>();
        hashSet.addAll(newHashSet);
        hashSet.addAll(newHashSet2);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(SerializationFeature.INDENT_OUTPUT, true);
        Producer create = this.pulsarClient.newProducer(Schema.INT32).topic(str).create();
        try {
            TwoPhaseCompactor twoPhaseCompactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
            twoPhaseCompactor.compact(str).join();
            log.info(" ---- X 1: {}", objectMapper.writeValueAsString(this.admin.topics().getInternalStats(str, false)));
            Iterator<String> it = hashSet.iterator();
            while (it.hasNext()) {
                create.newMessage().key(it.next()).value(1).send();
            }
            log.info(" ---- X 2: {}", objectMapper.writeValueAsString(this.admin.topics().getInternalStats(str, false)));
            validateMessages(this.pulsarClient, true, str, 1, hashSet);
            twoPhaseCompactor.compact(str).join();
            log.info(" ---- X 3: {}", objectMapper.writeValueAsString(this.admin.topics().getInternalStats(str, false)));
            validateMessages(this.pulsarClient, true, str, 1, hashSet);
            Iterator<String> it2 = hashSet.iterator();
            while (it2.hasNext()) {
                create.newMessage().key(it2.next()).value(2).send();
            }
            twoPhaseCompactor.compact(str).join();
            validateMessages(this.pulsarClient, true, str, 2, hashSet);
            Iterator it3 = newHashSet2.iterator();
            while (it3.hasNext()) {
                create.newMessage().key((String) it3.next()).send();
            }
            twoPhaseCompactor.compact(str).join();
            log.info(" ---- X 4: {}", objectMapper.writeValueAsString(this.admin.topics().getInternalStats(str, false)));
            validateMessages(this.pulsarClient, true, str, 2, newHashSet);
            validateMessages(this.pulsarClient, false, str, 2, Collections.emptySet());
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    private void validateMessages(PulsarClient pulsarClient, boolean z, String str, int i, Set<String> set) throws Exception {
        Reader create = pulsarClient.newReader(Schema.INT32).topic(str).startMessageId(MessageId.earliest).readCompacted(z).create();
        try {
            HashMap hashMap = new HashMap();
            while (true) {
                Message readNext = create.readNext(1, TimeUnit.SECONDS);
                if (readNext == null) {
                    break;
                }
                Integer num = readNext.getData().length > 0 ? (Integer) readNext.getValue() : null;
                log.info("Received: {} -- value: {}", readNext.getKey(), num);
                if (num != null) {
                    hashMap.put(readNext.getKey(), num);
                }
            }
            HashMap hashMap2 = new HashMap();
            set.forEach(str2 -> {
            });
            log.info("Received values: {}", hashMap);
            log.info("Expected values: {}", hashMap2);
            Assert.assertEquals(hashMap, hashMap2);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }
}
