/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.worker.ErrorNotifier;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest;
import org.apache.pulsar.functions.worker.PulsarFunctionTestTemporaryDirectory;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"functions-worker"})
public class PulsarWorkerAssignmentTest {
    private static final Logger log = LoggerFactory.getLogger(PulsarWorkerAssignmentTest.class);
    LocalBookkeeperEnsemble bkEnsemble;
    ServiceConfiguration config;
    WorkerConfig workerConfig;
    PulsarService pulsar;
    PulsarAdmin admin;
    PulsarClient pulsarClient;
    BrokerStats brokerStatsClient;
    PulsarWorkerService functionsWorkerService;
    final String tenant = "external-repl-prop";
    final String pulsarFunctionsNamespace = "external-repl-prop/pulsar-function-admin";
    String primaryHost;
    String workerId;
    private PulsarFunctionTestTemporaryDirectory tempDirectory;

    @BeforeMethod(timeOut=60000L)
    void setup(Method method) throws Exception {
        log.info("--- Setting up method {} ---", (Object)method.getName());
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
        this.bkEnsemble.start();
        this.config = (ServiceConfiguration)Mockito.spy((Object)new ServiceConfiguration());
        this.config.setClusterName("use");
        HashSet superUsers = Sets.newHashSet((Object[])new String[]{"superUser", "admin"});
        this.config.setSuperUserRoles((Set)superUsers);
        this.config.setWebServicePort(Optional.of(0));
        this.config.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
        this.config.setBrokerShutdownTimeoutMs(0L);
        this.config.setBrokerServicePort(Optional.of(0));
        this.config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
        this.config.setAdvertisedAddress("localhost");
        this.functionsWorkerService = this.createPulsarFunctionWorker(this.config);
        Optional<PulsarWorkerService> functionWorkerService = Optional.of(this.functionsWorkerService);
        this.pulsar = new PulsarService(this.config, this.workerConfig, functionWorkerService, exitCode -> {});
        this.pulsar.start();
        this.admin = (PulsarAdmin)Mockito.spy((Object)PulsarAdmin.builder().serviceHttpUrl(this.pulsar.getWebServiceAddress()).build());
        this.brokerStatsClient = this.admin.brokerStats();
        this.primaryHost = this.pulsar.getWebServiceAddress();
        ClusterData clusterData = ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build();
        this.admin.clusters().updateCluster(this.config.getClusterName(), clusterData);
        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
        if (this.pulsarClient != null) {
            this.pulsarClient.close();
        }
        this.pulsarClient = clientBuilder.build();
        TenantInfo propAdmin = TenantInfo.builder().adminRoles(Collections.singleton("superUser")).allowedClusters(Collections.singleton("use")).build();
        this.admin.tenants().updateTenant("external-repl-prop", propAdmin);
        Thread.sleep(100L);
    }

    @AfterMethod(alwaysRun=true)
    void shutdown() {
        log.info("--- Shutting down ---");
        try {
            this.pulsarClient.close();
            this.admin.close();
            this.functionsWorkerService.stop();
            this.pulsar.close();
            this.bkEnsemble.stop();
        }
        catch (Exception e) {
            log.warn("Encountered errors at shutting down PulsarWorkerAssignmentTest", (Throwable)e);
        }
        finally {
            if (this.tempDirectory != null) {
                this.tempDirectory.delete();
            }
        }
    }

