/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.compaction;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.EncryptionKeyInfo;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"flaky"})
public class CompactionTest
extends MockedPulsarServiceBaseTest {
    private ScheduledExecutorService compactionScheduler;
    private BookKeeper bk;

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("use", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("my-property", (TenantInfo)new TenantInfoImpl((Set)Sets.newHashSet((Object[])new String[]{"appid1", "appid2"}), (Set)Sets.newHashSet((Object[])new String[]{"use"})));
        this.admin.namespaces().createNamespace("my-property/use/my-ns");
        this.compactionScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
        this.bk = this.pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null);
    }

    @Override
    @AfterMethod(alwaysRun=true)
    public void cleanup() throws Exception {
        super.internalCleanup();
        if (this.compactionScheduler != null) {
            this.compactionScheduler.shutdownNow();
        }
    }

    @Test
    public void testCompaction() throws Exception {
        Message m;
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        int numMessages = 20;
        int maxKeys = 10;
        Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        HashMap<String, byte[]> expected = new HashMap<String, byte[]>();
        ArrayList<Pair> all = new ArrayList<Pair>();
        Random r = new Random(0L);
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        for (int j = 0; j < 20; ++j) {
            int keyIndex = r.nextInt(10);
            String key = "key" + keyIndex;
            byte[] data = ("my-message-" + key + "-" + j).getBytes();
            producer.newMessage().key(key).value((Object)data).send();
            expected.put(key, data);
            all.add(Pair.of((Object)key, (Object)data));
        }
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats(topic, false);
        Assert.assertEquals((long)expected.size(), (long)internalStats.compactedLedger.entries);
        Assert.assertTrue((internalStats.compactedLedger.ledgerId > -1L ? 1 : 0) != 0);
        Assert.assertFalse((boolean)internalStats.compactedLedger.offloaded);
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();){
            do {
                m = consumer.receive(2, TimeUnit.SECONDS);
                Assert.assertEquals((byte[])((byte[])expected.remove(m.getKey())), (byte[])m.getData());
            } while (!expected.isEmpty());
            Assert.assertTrue((boolean)expected.isEmpty());
        }
        consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(false).subscribe();
        var11_13 = null;
        try {
            do {
                m = consumer.receive(2, TimeUnit.SECONDS);
                Pair expectedMessage = (Pair)all.remove(0);
                Assert.assertEquals((String)((String)expectedMessage.getLeft()), (String)m.getKey());
                Assert.assertEquals((byte[])((byte[])expectedMessage.getRight()), (byte[])m.getData());
            } while (!all.isEmpty());
            Assert.assertTrue((boolean)all.isEmpty());
        }
        catch (Throwable throwable) {
            var11_13 = throwable;
            throw throwable;
        }
        finally {
            if (consumer != null) {
                if (var11_13 != null) {
                    try {
                        consumer.close();
                    }
                    catch (Throwable throwable) {
                        var11_13.addSuppressed(throwable);
                    }
                } else {
                    consumer.close();
                }
            }
        }
    }

    @Test
    public void testCompactionWithReader() throws Exception {
        Message m;
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        int numMessages = 20;
        int maxKeys = 10;
        this.admin.namespaces().setRetention("my-property/use/my-ns", new RetentionPolicies(-1, -1));
        Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        HashMap<String, String> expected = new HashMap<String, String>();
        ArrayList<Pair> all = new ArrayList<Pair>();
        Random r = new Random(0L);
        for (int j = 0; j < 20; ++j) {
            int keyIndex = r.nextInt(10);
            String key = "key" + keyIndex;
            String value = "my-message-" + key + "-" + j;
            producer.newMessage().key(key).value((Object)value.getBytes()).send();
            expected.put(key, value);
            all.add(Pair.of((Object)key, (Object)value));
        }
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Reader reader = this.pulsarClient.newReader().topic(topic).readCompacted(true).startMessageId(MessageId.earliest).create();){
            do {
                m = reader.readNext(2, TimeUnit.SECONDS);
                Assert.assertEquals((String)((String)expected.remove(m.getKey())), (String)new String(m.getData()));
            } while (!expected.isEmpty());
            Assert.assertTrue((boolean)expected.isEmpty());
        }
        reader = this.pulsarClient.newReader().topic(topic).readCompacted(false).startMessageId(MessageId.earliest).create();
        var10_12 = null;
        try {
            do {
                m = reader.readNext(2, TimeUnit.SECONDS);
                Pair expectedMessage = (Pair)all.remove(0);
                Assert.assertEquals((String)((String)expectedMessage.getLeft()), (String)m.getKey());
                Assert.assertEquals((String)((String)expectedMessage.getRight()), (String)new String(m.getData()));
            } while (!all.isEmpty());
            Assert.assertTrue((boolean)all.isEmpty());
        }
        catch (Throwable throwable) {
            var10_12 = throwable;
            throw throwable;
        }
        finally {
            if (reader != null) {
                if (var10_12 != null) {
                    try {
                        reader.close();
                    }
                    catch (Throwable throwable) {
                        var10_12.addSuppressed(throwable);
                    }
                } else {
                    reader.close();
                }
            }
        }
    }

    @Test
    public void testReadCompactedBeforeCompaction() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(false).create();
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        producer.newMessage().key("key0").value((Object)"content0".getBytes()).send();
        producer.newMessage().key("key0").value((Object)"content1".getBytes()).send();
        producer.newMessage().key("key0").value((Object)"content2".getBytes()).send();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();){
            Message m = consumer.receive();
            Assert.assertEquals((String)m.getKey(), (String)"key0");
            Assert.assertEquals((byte[])m.getData(), (byte[])"content0".getBytes());
            m = consumer.receive();
            Assert.assertEquals((String)m.getKey(), (String)"key0");
            Assert.assertEquals((byte[])m.getData(), (byte[])"content1".getBytes());
            m = consumer.receive();
            Assert.assertEquals((String)m.getKey(), (String)"key0");
            Assert.assertEquals((byte[])m.getData(), (byte[])"content2".getBytes());
        }
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();){
            Message m = consumer.receive();
            Assert.assertEquals((String)m.getKey(), (String)"key0");
            Assert.assertEquals((byte[])m.getData(), (byte[])"content2".getBytes());
        }
    }

    @Test
    public void testReadEntriesAfterCompaction() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(false).create();
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        producer.newMessage().key("key0").value((Object)"content0".getBytes()).send();
        producer.newMessage().key("key0").value((Object)"content1".getBytes()).send();
        producer.newMessage().key("key0").value((Object)"content2".getBytes()).send();
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        producer.newMessage().key("key0").value((Object)"content3".getBytes()).send();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();){
            Message m = consumer.receive();
            Assert.assertEquals((String)m.getKey(), (String)"key0");
            Assert.assertEquals((byte[])m.getData(), (byte[])"content2".getBytes());
            m = consumer.receive();
            Assert.assertEquals((String)m.getKey(), (String)"key0");
            Assert.assertEquals((byte[])m.getData(), (byte[])"content3".getBytes());
        }
    }

    @Test
    public void testSeekEarliestAfterCompaction() throws Exception {
        Message m;
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(false).create();
        producer.newMessage().key("key0").value((Object)"content0".getBytes()).send();
        producer.newMessage().key("key0").value((Object)"content1".getBytes()).send();
        producer.newMessage().key("key0").value((Object)"content2".getBytes()).send();
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();){
            consumer.seek(MessageId.earliest);
            m = consumer.receive();
            Assert.assertEquals((String)m.getKey(), (String)"key0");
            Assert.assertEquals((byte[])m.getData(), (byte[])"content2".getBytes());
        }
        consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(false).subscribe();
        var5_5 = null;
        try {
            consumer.seek(MessageId.earliest);
            m = consumer.receive();
            Assert.assertEquals((String)m.getKey(), (String)"key0");
            Assert.assertEquals((byte[])m.getData(), (byte[])"content0".getBytes());
            m = consumer.receive();
            Assert.assertEquals((String)m.getKey(), (String)"key0");
            Assert.assertEquals((byte[])m.getData(), (byte[])"content1".getBytes());
            m = consumer.receive();
            Assert.assertEquals((String)m.getKey(), (String)"key0");
            Assert.assertEquals((byte[])m.getData(), (byte[])"content2".getBytes());
        }
        catch (Throwable throwable) {
            var5_5 = throwable;
            throw throwable;
        }
        finally {
            if (consumer != null) {
                if (var5_5 != null) {
                    try {
                        consumer.close();
                    }
                    catch (Throwable throwable) {
                        var5_5.addSuppressed(throwable);
                    }
                } else {
                    consumer.close();
                }
            }
        }
    }

    @Test
    public void testBrokerRestartAfterCompaction() throws Exception {
        Message m2;
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(false).create();
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        producer.newMessage().key("key0").value((Object)"content0".getBytes()).send();
        producer.newMessage().key("key0").value((Object)"content1".getBytes()).send();
        producer.newMessage().key("key0").value((Object)"content2".getBytes()).send();
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();){
            m2 = consumer.receive();
            Assert.assertEquals((String)m2.getKey(), (String)"key0");
            Assert.assertEquals((byte[])m2.getData(), (byte[])"content2".getBytes());
        }
        this.stopBroker();
        try {
            consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();
            var5_6 = null;
            try {
                consumer.receive();
                Assert.fail((String)"Shouldn't have been able to receive anything");
            }
            catch (Throwable m2) {
                var5_6 = m2;
                throw m2;
            }
            finally {
                if (consumer != null) {
                    if (var5_6 != null) {
                        try {
                            consumer.close();
                        }
                        catch (Throwable m2) {
                            var5_6.addSuppressed(m2);
                        }
                    } else {
                        consumer.close();
                    }
                }
            }
        }
        catch (PulsarClientException consumer2) {
            // empty catch block
        }
        this.startBroker();
        consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();
        var5_6 = null;
        try {
            m2 = consumer.receive();
            Assert.assertEquals((String)m2.getKey(), (String)"key0");
            Assert.assertEquals((byte[])m2.getData(), (byte[])"content2".getBytes());
        }
        catch (Throwable throwable) {
            var5_6 = throwable;
            throw throwable;
        }
        finally {
            if (consumer != null) {
                if (var5_6 != null) {
                    try {
                        consumer.close();
                    }
                    catch (Throwable throwable) {
                        var5_6.addSuppressed(throwable);
                    }
                } else {
                    consumer.close();
                }
            }
        }
    }

    @Test
    public void testCompactEmptyTopic() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(false).create();
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        producer.newMessage().key("key0").value((Object)"content0".getBytes()).send();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();){
            Message m = consumer.receive();
            Assert.assertEquals((String)m.getKey(), (String)"key0");
            Assert.assertEquals((byte[])m.getData(), (byte[])"content0".getBytes());
        }
    }

    @Test
    public void testFirstMessageRetained() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        try (Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(false).create();){
            producer.newMessage().key("key1").value((Object)"my-message-1".getBytes()).sendAsync();
            producer.newMessage().key("key2").value((Object)"my-message-2".getBytes()).sendAsync();
            producer.newMessage().key("key2").value((Object)"my-message-3".getBytes()).send();
        }
        ArrayList<Message> messages = new ArrayList<Message>();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();){
            messages.add(consumer.receive());
            messages.add(consumer.receive());
            messages.add(consumer.receive());
        }
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();){
            Message message1 = consumer.receive();
            Assert.assertEquals((String)message1.getKey(), (String)"key1");
            Assert.assertEquals((String)new String(message1.getData()), (String)"my-message-1");
            Assert.assertEquals((Object)message1.getMessageId(), (Object)((Message)messages.get(0)).getMessageId());
            Message message2 = consumer.receive();
            Assert.assertEquals((String)message2.getKey(), (String)"key2");
            Assert.assertEquals((String)new String(message2.getData()), (String)"my-message-3");
            Assert.assertEquals((Object)message2.getMessageId(), (Object)((Message)messages.get(2)).getMessageId());
        }
    }

    @Test
    public void testBatchMessageIdsDontChange() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        try (Producer producer = this.pulsarClient.newProducer().topic(topic).maxPendingMessages(3).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).messageRoutingMode(MessageRoutingMode.SinglePartition).create();){
            producer.newMessage().key("key1").value((Object)"my-message-1".getBytes()).sendAsync();
            producer.newMessage().key("key2").value((Object)"my-message-2".getBytes()).sendAsync();
            producer.newMessage().key("key2").value((Object)"my-message-3".getBytes()).send();
        }
        ArrayList<Message> messages = new ArrayList<Message>();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();){
            messages.add(consumer.receive());
            messages.add(consumer.receive());
            messages.add(consumer.receive());
        }
        Assert.assertEquals((long)((BatchMessageIdImpl)((Message)messages.get(0)).getMessageId()).getLedgerId(), (long)((BatchMessageIdImpl)((Message)messages.get(1)).getMessageId()).getLedgerId());
        Assert.assertEquals((long)((BatchMessageIdImpl)((Message)messages.get(0)).getMessageId()).getLedgerId(), (long)((BatchMessageIdImpl)((Message)messages.get(2)).getMessageId()).getLedgerId());
        Assert.assertEquals((long)((BatchMessageIdImpl)((Message)messages.get(0)).getMessageId()).getEntryId(), (long)((BatchMessageIdImpl)((Message)messages.get(1)).getMessageId()).getEntryId());
        Assert.assertEquals((long)((BatchMessageIdImpl)((Message)messages.get(0)).getMessageId()).getEntryId(), (long)((BatchMessageIdImpl)((Message)messages.get(2)).getMessageId()).getEntryId());
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();){
            Message message1 = consumer.receive();
            Assert.assertEquals((String)message1.getKey(), (String)"key1");
            Assert.assertEquals((String)new String(message1.getData()), (String)"my-message-1");
            Assert.assertEquals((Object)message1.getMessageId(), (Object)((Message)messages.get(0)).getMessageId());
            Message message2 = consumer.receive();
            Assert.assertEquals((String)message2.getKey(), (String)"key2");
            Assert.assertEquals((String)new String(message2.getData()), (String)"my-message-3");
            Assert.assertEquals((Object)message2.getMessageId(), (Object)((Message)messages.get(2)).getMessageId());
        }
    }

    @Test
    public void testWholeBatchCompactedOut() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        try (Producer producerNormal = this.pulsarClient.newProducer().topic(topic).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
             Producer producerBatch = this.pulsarClient.newProducer().topic(topic).maxPendingMessages(3).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).messageRoutingMode(MessageRoutingMode.SinglePartition).create();){
            producerBatch.newMessage().key("key1").value((Object)"my-message-1".getBytes()).sendAsync();
            producerBatch.newMessage().key("key1").value((Object)"my-message-2".getBytes()).sendAsync();
            producerBatch.newMessage().key("key1").value((Object)"my-message-3".getBytes()).sendAsync();
            producerNormal.newMessage().key("key1").value((Object)"my-message-4".getBytes()).send();
        }
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();){
            Message message = consumer.receive();
            Assert.assertEquals((String)message.getKey(), (String)"key1");
            Assert.assertEquals((String)new String(message.getData()), (String)"my-message-4");
        }
    }

    @Test
    public void testKeyLessMessagesPassThrough() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        try (Producer producerNormal = this.pulsarClient.newProducer().topic(topic).create();
             Producer producerBatch = this.pulsarClient.newProducer().topic(topic).maxPendingMessages(3).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).create();){
            producerNormal.newMessage().value((Object)"my-message-1".getBytes()).send();
            producerBatch.newMessage().value((Object)"my-message-2".getBytes()).sendAsync();
            producerBatch.newMessage().key("key1").value((Object)"my-message-3".getBytes()).sendAsync();
            producerBatch.newMessage().key("key1").value((Object)"my-message-4".getBytes()).send();
            producerBatch.newMessage().key("key2").value((Object)"my-message-5".getBytes()).sendAsync();
            producerBatch.newMessage().key("key2").value((Object)"my-message-6".getBytes()).sendAsync();
            producerBatch.newMessage().value((Object)"my-message-7".getBytes()).send();
            producerNormal.newMessage().value((Object)"my-message-8".getBytes()).send();
        }
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();){
            Message message1 = consumer.receive();
            Assert.assertFalse((boolean)message1.hasKey());
            Assert.assertEquals((String)new String(message1.getData()), (String)"my-message-1");
            Message message2 = consumer.receive();
            Assert.assertFalse((boolean)message2.hasKey());
            Assert.assertEquals((String)new String(message2.getData()), (String)"my-message-2");
            Message message3 = consumer.receive();
            Assert.assertEquals((String)message3.getKey(), (String)"key1");
            Assert.assertEquals((String)new String(message3.getData()), (String)"my-message-4");
            Message message4 = consumer.receive();
            Assert.assertEquals((String)message4.getKey(), (String)"key2");
            Assert.assertEquals((String)new String(message4.getData()), (String)"my-message-6");
            Message message5 = consumer.receive();
            Assert.assertFalse((boolean)message5.hasKey());
            Assert.assertEquals((String)new String(message5.getData()), (String)"my-message-7");
            Message message6 = consumer.receive();
            Assert.assertFalse((boolean)message6.hasKey());
            Assert.assertEquals((String)new String(message6.getData()), (String)"my-message-8");
        }
    }

    @Test
    public void testEmptyPayloadDeletes() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        try (Producer producerNormal = this.pulsarClient.newProducer().topic(topic).enableBatching(false).create();
             Producer producerBatch = this.pulsarClient.newProducer().topic(topic).maxPendingMessages(3).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).create();){
            producerNormal.newMessage().key("key0").value((Object)"my-message-0".getBytes()).send();
            producerNormal.newMessage().key("key1").value((Object)"my-message-1".getBytes()).send();
            producerNormal.newMessage().key("key1").send();
            producerBatch.newMessage().key("key2").value((Object)"my-message-2".getBytes()).sendAsync();
            producerBatch.newMessage().key("key3").value((Object)"my-message-3".getBytes()).sendAsync();
            producerBatch.newMessage().key("key2").send();
            producerBatch.newMessage().key("key3").sendAsync();
            producerBatch.newMessage().key("key4").value((Object)"my-message-3".getBytes()).sendAsync();
            producerBatch.newMessage().key("key4").send();
            producerNormal.newMessage().key("key4").value((Object)"my-message-4".getBytes()).send();
        }
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();){
            Message message1 = consumer.receive();
            Assert.assertEquals((String)message1.getKey(), (String)"key0");
            Assert.assertEquals((String)new String(message1.getData()), (String)"my-message-0");
            Message message2 = consumer.receive();
            Assert.assertEquals((String)message2.getKey(), (String)"key4");
            Assert.assertEquals((String)new String(message2.getData()), (String)"my-message-4");
        }
    }

    @Test
    public void testEmptyPayloadDeletesWhenCompressed() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        try (Producer producerNormal = this.pulsarClient.newProducer().topic(topic).enableBatching(false).compressionType(CompressionType.LZ4).create();
             Producer producerBatch = this.pulsarClient.newProducer().topic(topic).maxPendingMessages(3).enableBatching(true).compressionType(CompressionType.LZ4).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).create();){
            producerNormal.newMessage().key("key0").value((Object)"my-message-0".getBytes()).send();
            producerNormal.newMessage().key("key1").value((Object)"my-message-1".getBytes()).send();
            producerNormal.newMessage().key("key1").send();
            producerBatch.newMessage().key("key2").value((Object)"my-message-2".getBytes()).sendAsync();
            producerBatch.newMessage().key("key3").value((Object)"my-message-3".getBytes()).sendAsync();
            producerBatch.newMessage().key("key2").send();
            producerBatch.newMessage().key("key3").sendAsync();
            producerBatch.newMessage().key("key4").value((Object)"my-message-3".getBytes()).sendAsync();
            producerBatch.newMessage().key("key4").send();
            producerNormal.newMessage().key("key4").value((Object)"my-message-4".getBytes()).send();
        }
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();){
            Message message1 = consumer.receive();
            Assert.assertEquals((String)message1.getKey(), (String)"key0");
            Assert.assertEquals((String)new String(message1.getData()), (String)"my-message-0");
            Message message2 = consumer.receive();
            Assert.assertEquals((String)message2.getKey(), (String)"key4");
            Assert.assertEquals((String)new String(message2.getData()), (String)"my-message-4");
        }
    }

    @Test
    public void testCompactorReadsCompacted() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        Set ledgersOpened = Sets.newConcurrentHashSet();
        Mockito.when((Object)this.mockBookKeeper.newOpenLedgerOp()).thenAnswer(invocation -> {
            OpenBuilder builder = (OpenBuilder)Mockito.spy((Object)invocation.callRealMethod());
            Mockito.when((Object)builder.withLedgerId(Mockito.anyLong())).thenAnswer(invocation2 -> {
                ledgersOpened.add((Long)invocation2.getArguments()[0]);
                return invocation2.callRealMethod();
            });
            return builder;
        });
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").subscribe().close();
        try (Producer producerNormal = this.pulsarClient.newProducer().topic(topic).create();){
            producerNormal.newMessage().key("key0").value((Object)"my-message-0".getBytes()).send();
        }
        ((Topic)this.pulsar.getBrokerService().getTopicReference(topic).get()).close(false).get();
        producerNormal = this.pulsarClient.newProducer().topic(topic).create();
        var4_4 = null;
        try {
            producerNormal.newMessage().key("key1").value((Object)"my-message-1".getBytes()).send();
        }
        catch (Throwable throwable) {
            var4_4 = throwable;
            throw throwable;
        }
        finally {
            if (producerNormal != null) {
                if (var4_4 != null) {
                    try {
                        producerNormal.close();
                    }
                    catch (Throwable throwable) {
                        var4_4.addSuppressed(throwable);
                    }
                } else {
                    producerNormal.close();
                }
            }
        }
        String managedLedgerName = ((PersistentTopic)this.pulsar.getBrokerService().getTopicReference(topic).get()).getManagedLedger().getName();
        ManagedLedgerInfo info = this.pulsar.getManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName);
        Assert.assertEquals((int)info.ledgers.size(), (int)2);
        Assert.assertTrue((boolean)ledgersOpened.isEmpty());
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        Assert.assertTrue((boolean)ledgersOpened.contains(((ManagedLedgerInfo.LedgerInfo)info.ledgers.get((int)0)).ledgerId));
        Assert.assertFalse((boolean)ledgersOpened.contains(((ManagedLedgerInfo.LedgerInfo)info.ledgers.get((int)1)).ledgerId));
        ledgersOpened.clear();
        ((Topic)this.pulsar.getBrokerService().getTopicReference(topic).get()).close(false).get();
        try (Producer producerNormal = this.pulsarClient.newProducer().topic(topic).create();){
            producerNormal.newMessage().key("key2").value((Object)"my-message-2".getBytes()).send();
        }
        info = this.pulsar.getManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName);
        Assert.assertEquals((int)info.ledgers.size(), (int)3);
        Assert.assertFalse((boolean)ledgersOpened.contains(((ManagedLedgerInfo.LedgerInfo)info.ledgers.get((int)0)).ledgerId));
        Assert.assertFalse((boolean)ledgersOpened.contains(((ManagedLedgerInfo.LedgerInfo)info.ledgers.get((int)1)).ledgerId));
        Assert.assertFalse((boolean)ledgersOpened.contains(((ManagedLedgerInfo.LedgerInfo)info.ledgers.get((int)2)).ledgerId));
        ledgersOpened.clear();
        compactor.compact(topic).get();
        Assert.assertFalse((boolean)ledgersOpened.contains(((ManagedLedgerInfo.LedgerInfo)info.ledgers.get((int)0)).ledgerId));
        Assert.assertTrue((boolean)ledgersOpened.contains(((ManagedLedgerInfo.LedgerInfo)info.ledgers.get((int)1)).ledgerId));
        Assert.assertFalse((boolean)ledgersOpened.contains(((ManagedLedgerInfo.LedgerInfo)info.ledgers.get((int)2)).ledgerId));
        var7_13 = null;
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();){
            Message message1 = consumer.receive();
            Assert.assertEquals((String)message1.getKey(), (String)"key0");
            Assert.assertEquals((String)new String(message1.getData()), (String)"my-message-0");
            Message message2 = consumer.receive();
            Assert.assertEquals((String)message2.getKey(), (String)"key1");
            Assert.assertEquals((String)new String(message2.getData()), (String)"my-message-1");
            Message message3 = consumer.receive();
            Assert.assertEquals((String)message3.getKey(), (String)"key2");
            Assert.assertEquals((String)new String(message3.getData()), (String)"my-message-2");
        }
        catch (Throwable throwable) {
            var7_13 = throwable;
            throw throwable;
        }
    }

    @Test
    public void testCompactCompressedNoBatch() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        try (Producer producer = this.pulsarClient.newProducer().topic(topic).compressionType(CompressionType.LZ4).enableBatching(false).create();){
            producer.newMessage().key("key1").value((Object)"my-message-1".getBytes()).sendAsync();
            producer.newMessage().key("key2").value((Object)"my-message-2".getBytes()).sendAsync();
            producer.newMessage().key("key2").value((Object)"my-message-3".getBytes()).send();
        }
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();){
            Message message1 = consumer.receive();
            Assert.assertEquals((String)message1.getKey(), (String)"key1");
            Assert.assertEquals((String)new String(message1.getData()), (String)"my-message-1");
            Message message2 = consumer.receive();
            Assert.assertEquals((String)message2.getKey(), (String)"key2");
            Assert.assertEquals((String)new String(message2.getData()), (String)"my-message-3");
        }
    }

    @Test
    public void testCompactCompressedBatching() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        try (Producer producer = this.pulsarClient.newProducer().topic(topic).compressionType(CompressionType.LZ4).maxPendingMessages(3).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).create();){
            producer.newMessage().key("key1").value((Object)"my-message-1".getBytes()).sendAsync();
            producer.newMessage().key("key2").value((Object)"my-message-2".getBytes()).sendAsync();
            producer.newMessage().key("key2").value((Object)"my-message-3".getBytes()).send();
        }
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();){
            Message message1 = consumer.receive();
            Assert.assertEquals((String)message1.getKey(), (String)"key1");
            Assert.assertEquals((String)new String(message1.getData()), (String)"my-message-1");
            Message message2 = consumer.receive();
            Assert.assertEquals((String)message2.getKey(), (String)"key2");
            Assert.assertEquals((String)new String(message2.getData()), (String)"my-message-3");
        }
    }

    @Test
    public void testCompactEncryptedNoBatch() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        try (Producer producer = this.pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).enableBatching(false).create();){
            producer.newMessage().key("key1").value((Object)"my-message-1".getBytes()).sendAsync();
            producer.newMessage().key("key2").value((Object)"my-message-2".getBytes()).sendAsync();
            producer.newMessage().key("key2").value((Object)"my-message-3".getBytes()).send();
        }
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).readCompacted(true).subscribe();){
            Message message1 = consumer.receive();
            Assert.assertEquals((String)message1.getKey(), (String)"key1");
            Assert.assertEquals((String)new String(message1.getData()), (String)"my-message-1");
            Message message2 = consumer.receive();
            Assert.assertEquals((String)message2.getKey(), (String)"key2");
            Assert.assertEquals((String)new String(message2.getData()), (String)"my-message-3");
        }
    }

    @Test
    public void testCompactEncryptedBatching() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        try (Producer producer = this.pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).maxPendingMessages(3).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).create();){
            producer.newMessage().key("key1").value((Object)"my-message-1".getBytes()).sendAsync();
            producer.newMessage().key("key2").value((Object)"my-message-2".getBytes()).sendAsync();
            producer.newMessage().key("key2").value((Object)"my-message-3".getBytes()).send();
        }
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).readCompacted(true).subscribe();){
            Message message1 = consumer.receive();
            Assert.assertEquals((String)message1.getKey(), (String)"key1");
            Assert.assertEquals((String)new String(message1.getData()), (String)"my-message-1");
            Message message2 = consumer.receive();
            Assert.assertEquals((String)message2.getKey(), (String)"key2");
            Assert.assertEquals((String)new String(message2.getData()), (String)"my-message-2");
            Message message3 = consumer.receive();
            Assert.assertEquals((String)message3.getKey(), (String)"key2");
            Assert.assertEquals((String)new String(message3.getData()), (String)"my-message-3");
        }
    }

    @Test
    public void testCompactEncryptedAndCompressedNoBatch() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        try (Producer producer = this.pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).compressionType(CompressionType.LZ4).enableBatching(false).create();){
            producer.newMessage().key("key1").value((Object)"my-message-1".getBytes()).sendAsync();
            producer.newMessage().key("key2").value((Object)"my-message-2".getBytes()).sendAsync();
            producer.newMessage().key("key2").value((Object)"my-message-3".getBytes()).send();
        }
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).readCompacted(true).subscribe();){
            Message message1 = consumer.receive();
            Assert.assertEquals((String)message1.getKey(), (String)"key1");
            Assert.assertEquals((String)new String(message1.getData()), (String)"my-message-1");
            Message message2 = consumer.receive();
            Assert.assertEquals((String)message2.getKey(), (String)"key2");
            Assert.assertEquals((String)new String(message2.getData()), (String)"my-message-3");
        }
    }

    @Test
    public void testCompactEncryptedAndCompressedBatching() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        try (Producer producer = this.pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).compressionType(CompressionType.LZ4).maxPendingMessages(3).enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).create();){
            producer.newMessage().key("key1").value((Object)"my-message-1".getBytes()).sendAsync();
            producer.newMessage().key("key2").value((Object)"my-message-2".getBytes()).sendAsync();
            producer.newMessage().key("key2").value((Object)"my-message-3".getBytes()).send();
        }
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).readCompacted(true).subscribe();){
            Message message1 = consumer.receive();
            Assert.assertEquals((String)message1.getKey(), (String)"key1");
            Assert.assertEquals((String)new String(message1.getData()), (String)"my-message-1");
            Message message2 = consumer.receive();
            Assert.assertEquals((String)message2.getKey(), (String)"key2");
            Assert.assertEquals((String)new String(message2.getData()), (String)"my-message-2");
            Message message3 = consumer.receive();
            Assert.assertEquals((String)message3.getKey(), (String)"key2");
            Assert.assertEquals((String)new String(message3.getData()), (String)"my-message-3");
        }
    }

    @Test
    public void testEmptyPayloadDeletesWhenEncrypted() throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        try (Producer producerNormal = this.pulsarClient.newProducer().topic(topic).enableBatching(false).addEncryptionKey("client-ecdsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
             Producer producerBatch = this.pulsarClient.newProducer().topic(topic).maxPendingMessages(3).enableBatching(true).addEncryptionKey("client-ecdsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).batchingMaxMessages(3).batchingMaxPublishDelay(1L, TimeUnit.HOURS).create();){
            producerNormal.newMessage().key("key0").value((Object)"my-message-0".getBytes()).send();
            producerNormal.newMessage().key("key1").value((Object)"my-message-1".getBytes()).send();
            producerNormal.newMessage().key("key1").send();
            producerBatch.newMessage().key("key2").value((Object)"my-message-2".getBytes()).sendAsync();
            producerBatch.newMessage().key("key3").value((Object)"my-message-3".getBytes()).sendAsync();
            producerBatch.newMessage().key("key2").send();
            producerNormal.newMessage().key("key4").value((Object)"my-message-4".getBytes()).send();
        }
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).subscriptionName("sub1").readCompacted(true).subscribe();){
            Message message1 = consumer.receive();
            Assert.assertEquals((String)message1.getKey(), (String)"key0");
            Assert.assertEquals((String)new String(message1.getData()), (String)"my-message-0");
            Message message2 = consumer.receive();
            Assert.assertEquals((String)message2.getKey(), (String)"key2");
            Assert.assertEquals((String)new String(message2.getData()), (String)"my-message-2");
            Message message3 = consumer.receive();
            Assert.assertEquals((String)message3.getKey(), (String)"key3");
            Assert.assertEquals((String)new String(message3.getData()), (String)"my-message-3");
            Message message4 = consumer.receive();
            Assert.assertEquals((String)message4.getKey(), (String)"key2");
            Assert.assertEquals((String)new String(message4.getData()), (String)"");
            Message message5 = consumer.receive();
            Assert.assertEquals((String)message5.getKey(), (String)"key4");
            Assert.assertEquals((String)new String(message5.getData()), (String)"my-message-4");
        }
    }

    @DataProvider(name="lastDeletedBatching")
    public static Object[][] lastDeletedBatching() {
        return new Object[][]{{true}, {false}};
    }

    @Test(timeOut=20000L, dataProvider="lastDeletedBatching")
    public void testCompactionWithLastDeletedKey(boolean batching) throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(batching).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        producer.newMessage().key("1").value((Object)"1".getBytes()).send();
        producer.newMessage().key("2").value((Object)"2".getBytes()).send();
        producer.newMessage().key("3").value((Object)"3".getBytes()).send();
        producer.newMessage().key("1").value((Object)"".getBytes()).send();
        producer.newMessage().key("2").value((Object)"".getBytes()).send();
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        HashSet expected = Sets.newHashSet((Object[])new String[]{"3"});
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();){
            Message m = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)expected.remove(m.getKey()));
        }
    }

    @Test(timeOut=20000L, dataProvider="lastDeletedBatching")
    public void testEmptyCompactionLedger(boolean batching) throws Exception {
        String topic = "persistent://my-property/use/my-ns/my-topic1";
        Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(batching).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe().close();
        producer.newMessage().key("1").value((Object)"1".getBytes()).send();
        producer.newMessage().key("2").value((Object)"2".getBytes()).send();
        producer.newMessage().key("1").value((Object)"".getBytes()).send();
        producer.newMessage().key("2").value((Object)"".getBytes()).send();
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscribe();){
            Message m = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertNull((Object)m);
        }
    }

    @Test(timeOut=20000L, dataProvider="lastDeletedBatching")
    public void testAllEmptyCompactionLedger(boolean batchEnabled) throws Exception {
        String topic = "persistent://my-property/use/my-ns/testAllEmptyCompactionLedger" + UUID.randomUUID().toString();
        int messages = 10;
        ProducerBuilder builder = this.pulsarClient.newProducer().topic(topic);
        if (!batchEnabled) {
            builder.enableBatching(false);
        } else {
            builder.batchingMaxMessages(2);
        }
        Producer producer = builder.create();
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>(10);
        for (int i = 0; i < 10; ++i) {
            futures.add(producer.newMessage().keyBytes("1".getBytes()).value((Object)"".getBytes()).sendAsync());
        }
        FutureUtil.waitForAll(futures).get();
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();){
            Message m = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertNull((Object)m);
        }
    }

    @Test(timeOut=20000L)
    public void testBatchAndNonBatchWithoutEmptyPayload() throws PulsarClientException, ExecutionException, InterruptedException {
        int i;
        String topic = "persistent://my-property/use/my-ns/testBatchAndNonBatchWithoutEmptyPayload" + UUID.randomUUID().toString();
        Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(true).batchingMaxPublishDelay(1L, TimeUnit.DAYS).create();
        String k1 = "k1";
        String k2 = "k2";
        producer.newMessage().key("k1").value((Object)"0".getBytes()).send();
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>(7);
        for (i = 0; i < 2; ++i) {
            futures.add(producer.newMessage().key("k1").value((Object)(i + 1 + "").getBytes()).sendAsync());
        }
        producer.flush();
        producer.newMessage().key("k1").value((Object)"3".getBytes()).send();
        for (i = 0; i < 2; ++i) {
            futures.add(producer.newMessage().key("k1").value((Object)(i + 4 + "").getBytes()).sendAsync());
        }
        producer.flush();
        for (i = 0; i < 3; ++i) {
            futures.add(producer.newMessage().key("k2").value((Object)(i + "").getBytes()).sendAsync());
        }
        producer.newMessage().key("k2").value((Object)"3".getBytes()).send();
        producer.flush();
        FutureUtil.waitForAll(futures).get();
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();){
            Message m1 = consumer.receive(2, TimeUnit.SECONDS);
            Message m2 = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertNotNull((Object)m1);
            Assert.assertNotNull((Object)m2);
            Assert.assertEquals((String)m1.getKey(), (String)"k1");
            Assert.assertEquals((String)new String((byte[])m1.getValue()), (String)"5");
            Assert.assertEquals((String)m2.getKey(), (String)"k2");
            Assert.assertEquals((String)new String((byte[])m2.getValue()), (String)"3");
            Message none = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertNull((Object)none);
        }
    }

    @Test(timeOut=20000L)
    public void testBatchAndNonBatchWithEmptyPayload() throws PulsarClientException, ExecutionException, InterruptedException {
        int i;
        String topic = "persistent://my-property/use/my-ns/testBatchAndNonBatchWithEmptyPayload" + UUID.randomUUID().toString();
        Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(true).batchingMaxPublishDelay(1L, TimeUnit.DAYS).create();
        String k1 = "k1";
        String k2 = "k2";
        String k3 = "k3";
        producer.newMessage().key("k1").value((Object)"0".getBytes()).send();
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>(7);
        for (i = 0; i < 2; ++i) {
            futures.add(producer.newMessage().key("k1").value((Object)(i + 1 + "").getBytes()).sendAsync());
        }
        producer.flush();
        producer.newMessage().key("k1").value((Object)"3".getBytes()).send();
        for (i = 0; i < 2; ++i) {
            futures.add(producer.newMessage().key("k1").value((Object)(i + 4 + "").getBytes()).sendAsync());
        }
        producer.flush();
        for (i = 0; i < 3; ++i) {
            futures.add(producer.newMessage().key("k2").value((Object)(i + 10 + "").getBytes()).sendAsync());
        }
        producer.flush();
        producer.newMessage().key("k2").value((Object)"".getBytes()).send();
        producer.newMessage().key("k3").value((Object)"0".getBytes()).send();
        FutureUtil.waitForAll(futures).get();
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();){
            Message m1 = consumer.receive();
            Message m2 = consumer.receive();
            Assert.assertNotNull((Object)m1);
            Assert.assertNotNull((Object)m2);
            Assert.assertEquals((String)m1.getKey(), (String)"k1");
            Assert.assertEquals((String)m2.getKey(), (String)"k3");
            Assert.assertEquals((String)new String((byte[])m1.getValue()), (String)"5");
            Assert.assertEquals((String)new String((byte[])m2.getValue()), (String)"0");
            Message none = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertNull((Object)none);
        }
    }

    @Test(timeOut=20000L)
    public void testBatchAndNonBatchEndOfEmptyPayload() throws PulsarClientException, ExecutionException, InterruptedException {
        int i;
        String topic = "persistent://my-property/use/my-ns/testBatchAndNonBatchWithEmptyPayload" + UUID.randomUUID().toString();
        Producer producer = this.pulsarClient.newProducer().topic(topic).enableBatching(true).batchingMaxPublishDelay(1L, TimeUnit.DAYS).create();
        String k1 = "k1";
        String k2 = "k2";
        producer.newMessage().key("k1").value((Object)"0".getBytes()).send();
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>(7);
        for (i = 0; i < 2; ++i) {
            futures.add(producer.newMessage().key("k1").value((Object)(i + 1 + "").getBytes()).sendAsync());
        }
        producer.flush();
        producer.newMessage().key("k1").value((Object)"3".getBytes()).send();
        for (i = 0; i < 2; ++i) {
            futures.add(producer.newMessage().key("k1").value((Object)(i + 4 + "").getBytes()).sendAsync());
        }
        producer.flush();
        for (i = 0; i < 3; ++i) {
            futures.add(producer.newMessage().key("k2").value((Object)(i + 10 + "").getBytes()).sendAsync());
        }
        producer.flush();
        producer.newMessage().key("k2").value((Object)"".getBytes()).send();
        FutureUtil.waitForAll(futures).get();
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();){
            Message m1 = consumer.receive();
            Assert.assertNotNull((Object)m1);
            Assert.assertEquals((String)m1.getKey(), (String)"k1");
            Assert.assertEquals((String)new String((byte[])m1.getValue()), (String)"5");
            Message none = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertNull((Object)none);
        }
    }

    @Test(timeOut=20000L, dataProvider="lastDeletedBatching")
    public void testCompactMultipleTimesWithoutEmptyMessage(boolean batchEnabled) throws PulsarClientException, ExecutionException, InterruptedException {
        String topic = "persistent://my-property/use/my-ns/testCompactMultipleTimesWithoutEmptyMessage" + UUID.randomUUID().toString();
        int messages = 10;
        String key = "1";
        ProducerBuilder builder = this.pulsarClient.newProducer().topic(topic);
        if (!batchEnabled) {
            builder.enableBatching(false);
        } else {
            builder.batchingMaxMessages(2);
        }
        Producer producer = builder.create();
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>(10);
        for (int i = 0; i < 10; ++i) {
            futures.add(producer.newMessage().key("1").value((Object)(i + "").getBytes()).sendAsync());
        }
        FutureUtil.waitForAll(futures).get();
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        futures.clear();
        for (int i = 0; i < 10; ++i) {
            futures.add(producer.newMessage().key("1").value((Object)(i + 10 + "").getBytes()).sendAsync());
        }
        FutureUtil.waitForAll(futures).get();
        compactor.compact(topic).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();){
            Message m1 = consumer.receive();
            Assert.assertNotNull((Object)m1);
            Assert.assertEquals((String)m1.getKey(), (String)"1");
            Assert.assertEquals((String)new String((byte[])m1.getValue()), (String)"19");
            Message none = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertNull((Object)none);
        }
    }

    @Test(timeOut=2000000L, dataProvider="lastDeletedBatching")
    public void testReadUnCompacted(boolean batchEnabled) throws PulsarClientException, ExecutionException, InterruptedException {
        Message none;
        Message received;
        String topic = "persistent://my-property/use/my-ns/testReadUnCompacted" + UUID.randomUUID().toString();
        int messages = 10;
        String key = "1";
        ProducerBuilder builder = this.pulsarClient.newProducer().topic(topic);
        if (!batchEnabled) {
            builder.enableBatching(false);
        } else {
            builder.batchingMaxMessages(2);
        }
        Producer producer = builder.create();
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>(10);
        for (int i = 0; i < 10; ++i) {
            futures.add(producer.newMessage().key("1").value((Object)(i + "").getBytes()).sendAsync());
        }
        FutureUtil.waitForAll(futures).get();
        TwoPhaseCompactor compactor = new TwoPhaseCompactor(this.conf, this.pulsarClient, this.bk, this.compactionScheduler);
        compactor.compact(topic).get();
        futures.clear();
        for (int i = 0; i < 10; ++i) {
            futures.add(producer.newMessage().key("1").value((Object)(i + 10 + "").getBytes()).sendAsync());
        }
        FutureUtil.waitForAll(futures).get();
        try (Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub1").readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();){
            for (int i = 0; i < 11; ++i) {
                received = consumer.receive();
                Assert.assertNotNull((Object)received);
                Assert.assertEquals((String)received.getKey(), (String)"1");
                Assert.assertEquals((String)new String((byte[])received.getValue()), (String)(i + 9 + ""));
                consumer.acknowledge(received);
            }
            none = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertNull((Object)none);
        }
        producer.newMessage().key("1").value((Object)"".getBytes()).send();
        compactor.compact(topic).get();
        consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub2").readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        var10_14 = null;
        try {
            none = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertNull((Object)none);
        }
        catch (Throwable none2) {
            var10_14 = none2;
            throw none2;
        }
        finally {
            if (consumer != null) {
                if (var10_14 != null) {
                    try {
                        consumer.close();
                    }
                    catch (Throwable none2) {
                        var10_14.addSuppressed(none2);
                    }
                } else {
                    consumer.close();
                }
            }
        }
        for (int i = 0; i < 10; ++i) {
            futures.add(producer.newMessage().key("1").value((Object)(i + 20 + "").getBytes()).sendAsync());
        }
        FutureUtil.waitForAll(futures).get();
        consumer = this.pulsarClient.newConsumer().topic(new String[]{topic}).subscriptionName("sub3").readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        var10_14 = null;
        try {
            for (int i = 0; i < 10; ++i) {
                received = consumer.receive();
                Assert.assertNotNull((Object)received);
                Assert.assertEquals((String)received.getKey(), (String)"1");
                Assert.assertEquals((String)new String((byte[])received.getValue()), (String)(i + 20 + ""));
                consumer.acknowledge(received);
            }
            Message none3 = consumer.receive(2, TimeUnit.SECONDS);
            Assert.assertNull((Object)none3);
        }
        catch (Throwable throwable) {
            var10_14 = throwable;
            throw throwable;
        }
        finally {
            if (consumer != null) {
                if (var10_14 != null) {
                    try {
                        consumer.close();
                    }
                    catch (Throwable throwable) {
                        var10_14.addSuppressed(throwable);
                    }
                } else {
                    consumer.close();
                }
            }
        }
    }

    class EncKeyReader
    implements CryptoKeyReader {
        EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

        EncKeyReader() {
        }

        public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
            String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
            if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                    return this.keyInfo;
                }
                catch (IOException e) {
                    Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                }
            } else {
                Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
            }
            return null;
        }

        public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
            String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
            if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                    return this.keyInfo;
                }
                catch (IOException e) {
                    Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                }
            } else {
                Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
            }
            return null;
        }
    }
}

