/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.thread;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.shade.io.prometheus.client.CollectorRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThreadRuntime
implements Runtime {
    private static final Logger log = LoggerFactory.getLogger(ThreadRuntime.class);
    private Thread fnThread;
    private static final int THREAD_SHUTDOWN_TIMEOUT_MILLIS = 10000;
    private InstanceConfig instanceConfig;
    private JavaInstanceRunnable javaInstanceRunnable;
    private ThreadGroup threadGroup;
    private FunctionCacheManager fnCache;
    private String jarFile;
    private PulsarClient pulsarClient;
    private String stateStorageServiceUrl;
    private SecretsProvider secretsProvider;
    private CollectorRegistry collectorRegistry;
    private String narExtractionDirectory;

    ThreadRuntime(InstanceConfig instanceConfig, FunctionCacheManager fnCache, ThreadGroup threadGroup, String jarFile, PulsarClient pulsarClient, String stateStorageServiceUrl, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String narExtractionDirectory) {
        this.instanceConfig = instanceConfig;
        if (instanceConfig.getFunctionDetails().getRuntime() != Function.FunctionDetails.Runtime.JAVA) {
            throw new RuntimeException("Thread Container only supports Java Runtime");
        }
        this.threadGroup = threadGroup;
        this.fnCache = fnCache;
        this.jarFile = jarFile;
        this.pulsarClient = pulsarClient;
        this.stateStorageServiceUrl = stateStorageServiceUrl;
        this.secretsProvider = secretsProvider;
        this.collectorRegistry = collectorRegistry;
        this.narExtractionDirectory = narExtractionDirectory;
        this.javaInstanceRunnable = new JavaInstanceRunnable(instanceConfig, fnCache, jarFile, pulsarClient, stateStorageServiceUrl, secretsProvider, collectorRegistry, narExtractionDirectory);
    }

    @Override
    public void start() {
        this.javaInstanceRunnable = new JavaInstanceRunnable(this.instanceConfig, this.fnCache, this.jarFile, this.pulsarClient, this.stateStorageServiceUrl, this.secretsProvider, this.collectorRegistry, this.narExtractionDirectory);
        log.info("ThreadContainer starting function with instance config {}", (Object)this.instanceConfig);
        this.fnThread = new Thread(this.threadGroup, this.javaInstanceRunnable, String.format("%s-%s", FunctionCommon.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()), this.instanceConfig.getInstanceId()));
        this.fnThread.start();
    }

    @Override
    public void join() throws Exception {
        if (this.fnThread != null) {
            this.fnThread.join();
        }
    }

    @Override
    public void stop() {
        if (this.fnThread != null) {
            this.fnThread.interrupt();
            try {
                this.fnThread.join(10000L, 0);
                if (this.fnThread.isAlive()) {
                    this.fnThread.stop();
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            this.javaInstanceRunnable.close();
        }
    }

    @Override
    public CompletableFuture<InstanceCommunication.FunctionStatus> getFunctionStatus(int instanceId) {
        CompletableFuture<InstanceCommunication.FunctionStatus> statsFuture = new CompletableFuture<InstanceCommunication.FunctionStatus>();
        if (!this.isAlive()) {
            InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
            functionStatusBuilder.setRunning(false);
            Throwable ex = this.getDeathException();
            if (ex != null && ex.getMessage() != null) {
                functionStatusBuilder.setFailureException(ex.getMessage());
            }
            statsFuture.complete(functionStatusBuilder.build());
            return statsFuture;
        }
        InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = this.javaInstanceRunnable.getFunctionStatus();
        functionStatusBuilder.setRunning(true);
        statsFuture.complete(functionStatusBuilder.build());
        return statsFuture;
    }

    @Override
    public CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics() {
        return CompletableFuture.completedFuture(this.javaInstanceRunnable.getAndResetMetrics());
    }

    @Override
    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId) {
        return CompletableFuture.completedFuture(this.javaInstanceRunnable.getMetrics());
    }

    @Override
    public String getPrometheusMetrics() throws IOException {
        if (this.javaInstanceRunnable == null) {
            throw new PulsarServerException("javaInstanceRunnable is not initialized");
        }
        return this.javaInstanceRunnable.getStatsAsString();
    }

    @Override
    public CompletableFuture<Void> resetMetrics() {
        this.javaInstanceRunnable.resetMetrics();
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public boolean isAlive() {
        if (this.fnThread != null) {
            return this.fnThread.isAlive();
        }
        return false;
    }

    @Override
    public Throwable getDeathException() {
        if (this.isAlive()) {
            return null;
        }
        if (null != this.javaInstanceRunnable) {
            return this.javaInstanceRunnable.getDeathException();
        }
        return null;
    }

    public InstanceConfig getInstanceConfig() {
        return this.instanceConfig;
    }
}

