package org.apache.pulsar.io;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
import org.apache.pulsar.common.policies.data.FunctionInstanceStatsImpl;
import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest;
import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = {"broker-io"})
/* loaded from: input_file:org/apache/pulsar/io/PulsarFunctionE2ETest.class */
public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest {

    /* loaded from: input_file:org/apache/pulsar/io/PulsarFunctionE2ETest$ByteBufferFunction.class */
    public static class ByteBufferFunction implements Function<ByteBuffer, ByteBuffer> {
        public ByteBuffer process(ByteBuffer byteBuffer, Context context) throws Exception {
            Assert.assertTrue(byteBuffer.isDirect());
            return byteBuffer;
        }
    }

    protected static FunctionConfig createFunctionConfig(String str, String str2, String str3, String str4, String str5, String str6) {
        FunctionConfig functionConfig = new FunctionConfig();
        functionConfig.setTenant(str);
        functionConfig.setNamespace(str2);
        functionConfig.setName(str3);
        functionConfig.setParallelism(1);
        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
        functionConfig.setSubName(str6);
        if (str4 != null) {
            functionConfig.setTopicsPattern(String.format("persistent://%s/%s/%s", str, str2, str4));
        }
        functionConfig.setAutoAck(true);
        functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
        functionConfig.setOutput(str5);
        functionConfig.setCleanupSubscription(true);
        return functionConfig;
    }

    private void testE2EPulsarFunction(String str) throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", Sets.newHashSet(Lists.newArrayList(new String[]{"use"})));
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").create();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://external-repl-prop/io/output2"}).subscriptionName("sub").subscribe();
        FunctionConfig createFunctionConfig = createFunctionConfig("external-repl-prop", "io", "PulsarFunction-test", "my.*", "persistent://external-repl-prop/io/output", "test-sub");
        createFunctionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        this.admin.functions().createFunctionWithUrl(createFunctionConfig, str);
        createFunctionConfig.setParallelism(2);
        createFunctionConfig.setOutput("persistent://external-repl-prop/io/output2");
        this.admin.functions().updateFunctionWithUrl(createFunctionConfig, str);
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            TopicStats stats = this.admin.topics().getStats("persistent://external-repl-prop/io/output2");
            Assert.assertEquals(stats.getPublishers().size(), 2);
            Assert.assertNotNull(((PublisherStats) stats.getPublishers().get(0)).getMetadata());
            Assert.assertTrue(((PublisherStats) stats.getPublishers().get(0)).getMetadata().containsKey("id"));
            Assert.assertEquals((String) ((PublisherStats) stats.getPublishers().get(0)).getMetadata().get("id"), String.format("%s/%s/%s", "external-repl-prop", "io", "PulsarFunction-test"));
        });
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), 1);
        });
        for (int i = 0; i < 5; i++) {
            create.newMessage().property("key", "value").value("my-message-" + i).send();
        }
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().get("test-sub")).getUnackedMessages(), 0L);
        });
        Assert.assertEquals("value", subscribe.receive(5, TimeUnit.SECONDS).getProperty("key"));
        Assert.assertNotEquals(Long.valueOf(((SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().values().iterator().next()).getUnackedMessages()), 5);
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarFunction-test");
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), 0);
        });
        this.tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
    }

    @Test(timeOut = 20000)
    public void testE2EPulsarFunctionWithFile() throws Exception {
        testE2EPulsarFunction(PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().toURI().toString());
    }

    @Test(timeOut = 40000)
    public void testE2EPulsarFunctionWithUrl() throws Exception {
        testE2EPulsarFunction(this.fileServer.getUrl("/pulsar-functions-api-examples.jar"));
    }

    @Test(timeOut = 30000)
    public void testReadCompactedFunction() throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", Sets.newHashSet(Lists.newArrayList(new String[]{"use"})));
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://external-repl-prop/io/my-topic1"}).subscriptionName("test-sub").readCompacted(true).subscribe().close();
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 20; i++) {
            String str = "key" + (i % 10);
            String str2 = "my-message-" + str + i;
            create.newMessage().key(str).value(str2).send();
            hashMap.put(str, str2);
        }
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
        try {
            new TwoPhaseCompactor(this.config, this.pulsarClient, this.pulsar.getBookKeeperClient(), newSingleThreadScheduledExecutor).compact("persistent://external-repl-prop/io/my-topic1").get();
            FunctionConfig createFunctionConfig = createFunctionConfig("external-repl-prop", "io", "PulsarFunction-test", null, "persistent://external-repl-prop/io/output", "test-sub");
            HashMap hashMap2 = new HashMap();
            ConsumerConfig consumerConfig = new ConsumerConfig();
            HashMap hashMap3 = new HashMap();
            hashMap3.put("readCompacted", "true");
            consumerConfig.setConsumerProperties(hashMap3);
            hashMap2.put("persistent://external-repl-prop/io/my-topic1", consumerConfig);
            createFunctionConfig.setInputSpecs(hashMap2);
            this.admin.functions().createFunctionWithUrl(createFunctionConfig, PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().toURI().toString());
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://external-repl-prop/io/output"}).subscriptionName("sink-sub").subscribe();
            int i2 = 0;
            while (true) {
                Message receive = subscribe.receive(10, TimeUnit.SECONDS);
                if (receive == null) {
                    break;
                }
                subscribe.acknowledge(receive);
                i2++;
                Assert.assertEquals(((String) hashMap.remove(receive.getKey())) + "!", (String) receive.getValue());
            }
            Assert.assertEquals(i2, 10);
            Assert.assertTrue(hashMap.isEmpty());
            subscribe.close();
            create.close();
            if (Collections.singletonList(newSingleThreadScheduledExecutor).get(0) != null) {
                newSingleThreadScheduledExecutor.shutdownNow();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(newSingleThreadScheduledExecutor).get(0) != null) {
                newSingleThreadScheduledExecutor.shutdownNow();
            }
            throw th;
        }
    }

    @Test(timeOut = 20000)
    public void testPulsarFunctionStats() throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", Sets.newHashSet(Lists.newArrayList(new String[]{"use"})));
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").create();
        String uri = PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().toURI().toString();
        FunctionConfig createFunctionConfig = createFunctionConfig("external-repl-prop", "io", "PulsarSink-test", "my.*", "persistent://external-repl-prop/io/output", "test-sub");
        this.admin.functions().createFunctionWithUrl(createFunctionConfig, uri);
        this.admin.functions().updateFunctionWithUrl(createFunctionConfig, uri);
        MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size() == 1;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), 1);
        FunctionRuntimeManager functionRuntimeManager = this.functionsWorkerService.getFunctionRuntimeManager();
        FunctionStatsImpl functionStats = functionRuntimeManager.getFunctionStats("external-repl-prop", "io", "PulsarSink-test", (URI) null);
        Assert.assertEquals(functionStats, this.admin.functions().getFunctionStats("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(functionStats.getReceivedTotal(), 0L);
        Assert.assertEquals(functionStats.getProcessedSuccessfullyTotal(), 0L);
        Assert.assertEquals(functionStats.getSystemExceptionsTotal(), 0L);
        Assert.assertEquals(functionStats.getUserExceptionsTotal(), 0L);
        Assert.assertNull(functionStats.avgProcessLatency);
        Assert.assertEquals(functionStats.oneMin.getReceivedTotal(), 0L);
        Assert.assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(), 0L);
        Assert.assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0L);
        Assert.assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0L);
        Assert.assertNull(functionStats.oneMin.getAvgProcessLatency());
        Assert.assertEquals(functionStats.getAvgProcessLatency(), functionStats.oneMin.getAvgProcessLatency());
        Assert.assertNull(functionStats.getLastInvocation());
        Assert.assertEquals(functionStats.instances.size(), 1);
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats.instances.get(0)).getInstanceId(), 0);
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats.instances.get(0)).getMetrics().getReceivedTotal(), 0L);
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats.instances.get(0)).getMetrics().getProcessedSuccessfullyTotal(), 0L);
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats.instances.get(0)).getMetrics().getSystemExceptionsTotal(), 0L);
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats.instances.get(0)).getMetrics().getUserExceptionsTotal(), 0L);
        Assert.assertNull(((FunctionInstanceStatsImpl) functionStats.instances.get(0)).getMetrics().getAvgProcessLatency());
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats.instances.get(0)).getMetrics().getOneMin().getReceivedTotal(), 0L);
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats.instances.get(0)).getMetrics().getOneMin().getProcessedSuccessfullyTotal(), 0L);
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats.instances.get(0)).getMetrics().getOneMin().getSystemExceptionsTotal(), 0L);
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats.instances.get(0)).getMetrics().getOneMin().getUserExceptionsTotal(), 0L);
        Assert.assertNull(((FunctionInstanceStatsImpl) functionStats.instances.get(0)).getMetrics().getOneMin().getAvgProcessLatency());
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats.instances.get(0)).getMetrics().getAvgProcessLatency(), ((FunctionInstanceStatsImpl) functionStats.instances.get(0)).getMetrics().getOneMin().getAvgProcessLatency());
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats.instances.get(0)).getMetrics().getAvgProcessLatency(), functionStats.getAvgProcessLatency());
        String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(((Integer) this.pulsar.getListenPortHTTP().get()).intValue());
        log.info("prometheus metrics: {}", prometheusMetrics);
        Map<String, PulsarFunctionTestUtils.Metric> parseMetrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
        PulsarFunctionTestUtils.Metric metric = parseMetrics.get("pulsar_function_received_total");
        Assert.assertEquals(metric.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric.tags.get("instance_id"), "0");
        Assert.assertEquals(metric.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric2 = parseMetrics.get("pulsar_function_received_total_1min");
        Assert.assertEquals(metric2.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric2.tags.get("instance_id"), "0");
        Assert.assertEquals(metric2.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric2.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric2.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric2.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric3 = parseMetrics.get("pulsar_function_user_exceptions_total");
        Assert.assertEquals(metric3.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric3.tags.get("instance_id"), "0");
        Assert.assertEquals(metric3.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric3.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric3.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric3.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric4 = parseMetrics.get("pulsar_function_user_exceptions_total_1min");
        Assert.assertEquals(metric4.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric4.tags.get("instance_id"), "0");
        Assert.assertEquals(metric4.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric4.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric4.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric4.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric5 = parseMetrics.get("pulsar_function_process_latency_ms");
        Assert.assertEquals(metric5.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric5.tags.get("instance_id"), "0");
        Assert.assertEquals(metric5.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric5.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric5.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric5.value, Double.NaN);
        PulsarFunctionTestUtils.Metric metric6 = parseMetrics.get("pulsar_function_process_latency_ms_1min");
        Assert.assertEquals(metric6.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric6.tags.get("instance_id"), "0");
        Assert.assertEquals(metric6.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric6.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric6.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric6.value, Double.NaN);
        PulsarFunctionTestUtils.Metric metric7 = parseMetrics.get("pulsar_function_system_exceptions_total");
        Assert.assertEquals(metric7.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric7.tags.get("instance_id"), "0");
        Assert.assertEquals(metric7.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric7.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric7.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric7.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric8 = parseMetrics.get("pulsar_function_system_exceptions_total_1min");
        Assert.assertEquals(metric8.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric8.tags.get("instance_id"), "0");
        Assert.assertEquals(metric8.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric8.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric8.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric8.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric9 = parseMetrics.get("pulsar_function_last_invocation");
        Assert.assertEquals(metric9.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric9.tags.get("instance_id"), "0");
        Assert.assertEquals(metric9.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric9.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric9.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric9.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric10 = parseMetrics.get("pulsar_function_processed_successfully_total");
        Assert.assertEquals(metric10.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric10.tags.get("instance_id"), "0");
        Assert.assertEquals(metric10.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric10.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric10.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric10.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric11 = parseMetrics.get("pulsar_function_processed_successfully_total_1min");
        Assert.assertEquals(metric11.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric11.tags.get("instance_id"), "0");
        Assert.assertEquals(metric11.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric11.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric11.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric11.value, 0.0d);
        FunctionInstanceStatsDataImpl functionInstanceStats = functionRuntimeManager.getFunctionInstanceStats("external-repl-prop", "io", "PulsarSink-test", 0, (URI) null);
        Assert.assertEquals(functionInstanceStats, this.admin.functions().getFunctionStats("external-repl-prop", "io", "PulsarSink-test", 0));
        Assert.assertEquals(functionInstanceStats, ((FunctionInstanceStatsImpl) functionStats.instances.get(0)).getMetrics());
        int i = 10;
        for (int i2 = 0; i2 < 10; i2++) {
            create.newMessage().property("key", "value").value("my-message-" + i2).send();
        }
        MockedPulsarServiceBaseTest.retryStrategically(r7 -> {
            try {
                SubscriptionStats subscriptionStats = (SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().get("test-sub");
                if (subscriptionStats.getUnackedMessages() == 0) {
                    if (subscriptionStats.getMsgThroughputOut() == i) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200L);
        FunctionStatsImpl functionStats2 = functionRuntimeManager.getFunctionStats("external-repl-prop", "io", "PulsarSink-test", (URI) null);
        Assert.assertEquals(functionStats2, this.admin.functions().getFunctionStats("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(functionStats2.getReceivedTotal(), 10);
        Assert.assertEquals(functionStats2.getProcessedSuccessfullyTotal(), 10);
        Assert.assertEquals(functionStats2.getSystemExceptionsTotal(), 0L);
        Assert.assertEquals(functionStats2.getUserExceptionsTotal(), 0L);
        Assert.assertTrue(functionStats2.avgProcessLatency.doubleValue() > 0.0d);
        Assert.assertEquals(functionStats2.oneMin.getReceivedTotal(), 10);
        Assert.assertEquals(functionStats2.oneMin.getProcessedSuccessfullyTotal(), 10);
        Assert.assertEquals(functionStats2.oneMin.getSystemExceptionsTotal(), 0L);
        Assert.assertEquals(functionStats2.oneMin.getUserExceptionsTotal(), 0L);
        Assert.assertTrue(functionStats2.oneMin.getAvgProcessLatency().doubleValue() > 0.0d);
        Assert.assertEquals(functionStats2.getAvgProcessLatency(), functionStats2.oneMin.getAvgProcessLatency());
        Assert.assertTrue(functionStats2.getLastInvocation().longValue() > 0);
        Assert.assertEquals(functionStats2.instances.size(), 1);
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats2.instances.get(0)).getInstanceId(), 0);
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats2.instances.get(0)).getMetrics().getReceivedTotal(), 10);
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats2.instances.get(0)).getMetrics().getProcessedSuccessfullyTotal(), 10);
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats2.instances.get(0)).getMetrics().getSystemExceptionsTotal(), 0L);
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats2.instances.get(0)).getMetrics().getUserExceptionsTotal(), 0L);
        Assert.assertTrue(((FunctionInstanceStatsImpl) functionStats2.instances.get(0)).getMetrics().getAvgProcessLatency().doubleValue() > 0.0d);
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats2.instances.get(0)).getMetrics().getOneMin().getReceivedTotal(), 10);
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats2.instances.get(0)).getMetrics().getOneMin().getProcessedSuccessfullyTotal(), 10);
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats2.instances.get(0)).getMetrics().getOneMin().getSystemExceptionsTotal(), 0L);
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats2.instances.get(0)).getMetrics().getOneMin().getUserExceptionsTotal(), 0L);
        Assert.assertTrue(((FunctionInstanceStatsImpl) functionStats2.instances.get(0)).getMetrics().getOneMin().getAvgProcessLatency().doubleValue() > 0.0d);
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats2.instances.get(0)).getMetrics().getAvgProcessLatency(), ((FunctionInstanceStatsImpl) functionStats2.instances.get(0)).getMetrics().getOneMin().getAvgProcessLatency());
        Assert.assertEquals(((FunctionInstanceStatsImpl) functionStats2.instances.get(0)).getMetrics().getAvgProcessLatency(), functionStats2.getAvgProcessLatency());
        FunctionInstanceStatsDataImpl functionInstanceStats2 = functionRuntimeManager.getFunctionInstanceStats("external-repl-prop", "io", "PulsarSink-test", 0, (URI) null);
        Assert.assertEquals(functionInstanceStats2, this.admin.functions().getFunctionStats("external-repl-prop", "io", "PulsarSink-test", 0));
        Assert.assertEquals(functionInstanceStats2, ((FunctionInstanceStatsImpl) functionStats2.instances.get(0)).getMetrics());
        String prometheusMetrics2 = PulsarFunctionTestUtils.getPrometheusMetrics(((Integer) this.pulsar.getListenPortHTTP().get()).intValue());
        log.info("prometheus metrics: {}", prometheusMetrics2);
        Map<String, PulsarFunctionTestUtils.Metric> parseMetrics2 = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics2);
        PulsarFunctionTestUtils.Metric metric12 = parseMetrics2.get("pulsar_function_received_total");
        Assert.assertEquals(metric12.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric12.tags.get("instance_id"), "0");
        Assert.assertEquals(metric12.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric12.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric12.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric12.value, 10);
        PulsarFunctionTestUtils.Metric metric13 = parseMetrics2.get("pulsar_function_received_total_1min");
        Assert.assertEquals(metric13.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric13.tags.get("instance_id"), "0");
        Assert.assertEquals(metric13.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric13.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric13.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric13.value, 10);
        PulsarFunctionTestUtils.Metric metric14 = parseMetrics2.get("pulsar_function_user_exceptions_total");
        Assert.assertEquals(metric14.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric14.tags.get("instance_id"), "0");
        Assert.assertEquals(metric14.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric14.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric14.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric14.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric15 = parseMetrics2.get("pulsar_function_user_exceptions_total_1min");
        Assert.assertEquals(metric15.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric15.tags.get("instance_id"), "0");
        Assert.assertEquals(metric15.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric15.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric15.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric15.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric16 = parseMetrics2.get("pulsar_function_process_latency_ms");
        Assert.assertEquals(metric16.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric16.tags.get("instance_id"), "0");
        Assert.assertEquals(metric16.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric16.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric16.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertTrue(metric16.value > 0.0d);
        PulsarFunctionTestUtils.Metric metric17 = parseMetrics2.get("pulsar_function_process_latency_ms_1min");
        Assert.assertEquals(metric17.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric17.tags.get("instance_id"), "0");
        Assert.assertEquals(metric17.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric17.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric17.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertTrue(metric17.value > 0.0d);
        PulsarFunctionTestUtils.Metric metric18 = parseMetrics2.get("pulsar_function_system_exceptions_total");
        Assert.assertEquals(metric18.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric18.tags.get("instance_id"), "0");
        Assert.assertEquals(metric18.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric18.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric18.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric18.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric19 = parseMetrics2.get("pulsar_function_system_exceptions_total_1min");
        Assert.assertEquals(metric19.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric19.tags.get("instance_id"), "0");
        Assert.assertEquals(metric19.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric19.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric19.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric19.value, 0.0d);
        PulsarFunctionTestUtils.Metric metric20 = parseMetrics2.get("pulsar_function_last_invocation");
        Assert.assertEquals(metric20.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric20.tags.get("instance_id"), "0");
        Assert.assertEquals(metric20.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric20.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric20.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertTrue(metric20.value > 0.0d);
        PulsarFunctionTestUtils.Metric metric21 = parseMetrics2.get("pulsar_function_processed_successfully_total");
        Assert.assertEquals(metric21.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric21.tags.get("instance_id"), "0");
        Assert.assertEquals(metric21.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric21.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric21.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric21.value, 10);
        PulsarFunctionTestUtils.Metric metric22 = parseMetrics2.get("pulsar_function_processed_successfully_total_1min");
        Assert.assertEquals(metric22.tags.get("cluster"), this.config.getClusterName());
        Assert.assertEquals(metric22.tags.get("instance_id"), "0");
        Assert.assertEquals(metric22.tags.get("name"), "PulsarSink-test");
        Assert.assertEquals(metric22.tags.get("namespace"), String.format("%s/%s", "external-repl-prop", "io"));
        Assert.assertEquals(metric22.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName("external-repl-prop", "io", "PulsarSink-test"));
        Assert.assertEquals(metric22.value, 10);
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarSink-test");
        MockedPulsarServiceBaseTest.retryStrategically(r42 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size() == 0;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), 0);
        this.tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
    }

    @Test(timeOut = 20000)
    public void testPulsarFunctionStatus() throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", Sets.newHashSet(Lists.newArrayList(new String[]{"use"})));
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").create();
        String uri = PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().toURI().toString();
        FunctionConfig createFunctionConfig = createFunctionConfig("external-repl-prop", "io", "PulsarSink-test", "my.*", "persistent://external-repl-prop/io/output", "test-sub");
        this.admin.functions().createFunctionWithUrl(createFunctionConfig, uri);
        this.admin.functions().updateFunctionWithUrl(createFunctionConfig, uri);
        MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size() == 1;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), 1);
        int i = 10;
        for (int i2 = 0; i2 < 10; i2++) {
            create.newMessage().property("key", "value").value("my-message-" + i2).send();
        }
        MockedPulsarServiceBaseTest.retryStrategically(r7 -> {
            try {
                SubscriptionStats subscriptionStats = (SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().get("test-sub");
                if (subscriptionStats.getUnackedMessages() == 0) {
                    if (subscriptionStats.getMsgThroughputOut() == i) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200L);
        FunctionStatus functionStatus = this.admin.functions().getFunctionStatus("external-repl-prop", "io", "PulsarSink-test");
        Assert.assertEquals(functionStatus.getNumInstances(), 1);
        FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData status = ((FunctionStatus.FunctionInstanceStatus) functionStatus.getInstances().get(0)).getStatus();
        double numReceived = status.getNumReceived();
        double numSuccessfullyProcessed = status.getNumSuccessfullyProcessed();
        String workerId = status.getWorkerId();
        Assert.assertEquals((int) numReceived, 10);
        Assert.assertEquals((int) numSuccessfullyProcessed, 10);
        Assert.assertEquals(workerId, this.workerId);
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarSink-test");
        MockedPulsarServiceBaseTest.retryStrategically(r42 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size() == 0;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), 0);
    }

    @Test(dataProvider = "validRoleName")
    public void testAuthorization(boolean z) throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", Sets.newHashSet(Lists.newArrayList(new String[]{"use"})));
        this.admin.tenants().updateTenant("external-repl-prop", TenantInfo.builder().adminRoles(Collections.singleton(z ? "superUser" : "invalid")).allowedClusters(Collections.singleton("use")).build());
        String uri = PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().toURI().toString();
        FunctionConfig createFunctionConfig = createFunctionConfig("external-repl-prop", "io", "PulsarSink-test", "my.*", "persistent://external-repl-prop/io/output", "test-sub");
        if (z) {
            try {
                this.admin.functions().createFunctionWithUrl(createFunctionConfig, uri);
                Assert.assertTrue(z);
                return;
            } catch (PulsarAdminException.NotAuthorizedException e) {
                Assert.fail();
                return;
            }
        }
        this.admin = (PulsarAdmin) Mockito.spy(PulsarAdmin.builder().serviceHttpUrl(this.pulsar.getWebServiceAddressTls()).tlsTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem").allowTlsInsecureConnection(true).build());
        try {
            this.admin.functions().createFunctionWithUrl(createFunctionConfig, uri);
        } catch (PulsarAdminException.NotAuthorizedException e2) {
            Assert.assertFalse(z);
        }
    }

    @Test(timeOut = 20000)
    public void testFunctionStopAndRestartApi() throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", Sets.newHashSet(Lists.newArrayList(new String[]{"use"})));
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/restartFunction").create();
        this.admin.functions().createFunctionWithUrl(createFunctionConfig("external-repl-prop", "io", "PulsarSink-test", "restartFunction", "persistent://external-repl-prop/io/output", "test-sub"), PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().toURI().toString());
        MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
            try {
                SubscriptionStats subscriptionStats = (SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/restartFunction").getSubscriptions().get("test-sub");
                if (subscriptionStats != null) {
                    if (subscriptionStats.getConsumers().size() == 1) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/restartFunction").getSubscriptions().get("test-sub")).getConsumers().size(), 1);
        this.admin.functions().stopFunction("external-repl-prop", "io", "PulsarSink-test");
        MockedPulsarServiceBaseTest.retryStrategically(r42 -> {
            try {
                SubscriptionStats subscriptionStats = (SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/restartFunction").getSubscriptions().get("test-sub");
                if (subscriptionStats != null) {
                    if (subscriptionStats.getConsumers().size() == 0) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/restartFunction").getSubscriptions().get("test-sub")).getConsumers().size(), 0);
        this.admin.functions().restartFunction("external-repl-prop", "io", "PulsarSink-test");
        MockedPulsarServiceBaseTest.retryStrategically(r43 -> {
            try {
                SubscriptionStats subscriptionStats = (SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/restartFunction").getSubscriptions().get("test-sub");
                if (subscriptionStats != null) {
                    if (subscriptionStats.getConsumers().size() == 1) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/restartFunction").getSubscriptions().get("test-sub")).getConsumers().size(), 1);
        create.close();
    }

    @Test(timeOut = 20000)
    public void testFunctionAutomaticSubCleanup() throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", Sets.newHashSet(Lists.newArrayList(new String[]{"use"})));
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").create();
        String uri = PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().toURI().toString();
        FunctionConfig functionConfig = new FunctionConfig();
        functionConfig.setTenant("external-repl-prop");
        functionConfig.setNamespace("io");
        functionConfig.setName("PulsarFunction-test");
        functionConfig.setParallelism(1);
        functionConfig.setInputs(Collections.singleton("persistent://external-repl-prop/io/my-topic1"));
        functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
        functionConfig.setOutput("persistent://external-repl-prop/io/output");
        functionConfig.setCleanupSubscription(false);
        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
        this.admin.functions().createFunctionWithUrl(functionConfig, uri);
        MockedPulsarServiceBaseTest.retryStrategically(r6 -> {
            try {
                FunctionConfig function = this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test");
                if (function != null) {
                    if (function.getCleanupSubscription() != null) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertFalse(this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription().booleanValue());
        MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size() == 1;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), 1);
        functionConfig.setCleanupSubscription(true);
        this.admin.functions().updateFunctionWithUrl(functionConfig, uri);
        MockedPulsarServiceBaseTest.retryStrategically(r62 -> {
            try {
                return this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription().booleanValue();
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertTrue(this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription().booleanValue());
        int i = 10;
        for (int i2 = 0; i2 < 10; i2++) {
            create.newMessage().property("key", "value").value("my-message-" + i2).send();
        }
        MockedPulsarServiceBaseTest.retryStrategically(r7 -> {
            try {
                SubscriptionStats subscriptionStats = (SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().get(InstanceUtils.getDefaultSubscriptionName("external-repl-prop", "io", "PulsarFunction-test"));
                if (subscriptionStats.getUnackedMessages() == 0) {
                    if (subscriptionStats.getMsgThroughputOut() == i) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 5, 200L);
        FunctionStatus functionStatus = this.admin.functions().getFunctionStatus("external-repl-prop", "io", "PulsarFunction-test");
        Assert.assertEquals(functionStatus.getNumInstances(), 1);
        FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData status = ((FunctionStatus.FunctionInstanceStatus) functionStatus.getInstances().get(0)).getStatus();
        double numReceived = status.getNumReceived();
        double numSuccessfullyProcessed = status.getNumSuccessfullyProcessed();
        String workerId = status.getWorkerId();
        Assert.assertEquals((int) numReceived, 10);
        Assert.assertEquals((int) numSuccessfullyProcessed, 10);
        Assert.assertEquals(workerId, this.workerId);
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarFunction-test");
        MockedPulsarServiceBaseTest.retryStrategically(r42 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size() == 0;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), 0);
        functionConfig.setCleanupSubscription(false);
        this.admin.functions().createFunctionWithUrl(functionConfig, uri);
        MockedPulsarServiceBaseTest.retryStrategically(r43 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size() == 1;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), 1);
        MockedPulsarServiceBaseTest.retryStrategically(r63 -> {
            try {
                return !this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription().booleanValue();
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertFalse(this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription().booleanValue());
        functionConfig.setParallelism(2);
        this.admin.functions().updateFunctionWithUrl(functionConfig, uri);
        MockedPulsarServiceBaseTest.retryStrategically(r64 -> {
            try {
                FunctionConfig function = this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test");
                if (function.getParallelism().intValue() == 2) {
                    if (!function.getCleanupSubscription().booleanValue()) {
                        return true;
                    }
                }
                return false;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertFalse(this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test").getCleanupSubscription().booleanValue());
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarFunction-test");
        MockedPulsarServiceBaseTest.retryStrategically(r44 -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size() == 1;
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), 1);
    }

    @Test(timeOut = 20000)
    public void testMultiTopicFunction() throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", Sets.newHashSet(Lists.newArrayList(new String[]{"use"})));
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").create();
        try {
            create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic2").create();
            try {
                String uri = PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().toURI().toString();
                FunctionConfig functionConfig = new FunctionConfig();
                functionConfig.setTenant("external-repl-prop");
                functionConfig.setNamespace("io");
                functionConfig.setName("PulsarFunction-test");
                functionConfig.setParallelism(1);
                LinkedList linkedList = new LinkedList();
                linkedList.add("persistent://external-repl-prop/io/my-topic1");
                linkedList.add("persistent://external-repl-prop/io/my-topic2");
                functionConfig.setInputs(linkedList);
                functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
                functionConfig.setOutput("persistent://external-repl-prop/io/output");
                functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
                this.admin.functions().createFunctionWithUrl(functionConfig, uri);
                Assert.assertTrue(MockedPulsarServiceBaseTest.retryStrategically(r6 -> {
                    try {
                        this.admin.functions().getFunction("external-repl-prop", "io", "PulsarFunction-test");
                        return true;
                    } catch (PulsarAdminException e) {
                        return false;
                    }
                }, 50, 150L));
                Assert.assertTrue(MockedPulsarServiceBaseTest.retryStrategically(r4 -> {
                    try {
                        return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size() == 1;
                    } catch (PulsarAdminException e) {
                        return false;
                    }
                }, 50, 150L));
                Assert.assertTrue(MockedPulsarServiceBaseTest.retryStrategically(r42 -> {
                    try {
                        return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic2").getSubscriptions().size() == 1;
                    } catch (PulsarAdminException e) {
                        return false;
                    }
                }, 50, 150L));
                int i = 10;
                for (int i2 = 0; i2 < 10; i2++) {
                    String str = "my-message-" + i2;
                    create.newMessage().property("key", "value").value(str).send();
                    create.newMessage().property("key", "value").value(str).send();
                }
                Assert.assertTrue(MockedPulsarServiceBaseTest.retryStrategically(r7 -> {
                    try {
                        SubscriptionStats subscriptionStats = (SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().get(InstanceUtils.getDefaultSubscriptionName("external-repl-prop", "io", "PulsarFunction-test"));
                        if (subscriptionStats.getUnackedMessages() == 0) {
                            if (subscriptionStats.getMsgThroughputOut() == i) {
                                return true;
                            }
                        }
                        return false;
                    } catch (PulsarAdminException e) {
                        return false;
                    }
                }, 5, 200L));
                Assert.assertTrue(MockedPulsarServiceBaseTest.retryStrategically(r72 -> {
                    try {
                        SubscriptionStats subscriptionStats = (SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic2").getSubscriptions().get(InstanceUtils.getDefaultSubscriptionName("external-repl-prop", "io", "PulsarFunction-test"));
                        if (subscriptionStats.getUnackedMessages() == 0) {
                            if (subscriptionStats.getMsgThroughputOut() == i) {
                                return true;
                            }
                        }
                        return false;
                    } catch (PulsarAdminException e) {
                        return false;
                    }
                }, 5, 200L));
                FunctionStatus functionStatus = this.admin.functions().getFunctionStatus("external-repl-prop", "io", "PulsarFunction-test");
                Assert.assertEquals(functionStatus.getNumInstances(), 1);
                FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData status = ((FunctionStatus.FunctionInstanceStatus) functionStatus.getInstances().get(0)).getStatus();
                double numReceived = status.getNumReceived();
                double numSuccessfullyProcessed = status.getNumSuccessfullyProcessed();
                String workerId = status.getWorkerId();
                Assert.assertEquals((int) numReceived, 10 * 2);
                Assert.assertEquals((int) numSuccessfullyProcessed, 10 * 2);
                Assert.assertEquals(workerId, this.workerId);
                this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarFunction-test");
                Assert.assertTrue(MockedPulsarServiceBaseTest.retryStrategically(r43 -> {
                    try {
                        return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size() == 0;
                    } catch (PulsarAdminException e) {
                        return false;
                    }
                }, 50, 150L));
                Assert.assertTrue(MockedPulsarServiceBaseTest.retryStrategically(r44 -> {
                    try {
                        return this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic2").getSubscriptions().size() == 0;
                    } catch (PulsarAdminException e) {
                        return false;
                    }
                }, 50, 150L));
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } finally {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test(timeOut = 20000)
    public void testE2EPulsarFunctionMessagePooled() throws Exception {
        this.admin.namespaces().createNamespace("external-repl-prop/io");
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/io", Sets.newHashSet(Lists.newArrayList(new String[]{"use"})));
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://external-repl-prop/io/my-topic1").create();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://external-repl-prop/io/output"}).subscriptionName("sub").subscribe();
        FunctionConfig functionConfig = new FunctionConfig();
        functionConfig.setTenant("external-repl-prop");
        functionConfig.setNamespace("io");
        functionConfig.setName("PulsarFunction-test");
        functionConfig.setParallelism(1);
        functionConfig.setSubName("test-sub");
        functionConfig.setInputSpecs(Collections.singletonMap("persistent://external-repl-prop/io/my-topic1", ConsumerConfig.builder().poolMessages(true).build()));
        functionConfig.setAutoAck(true);
        functionConfig.setClassName(ByteBufferFunction.class.getName());
        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
        functionConfig.setOutput("persistent://external-repl-prop/io/output");
        functionConfig.setCleanupSubscription(true);
        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        this.admin.functions().createFunctionWithUrl(functionConfig, PulsarFunctionE2ETest.class.getProtectionDomain().getCodeSource().getLocation().toURI().toString());
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            TopicStats stats = this.admin.topics().getStats("persistent://external-repl-prop/io/output");
            Assert.assertEquals(stats.getPublishers().size(), 1);
            Assert.assertNotNull(((PublisherStats) stats.getPublishers().get(0)).getMetadata());
            Assert.assertTrue(((PublisherStats) stats.getPublishers().get(0)).getMetadata().containsKey("id"));
            Assert.assertEquals((String) ((PublisherStats) stats.getPublishers().get(0)).getMetadata().get("id"), String.format("%s/%s/%s", "external-repl-prop", "io", "PulsarFunction-test"));
        });
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), 1);
        });
        for (int i = 0; i < 5; i++) {
            create.newMessage().property("key", "value").value("my-message-" + i).send();
        }
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().get("test-sub")).getUnackedMessages(), 0L);
        });
        Message receive = subscribe.receive(5, TimeUnit.SECONDS);
        if (receive == null) {
            Assert.fail("Should have gotten a message");
        }
        Assert.assertEquals("value", receive.getProperty("key"));
        Assert.assertNotEquals(Long.valueOf(((SubscriptionStats) this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().values().iterator().next()).getUnackedMessages()), 5);
        this.admin.functions().deleteFunction("external-repl-prop", "io", "PulsarFunction-test");
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getStats("persistent://external-repl-prop/io/my-topic1").getSubscriptions().size(), 0);
        });
        this.tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
    }
}
