/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.admin;

import com.google.common.collect.Sets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"broker"})
public class AdminApiOffloadTest
extends MockedPulsarServiceBaseTest {
    private final String testTenant = "prop-xyz";
    private final String testNamespace = "ns1";
    private final String myNamespace = "prop-xyz/ns1";
    private final String testTopic = "persistent://prop-xyz/ns1/test-";

    @Override
    @BeforeMethod
    public void setup() throws Exception {
        this.conf.setManagedLedgerMaxEntriesPerLedger(10);
        this.conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        super.internalSetup();
        this.admin.clusters().createCluster("test", new ClusterData(this.pulsar.getWebServiceAddress()));
        TenantInfo tenantInfo = new TenantInfo((Set)Sets.newHashSet((Object[])new String[]{"role1", "role2"}), (Set)Sets.newHashSet((Object[])new String[]{"test"}));
        this.admin.tenants().createTenant("prop-xyz", tenantInfo);
        this.admin.namespaces().createNamespace("prop-xyz/ns1", (Set)Sets.newHashSet((Object[])new String[]{"test"}));
    }

    @Override
    @AfterMethod(alwaysRun=true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    private void testOffload(String topicName, String mlName) throws Exception {
        LedgerOffloader offloader = (LedgerOffloader)Mockito.mock(LedgerOffloader.class);
        Mockito.when((Object)offloader.getOffloadDriverName()).thenReturn((Object)"mock");
        ((PulsarService)Mockito.doReturn((Object)offloader).when((Object)this.pulsar)).getManagedLedgerOffloader((NamespaceName)ArgumentMatchers.any(), (OffloadPolicies)ArgumentMatchers.any());
        CompletableFuture promise = new CompletableFuture();
        ((LedgerOffloader)Mockito.doReturn(promise).when((Object)offloader)).offload((ReadHandle)ArgumentMatchers.any(), (UUID)ArgumentMatchers.any(), (Map)ArgumentMatchers.any());
        MessageId currentId = MessageId.latest;
        try (Producer p = this.pulsarClient.newProducer().topic(topicName).enableBatching(false).create();){
            for (int i = 0; i < 15; ++i) {
                currentId = p.send((Object)"Foobar".getBytes());
            }
        }
        ManagedLedgerInfo info = this.pulsar.getManagedLedgerFactory().getManagedLedgerInfo(mlName);
        Assert.assertEquals((int)info.ledgers.size(), (int)2);
        Assert.assertEquals((Object)this.admin.topics().offloadStatus(topicName).getStatus(), (Object)LongRunningProcessStatus.Status.NOT_RUN);
        this.admin.topics().triggerOffload(topicName, currentId);
        Assert.assertEquals((Object)this.admin.topics().offloadStatus(topicName).getStatus(), (Object)LongRunningProcessStatus.Status.RUNNING);
        try {
            this.admin.topics().triggerOffload(topicName, currentId);
            Assert.fail((String)"Should have failed");
        }
        catch (PulsarAdminException.ConflictException conflictException) {
            // empty catch block
        }
        promise.completeExceptionally(new Exception("Some random failure"));
        Assert.assertEquals((Object)this.admin.topics().offloadStatus(topicName).getStatus(), (Object)LongRunningProcessStatus.Status.ERROR);
        Assert.assertTrue((boolean)this.admin.topics().offloadStatus(topicName).getLastError().contains("Some random failure"));
        ((LedgerOffloader)Mockito.doReturn(CompletableFuture.completedFuture(null)).when((Object)offloader)).offload((ReadHandle)ArgumentMatchers.any(), (UUID)ArgumentMatchers.any(), (Map)ArgumentMatchers.any());
        this.admin.topics().triggerOffload(topicName, currentId);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().offloadStatus(topicName).getStatus(), (Object)LongRunningProcessStatus.Status.SUCCESS));
        MessageId firstUnoffloaded = this.admin.topics().offloadStatus(topicName).getFirstUnoffloadedMessage();
        Assert.assertTrue((boolean)(firstUnoffloaded instanceof MessageIdImpl));
        MessageIdImpl firstUnoffloadedMessage = (MessageIdImpl)firstUnoffloaded;
        Assert.assertEquals((long)firstUnoffloadedMessage.getLedgerId(), (long)((ManagedLedgerInfo.LedgerInfo)info.ledgers.get((int)1)).ledgerId);
        Assert.assertEquals((long)firstUnoffloadedMessage.getEntryId(), (long)0L);
        ((LedgerOffloader)Mockito.verify((Object)offloader, (VerificationMode)Mockito.times((int)2))).offload((ReadHandle)ArgumentMatchers.any(), (UUID)ArgumentMatchers.any(), (Map)ArgumentMatchers.any());
    }

    @Test
    public void testOffloadV2() throws Exception {
        String topicName = "persistent://prop-xyz/ns1/topic1";
        String mlName = "prop-xyz/ns1/persistent/topic1";
        this.testOffload(topicName, mlName);
    }

    @Test
    public void testOffloadV1() throws Exception {
        String topicName = "persistent://prop-xyz/test/ns1/topic2";
        String mlName = "prop-xyz/test/ns1/persistent/topic2";
        this.testOffload(topicName, mlName);
    }

    @Test
    public void testOffloadPolicies() throws Exception {
        String namespaceName = "prop-xyz/ns1";
        String driver = "aws-s3";
        String region = "test-region";
        String bucket = "test-bucket";
        String endpoint = "test-endpoint";
        long offloadThresholdInBytes = 0L;
        long offloadDeletionLagInMillis = 100L;
        OffloadPolicies.OffloadedReadPriority priority = OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST;
        OffloadPolicies offload1 = OffloadPolicies.create((String)driver, (String)region, (String)bucket, (String)endpoint, null, null, null, null, (Integer)100, (Integer)100, (Long)offloadThresholdInBytes, (Long)offloadDeletionLagInMillis, (OffloadPolicies.OffloadedReadPriority)priority);
        this.admin.namespaces().setOffloadPolicies(namespaceName, offload1);
        OffloadPolicies offload2 = this.admin.namespaces().getOffloadPolicies(namespaceName);
        Assert.assertEquals((Object)offload1, (Object)offload2);
        this.admin.namespaces().removeOffloadPolicies(namespaceName);
        OffloadPolicies offload3 = this.admin.namespaces().getOffloadPolicies(namespaceName);
        Assert.assertNull((Object)offload3);
    }

    @Test
    public void testOffloadPoliciesApi() throws Exception {
        String topicName = "persistent://prop-xyz/ns1/test-" + UUID.randomUUID().toString();
        this.admin.topics().createPartitionedTopic(topicName, 3);
        this.pulsarClient.newProducer().topic(topicName).create().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topicName)));
        OffloadPolicies offloadPolicies = this.admin.topics().getOffloadPolicies(topicName);
        Assert.assertNull((Object)offloadPolicies);
        OffloadPolicies offload = new OffloadPolicies();
        String path = "fileSystemPath";
        offload.setFileSystemProfilePath(path);
        this.admin.topics().setOffloadPolicies(topicName, offload);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getOffloadPolicies(topicName)));
        Assert.assertEquals((Object)this.admin.topics().getOffloadPolicies(topicName), (Object)offload);
        Assert.assertEquals((String)this.admin.topics().getOffloadPolicies(topicName).getFileSystemProfilePath(), (String)path);
        this.admin.topics().removeOffloadPolicies(topicName);
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getOffloadPolicies(topicName)));
        Assert.assertNull((Object)this.admin.topics().getOffloadPolicies(topicName));
    }

    @Test
    public void testOffloadPoliciesAppliedApi() throws Exception {
        String topicName = "persistent://prop-xyz/ns1/test-" + UUID.randomUUID().toString();
        this.admin.topics().createPartitionedTopic(topicName, 3);
        this.pulsarClient.newProducer().topic(topicName).create().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topicName)));
        OffloadPolicies offloadPolicies = this.admin.topics().getOffloadPolicies(topicName, true);
        OffloadPolicies brokerPolicies = OffloadPolicies.mergeConfiguration(null, null, (Properties)this.pulsar.getConfiguration().getProperties());
        Assert.assertEquals((Object)offloadPolicies, (Object)brokerPolicies);
        LedgerOffloader topicOffloaded = (LedgerOffloader)Mockito.mock(LedgerOffloader.class);
        Mockito.when((Object)topicOffloaded.getOffloadDriverName()).thenReturn((Object)"mock");
        ((PulsarService)Mockito.doReturn((Object)topicOffloaded).when((Object)this.pulsar)).createManagedLedgerOffloader((OffloadPolicies)ArgumentMatchers.any());
        OffloadPolicies namespacePolicies = new OffloadPolicies();
        namespacePolicies.setManagedLedgerOffloadThresholdInBytes(Long.valueOf(100L));
        namespacePolicies.setManagedLedgerOffloadDeletionLagInMillis(Long.valueOf(200L));
        namespacePolicies.setManagedLedgerOffloadDriver("s3");
        namespacePolicies.setManagedLedgerOffloadBucket("buck");
        this.admin.namespaces().setOffloadPolicies("prop-xyz/ns1", namespacePolicies);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.namespaces().getOffloadPolicies("prop-xyz/ns1"), (Object)namespacePolicies));
        Assert.assertEquals((Object)this.admin.topics().getOffloadPolicies(topicName, true), (Object)namespacePolicies);
        OffloadPolicies topicPolicies = new OffloadPolicies();
        topicPolicies.setManagedLedgerOffloadThresholdInBytes(Long.valueOf(200L));
        topicPolicies.setManagedLedgerOffloadDeletionLagInMillis(Long.valueOf(400L));
        topicPolicies.setManagedLedgerOffloadDriver("s3");
        topicPolicies.setManagedLedgerOffloadBucket("buck2");
        this.admin.topics().setOffloadPolicies(topicName, topicPolicies);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getOffloadPolicies(topicName, true), (Object)topicPolicies));
        this.admin.topics().removeOffloadPolicies(topicName);
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getOffloadPolicies(topicName, true), (Object)namespacePolicies));
        this.admin.namespaces().removeOffloadPolicies("prop-xyz/ns1");
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)this.admin.topics().getOffloadPolicies(topicName, true), (Object)brokerPolicies));
    }

    @Test
    public void testTopicLevelOffloadPartitioned() throws Exception {
        this.testOffload(true);
    }

    @Test
    public void testTopicLevelOffloadNonPartitioned() throws Exception {
        this.testOffload(false);
    }

    private void testOffload(boolean isPartitioned) throws Exception {
        String topicName = "persistent://prop-xyz/ns1/test-" + UUID.randomUUID().toString();
        int partitionNum = 3;
        if (isPartitioned) {
            this.admin.topics().createPartitionedTopic(topicName, partitionNum);
        } else {
            this.admin.topics().createNonPartitionedTopic(topicName);
        }
        this.pulsarClient.newProducer().topic(topicName).enableBatching(false).create().close();
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topicName)));
        if (isPartitioned) {
            for (int i = 0; i < partitionNum; ++i) {
                PersistentTopic topic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(TopicName.get((String)topicName).getPartition(i).toString()).get()).get();
                Assert.assertNotNull((Object)topic.getManagedLedger().getConfig().getLedgerOffloader());
                Assert.assertEquals((String)topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName(), (String)"NullLedgerOffloader");
            }
        } else {
            PersistentTopic topic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopic(topicName, false).get()).get();
            Assert.assertNotNull((Object)topic.getManagedLedger().getConfig().getLedgerOffloader());
            Assert.assertEquals((String)topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName(), (String)"NullLedgerOffloader");
        }
        OffloadPolicies offloadPolicies = new OffloadPolicies();
        offloadPolicies.setOffloadersDirectory(".");
        offloadPolicies.setManagedLedgerOffloadDriver("mock");
        offloadPolicies.setManagedLedgerOffloadPrefetchRounds(Integer.valueOf(10));
        offloadPolicies.setManagedLedgerOffloadThresholdInBytes(Long.valueOf(1024L));
        LedgerOffloader topicOffloader = (LedgerOffloader)Mockito.mock(LedgerOffloader.class);
        Mockito.when((Object)topicOffloader.getOffloadDriverName()).thenReturn((Object)"mock");
        ((PulsarService)Mockito.doReturn((Object)topicOffloader).when((Object)this.pulsar)).createManagedLedgerOffloader((OffloadPolicies)ArgumentMatchers.any());
        Awaitility.await().until(() -> this.pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get((String)topicName)));
        this.admin.topics().setOffloadPolicies(topicName, offloadPolicies);
        Awaitility.await().untilAsserted(() -> Assert.assertNotNull((Object)this.admin.topics().getOffloadPolicies(topicName)));
        if (isPartitioned) {
            for (int i = 0; i < partitionNum; ++i) {
                PersistentTopic topic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopic(TopicName.get((String)topicName).getPartition(i).toString(), false).get()).get();
                Assert.assertNotNull((Object)topic.getManagedLedger().getConfig().getLedgerOffloader());
                Assert.assertEquals((String)topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName(), (String)"mock");
            }
        } else {
            PersistentTopic topic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopic(topicName, false).get()).get();
            Assert.assertNotNull((Object)topic.getManagedLedger().getConfig().getLedgerOffloader());
            Assert.assertEquals((String)topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName(), (String)"mock");
        }
        LedgerOffloader namespaceOffloader = (LedgerOffloader)Mockito.mock(LedgerOffloader.class);
        Mockito.when((Object)namespaceOffloader.getOffloadDriverName()).thenReturn((Object)"s3");
        HashMap<NamespaceName, LedgerOffloader> map = new HashMap<NamespaceName, LedgerOffloader>();
        map.put(TopicName.get((String)topicName).getNamespaceObject(), namespaceOffloader);
        ((PulsarService)Mockito.doReturn(map).when((Object)this.pulsar)).getLedgerOffloaderMap();
        this.admin.topics().removeOffloadPolicies(topicName);
        Awaitility.await().untilAsserted(() -> Assert.assertNull((Object)this.admin.topics().getOffloadPolicies(topicName)));
        if (isPartitioned) {
            ((LedgerOffloader)Mockito.verify((Object)topicOffloader, (VerificationMode)Mockito.times((int)partitionNum))).close();
        } else {
            ((LedgerOffloader)Mockito.verify((Object)topicOffloader)).close();
        }
        if (isPartitioned) {
            for (int i = 0; i < partitionNum; ++i) {
                PersistentTopic topic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(TopicName.get((String)topicName).getPartition(i).toString()).get()).get();
                Assert.assertNotNull((Object)topic.getManagedLedger().getConfig().getLedgerOffloader());
                Assert.assertEquals((String)topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName(), (String)"s3");
            }
        } else {
            PersistentTopic topic = (PersistentTopic)((Optional)this.pulsar.getBrokerService().getTopic(topicName, false).get()).get();
            Assert.assertNotNull((Object)topic.getManagedLedger().getConfig().getLedgerOffloader());
            Assert.assertEquals((String)topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName(), (String)"s3");
        }
    }
}

