package org.apache.zeppelin.flink;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.ZeppelinContext;
import org.apache.zeppelin.python.IPythonInterpreter;
import org.apache.zeppelin.python.PythonInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zeppelin/flink/PyFlinkInterpreter.class */
public class PyFlinkInterpreter extends PythonInterpreter {
    private static final Logger LOGGER = LoggerFactory.getLogger(PyFlinkInterpreter.class);
    private FlinkInterpreter flinkInterpreter;
    private InterpreterContext curInterpreterContext;
    private boolean isOpened;
    private ClassLoader originalClassLoader;

    public PyFlinkInterpreter(Properties properties) {
        super(properties);
        this.isOpened = false;
    }

    public void open() throws InterpreterException {
        File[] listFiles;
        this.flinkInterpreter = (FlinkInterpreter) getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class);
        setProperty("zeppelin.python", getProperty("zeppelin.pyflink.python", "python"));
        setProperty("zeppelin.python.useIPython", getProperty("zeppelin.pyflink.useIPython", "true"));
        URL[] urlArr = new URL[0];
        LinkedList linkedList = new LinkedList();
        String property = getProperty("zeppelin.interpreter.localRepo");
        if (property != null) {
            File file = new File(property);
            if (file.exists() && (listFiles = file.listFiles()) != null) {
                for (File file2 : listFiles) {
                    try {
                        linkedList.add(file2.toURI().toURL());
                    } catch (MalformedURLException e) {
                        LOGGER.error("Error", e);
                    }
                }
            }
        }
        URL[] urlArr2 = (URL[]) linkedList.toArray(urlArr);
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(new URLClassLoader(urlArr2, contextClassLoader));
            this.flinkInterpreter = (FlinkInterpreter) getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class);
            super.open();
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            if (!useIPython()) {
                try {
                    bootstrapInterpreter("python/zeppelin_pyflink.py");
                } catch (IOException e2) {
                    throw new InterpreterException("Fail to bootstrap pyflink", e2);
                }
            }
            this.isOpened = true;
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    public InterpreterResult interpret(String str, InterpreterContext interpreterContext) throws InterpreterException {
        try {
            if (!useIPython()) {
                if (this.isOpened) {
                    this.curInterpreterContext = interpreterContext;
                    InterpreterResult interpret = super.interpret("intp.initJavaThread()", interpreterContext);
                    if (interpret.code() != InterpreterResult.Code.SUCCESS) {
                        throw new InterpreterException("Fail to initJavaThread: " + interpret.toString());
                    }
                }
                this.flinkInterpreter.setSavepointPathIfNecessary(interpreterContext);
                this.flinkInterpreter.setParallelismIfNecessary(interpreterContext);
            }
            InterpreterResult interpret2 = super.interpret(str, interpreterContext);
            if (useIPython() || (!useIPython() && getPythonProcessLauncher().isRunning())) {
                InterpreterResult interpret3 = super.interpret("intp.resetClassLoaderInPythonThread()", interpreterContext);
                if (interpret3.code() != InterpreterResult.Code.SUCCESS) {
                    LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + interpret3.toString());
                }
            }
            return interpret2;
        } catch (Throwable th) {
            if (useIPython() || (!useIPython() && getPythonProcessLauncher().isRunning())) {
                InterpreterResult interpret4 = super.interpret("intp.resetClassLoaderInPythonThread()", interpreterContext);
                if (interpret4.code() != InterpreterResult.Code.SUCCESS) {
                    LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + interpret4.toString());
                }
            }
            throw th;
        }
    }

    public void initJavaThread() {
        InterpreterContext.set(this.curInterpreterContext);
        this.originalClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(this.flinkInterpreter.getFlinkScalaShellLoader());
        this.flinkInterpreter.createPlannerAgain();
    }

    public void resetClassLoaderInPythonThread() {
        if (this.originalClassLoader != null) {
            Thread.currentThread().setContextClassLoader(this.originalClassLoader);
        }
    }

    public void cancel(InterpreterContext interpreterContext) throws InterpreterException {
        this.flinkInterpreter.cancel(interpreterContext);
        if (useIPython()) {
            super.cancel(interpreterContext);
        }
    }

    protected Map<String, String> setupPythonEnv() throws IOException {
        Map<String, String> map = super.setupPythonEnv();
        map.put("PYTHONPATH", map.getOrDefault("PYTHONPATH", "") + ":" + this.flinkInterpreter.getFlinkShims().getPyFlinkPythonPath(this.properties));
        return map;
    }

    protected IPythonInterpreter getIPythonInterpreter() throws InterpreterException {
        return (IPythonInterpreter) getInterpreterInTheSameSessionByClassName(IPyFlinkInterpreter.class, false);
    }

    public ZeppelinContext getZeppelinContext() {
        return this.flinkInterpreter.getZeppelinContext();
    }

    public int getProgress(InterpreterContext interpreterContext) throws InterpreterException {
        return this.flinkInterpreter.getProgress(interpreterContext);
    }

    public boolean isFlink110() {
        return this.flinkInterpreter.getFlinkVersion().isFlink110();
    }

    public boolean isAfterFlink114() {
        return this.flinkInterpreter.getFlinkVersion().isAfterFlink114();
    }

    public ExecutionEnvironment getJavaExecutionEnvironment() {
        return this.flinkInterpreter.getExecutionEnvironment().getJavaEnv();
    }

    public StreamExecutionEnvironment getJavaStreamExecutionEnvironment() {
        return this.flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv();
    }

    public TableEnvironment getJavaBatchTableEnvironment(String str) {
        return this.flinkInterpreter.getJavaBatchTableEnvironment(str);
    }

    public TableEnvironment getJavaStreamTableEnvironment(String str) {
        return this.flinkInterpreter.getJavaStreamTableEnvironment(str);
    }
}