    private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
        this.workerConfig = new WorkerConfig();
        this.tempDirectory = PulsarFunctionTestTemporaryDirectory.create(this.getClass().getSimpleName());
        this.tempDirectory.useTemporaryDirectoriesForWorkerConfig(this.workerConfig);
        this.workerConfig.setPulsarFunctionsNamespace("external-repl-prop/pulsar-function-admin");
        this.workerConfig.setSchedulerClassName(RoundRobinScheduler.class.getName());
        this.workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
        this.workerConfig.setFunctionRuntimeFactoryConfigs((Map)ObjectMapperFactory.getThreadLocal().convertValue((Object)new ThreadRuntimeFactoryConfig().setThreadGroupName("use"), Map.class));
        this.workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort().get());
        this.workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort().get());
        this.workerConfig.setFailureCheckFreqMs(100L);
        this.workerConfig.setNumFunctionPackageReplicas(1);
        this.workerConfig.setClusterCoordinationTopicName("coordinate");
        this.workerConfig.setFunctionAssignmentTopicName("assignment");
        this.workerConfig.setFunctionMetadataTopicName("metadata");
        this.workerConfig.setInstanceLivenessCheckFreqMs(100L);
        this.workerConfig.setWorkerPort(Integer.valueOf(0));
        this.workerConfig.setPulsarFunctionsCluster(config.getClusterName());
        String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress((String)config.getAdvertisedAddress());
        this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + this.workerConfig.getWorkerPort();
        this.workerConfig.setWorkerHostname(hostname);
        this.workerConfig.setWorkerId(this.workerId);
        this.workerConfig.setTopicCompactionFrequencySec(1L);
        PulsarWorkerService workerService = new PulsarWorkerService();
        workerService.init(this.workerConfig, null, false);
        return workerService;
    }

    @Test(timeOut=60000L, enabled=false)
    public void testFunctionAssignments() throws Exception {
        String namespacePortion = "assignment-test";
        String replNamespace = "external-repl-prop/assignment-test";
        String sinkTopic = "persistent://external-repl-prop/assignment-test/my-topic1";
        String functionName = "assign";
        String subscriptionName = "test-sub";
        this.admin.namespaces().createNamespace("external-repl-prop/assignment-test");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/assignment-test", (Set)clusters);
        String jarFilePathUrl = PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().toURI().toString();
        FunctionConfig functionConfig = PulsarWorkerAssignmentTest.createFunctionConfig("external-repl-prop", "assignment-test", "assign", "my.*", "persistent://external-repl-prop/assignment-test/my-topic1", "test-sub");
        functionConfig.setParallelism(Integer.valueOf(2));
        this.admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/assignment-test/my-topic1").getSubscriptions().size() == 1 && ((SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/assignment-test/my-topic1").getSubscriptions().values().iterator().next()).getConsumers().size() == 2;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        Assert.assertEquals((int)this.admin.topics().getStats("persistent://external-repl-prop/assignment-test/my-topic1").getSubscriptions().size(), (int)1);
        Assert.assertEquals((int)((SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/assignment-test/my-topic1").getSubscriptions().values().iterator().next()).getConsumers().size(), (int)2);
        functionConfig.setParallelism(Integer.valueOf(1));
        this.admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                return this.admin.topics().getStats("persistent://external-repl-prop/assignment-test/my-topic1").getSubscriptions().size() == 1 && ((SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/assignment-test/my-topic1").getSubscriptions().values().iterator().next()).getConsumers().size() == 1;
            }
            catch (PulsarAdminException e) {
                return false;
            }
        }, 50, 150L);
        log.info("admin.topics().getStats(sinkTopic): {}", (Object)new Gson().toJson((Object)this.admin.topics().getStats("persistent://external-repl-prop/assignment-test/my-topic1")));
        Assert.assertEquals((int)((SubscriptionStats)this.admin.topics().getStats("persistent://external-repl-prop/assignment-test/my-topic1").getSubscriptions().values().iterator().next()).getConsumers().size(), (int)1);
    }

    @Test(timeOut=60000L, enabled=false)
    public void testFunctionAssignmentsWithRestart() throws Exception {
        FunctionConfig functionConfig;
        String namespacePortion = "assignment-test";
        String replNamespace = "external-repl-prop/assignment-test";
        String sinkTopic = "persistent://external-repl-prop/assignment-test/my-topic1";
        String logTopic = "persistent://external-repl-prop/assignment-test/log-topic";
        String baseFunctionName = "assign-restart";
        String subscriptionName = "test-sub";
        int totalFunctions = 5;
        int parallelism = 2;
        this.admin.namespaces().createNamespace("external-repl-prop/assignment-test");
        HashSet clusters = Sets.newHashSet((Iterable)Lists.newArrayList((Object[])new String[]{"use"}));
        this.admin.namespaces().setNamespaceReplicationClusters("external-repl-prop/assignment-test", (Set)clusters);
        FunctionRuntimeManager runtimeManager = this.functionsWorkerService.getFunctionRuntimeManager();
        String jarFilePathUrl = PulsarFunctionLocalRunTest.getPulsarApiExamplesJar().toURI().toString();
        for (int i = 0; i < 5; ++i) {
            String functionName = "assign-restart" + i;
            functionConfig = PulsarWorkerAssignmentTest.createFunctionConfig("external-repl-prop", "assignment-test", functionName, "my.*", "persistent://external-repl-prop/assignment-test/my-topic1", "test-sub");
            functionConfig.setParallelism(Integer.valueOf(2));
            this.admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
        }
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                Map assgn = (Map)runtimeManager.getCurrentAssignments().values().iterator().next();
                return assgn.size() == 10;
            }
            catch (Exception e) {
                return false;
            }
        }, 50, 150L);
        Map assignments = (Map)runtimeManager.getCurrentAssignments().values().iterator().next();
        Assert.assertEquals((int)assignments.size(), (int)10);
        for (int i = 0; i < 5; ++i) {
            String functionName = "assign-restart" + i;
            functionConfig = PulsarWorkerAssignmentTest.createFunctionConfig("external-repl-prop", "assignment-test", functionName, "my.*", "persistent://external-repl-prop/assignment-test/my-topic1", "test-sub");
            functionConfig.setParallelism(Integer.valueOf(2));
            functionConfig.setLogTopic("persistent://external-repl-prop/assignment-test/log-topic");
            this.admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
        }
        int totalDeletedFunction = 2;
        for (int i = 4; i >= 3; --i) {
            String functionName = "assign-restart" + i;
            this.admin.functions().deleteFunction("external-repl-prop", "assignment-test", functionName);
        }
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                Map assgn = (Map)runtimeManager.getCurrentAssignments().values().iterator().next();
                return assgn.size() == 6;
            }
            catch (Exception e) {
                return false;
            }
        }, 50, 150L);
        assignments = (Map)runtimeManager.getCurrentAssignments().values().iterator().next();
        Assert.assertEquals((int)assignments.size(), (int)6);
        URI dlUri = this.functionsWorkerService.getDlogUri();
        this.functionsWorkerService.stop();
        this.functionsWorkerService = new PulsarWorkerService();
        this.functionsWorkerService.init(this.workerConfig, dlUri, false);
        this.functionsWorkerService.start(new AuthenticationService(PulsarConfigurationLoader.convertFrom((PulsarConfiguration)this.workerConfig)), null, ErrorNotifier.getDefaultImpl());
        FunctionRuntimeManager runtimeManager2 = this.functionsWorkerService.getFunctionRuntimeManager();
        MockedPulsarServiceBaseTest.retryStrategically(test -> {
            try {
                Map assgn = (Map)runtimeManager2.getCurrentAssignments().values().iterator().next();
                return assgn.size() == 6;
            }
            catch (Exception e) {
                return false;
            }
        }, 50, 150L);
        assignments = (Map)runtimeManager2.getCurrentAssignments().values().iterator().next();
        Assert.assertEquals((int)assignments.size(), (int)6);
        for (int i = 0; i < 3; ++i) {
            String functionName = "assign-restart" + i;
            Assert.assertEquals((String)this.admin.functions().getFunction("external-repl-prop", "assignment-test", functionName).getLogTopic(), (String)"persistent://external-repl-prop/assignment-test/log-topic");
        }
    }

    protected static FunctionConfig createFunctionConfig(String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) {
        String sourceTopicPattern = String.format("persistent://%s/%s/%s", tenant, namespace, sourceTopic);
        FunctionConfig functionConfig = new FunctionConfig();
        functionConfig.setTenant(tenant);
        functionConfig.setNamespace(namespace);
        functionConfig.setName(functionName);
        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
        functionConfig.setParallelism(Integer.valueOf(1));
        functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
        functionConfig.setTopicsPattern(sourceTopicPattern);
        functionConfig.setSubName(subscriptionName);
        functionConfig.setAutoAck(Boolean.valueOf(true));
        functionConfig.setOutput(sinkTopic);
        return functionConfig;
    }
}

