package com.github.ltsopensource.jobtracker.support.checker;

import com.github.ltsopensource.biz.logger.domain.JobLogPo;
import com.github.ltsopensource.biz.logger.domain.LogType;
import com.github.ltsopensource.core.cluster.NodeType;
import com.github.ltsopensource.core.commons.utils.CollectionUtils;
import com.github.ltsopensource.core.commons.utils.QuietUtils;
import com.github.ltsopensource.core.constant.Level;
import com.github.ltsopensource.core.exception.RemotingSendException;
import com.github.ltsopensource.core.factory.NamedThreadFactory;
import com.github.ltsopensource.core.json.JSON;
import com.github.ltsopensource.core.logger.Logger;
import com.github.ltsopensource.core.logger.LoggerFactory;
import com.github.ltsopensource.core.protocol.JobProtos;
import com.github.ltsopensource.core.protocol.command.JobAskRequest;
import com.github.ltsopensource.core.remoting.RemotingServerDelegate;
import com.github.ltsopensource.core.support.JobDomainConverter;
import com.github.ltsopensource.core.support.SystemClock;
import com.github.ltsopensource.jobtracker.channel.ChannelWrapper;
import com.github.ltsopensource.jobtracker.domain.JobTrackerAppContext;
import com.github.ltsopensource.jobtracker.monitor.JobTrackerMStatReporter;
import com.github.ltsopensource.queue.domain.JobPo;
import com.github.ltsopensource.remoting.AsyncCallback;
import com.github.ltsopensource.remoting.Channel;
import com.github.ltsopensource.remoting.ResponseFuture;
import com.github.ltsopensource.remoting.protocol.RemotingCommand;
import com.github.ltsopensource.remoting.protocol.RemotingProtos;
import com.github.ltsopensource.store.jdbc.exception.DupEntryException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:com/github/ltsopensource/jobtracker/support/checker/ExecutingDeadJobChecker.class */
public class ExecutingDeadJobChecker {
    private static final Logger LOGGER = LoggerFactory.getLogger(ExecutingDeadJobChecker.class);
    private JobTrackerAppContext appContext;
    private JobTrackerMStatReporter stat;
    private ScheduledFuture<?> scheduledFuture;
    private final ScheduledExecutorService FIXED_EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1, new NamedThreadFactory("LTS-ExecutingJobQueue-Fix-Executor", true));
    private AtomicBoolean start = new AtomicBoolean(false);

    public ExecutingDeadJobChecker(JobTrackerAppContext jobTrackerAppContext) {
        this.appContext = jobTrackerAppContext;
        this.stat = jobTrackerAppContext.getMStatReporter();
    }

    public void start() {
        try {
            if (this.start.compareAndSet(false, true)) {
                int parameter = this.appContext.getConfig().getParameter("jobtracker.executing.job.fix.check.interval.seconds", 30);
                if (parameter < 5) {
                    parameter = 5;
                } else if (parameter > 300) {
                    parameter = 300;
                }
                this.scheduledFuture = this.FIXED_EXECUTOR_SERVICE.scheduleWithFixedDelay(new Runnable() { // from class: com.github.ltsopensource.jobtracker.support.checker.ExecutingDeadJobChecker.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            if (ExecutingDeadJobChecker.this.appContext.getRegistryStatMonitor().isAvailable()) {
                                ExecutingDeadJobChecker.this.checkAndFix();
                            }
                        } catch (Throwable th) {
                            ExecutingDeadJobChecker.LOGGER.error("Check executing dead job error ", th);
                        }
                    }
                }, parameter, parameter, TimeUnit.SECONDS);
            }
            LOGGER.info("Executing dead job checker started!");
        } catch (Throwable th) {
            LOGGER.error("Executing dead job checker start failed!", th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkAndFix() throws RemotingSendException {
        int parameter = this.appContext.getConfig().getParameter("jobtracker.executing.job.fix.deadline.seconds", 20);
        if (parameter < 10) {
            parameter = 10;
        } else if (parameter > 300) {
            parameter = 300;
        }
        List<JobPo> deadJobs = this.appContext.getExecutingJobQueue().getDeadJobs(SystemClock.now() - (parameter * 1000));
        if (CollectionUtils.isNotEmpty(deadJobs)) {
            HashMap hashMap = new HashMap();
            for (JobPo jobPo : deadJobs) {
                List list = (List) hashMap.get(jobPo.getTaskTrackerIdentity());
                if (list == null) {
                    list = new ArrayList();
                    hashMap.put(jobPo.getTaskTrackerIdentity(), list);
                }
                list.add(jobPo);
            }
            for (Map.Entry entry : hashMap.entrySet()) {
                String taskTrackerNodeGroup = ((JobPo) ((List) entry.getValue()).get(0)).getTaskTrackerNodeGroup();
                String str = (String) entry.getKey();
                ChannelWrapper channel = this.appContext.getChannelManager().getChannel(taskTrackerNodeGroup, NodeType.TASK_TRACKER, str);
                if (channel == null && str != null) {
                    Long offlineTimestamp = this.appContext.getChannelManager().getOfflineTimestamp(str);
                    if (offlineTimestamp == null || SystemClock.now() - offlineTimestamp.longValue() > 10000) {
                        fixDeadJob((List<JobPo>) entry.getValue());
                    }
                } else if (channel != null && channel.getChannel() != null && channel.isOpen()) {
                    askTimeoutJob(channel.getChannel(), (List) entry.getValue());
                }
            }
        }
    }

    private void askTimeoutJob(Channel channel, final List<JobPo> list) {
        try {
            RemotingServerDelegate remotingServer = this.appContext.getRemotingServer();
            ArrayList arrayList = new ArrayList(list.size());
            Iterator<JobPo> it = list.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().getJobId());
            }
            JobAskRequest wrapper = this.appContext.getCommandBodyWrapper().wrapper(new JobAskRequest());
            wrapper.setJobIds(arrayList);
            remotingServer.invokeAsync(channel, RemotingCommand.createRequestCommand(JobProtos.RequestCode.JOB_ASK.code(), wrapper), new AsyncCallback() { // from class: com.github.ltsopensource.jobtracker.support.checker.ExecutingDeadJobChecker.2
                public void operationComplete(ResponseFuture responseFuture) {
                    RemotingCommand responseCommand = responseFuture.getResponseCommand();
                    if (responseCommand == null || RemotingProtos.ResponseCode.SUCCESS.code() != responseCommand.getCode()) {
                        return;
                    }
                    List jobIds = responseCommand.getBody().getJobIds();
                    if (CollectionUtils.isNotEmpty(jobIds)) {
                        QuietUtils.sleep(ExecutingDeadJobChecker.this.appContext.getConfig().getParameter("jobtracker.fix.executing.job.waiting.mills", 1000L));
                        for (JobPo jobPo : list) {
                            if (jobIds.contains(jobPo.getJobId())) {
                                ExecutingDeadJobChecker.this.fixDeadJob(jobPo);
                            }
                        }
                    }
                }
            });
        } catch (RemotingSendException e) {
            LOGGER.error("Ask timeout Job error, ", e);
        }
    }

    private void fixDeadJob(List<JobPo> list) {
        Iterator<JobPo> it = list.iterator();
        while (it.hasNext()) {
            fixDeadJob(it.next());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fixDeadJob(JobPo jobPo) {
        try {
        } catch (Throwable th) {
            LOGGER.error(th.getMessage(), th);
        }
        if (this.appContext.getExecutingJobQueue().getJob(jobPo.getJobId()) == null) {
            return;
        }
        jobPo.setGmtModified(Long.valueOf(SystemClock.now()));
        jobPo.setTaskTrackerIdentity((String) null);
        jobPo.setIsRunning(false);
        try {
            this.appContext.getExecutableJobQueue().add(jobPo);
        } catch (DupEntryException e) {
            LOGGER.warn("ExecutableJobQueue already exist:" + JSON.toJSONString(jobPo));
        }
        this.appContext.getExecutingJobQueue().remove(jobPo.getJobId());
        JobLogPo convertJobLog = JobDomainConverter.convertJobLog(jobPo);
        convertJobLog.setLogTime(Long.valueOf(SystemClock.now()));
        convertJobLog.setSuccess(true);
        convertJobLog.setLevel(Level.WARN);
        convertJobLog.setLogType(LogType.FIXED_DEAD);
        this.appContext.getJobLogger().log(convertJobLog);
        this.stat.incFixExecutingJobNum();
        LOGGER.info("checkAndFix dead job ! {}", new Object[]{JSON.toJSONString(jobPo)});
    }

    public void stop() {
        try {
            if (this.start.compareAndSet(true, false)) {
                this.scheduledFuture.cancel(true);
                this.FIXED_EXECUTOR_SERVICE.shutdown();
            }
            LOGGER.info("Executing dead job checker stopped!");
        } catch (Throwable th) {
            LOGGER.error("Executing dead job checker stop failed!", th);
        }
    }
}
