/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.io.BatchSourceConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest;
import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils;
import org.apache.pulsar.io.AbstractPulsarE2ETest;
import org.apache.pulsar.io.batchdiscovery.ImmediateTriggerer;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups={"broker-io"})
public class PulsarBatchSourceE2ETest
extends AbstractPulsarE2ETest {
    private void testPulsarBatchSourceStats(String jarFilePathUrl) throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sinkTopic = "persistent://external-repl-prop/io/output";
        String sourceName = "PulsarBatchSource";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        SourceConfig sourceConfig = PulsarBatchSourceE2ETest.createSourceConfig("external-repl-prop", "io", "PulsarBatchSource", "persistent://external-repl-prop/io/output");
        sourceConfig.setBatchSourceConfig(PulsarBatchSourceE2ETest.createBatchSourceConfig());
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats((String)"persistent://external-repl-prop/io/output").publishers.size() == 1;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 10, 150L);
        String sinkTopic2 = "persistent://external-repl-prop/io/output-PulsarBatchSource";
        sourceConfig.setTopicName("persistent://external-repl-prop/io/output-PulsarBatchSource");
        if (jarFilePathUrl.startsWith("builtin")) {
            sourceConfig.setArchive(jarFilePathUrl);
            this.admin.sources().createSource(sourceConfig, jarFilePathUrl);
        } else {
            this.admin.sources().createSourceWithUrl(sourceConfig, jarFilePathUrl);
        }
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                TopicStats sourceStats = this.admin.topics().getStats("persistent://external-repl-prop/io/output-PulsarBatchSource");
                return sourceStats.publishers.size() == 1 && ((PublisherStats)sourceStats.publishers.get((int)0)).metadata != null && ((PublisherStats)sourceStats.publishers.get((int)0)).metadata.containsKey("id") && ((String)((PublisherStats)sourceStats.publishers.get((int)0)).metadata.get("id")).equals(String.format("%s/%s/%s", "external-repl-prop", "io", "PulsarBatchSource"));
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        TopicStats sourceStats = this.admin.topics().getStats("persistent://external-repl-prop/io/output-PulsarBatchSource");
        Assert.assertEquals((int)sourceStats.publishers.size(), (int)1);
        Assert.assertNotNull((Object)((PublisherStats)sourceStats.publishers.get((int)0)).metadata);
        Assert.assertTrue((boolean)((PublisherStats)sourceStats.publishers.get((int)0)).metadata.containsKey("id"));
        Assert.assertEquals((String)((String)((PublisherStats)sourceStats.publishers.get((int)0)).metadata.get("id")), (String)String.format("%s/%s/%s", "external-repl-prop", "io", "PulsarBatchSource"));
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats((String)"persistent://external-repl-prop/io/output-PulsarBatchSource").publishers.size() == 1 && this.admin.topics().getInternalStats((String)"persistent://external-repl-prop/io/output-PulsarBatchSource", (boolean)false).numberOfEntries > 4L;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/output-PulsarBatchSource").publishers.size(), (int)1);
        String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics((Integer)this.pulsar.getListenPortHTTP().get());
        log.info("prometheusMetrics: {}", (Object)prometheusMetrics);
        Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
        PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_source_received_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarBatchSource");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarBatchSource"));
        Assert.assertTrue((m.value > 0.0 ? 1 : 0) != 0);
        m = metrics.get("pulsar_source_received_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarBatchSource");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarBatchSource"));
        Assert.assertTrue((m.value > 0.0 ? 1 : 0) != 0);
        m = metrics.get("pulsar_source_written_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarBatchSource");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarBatchSource"));
        Assert.assertTrue((m.value > 0.0 ? 1 : 0) != 0);
        m = metrics.get("pulsar_source_written_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarBatchSource");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarBatchSource"));
        Assert.assertTrue((m.value > 0.0 ? 1 : 0) != 0);
        m = metrics.get("pulsar_source_source_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarBatchSource");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarBatchSource"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_source_source_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarBatchSource");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarBatchSource"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_source_system_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarBatchSource");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarBatchSource"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_source_system_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarBatchSource");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarBatchSource"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_source_last_invocation");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarBatchSource");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarBatchSource"));
        Assert.assertTrue((m.value > 0.0 ? 1 : 0) != 0);
        this.tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
        this.admin.sources().deleteSource("external-repl-prop", "io", "PulsarBatchSource");
    }

    @Test(timeOut=20000L, groups={"builtin"})
    public void testPulsarBatchSourceStatsBuiltin() throws Exception {
        String jarFilePathUrl = String.format("%s://batch-data-generator", "builtin");
        this.testPulsarBatchSourceStats(jarFilePathUrl);
    }

    @Test(timeOut=20000L)
    private void testPulsarBatchSourceStatsWithFile() throws Exception {
        String jarFilePathUrl = PulsarFunctionLocalRunTest.getPulsarIOBatchDataGeneratorNar().toURI().toString();
        this.testPulsarBatchSourceStats(jarFilePathUrl);
    }

    @Test(timeOut=40000L)
    private void testPulsarBatchSourceStatsWithUrl() throws Exception {
        this.testPulsarBatchSourceStats(this.fileServer.getUrl("/pulsar-io-batch-data-generator.nar"));
    }

    private static SourceConfig createSourceConfig(String tenant, String namespace, String functionName, String sinkTopic) {
        SourceConfig sourceConfig = new SourceConfig();
        sourceConfig.setTenant(tenant);
        sourceConfig.setNamespace(namespace);
        sourceConfig.setName(functionName);
        sourceConfig.setParallelism(Integer.valueOf(1));
        sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        sourceConfig.setTopicName(sinkTopic);
        return sourceConfig;
    }

    private static BatchSourceConfig createBatchSourceConfig() {
        return BatchSourceConfig.builder().discoveryTriggererClassName(ImmediateTriggerer.class.getName()).build();
    }
}

