package org.apache.pulsar.io;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.apache.pulsar.functions.LocalRunner;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest;
import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = {"broker-io"})
/* loaded from: input_file:org/apache/pulsar/io/PulsarSinkE2ETest.class */
public class PulsarSinkE2ETest extends AbstractPulsarE2ETest {
    @Test
    public void testReadCompactedSink() throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", Sets.newHashSet(Lists.newArrayList(new String[]{"use"})));
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic2").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://external-repl-prop/io/my-topic2"}).subscriptionName("test-sub").readCompacted(true).subscribe().close();
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 20; i++) {
            String str = "key" + (i % 10);
            String str2 = "my-message-" + str + i;
            create.newMessage().key(str).value(str2).send();
            hashMap.put(str, str2);
        }
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
        try {
            new TwoPhaseCompactor(this.config, this.pulsarClient, this.pulsar.getBookKeeperClient(), newSingleThreadScheduledExecutor).compact("persistent://external-repl-prop/io/my-topic2").get();
            SinkConfig createSinkConfig = createSinkConfig("external-repl-prop", "io", "PulsarFunction-test", "persistent://external-repl-prop/io/my-topic2", "test-sub");
            createSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
            HashMap hashMap2 = new HashMap();
            hashMap2.put("readCompacted", "true");
            createSinkConfig.setInputSpecs(Collections.singletonMap("persistent://external-repl-prop/io/my-topic2", ConsumerConfig.builder().consumerProperties(hashMap2).build()));
            this.admin.sink().createSinkWithUrl(createSinkConfig, PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar().toURI().toString());
            MockedPulsarServiceBaseTest.retryStrategically(r6 -> {
                try {
                    return PulsarFunctionTestUtils.parseMetrics(PulsarFunctionTestUtils.getPrometheusMetrics(((Integer) this.pulsar.getListenPortHTTP().get()).intValue())).get("pulsar_sink_received_total").value == 10.0d;
                } catch (Exception e) {
                    return false;
                }
            }, 50, 1000L);
            create.close();
            if (Collections.singletonList(newSingleThreadScheduledExecutor).get(0) != null) {
                newSingleThreadScheduledExecutor.shutdownNow();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newSingleThreadScheduledExecutor).get(0) != null) {
                newSingleThreadScheduledExecutor.shutdownNow();
            }
            throw th;
        }
    }

    @Test(timeOut = 30000)
    private void testPulsarSinkDLQ() throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", Sets.newHashSet(Lists.newArrayList(new String[]{"use"})));
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/input").create();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://external-repl-prop/io/input-DLQ"}).subscriptionName("test-sub").subscribe();
        SinkConfig createSinkConfig = createSinkConfig("external-repl-prop", "io", "PulsarSink-test", "persistent://external-repl-prop/io/input", "test-sub");
        createSinkConfig.setNegativeAckRedeliveryDelayMs(1001L);
        createSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        createSinkConfig.setMaxMessageRetries(2);
        createSinkConfig.setDeadLetterTopic("persistent://external-repl-prop/io/input-DLQ");
        createSinkConfig.setInputSpecs(Collections.singletonMap("persistent://external-repl-prop/io/input", ConsumerConfig.builder().receiverQueueSize(1000).build()));
        createSinkConfig.setClassName(SinkForTest.class.getName());
        LocalRunner build = LocalRunner.builder().sinkConfig(createSinkConfig).clientAuthPlugin(AuthenticationTls.class.getName()).clientAuthParams(String.format("tlsCertFile:%s,tlsKeyFile:%s", "./src/test/resources/authentication/tls/client-cert.pem", "./src/test/resources/authentication/tls/client-key.pem")).useTls(true).tlsTrustCertFilePath("./src/test/resources/authentication/tls/cacert.pem").tlsAllowInsecureConnection(true).tlsHostNameVerificationEnabled(false).brokerServiceUrl(this.pulsar.getBrokerServiceUrlTls()).build();
        try {
            build.start(false);
            MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
                try {
                    TopicStats stats = this.admin.topics().getStats("persistent://external-repl-prop/io/input");
                    if (stats.subscriptions.containsKey("test-sub") && ((SubscriptionStats) stats.subscriptions.get("test-sub")).consumers.size() == 1) {
                        if (((ConsumerStats) ((SubscriptionStats) stats.subscriptions.get("test-sub")).consumers.get(0)).availablePermits == 1000) {
                            return true;
                        }
                    }
                    return false;
                } catch (PulsarAdminException e) {
                    return false;
                }
            }, 50, 150L);
            HashSet hashSet = new HashSet();
            for (int i = 0; i < 10; i++) {
                String str = "fail" + i;
                create.newMessage().property("key", "value").value(str).send();
                hashSet.add(str);
            }
            for (int i2 = 0; i2 < 10; i2++) {
                Message receive = subscribe.receive(10, TimeUnit.SECONDS);
                Assert.assertNotNull(receive);
                hashSet.remove(receive.getValue());
            }
            Assert.assertEquals(hashSet, Collections.emptySet());
            create.close();
            subscribe.close();
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    private void testPulsarSinkStats(String str) throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", Sets.newHashSet(Lists.newArrayList(new String[]{"use"})));
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/input").create();
        SinkConfig createSinkConfig = createSinkConfig("external-repl-prop", "io", "PulsarSink-test", "persistent://external-repl-prop/io/input", "test-sub");
        createSinkConfig.setInputSpecs(Collections.singletonMap("persistent://external-repl-prop/io/input", ConsumerConfig.builder().receiverQueueSize(1000).build()));
        if (str.startsWith("builtin")) {
            createSinkConfig.setArchive(str);
            this.admin.sinks().createSink(createSinkConfig, (String) null);
        } else {
            this.admin.sinks().createSinkWithUrl(createSinkConfig, str);
        }
        createSinkConfig.setInputSpecs(Collections.singletonMap("persistent://external-repl-prop/io/input", ConsumerConfig.builder().receiverQueueSize(523).build()));
        if (str.startsWith("builtin")) {
            createSinkConfig.setArchive(str);
            this.admin.sinks().updateSink(createSinkConfig, (String) null);
        } else {
            this.admin.sinks().updateSinkWithUrl(createSinkConfig, str);
        }
        MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
            try {
                TopicStats stats = this.admin.topics().getStats("persistent://external-repl-prop/io/input");
                if (stats.subscriptions.containsKey("test-sub") && ((SubscriptionStats) stats.subscriptions.get("test-sub")).consumers.size() == 1) {
                    if (((ConsumerStats) ((SubscriptionStats) stats.subscriptions.get("test-sub")).consumers.get(0)).availablePermits == 523) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        TopicStats stats = this.admin.topics().getStats("persistent://external-repl-prop/io/input");
        Assert.assertEquals(stats.subscriptions.size(), 1);
        Assert.assertTrue(stats.subscriptions.containsKey("test-sub"));
        Assert.assertEquals(((SubscriptionStats) stats.subscriptions.get("test-sub")).consumers.size(), 1);
        Assert.assertEquals(((ConsumerStats) ((SubscriptionStats) stats.subscriptions.get("test-sub")).consumers.get(0)).availablePermits, 523);
        String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(((Integer) this.pulsar.getListenPortHTTP().get()).intValue());
        log.info("prometheus metrics: {}", prometheusMetrics);
        Map<String, PulsarFunctionTestUtils.Metric> parseMetrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
        PulsarFunctionTestUtils.Metric metric = parseMetrics.get("pulsar_sink_received_total");
        Assert.assertEquals(metric.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric.tags.get("instance_id"), "0");
        Assert.assertEquals(metric.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric2 = parseMetrics.get("pulsar_sink_received_total_1min");
        Assert.assertEquals(metric2.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric2.tags.get("instance_id"), "0");
        Assert.assertEquals(metric2.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric2.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric2.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric2.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric3 = parseMetrics.get("pulsar_sink_written_total");
        Assert.assertEquals(metric3.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric3.tags.get("instance_id"), "0");
        Assert.assertEquals(metric3.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric3.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric3.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric3.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric4 = parseMetrics.get("pulsar_sink_written_total_1min");
        Assert.assertEquals(metric4.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric4.tags.get("instance_id"), "0");
        Assert.assertEquals(metric4.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric4.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric4.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric4.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric5 = parseMetrics.get("pulsar_sink_sink_exceptions_total");
        Assert.assertEquals(metric5.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric5.tags.get("instance_id"), "0");
        Assert.assertEquals(metric5.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric5.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric5.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric5.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric6 = parseMetrics.get("pulsar_sink_sink_exceptions_total_1min");
        Assert.assertEquals(metric6.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric6.tags.get("instance_id"), "0");
        Assert.assertEquals(metric6.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric6.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric6.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric6.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric7 = parseMetrics.get("pulsar_sink_system_exceptions_total");
        Assert.assertEquals(metric7.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric7.tags.get("instance_id"), "0");
        Assert.assertEquals(metric7.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric7.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric7.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric7.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric8 = parseMetrics.get("pulsar_sink_system_exceptions_total_1min");
        Assert.assertEquals(metric8.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric8.tags.get("instance_id"), "0");
        Assert.assertEquals(metric8.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric8.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric8.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric8.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric9 = parseMetrics.get("pulsar_sink_last_invocation");
        Assert.assertEquals(metric9.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric9.tags.get("instance_id"), "0");
        Assert.assertEquals(metric9.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric9.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric9.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric9.value, 0.0d);
        int i = 10;
        for (int i2 = 0; i2 < 10; i2++) {
            create.newMessage().property("key", "value").value("my-message-" + i2).send();
        }
        MockedPulsarServiceBaseTest.retryStrategically(r7 -> {
            try {
                SubscriptionStats subscriptionStats = (SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/input").subscriptions.get("test-sub");
                if (subscriptionStats.unackedMessages == 0) {
                    if (subscriptionStats.msgThroughputOut == i) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200L);
        String prometheusMetrics2 = PulsarFunctionTestUtils.getPrometheusMetrics(((Integer) this.pulsar.getListenPortHTTP().get()).intValue());
        log.info("prometheusMetrics: {}", prometheusMetrics2);
        Map<String, PulsarFunctionTestUtils.Metric> parseMetrics2 = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics2);
        PulsarFunctionTestUtils.Metric metric10 = parseMetrics2.get("pulsar_sink_received_total");
        Assert.assertEquals(metric10.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric10.tags.get("instance_id"), "0");
        Assert.assertEquals(metric10.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric10.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric10.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric10.value, 10);
        PulsarFunctionTestUtils.Metric metric11 = parseMetrics2.get("pulsar_sink_received_total_1min");
        Assert.assertEquals(metric11.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric11.tags.get("instance_id"), "0");
        Assert.assertEquals(metric11.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric11.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric11.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric11.value, 10);
        PulsarFunctionTestUtils.Metric metric12 = parseMetrics2.get("pulsar_sink_written_total");
        Assert.assertEquals(metric12.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric12.tags.get("instance_id"), "0");
        Assert.assertEquals(metric12.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric12.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric12.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric12.value, 10);
        PulsarFunctionTestUtils.Metric metric13 = parseMetrics2.get("pulsar_sink_written_total_1min");
        Assert.assertEquals(metric13.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric13.tags.get("instance_id"), "0");
        Assert.assertEquals(metric13.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric13.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric13.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric13.value, 10);
        PulsarFunctionTestUtils.Metric metric14 = parseMetrics2.get("pulsar_sink_sink_exceptions_total");
        Assert.assertEquals(metric14.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric14.tags.get("instance_id"), "0");
        Assert.assertEquals(metric14.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric14.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric14.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric14.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric15 = parseMetrics2.get("pulsar_sink_sink_exceptions_total_1min");
        Assert.assertEquals(metric15.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric15.tags.get("instance_id"), "0");
        Assert.assertEquals(metric15.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric15.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric15.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric15.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric16 = parseMetrics2.get("pulsar_sink_system_exceptions_total");
        Assert.assertEquals(metric16.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric16.tags.get("instance_id"), "0");
        Assert.assertEquals(metric16.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric16.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric16.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric16.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric17 = parseMetrics2.get("pulsar_sink_system_exceptions_total_1min");
        Assert.assertEquals(metric17.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric17.tags.get("instance_id"), "0");
        Assert.assertEquals(metric17.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric17.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric17.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric17.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric18 = parseMetrics2.get("pulsar_sink_last_invocation");
        Assert.assertEquals(metric18.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric18.tags.get("instance_id"), "0");
        Assert.assertEquals(metric18.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric18.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric18.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertTrue(metric18.value > 0.0d);
        this.admin.sinks().deleteSink("external-repl-prop", "io", "PulsarSink-test");
        MockedPulsarServiceBaseTest.retryStrategically(r42 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/input").subscriptions.size() == 0;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/input").subscriptions.size(), 0);
        this.tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
    }

    @Test(timeOut = 20000, groups = {"builtin"})
    public void testPulsarSinkStatsBuiltin() throws Exception {
        testPulsarSinkStats(String.format("%s://data-generator", "builtin"));
    }

    @Test(timeOut = 20000)
    public void testPulsarSinkStatsWithFile() throws Exception {
        testPulsarSinkStats(PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar().toURI().toString());
    }

    @Test(timeOut = 40000)
    public void testPulsarSinkStatsWithUrl() throws Exception {
        testPulsarSinkStats(this.fileServer.getUrl("/pulsar-io-data-generator.nar"));
    }

    private static SinkConfig createSinkConfig(String str, String str2, String str3, String str4, String str5) {
        SinkConfig sinkConfig = new SinkConfig();
        sinkConfig.setTenant(str);
        sinkConfig.setNamespace(str2);
        sinkConfig.setName(str3);
        sinkConfig.setParallelism(1);
        sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        sinkConfig.setInputSpecs(Collections.singletonMap(str4, ConsumerConfig.builder().build()));
        sinkConfig.setSourceSubscriptionName(str5);
        sinkConfig.setCleanupSubscription(true);
        return sinkConfig;
    }
}
