package org.apache.zeppelin.flink;

import java.io.IOException;
import java.util.Properties;
import org.apache.zeppelin.flink.sql.AppendStreamSqlJob;
import org.apache.zeppelin.flink.sql.SingleRowStreamSqlJob;
import org.apache.zeppelin.flink.sql.UpdateStreamSqlJob;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;

/* loaded from: input_file:org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.class */
public class FlinkStreamSqlInterpreter extends FlinkSqlInterpreter {
    public FlinkStreamSqlInterpreter(Properties properties) {
        super(properties);
    }

    @Override // org.apache.zeppelin.flink.FlinkSqlInterpreter
    public void open() throws InterpreterException {
        super.open();
        this.flinkInterpreter.getFlinkShims().initInnerStreamSqlInterpreter(new FlinkSqlContext(this.flinkInterpreter.getExecutionEnvironment().getJavaEnv(), this.flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv(), this.flinkInterpreter.getJavaBatchTableEnvironment("blink"), this.flinkInterpreter.getJavaStreamTableEnvironment(), this.flinkInterpreter.getZeppelinContext(), str -> {
            callInnerSelect(str);
        }));
    }

    public void callInnerSelect(String str) {
        InterpreterContext interpreterContext = InterpreterContext.get();
        String str2 = (String) interpreterContext.getLocalProperties().getOrDefault("type", "update");
        if (str2.equalsIgnoreCase("single")) {
            try {
                new SingleRowStreamSqlJob(this.flinkInterpreter.getStreamExecutionEnvironment(), this.flinkInterpreter.getJavaStreamTableEnvironment(), this.flinkInterpreter.getJobManager(), interpreterContext, this.flinkInterpreter.getDefaultParallelism(), this.flinkInterpreter.getFlinkShims()).run(str);
            } catch (IOException e) {
                throw new RuntimeException("Fail to run single type stream job", e);
            }
        } else if (str2.equalsIgnoreCase("append")) {
            try {
                new AppendStreamSqlJob(this.flinkInterpreter.getStreamExecutionEnvironment(), this.flinkInterpreter.getStreamTableEnvironment(), this.flinkInterpreter.getJobManager(), interpreterContext, this.flinkInterpreter.getDefaultParallelism(), this.flinkInterpreter.getFlinkShims()).run(str);
            } catch (IOException e2) {
                throw new RuntimeException("Fail to run append type stream job", e2);
            }
        } else {
            if (!str2.equalsIgnoreCase("update")) {
                throw new RuntimeException("Unrecognized stream type: " + str2);
            }
            try {
                new UpdateStreamSqlJob(this.flinkInterpreter.getStreamExecutionEnvironment(), this.flinkInterpreter.getStreamTableEnvironment(), this.flinkInterpreter.getJobManager(), interpreterContext, this.flinkInterpreter.getDefaultParallelism(), this.flinkInterpreter.getFlinkShims()).run(str);
            } catch (IOException e3) {
                throw new RuntimeException("Fail to run update type stream job", e3);
            }
        }
    }

    @Override // org.apache.zeppelin.flink.FlinkSqlInterpreter
    public InterpreterResult runSqlList(String str, InterpreterContext interpreterContext) {
        return this.flinkShims.runSqlList(str, interpreterContext, false);
    }

    public int getProgress(InterpreterContext interpreterContext) throws InterpreterException {
        return 0;
    }

    public Scheduler getScheduler() {
        return SchedulerFactory.singleton().createOrGetParallelScheduler(FlinkStreamSqlInterpreter.class.getName() + hashCode(), Integer.parseInt(getProperty("zeppelin.flink.concurrentStreamSql.max", "10")));
    }
}
