/*
 * Decompiled with CFR 0.152.
 */
package xin.bluesky.leiothrix.worker.executor;

import com.alibaba.fastjson.JSON;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import xin.bluesky.leiothrix.common.jdbc.JdbcTemplate;
import xin.bluesky.leiothrix.common.util.StringUtils2;
import xin.bluesky.leiothrix.model.msg.WorkerMessage;
import xin.bluesky.leiothrix.model.task.partition.ExecutionStatistics;
import xin.bluesky.leiothrix.model.task.partition.PartitionTask;
import xin.bluesky.leiothrix.model.task.partition.PartitionTaskProgress;
import xin.bluesky.leiothrix.model.task.partition.PartitionTaskWrapper;
import xin.bluesky.leiothrix.worker.WorkerProcessor;
import xin.bluesky.leiothrix.worker.api.DatabasePageDataHandler;
import xin.bluesky.leiothrix.worker.client.ServerChannel;
import xin.bluesky.leiothrix.worker.conf.Settings;
import xin.bluesky.leiothrix.worker.executor.Status;
import xin.bluesky.leiothrix.worker.executor.TaskContainer;
import xin.bluesky.leiothrix.worker.report.WorkerProgressReporter;

public class TaskExecutor
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(TaskExecutor.class);
    private static final int ACQUIRE_TASK_TIMEOUT = 15;
    private WorkerProgressReporter progressReporter;
    private CountDownLatch countDownLatch;
    private volatile Status status = Status.NOT_START;

    public TaskExecutor(WorkerProgressReporter progressReporter, CountDownLatch countDownLatch) {
        this.progressReporter = progressReporter;
        this.countDownLatch = countDownLatch;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        this.status = Status.RUNNING;
        try {
            while (this.ableRunning()) {
                PartitionTaskWrapper wrapper = TaskContainer.takePartitionTaskWrapper(15L, TimeUnit.SECONDS);
                if (wrapper == null) {
                    logger.info("\u5de5\u4f5c\u7ebf\u7a0b{}\u5728{}\u79d2\u5185\u6ca1\u6709\u83b7\u5f97\u65b0\u4efb\u52a1,\u7ed3\u675f", (Object)Thread.currentThread().getName(), (Object)15);
                    return;
                }
                switch (wrapper.getStatus()) {
                    case "waitAndTryLater": {
                        this.tryLater();
                        break;
                    }
                    case "success": {
                        this.execute(wrapper);
                        break;
                    }
                }
            }
        }
        catch (Exception e) {
            logger.error("\u6267\u884c\u4efb\u52a1\u7247\u65f6\u51fa\u9519,\u5f02\u5e38:{}", (Object)ExceptionUtils.getStackTrace((Throwable)e));
        }
        finally {
            this.countDownLatch.countDown();
        }
    }

    private boolean ableRunning() {
        return WorkerProcessor.getProcessor().isRunning() && this.status == Status.RUNNING;
    }

    private void tryLater() throws InterruptedException {
        Thread.sleep(10000L);
    }

    private void execute(PartitionTaskWrapper wrapper) {
        StopWatch watch = new StopWatch();
        watch.start();
        PartitionTask partitionTask = wrapper.getPartitionTask();
        logger.info("\u5f97\u5230\u65b0\u7684\u4efb\u52a1\u7247:{}", (Object)partitionTask);
        JdbcTemplate jdbcTemplate = new JdbcTemplate(partitionTask.getDatabaseInfo());
        this.execute(partitionTask, jdbcTemplate);
        watch.stop();
        if (this.isReschedule()) {
            this.giveBackPartitionTask(partitionTask);
            logger.info("\u672c\u6b21\u4efb\u52a1\u7247[table={},rangeName={}]\u7531\u4e8e\u964d\u538b,\u91cd\u65b0\u8c03\u5ea6\u5230\u5176\u4ed6worker\u6267\u884c", (Object)partitionTask.getTableName(), (Object)partitionTask.getRangeName());
        } else {
            this.notifyServerFinished(partitionTask);
            this.status = Status.STOPPED;
            logger.info("\u672c\u6b21\u4efb\u52a1\u7247[table={},startIndex={},endIndex={}]\u6267\u884c\u7ed3\u675f,\u603b\u5171\u8017\u65f6{}\u6beb\u79d2", new Object[]{partitionTask.getTableName(), partitionTask.getRowStartIndex(), partitionTask.getRowEndIndex(), watch.getTime()});
        }
    }

    private void execute(PartitionTask partitionTask, JdbcTemplate jdbcTemplate) {
        long startIndex = partitionTask.getRowStartIndex();
        while (this.ableRunning()) {
            ExecutionStatistics statistics;
            long endIndex = startIndex + (long)Settings.getRangePageSize() - 1L;
            if (endIndex > partitionTask.getRowEndIndex()) {
                statistics = this.executePage(partitionTask, jdbcTemplate, startIndex, partitionTask.getRowEndIndex());
                this.progressReporter.reportProgress(new PartitionTaskProgress(partitionTask, partitionTask.getRowEndIndex(), statistics));
                break;
            }
            statistics = this.executePage(partitionTask, jdbcTemplate, startIndex, endIndex);
            this.progressReporter.reportProgress(new PartitionTaskProgress(partitionTask, endIndex, statistics));
            startIndex = endIndex + 1L;
        }
    }

    private ExecutionStatistics executePage(PartitionTask partitionTask, JdbcTemplate jdbcTemplate, long startIndex, long endIndex) {
        ExecutionStatistics statistics = new ExecutionStatistics();
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        String columns = StringUtils.isBlank((CharSequence)partitionTask.getColumnNames()) ? "*" : partitionTask.getColumnNames();
        String sql = StringUtils2.append((Object[])new Object[]{"select ", columns, " from ", partitionTask.getTableName(), " where ", partitionTask.getPrimaryKey(), " >= ?", " and ", partitionTask.getPrimaryKey(), " <= ?"});
        if (StringUtils.isNotBlank((CharSequence)partitionTask.getWhere())) {
            sql = StringUtils2.append((Object[])new Object[]{sql, " and (" + partitionTask.getWhere(), ")"});
        }
        List result = jdbcTemplate.query(sql, new Object[]{startIndex, endIndex});
        statistics.setHandledRecordNum(result.size());
        stopWatch.stop();
        long queryUsingTime = stopWatch.getTime();
        stopWatch.reset();
        stopWatch.start();
        DatabasePageDataHandler databasePageDataHandler = Settings.getConfiguration().getDatabasePageDataHandler();
        try {
            databasePageDataHandler.handle(partitionTask.getTableName(), partitionTask.getPrimaryKey(), result);
            statistics.setSuccessRecordNum(result.size());
        }
        catch (Throwable e) {
            databasePageDataHandler.exceptionCaught(partitionTask.getTableName(), result, new Exception(e));
            statistics.setFailRecordNum(result.size());
            statistics.setFailPageName(startIndex + "-" + endIndex);
            statistics.setExceptionMsg(e.getMessage());
        }
        stopWatch.stop();
        long handleUsingTime = stopWatch.getTime();
        long totalTime = queryUsingTime + handleUsingTime;
        statistics.setQueryUsingTime(queryUsingTime);
        statistics.setHandleUsingTime(handleUsingTime);
        statistics.setTotalTime(totalTime);
        logger.info("\u672c\u6b21\u4efb\u52a1\u7247\u5206\u9875\u67e5\u8be2[table={},startIndex={},endIndex={}]\u67e5\u8be2\u7ed3\u675f,\u6709{}\u884c\u6570\u636e,\u67e5\u8be2\u8017\u65f6{}\u6beb\u79d2,\u5904\u7406\u8017\u65f6{}\u6beb\u79d2,\u603b\u5171\u8017\u65f6{}\u6beb\u79d2", new Object[]{partitionTask.getTableName(), partitionTask.getRowStartIndex(), partitionTask.getRowEndIndex(), result.size(), queryUsingTime, handleUsingTime, totalTime});
        return statistics;
    }

    private int calQueryPage(PartitionTask partitionTask) {
        int page = (int)((partitionTask.getRowEndIndex() - partitionTask.getRowStartIndex()) / (long)Settings.getRangePageSize());
        long balance = (partitionTask.getRowEndIndex() - partitionTask.getRowStartIndex()) % (long)Settings.getRangePageSize();
        if (balance != 0L) {
            ++page;
        }
        return page;
    }

    private void notifyServerFinished(PartitionTask partitionTask) {
        WorkerMessage message = new WorkerMessage("finishedTask", JSON.toJSONString((Object)partitionTask), Settings.getWorkerIp());
        ServerChannel.send(message);
    }

    private void giveBackPartitionTask(PartitionTask partitionTask) {
        WorkerMessage message = new WorkerMessage("giveBackPartitionTask", JSON.toJSONString((Object)partitionTask), Settings.getWorkerIp());
        ServerChannel.send(message);
    }

    public void reschedule() {
        this.status = Status.RESCHEDULE;
    }

    public boolean isReschedule() {
        return this.status == Status.RESCHEDULE;
    }

    public boolean isFree() {
        return this.status == Status.STOPPED || this.status == Status.RESCHEDULE || this.status == Status.CANCELD;
    }
}

