/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.apache.pulsar.functions.LocalRunner;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest;
import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils;
import org.apache.pulsar.io.AbstractPulsarE2ETest;
import org.apache.pulsar.io.SinkForTest;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups={"broker-io"})
public class PulsarSinkE2ETest
extends AbstractPulsarE2ETest {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReadCompactedSink() throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic = "persistent://external-repl-prop/io/my-topic2";
        String sinkName = "PulsarFunction-test";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        int messageNum = 20;
        int maxKeys = 10;
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic2").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://external-repl-prop/io/my-topic2"}).subscriptionName("test-sub").readCompacted(true).subscribe().close();
        HashMap<String, String> expected = new HashMap<String, String>();
        for (int j = 0; j < 20; ++j) {
            String key = "key" + j % 10;
            String value = "my-message-" + key + j;
            producer.newMessage().key(key).value((Object)value).send();
            expected.put(key, value);
        }
        ScheduledExecutorService compactionScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
        try {
            TwoPhaseCompactor twoPhaseCompactor = new TwoPhaseCompactor(this.config, this.pulsarClient, this.pulsar.getBookKeeperClient(), compactionScheduler);
            twoPhaseCompactor.compact("persistent://external-repl-prop/io/my-topic2").get();
            SinkConfig sinkConfig = PulsarSinkE2ETest.createSinkConfig("external-repl-prop", "io", "PulsarFunction-test", "persistent://external-repl-prop/io/my-topic2", "test-sub");
            sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
            HashMap<String, String> consumerProperties = new HashMap<String, String>();
            consumerProperties.put("readCompacted", "true");
            sinkConfig.setInputSpecs(Collections.singletonMap("persistent://external-repl-prop/io/my-topic2", ConsumerConfig.builder().consumerProperties(consumerProperties).build()));
            String jarFilePathUrl = PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar().toURI().toString();
            this.admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl);
            MockedPulsarServiceBaseTest.retryStrategically(test -> {
                try {
                    String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics((Integer)this.pulsar.getListenPortHTTP().get());
                    Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
                    PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_received_total");
                    return m.value == 10.0;
                }
                catch (Exception e) {
                    return false;
                }
            }, 50, 1000L);
            producer.close();
        }
        finally {
            if (Collections.singletonList(compactionScheduler).get(0) != null) {
                compactionScheduler.shutdownNow();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L)
    private void testPulsarSinkDLQ() throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic = "persistent://external-repl-prop/io/input";
        String dlqTopic = "persistent://external-repl-prop/io/input-DLQ";
        String sinkName = "PulsarSink-test";
        String propertyKey = "key";
        String propertyValue = "value";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/input").create();
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://external-repl-prop/io/input-DLQ"}).subscriptionName("test-sub").subscribe();
        SinkConfig sinkConfig = PulsarSinkE2ETest.createSinkConfig("external-repl-prop", "io", "PulsarSink-test", "persistent://external-repl-prop/io/input", "test-sub");
        sinkConfig.setNegativeAckRedeliveryDelayMs(Long.valueOf(1001L));
        sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        sinkConfig.setMaxMessageRetries(Integer.valueOf(2));
        sinkConfig.setDeadLetterTopic("persistent://external-repl-prop/io/input-DLQ");
        sinkConfig.setInputSpecs(Collections.singletonMap("persistent://external-repl-prop/io/input", ConsumerConfig.builder().receiverQueueSize(Integer.valueOf(1000)).build()));
        sinkConfig.setClassName(SinkForTest.class.getName());
        LocalRunner localRunner = LocalRunner.builder().sinkConfig(sinkConfig).clientAuthPlugin(AuthenticationTls.class.getName()).clientAuthParams(String.format("tlsCertFile:%s,tlsKeyFile:%s", "./src/test/resources/authentication/tls/client-cert.pem", "./src/test/resources/authentication/tls/client-key.pem")).useTls(true).tlsTrustCertFilePath("./src/test/resources/authentication/tls/cacert.pem").tlsAllowInsecureConnection(true).tlsHostNameVerificationEnabled(false).brokerServiceUrl(this.pulsar.getBrokerServiceUrlTls()).build();
        try {
            int i;
            localRunner.start(false);
            MockedPulsarServiceBaseTest.retryStrategically(test -> {
                try {
                    TopicStats topicStats = this.admin.topics().getStats("persistent://external-repl-prop/io/input");
                    return topicStats.subscriptions.containsKey("test-sub") && ((SubscriptionStats)topicStats.subscriptions.get((Object)"test-sub")).consumers.size() == 1 && ((ConsumerStats)((SubscriptionStats)topicStats.subscriptions.get((Object)"test-sub")).consumers.get((int)0)).availablePermits == 1000;
                }
                catch (PulsarAdminException e) {
                    return false;
                }
            }, 50, 150L);
            int totalMsgs = 10;
            HashSet<String> remainingMessagesToReceive = new HashSet<String>();
            for (i = 0; i < totalMsgs; ++i) {
                String messageBody = "fail" + i;
                producer.newMessage().property("key", "value").value((Object)messageBody).send();
                remainingMessagesToReceive.add(messageBody);
            }
            for (i = 0; i < totalMsgs; ++i) {
                Message message = consumer.receive(10, TimeUnit.SECONDS);
                Assert.assertNotNull((Object)message);
                remainingMessagesToReceive.remove(message.getValue());
            }
            Assert.assertEquals(remainingMessagesToReceive, Collections.emptySet());
            producer.close();
            consumer.close();
        }
        finally {
            if (Collections.singletonList(localRunner).get(0) != null) {
                localRunner.close();
            }
        }
    }

    private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
        String namespacePortion = "io";
        String replNamespace = "external-repl-prop/io";
        String sourceTopic = "persistent://external-repl-prop/io/input";
        String sinkName = "PulsarSink-test";
        String propertyKey = "key";
        String propertyValue = "value";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", (Set)clusters);
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/input").create();
        SinkConfig sinkConfig = PulsarSinkE2ETest.createSinkConfig("external-repl-prop", "io", "PulsarSink-test", "persistent://external-repl-prop/io/input", "test-sub");
        sinkConfig.setInputSpecs(Collections.singletonMap("persistent://external-repl-prop/io/input", ConsumerConfig.builder().receiverQueueSize(Integer.valueOf(1000)).build()));
        if (jarFilePathUrl.startsWith("builtin")) {
            sinkConfig.setArchive(jarFilePathUrl);
            this.admin.sinks().createSink(sinkConfig, null);
        } else {
            this.admin.sinks().createSinkWithUrl(sinkConfig, jarFilePathUrl);
        }
        sinkConfig.setInputSpecs(Collections.singletonMap("persistent://external-repl-prop/io/input", ConsumerConfig.builder().receiverQueueSize(Integer.valueOf(523)).build()));
        if (jarFilePathUrl.startsWith("builtin")) {
            sinkConfig.setArchive(jarFilePathUrl);
            this.admin.sinks().updateSink(sinkConfig, null);
        } else {
            this.admin.sinks().updateSinkWithUrl(sinkConfig, jarFilePathUrl);
        }
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                TopicStats topicStats = this.admin.topics().getStats("persistent://external-repl-prop/io/input");
                return topicStats.subscriptions.containsKey("test-sub") && ((SubscriptionStats)topicStats.subscriptions.get((Object)"test-sub")).consumers.size() == 1 && ((ConsumerStats)((SubscriptionStats)topicStats.subscriptions.get((Object)"test-sub")).consumers.get((int)0)).availablePermits == 523;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        TopicStats topicStats = this.admin.topics().getStats("persistent://external-repl-prop/io/input");
        Assert.assertEquals((int)topicStats.subscriptions.size(), (int)1);
        Assert.assertTrue((boolean)topicStats.subscriptions.containsKey("test-sub"));
        Assert.assertEquals((int)((SubscriptionStats)topicStats.subscriptions.get((Object)"test-sub")).consumers.size(), (int)1);
        Assert.assertEquals((int)((ConsumerStats)((SubscriptionStats)topicStats.subscriptions.get((Object)"test-sub")).consumers.get((int)0)).availablePermits, (int)523);
        String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics((Integer)this.pulsar.getListenPortHTTP().get());
        log.info("prometheus metrics: {}", (Object)prometheusMetrics);
        Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
        PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_received_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_received_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_written_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_written_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_sink_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_system_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_system_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_last_invocation");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        int totalMsgs = 10;
        for (int i = 0; i < totalMsgs; ++i) {
            String data = "my-message-" + i;
            producer.newMessage().property("key", "value").value((Object)data).send();
        }
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                SubscriptionStats subStats = (SubscriptionStats)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/input").subscriptions.get("test-sub");
                return subStats.unackedMessages == 0L && subStats.msgThroughputOut == (double)totalMsgs;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200L);
        prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics((Integer)this.pulsar.getListenPortHTTP().get());
        log.info("prometheusMetrics: {}", (Object)prometheusMetrics);
        metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
        m = metrics.get("pulsar_sink_received_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)totalMsgs);
        m = metrics.get("pulsar_sink_received_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)totalMsgs);
        m = metrics.get("pulsar_sink_written_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)totalMsgs);
        m = metrics.get("pulsar_sink_written_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)totalMsgs);
        m = metrics.get("pulsar_sink_sink_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_system_exceptions_total");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_system_exceptions_total_1min");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertEquals((double)m.value, (double)0.0);
        m = metrics.get("pulsar_sink_last_invocation");
        Assert.assertEquals((String)m.tags.get("cluster"), (String)this.config.getClusterName());
        Assert.assertEquals((String)m.tags.get("instance_id"), (String)"0");
        Assert.assertEquals((String)m.tags.get("name"), (String)"PulsarSink-test");
        Assert.assertEquals((String)m.tags.get("namespace"), (String)String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals((String)m.tags.get("fqfn"), (String)FunctionCommon.getFullyQualifiedName((String)"external-repl-prop", (String)"io", (String)"PulsarSink-test"));
        Assert.assertTrue((m.value > 0.0 ? 1 : 0) != 0);
        this.admin.sinks().deleteSink("external-repl-prop", "io", "PulsarSink-test");
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats((String)"persistent://external-repl-prop/io/input").subscriptions.size() == 0;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats((String)"persistent://external-repl-prop/io/input").subscriptions.size(), (int)0);
        this.tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
    }

    @Test(timeOut=20000L, groups={"builtin"})
    public void testPulsarSinkStatsBuiltin() throws Exception {
        String jarFilePathUrl = String.format("%s://data-generator", "builtin");
        this.testPulsarSinkStats(jarFilePathUrl);
    }

    @Test(timeOut=20000L)
    public void testPulsarSinkStatsWithFile() throws Exception {
        String jarFilePathUrl = PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar().toURI().toString();
        this.testPulsarSinkStats(jarFilePathUrl);
    }

    @Test(timeOut=40000L)
    public void testPulsarSinkStatsWithUrl() throws Exception {
        this.testPulsarSinkStats(this.fileServer.getUrl("/pulsar-io-data-generator.nar"));
    }

    private static SinkConfig createSinkConfig(String tenant, String namespace, String functionName, String sourceTopic, String subName) {
        SinkConfig sinkConfig = new SinkConfig();
        sinkConfig.setTenant(tenant);
        sinkConfig.setNamespace(namespace);
        sinkConfig.setName(functionName);
        sinkConfig.setParallelism(Integer.valueOf(1));
        sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().build()));
        sinkConfig.setSourceSubscriptionName(subName);
        sinkConfig.setCleanupSubscription(Boolean.valueOf(true));
        return sinkConfig;
    }
}

