package org.apache.pulsar.compaction;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.EventLoopGroup;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/compaction/CompactionRetentionTest.class */
public class CompactionRetentionTest extends MockedPulsarServiceBaseTest {
    private static final Logger log = LoggerFactory.getLogger(CompactionRetentionTest.class);
    protected ScheduledExecutorService compactionScheduler;
    protected BookKeeper bk;
    private TwoPhaseCompactor compactor;

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        this.conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
        this.conf.setManagedLedgerMaxEntriesPerLedger(2);
        super.internalSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("my-tenant", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2"}), Sets.newHashSet(new String[]{"test"})));
        this.admin.namespaces().createNamespace("my-tenant/my-ns", Collections.singleton("test"));
        this.compactionScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
        this.bk = (BookKeeper) this.pulsar.getBookKeeperClientFactory().create(this.conf, (MetadataStoreExtended) null, (EventLoopGroup) null, Optional.empty(), (Map) null).get();
        this.compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    public void cleanup() throws Exception {
        super.internalCleanup();
        this.bk.close();
        if (this.compactionScheduler != null) {
            this.compactionScheduler.shutdownNow();
        }
    }

    protected long compact(String str) throws ExecutionException, InterruptedException {
        return ((Long) this.compactor.compact(str).get()).longValue();
    }

    @Test
    public void testCompaction() throws Exception {
        String str = "persistent://my-tenant/my-ns/my-topic-" + System.nanoTime();
        Set<String> newHashSet = Sets.newHashSet(new String[]{"a", "b", "c"});
        HashSet newHashSet2 = Sets.newHashSet(new String[]{"x1", "x2"});
        Set<String> hashSet = new HashSet<>();
        hashSet.addAll(newHashSet);
        hashSet.addAll(newHashSet2);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(SerializationFeature.INDENT_OUTPUT, true);
        Producer create = this.pulsarClient.newProducer(Schema.INT32).topic(str).create();
        try {
            compact(str);
            log.info(" ---- X 1: {}", objectMapper.writeValueAsString(this.admin.topics().getInternalStats(str, false)));
            Iterator<String> it = hashSet.iterator();
            while (it.hasNext()) {
                create.newMessage().key(it.next()).value(1).send();
            }
            log.info(" ---- X 2: {}", objectMapper.writeValueAsString(this.admin.topics().getInternalStats(str, false)));
            validateMessages(this.pulsarClient, true, str, 1, hashSet);
            this.compactor.compact(str).join();
            log.info(" ---- X 3: {}", objectMapper.writeValueAsString(this.admin.topics().getInternalStats(str, false)));
            validateMessages(this.pulsarClient, true, str, 1, hashSet);
            Iterator<String> it2 = hashSet.iterator();
            while (it2.hasNext()) {
                create.newMessage().key(it2.next()).value(2).send();
            }
            compact(str);
            validateMessages(this.pulsarClient, true, str, 2, hashSet);
            Iterator it3 = newHashSet2.iterator();
            while (it3.hasNext()) {
                create.newMessage().key((String) it3.next()).send();
            }
            compact(str);
            log.info(" ---- X 4: {}", objectMapper.writeValueAsString(this.admin.topics().getInternalStats(str, false)));
            validateMessages(this.pulsarClient, true, str, 2, newHashSet);
            validateMessages(this.pulsarClient, false, str, 2, Collections.emptySet());
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testCompactionRetentionOnTopicCreationWithNamespacePolicies() throws Exception {
        String str = "persistent://my-tenant/my-ns/my-topic-" + System.nanoTime();
        this.admin.namespaces().setCompactionThreshold("my-tenant/my-ns", 10L);
        testCompactionCursorRetention(str);
    }

    @Test
    public void testCompactionRetentionAfterTopicCreationWithNamespacePolicies() throws Exception {
        String str = "persistent://my-tenant/my-ns/my-topic-" + System.nanoTime();
        this.pulsarClient.newProducer(Schema.INT32).topic(str).create().close();
        this.admin.namespaces().setCompactionThreshold("my-tenant/my-ns", 10L);
        Awaitility.await().untilAsserted(() -> {
            testCompactionCursorRetention(str);
        });
    }

    @Test
    public void testCompactionRetentionOnTopicCreationWithTopicPolicies() throws Exception {
        String str = "persistent://my-tenant/my-ns/my-topic-" + System.nanoTime();
        this.pulsarClient.newProducer(Schema.INT32).topic(str).create().close();
        this.admin.topics().setCompactionThreshold(str, 10L);
        Awaitility.await().untilAsserted(() -> {
            testCompactionCursorRetention(str);
        });
    }

    @Test
    public void testRetentionPolicesForSystemTopic() throws Exception {
        String str = "persistent://" + "my-tenant/my-ns" + "/";
        this.admin.namespaces().setRetention("my-tenant/my-ns", new RetentionPolicies(-1, -1L));
        Iterator it = SystemTopicNames.EVENTS_TOPIC_NAMES.iterator();
        while (it.hasNext()) {
            checkSystemTopicRetentionPolicy(str + ((String) it.next()));
        }
        checkSystemTopicRetentionPolicy(str + SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN);
        checkSystemTopicRetentionPolicy(str + SystemTopicNames.TRANSACTION_COORDINATOR_LOG);
        checkSystemTopicRetentionPolicy(str + "__transaction_pending_ack");
        checkCommonTopicRetentionPolicy(str + "my-topic" + System.nanoTime());
        this.pulsar.getConfiguration().setTopicLevelPoliciesEnabled(true);
        this.pulsar.getConfiguration().setSystemTopicEnabled(true);
        this.admin.topics().createNonPartitionedTopic(str + "__transaction_buffer_snapshot");
        this.admin.topicPolicies().setRetention(str + "__transaction_buffer_snapshot", new RetentionPolicies(10, 10L));
        Awaitility.await().untilAsserted(() -> {
            checkTopicRetentionPolicy(str + "__transaction_buffer_snapshot", new RetentionPolicies(10, 10L));
        });
    }

    private void checkSystemTopicRetentionPolicy(String str) throws Exception {
        checkTopicRetentionPolicy(str, new RetentionPolicies(0, 0L));
    }

    private void checkCommonTopicRetentionPolicy(String str) throws Exception {
        checkTopicRetentionPolicy(str, new RetentionPolicies(-1, -1L));
    }

    private void checkTopicRetentionPolicy(String str, RetentionPolicies retentionPolicies) throws Exception {
        ManagedLedgerConfig managedLedgerConfig = (ManagedLedgerConfig) this.pulsar.getBrokerService().getManagedLedgerConfig(TopicName.get(str)).get();
        Assert.assertEquals(managedLedgerConfig.getRetentionSizeInMB(), retentionPolicies.getRetentionSizeInMB());
        Assert.assertEquals(managedLedgerConfig.getRetentionTimeMillis(), retentionPolicies.getRetentionTimeInMinutes() * 60000);
    }

    private void testCompactionCursorRetention(String str) throws Exception {
        HashSet newHashSet = Sets.newHashSet(new String[]{"a", "b", "c"});
        HashSet newHashSet2 = Sets.newHashSet(new String[]{"x1", "x2"});
        HashSet hashSet = new HashSet();
        hashSet.addAll(newHashSet);
        hashSet.addAll(newHashSet2);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(SerializationFeature.INDENT_OUTPUT, true);
        Producer create = this.pulsarClient.newProducer(Schema.INT32).topic(str).create();
        try {
            log.info(" ---- X 1: {}", objectMapper.writeValueAsString(this.admin.topics().getInternalStats(str, false)));
            Iterator<String> it = hashSet.iterator();
            while (it.hasNext()) {
                create.newMessage().key(it.next()).value(1).send();
            }
            log.info(" ---- X 2: {}", objectMapper.writeValueAsString(this.admin.topics().getInternalStats(str, false)));
            validateMessages(this.pulsarClient, true, str, 1, hashSet);
            compact(str);
            log.info(" ---- X 3: {}", objectMapper.writeValueAsString(this.admin.topics().getInternalStats(str, false)));
            validateMessages(this.pulsarClient, true, str, 1, hashSet);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    private void validateMessages(PulsarClient pulsarClient, boolean z, String str, int i, Set<String> set) throws Exception {
        Reader create = pulsarClient.newReader(Schema.INT32).topic(str).startMessageId(MessageId.earliest).readCompacted(z).create();
        try {
            HashMap hashMap = new HashMap();
            while (true) {
                Message readNext = create.readNext(1, TimeUnit.SECONDS);
                if (readNext == null) {
                    break;
                }
                Integer num = readNext.size() > 0 ? (Integer) readNext.getValue() : null;
                log.info("Received: {} -- value: {}", readNext.getKey(), num);
                if (num != null) {
                    hashMap.put(readNext.getKey(), num);
                }
            }
            HashMap hashMap2 = new HashMap();
            set.forEach(str2 -> {
                hashMap2.put(str2, Integer.valueOf(i));
            });
            log.info("Received values: {}", hashMap);
            log.info("Expected values: {}", hashMap2);
            Assert.assertEquals(hashMap, hashMap2);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }
}
