package org.apache.zeppelin.flink;

import com.mashape.unirest.http.JsonNode;
import com.mashape.unirest.http.Unirest;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.execution.JobClient;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zeppelin/flink/JobManager.class */
public class JobManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(JobManager.class);
    public static final String LATEST_CHECKPOINT_PATH = "latest_checkpoint_path";
    public static final String SAVEPOINT_PATH = "savepoint_path";
    public static final String RESUME_FROM_SAVEPOINT = "resumeFromSavepoint";
    public static final String RESUME_FROM_CHECKPOINT = "resumeFromLatestCheckpoint";
    public static final String SAVEPOINT_DIR = "savepointDir";
    private Map<String, JobClient> jobs = new HashMap();
    private ConcurrentHashMap<JobID, FlinkJobProgressPoller> jobProgressPollerMap = new ConcurrentHashMap<>();
    private String flinkWebUrl;
    private String displayedFlinkWebUrl;
    private Properties properties;

    /* loaded from: input_file:org/apache/zeppelin/flink/JobManager$FlinkJobProgressPoller.class */
    class FlinkJobProgressPoller extends Thread {
        private String flinkWebUrl;
        private JobID jobId;
        private InterpreterContext context;
        private boolean isStreamingInsertInto;
        private int progress;
        private AtomicBoolean running = new AtomicBoolean(true);
        private boolean isFirstPoll = true;
        private long checkInterval;
        private String latestCheckpointPath;

        FlinkJobProgressPoller(String str, JobID jobID, InterpreterContext interpreterContext, long j) {
            this.flinkWebUrl = str;
            this.jobId = jobID;
            this.context = interpreterContext;
            this.isStreamingInsertInto = interpreterContext.getLocalProperties().containsKey("flink.streaming.insert_into");
            this.checkInterval = j;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            JsonNode jsonNode;
            while (!Thread.currentThread().isInterrupted() && this.running.get()) {
                try {
                    synchronized (this.running) {
                        this.running.wait(this.checkInterval);
                    }
                    jsonNode = (JsonNode) Unirest.get(this.flinkWebUrl + "/jobs/" + this.jobId.toString()).asJson().getBody();
                    JSONArray jSONArray = jsonNode.getObject().getJSONArray("vertices");
                    int i = 0;
                    int i2 = 0;
                    for (int i3 = 0; i3 < jSONArray.length(); i3++) {
                        JSONObject jSONObject = jSONArray.getJSONObject(i3);
                        i += jSONObject.getInt("parallelism");
                        i2 += jSONObject.getJSONObject("tasks").getInt("FINISHED");
                    }
                    JobManager.LOGGER.debug("Total tasks:{}", Integer.valueOf(i));
                    JobManager.LOGGER.debug("Finished tasks:{}", Integer.valueOf(i2));
                    if (i2 != 0) {
                        this.progress = (i2 * 100) / i;
                        JobManager.LOGGER.debug("Progress: {}", Integer.valueOf(this.progress));
                    }
                } catch (Exception e) {
                    JobManager.LOGGER.error("Fail to poll flink job progress via rest api", e);
                }
                if (jsonNode.getObject().getString("state").equalsIgnoreCase("finished")) {
                    return;
                }
                long j = jsonNode.getObject().getLong("duration") / 1000;
                if (this.isStreamingInsertInto) {
                    if (this.isFirstPoll) {
                        this.context.out.clear(false);
                        this.context.out.write("%angular <h1>Duration: {{duration}} </h1>\n%text ");
                        this.context.out.flush();
                        this.isFirstPoll = false;
                    }
                    this.context.getAngularObjectRegistry().add("duration", JobManager.toRichTimeDuration(j), this.context.getNoteId(), this.context.getParagraphId());
                }
                JsonNode jsonNode2 = (JsonNode) Unirest.get(this.flinkWebUrl + "/jobs/" + this.jobId.toString() + "/checkpoints").asJson().getBody();
                if (jsonNode2.getObject().has("latest")) {
                    JSONObject jSONObject2 = jsonNode2.getObject().getJSONObject("latest");
                    if (jSONObject2.has("completed") && (jSONObject2.get("completed") instanceof JSONObject)) {
                        JSONObject jSONObject3 = jSONObject2.getJSONObject("completed");
                        if (jSONObject3.has("external_path")) {
                            String string = jSONObject3.getString("external_path");
                            JobManager.LOGGER.debug("Latest checkpoint path: {}", string);
                            if (!StringUtils.isBlank(string) && !string.equals(this.latestCheckpointPath)) {
                                HashMap hashMap = new HashMap();
                                hashMap.put(JobManager.LATEST_CHECKPOINT_PATH, string);
                                JobManager.LOGGER.info("Update latest checkpoint path: {}", string);
                                this.context.getIntpEventClient().updateParagraphConfig(this.context.getNoteId(), this.context.getParagraphId(), hashMap);
                                this.latestCheckpointPath = string;
                            }
                        }
                    }
                }
            }
        }

        public void cancel() {
            this.running.set(false);
            synchronized (this.running) {
                this.running.notify();
            }
        }

        public int getProgress() {
            return this.progress;
        }
    }

    public JobManager(String str, String str2, Properties properties) {
        this.flinkWebUrl = str;
        this.displayedFlinkWebUrl = str2;
        this.properties = properties;
        LOGGER.info("Creating JobManager at flinkWebUrl: {}, displayedFlinkWebUrl: {}", str, str2);
    }

    public void addJob(InterpreterContext interpreterContext, JobClient jobClient) {
        String paragraphId = interpreterContext.getParagraphId();
        if (this.jobs.put(paragraphId, jobClient) != null) {
            LOGGER.warn("There's another Job {} that is associated with paragraph {}", jobClient.getJobID(), paragraphId);
            return;
        }
        long parseLong = Long.parseLong(this.properties.getProperty("zeppelin.flink.job.check_interval", "1000"));
        if (parseLong < 0) {
            LOGGER.warn("The value of checkInterval must be positive {}", Long.valueOf(parseLong));
            return;
        }
        FlinkJobProgressPoller flinkJobProgressPoller = new FlinkJobProgressPoller(this.flinkWebUrl, jobClient.getJobID(), interpreterContext, parseLong);
        flinkJobProgressPoller.setName("JobProgressPoller-Thread-" + paragraphId);
        flinkJobProgressPoller.start();
        this.jobProgressPollerMap.put(jobClient.getJobID(), flinkJobProgressPoller);
    }

    public void removeJob(String str) {
        LOGGER.info("Remove job in paragraph: {}", str);
        JobClient remove = this.jobs.remove(str);
        if (remove == null) {
            LOGGER.warn("Unable to remove job, because no job is associated with paragraph: {}", str);
            return;
        }
        FlinkJobProgressPoller remove2 = this.jobProgressPollerMap.remove(remove.getJobID());
        if (remove2 == null) {
            LOGGER.warn("Unable to remove poller, because no poller is associated with paragraph: {}", str);
        } else {
            remove2.cancel();
            remove2.interrupt();
        }
    }

    public void sendFlinkJobUrl(InterpreterContext interpreterContext) {
        JobClient jobClient = this.jobs.get(interpreterContext.getParagraphId());
        if (jobClient == null) {
            LOGGER.warn("No job is associated with paragraph: {}", interpreterContext.getParagraphId());
            return;
        }
        String str = this.displayedFlinkWebUrl + "#/job/" + jobClient.getJobID();
        HashMap hashMap = new HashMap();
        hashMap.put("jobUrl", str);
        hashMap.put("label", "FLINK JOB");
        hashMap.put("tooltip", "View in Flink web UI");
        hashMap.put("noteId", interpreterContext.getNoteId());
        hashMap.put("paraId", interpreterContext.getParagraphId());
        interpreterContext.getIntpEventClient().onParaInfosReceived(hashMap);
    }

    public int getJobProgress(String str) {
        JobClient jobClient = this.jobs.get(str);
        if (jobClient == null) {
            LOGGER.warn("Unable to get job progress for paragraph: {}, because no job is associated with this paragraph", str);
            return 0;
        }
        FlinkJobProgressPoller flinkJobProgressPoller = this.jobProgressPollerMap.get(jobClient.getJobID());
        if (flinkJobProgressPoller != null) {
            return flinkJobProgressPoller.getProgress();
        }
        LOGGER.warn("Unable to get job progress for paragraph: {}, because no job progress is associated with this jobId: {}", str, jobClient.getJobID());
        return 0;
    }

    public void cancelJob(InterpreterContext interpreterContext) throws InterpreterException {
        LOGGER.info("Canceling job associated of paragraph: {}", interpreterContext.getParagraphId());
        JobClient jobClient = this.jobs.get(interpreterContext.getParagraphId());
        if (jobClient == null) {
            LOGGER.warn("Unable to remove Job from paragraph {} as no job associated to this paragraph", interpreterContext.getParagraphId());
            return;
        }
        try {
            try {
                String str = (String) interpreterContext.getLocalProperties().get(SAVEPOINT_DIR);
                if (StringUtils.isBlank(str)) {
                    LOGGER.info("Trying to cancel job of paragraph {}", interpreterContext.getParagraphId());
                    jobClient.cancel();
                } else {
                    LOGGER.info("Trying to stop job of paragraph {} with save point dir: {}", interpreterContext.getParagraphId(), str);
                    try {
                        String str2 = (String) jobClient.stopWithSavepoint(true, str).get();
                        HashMap hashMap = new HashMap();
                        hashMap.put(SAVEPOINT_PATH, str2);
                        interpreterContext.getIntpEventClient().updateParagraphConfig(interpreterContext.getNoteId(), interpreterContext.getParagraphId(), hashMap);
                        LOGGER.info("Job {} of paragraph {} is stopped with save point path: {}", new Object[]{jobClient.getJobID(), interpreterContext.getParagraphId(), str2});
                    } catch (Exception e) {
                        LOGGER.warn("Fail to cancel job of paragraph {} with savepoint, try to cancel it without savepoint", interpreterContext.getParagraphId(), e);
                        jobClient.cancel();
                    }
                }
                if (1 != 0) {
                    LOGGER.info("Cancelling is successful, remove the associated FlinkJobProgressPoller of paragraph: {}", interpreterContext.getParagraphId());
                    FlinkJobProgressPoller remove = this.jobProgressPollerMap.remove(jobClient.getJobID());
                    if (remove != null) {
                        remove.cancel();
                        remove.interrupt();
                    }
                    this.jobs.remove(interpreterContext.getParagraphId());
                }
            } catch (Exception e2) {
                String format = String.format("Fail to cancel job %s that is associated with paragraph %s", jobClient.getJobID(), interpreterContext.getParagraphId());
                LOGGER.warn(format, e2);
                throw new InterpreterException(format, e2);
            }
        } catch (Throwable th) {
            if (0 != 0) {
                LOGGER.info("Cancelling is successful, remove the associated FlinkJobProgressPoller of paragraph: {}", interpreterContext.getParagraphId());
                FlinkJobProgressPoller remove2 = this.jobProgressPollerMap.remove(jobClient.getJobID());
                if (remove2 != null) {
                    remove2.cancel();
                    remove2.interrupt();
                }
                this.jobs.remove(interpreterContext.getParagraphId());
            }
            throw th;
        }
    }

    public void shutdown() {
        Iterator<FlinkJobProgressPoller> it = this.jobProgressPollerMap.values().iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
    }

    static String toRichTimeDuration(long j) {
        long days = TimeUnit.SECONDS.toDays(j);
        long seconds = j - TimeUnit.DAYS.toSeconds(days);
        long hours = TimeUnit.SECONDS.toHours(seconds);
        long seconds2 = seconds - TimeUnit.HOURS.toSeconds(hours);
        long minutes = TimeUnit.SECONDS.toMinutes(seconds2);
        long seconds3 = TimeUnit.SECONDS.toSeconds(seconds2 - TimeUnit.MINUTES.toSeconds(minutes));
        StringBuilder sb = new StringBuilder();
        if (days != 0) {
            sb.append(days + " days ");
        }
        if (days != 0 || hours != 0) {
            sb.append(hours + " hours ");
        }
        if (days != 0 || hours != 0 || minutes != 0) {
            sb.append(minutes + " minutes ");
        }
        sb.append(seconds3 + " seconds");
        return sb.toString();
    }
}
