package ml.shifu.guagua.worker;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import ml.shifu.guagua.BasicCoordinator;
import ml.shifu.guagua.ComputableMonitor;
import ml.shifu.guagua.GuaguaConstants;
import ml.shifu.guagua.GuaguaRuntimeException;
import ml.shifu.guagua.GuaguaService;
import ml.shifu.guagua.InMemoryCoordinator;
import ml.shifu.guagua.io.Bytable;
import ml.shifu.guagua.io.GuaguaFileSplit;
import ml.shifu.guagua.io.HaltBytable;
import ml.shifu.guagua.io.Serializer;
import ml.shifu.guagua.util.NumberFormatUtils;
import ml.shifu.guagua.util.Progressable;
import ml.shifu.guagua.util.ReflectionUtils;
import ml.shifu.guagua.util.StringUtils;
import ml.shifu.guagua.worker.WorkerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ml/shifu/guagua/worker/GuaguaWorkerService.class */
public class GuaguaWorkerService<MASTER_RESULT extends Bytable, WORKER_RESULT extends Bytable> implements GuaguaService {
    private static final Logger LOG = LoggerFactory.getLogger(GuaguaWorkerService.class);
    private Properties props;
    private WorkerComputable<MASTER_RESULT, WORKER_RESULT> workerComputable;
    private int totalIteration;
    private List<WorkerInterceptor<MASTER_RESULT, WORKER_RESULT>> workerInterceptors;
    private List<GuaguaFileSplit> splits;
    private String appId;
    private String containerId;
    private String masterResultClassName;
    private String workerResultClassName;
    private WorkerContext<MASTER_RESULT, WORKER_RESULT> context;
    private InMemoryCoordinator<MASTER_RESULT, WORKER_RESULT> coordinator;
    private static final int COUNT_THRESHOLD = 3;
    private ExecutorService executor;
    private boolean isMonitored;
    private int overThresholdCount = 0;
    private int countThresholdDefined = 3;
    private boolean isSoftForComputableTimeout = true;

    @Override // ml.shifu.guagua.GuaguaService
    public void start() {
        WorkerContext<MASTER_RESULT, WORKER_RESULT> buildContext = buildContext();
        buildContext.setCurrentIteration(0);
        Iterator<WorkerInterceptor<MASTER_RESULT, WORKER_RESULT>> it = getWorkerInterceptors().iterator();
        while (it.hasNext()) {
            it.next().preApplication(buildContext);
        }
    }

    @Override // ml.shifu.guagua.GuaguaService
    public void stop() {
        if (this.isMonitored && this.executor != null) {
            this.executor.shutdown();
        }
        WorkerContext<MASTER_RESULT, WORKER_RESULT> buildContext = buildContext();
        buildContext.setCurrentIteration(buildContext.getCurrentIteration() + 1);
        int size = getWorkerInterceptors().size();
        Throwable th = null;
        for (int i = 0; i < size; i++) {
            try {
                getWorkerInterceptors().get((size - 1) - i).postApplication(buildContext);
            } catch (Throwable th2) {
                LOG.error("Error in worker interceptors cleaning.", th2);
                if (th == null) {
                    th = th2;
                }
            }
        }
        if (th != null) {
            throw new GuaguaRuntimeException(th);
        }
    }

    @Override // ml.shifu.guagua.GuaguaService
    public void run(Progressable progressable) {
        WorkerContext<MASTER_RESULT, WORKER_RESULT> buildContext = buildContext();
        int currentIteration = buildContext.getCurrentIteration() + 1;
        int currentIteration2 = buildContext.getCurrentIteration();
        while (currentIteration2 < getTotalIteration()) {
            try {
                buildContext.setCurrentIteration(currentIteration2 + 1);
                iterate(buildContext, currentIteration, progressable);
                currentIteration2 = buildContext.getCurrentIteration();
                MASTER_RESULT lastMasterResult = buildContext.getLastMasterResult();
                if ((lastMasterResult instanceof HaltBytable) && ((HaltBytable) lastMasterResult).isHalt()) {
                    break;
                }
            } catch (Throwable th) {
                ArrayList arrayList = new ArrayList();
                Iterator<WorkerContext.WorkerCompletionCallBack<MASTER_RESULT, WORKER_RESULT>> it = buildContext.getCallBackList().iterator();
                while (it.hasNext()) {
                    try {
                        it.next().callback(buildContext);
                    } catch (Exception e) {
                        LOG.error("Error in worker callbacks starting.", e);
                        arrayList.add(e);
                    }
                }
                if (arrayList.size() <= 0) {
                    throw th;
                }
                throw new GuaguaRuntimeException((Throwable) arrayList.get(0));
            }
        }
        ArrayList arrayList2 = new ArrayList();
        Iterator<WorkerContext.WorkerCompletionCallBack<MASTER_RESULT, WORKER_RESULT>> it2 = buildContext.getCallBackList().iterator();
        while (it2.hasNext()) {
            try {
                it2.next().callback(buildContext);
            } catch (Exception e2) {
                LOG.error("Error in worker callbacks starting.", e2);
                arrayList2.add(e2);
            }
        }
        if (arrayList2.size() > 0) {
            throw new GuaguaRuntimeException((Throwable) arrayList2.get(0));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v80, types: [ml.shifu.guagua.io.Bytable] */
    protected WORKER_RESULT iterate(final WorkerContext<MASTER_RESULT, WORKER_RESULT> workerContext, int i, Progressable progressable) {
        int currentIteration = workerContext.getCurrentIteration();
        if (progressable != null) {
            progressable.progress(currentIteration - 1, getTotalIteration(), String.format("Start worker iteration ( %s/%s ), progress %s%%", Integer.valueOf(currentIteration), Integer.valueOf(getTotalIteration()), Integer.valueOf(((currentIteration - 1) * 100) / getTotalIteration())), false, false);
        }
        Iterator<WorkerInterceptor<MASTER_RESULT, WORKER_RESULT>> it = getWorkerInterceptors().iterator();
        while (it.hasNext()) {
            it.next().preIteration(workerContext);
        }
        if (progressable != null) {
            progressable.progress(currentIteration - 1, getTotalIteration(), String.format("Start worker computing ( %s/%s ), progress %s%%", Integer.valueOf(currentIteration), Integer.valueOf(getTotalIteration()), Integer.valueOf(((currentIteration - 1) * 100) / getTotalIteration())), false, false);
        }
        long nanoTime = System.nanoTime();
        WORKER_RESULT worker_result = null;
        boolean z = false;
        boolean z2 = false;
        try {
            if (this.isMonitored) {
                if (this.executor.isTerminated() || this.executor.isShutdown()) {
                    this.executor = Executors.newSingleThreadExecutor();
                }
                ComputableMonitor computableMonitor = (ComputableMonitor) this.workerComputable.getClass().getAnnotation(ComputableMonitor.class);
                try {
                    worker_result = (Bytable) this.executor.submit((Callable) new Callable<WORKER_RESULT>() { // from class: ml.shifu.guagua.worker.GuaguaWorkerService.1
                        @Override // java.util.concurrent.Callable
                        public WORKER_RESULT call() throws Exception {
                            return (WORKER_RESULT) GuaguaWorkerService.this.workerComputable.compute(workerContext);
                        }
                    }).get(computableMonitor.duration(), computableMonitor.timeUnit());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (ExecutionException e2) {
                    LOG.error("Error in master computation:", e2);
                    throw new GuaguaRuntimeException(e2);
                } catch (TimeoutException e3) {
                    z2 = true;
                    LOG.warn("Time out for master computation, null will be returned or mapper will be killed.");
                    this.executor.shutdownNow();
                    worker_result = null;
                }
            } else {
                worker_result = this.workerComputable.compute(workerContext);
            }
            workerContext.setWorkerResult(worker_result);
            long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
            if (millis >= NumberFormatUtils.getLong(workerContext.getProps().getProperty(GuaguaConstants.GUAGUA_COMPUTATION_TIME_THRESHOLD), 60000L) && currentIteration > i) {
                this.overThresholdCount++;
                if (this.overThresholdCount >= this.countThresholdDefined) {
                    this.overThresholdCount = 0;
                    LOG.warn("Computation time is too long:{}, kill itself to make fail-over work.", Long.valueOf(millis));
                    z = true;
                }
            }
            if (progressable != null) {
                progressable.progress(currentIteration - 1, getTotalIteration(), String.format("Complete worker computing ( %s/%s ), progress %s%%", Integer.valueOf(currentIteration), Integer.valueOf(getTotalIteration()), Integer.valueOf(((currentIteration - 1) * 100) / getTotalIteration())), false, false);
            }
            int size = getWorkerInterceptors().size();
            for (int i2 = 0; i2 < size; i2++) {
                getWorkerInterceptors().get((size - 1) - i2).postIteration(workerContext);
            }
            if (progressable != null) {
                progressable.progress(currentIteration, getTotalIteration(), String.format("Complete worker iteration ( %s/%s ), progress %s%%", Integer.valueOf(currentIteration), Integer.valueOf(getTotalIteration()), Integer.valueOf((currentIteration * 100) / getTotalIteration())), true, z || !(!z2 || this.isSoftForComputableTimeout || workerContext.isFirstIteration()));
            }
            return worker_result;
        } catch (IOException e4) {
            throw new GuaguaRuntimeException("Error in worker computation", e4);
        } catch (Exception e5) {
            throw new GuaguaRuntimeException("Error in worker computation", e5);
        }
    }

    private WorkerContext<MASTER_RESULT, WORKER_RESULT> buildContext() {
        if (getContext() != null) {
            return getContext();
        }
        this.context = new WorkerContext<>(getTotalIteration(), getAppId(), getProps(), getContainerId(), getSplits(), getMasterResultClassName(), getWorkerResultClassName());
        return getContext();
    }

    @Override // ml.shifu.guagua.GuaguaService
    public void init(Properties properties) {
        setProps(properties);
        checkAndSetWorkerInterceptors(properties);
        setWorkerComputable(newWorkerComputable());
        this.isMonitored = getWorkerComputable().getClass().isAnnotationPresent(ComputableMonitor.class);
        if (this.isMonitored) {
            this.isSoftForComputableTimeout = ((ComputableMonitor) this.workerComputable.getClass().getAnnotation(ComputableMonitor.class)).isSoft();
            this.executor = Executors.newSingleThreadExecutor();
        }
        this.countThresholdDefined = Integer.valueOf(getProps().getProperty(GuaguaConstants.GUAGUA_STRAGGLER_ITERATORS, "3")).intValue();
        setTotalIteration(Integer.valueOf(getProps().getProperty(GuaguaConstants.GUAGUA_ITERATION_COUNT, "2147483647")).intValue());
        setMasterResultClassName(getProps().getProperty(GuaguaConstants.GUAGUA_MASTER_RESULT_CLASS));
        setWorkerResultClassName(getProps().getProperty(GuaguaConstants.GUAGUA_WORKER_RESULT_CLASS));
    }

    private void checkAndSetWorkerInterceptors(Properties properties) {
        ArrayList arrayList = new ArrayList();
        String str = StringUtils.get(properties.getProperty(GuaguaConstants.GUAGUA_WORKER_SYSTEM_INTERCEPTERS), GuaguaConstants.GUAGUA_WORKER_DEFAULT_SYSTEM_INTERCEPTERS);
        if (str != null && str.length() != 0) {
            String[] split = str.split(GuaguaConstants.GUAGUA_INTERCEPTOR_SEPARATOR);
            if (LOG.isInfoEnabled()) {
                LOG.info("System worker interceptors: {}.", Arrays.toString(split));
            }
            for (String str2 : split) {
                Object obj = (WorkerInterceptor) ReflectionUtils.newInstance(str2.trim());
                if (obj instanceof BasicCoordinator) {
                    ((BasicCoordinator) obj).setMasterSerializer((Serializer) ReflectionUtils.newInstance(StringUtils.get(properties.getProperty(GuaguaConstants.GUAGUA_MASTER_IO_SERIALIZER), GuaguaConstants.GUAGUA_IO_DEFAULT_SERIALIZER)));
                    ((BasicCoordinator) obj).setWorkerSerializer((Serializer) ReflectionUtils.newInstance(StringUtils.get(properties.getProperty(GuaguaConstants.GUAGUA_WORKER_IO_SERIALIZER), GuaguaConstants.GUAGUA_IO_DEFAULT_SERIALIZER)));
                } else if (obj instanceof LocalWorkerCoordinator) {
                    ((LocalWorkerCoordinator) obj).setCoordinator(getCoordinator());
                }
                arrayList.add(obj);
            }
        }
        String property = properties.getProperty(GuaguaConstants.GUAGUA_WORKER_INTERCEPTERS);
        if (property != null && property.length() != 0) {
            String[] split2 = property.split(GuaguaConstants.GUAGUA_INTERCEPTOR_SEPARATOR);
            if (LOG.isInfoEnabled()) {
                LOG.info("Customized worker interceptors: {}.", Arrays.toString(split2));
            }
            for (String str3 : split2) {
                arrayList.add((WorkerInterceptor) ReflectionUtils.newInstance(str3.trim()));
            }
        }
        setWorkerInterceptors(arrayList);
    }

    private WorkerComputable<MASTER_RESULT, WORKER_RESULT> newWorkerComputable() {
        try {
            return (WorkerComputable) ReflectionUtils.newInstance(Class.forName(getProps().get(GuaguaConstants.WORKER_COMPUTABLE_CLASS).toString()));
        } catch (ClassNotFoundException e) {
            throw new GuaguaRuntimeException(e);
        }
    }

    private WorkerContext<MASTER_RESULT, WORKER_RESULT> getContext() {
        return this.context;
    }

    public Properties getProps() {
        return this.props;
    }

    public void setProps(Properties properties) {
        this.props = properties;
    }

    public List<GuaguaFileSplit> getSplits() {
        return this.splits;
    }

    @Override // ml.shifu.guagua.GuaguaService
    public void setSplits(List<GuaguaFileSplit> list) {
        this.splits = list;
    }

    public String getAppId() {
        return this.appId;
    }

    @Override // ml.shifu.guagua.GuaguaService
    public void setAppId(String str) {
        this.appId = str;
    }

    public String getContainerId() {
        return this.containerId;
    }

    @Override // ml.shifu.guagua.GuaguaService
    public void setContainerId(String str) {
        this.containerId = str;
    }

    public WorkerComputable<MASTER_RESULT, WORKER_RESULT> getWorkerComputable() {
        return this.workerComputable;
    }

    public void setWorkerComputable(WorkerComputable<MASTER_RESULT, WORKER_RESULT> workerComputable) {
        this.workerComputable = workerComputable;
    }

    public int getTotalIteration() {
        return this.totalIteration;
    }

    public void setTotalIteration(int i) {
        this.totalIteration = i;
    }

    public List<WorkerInterceptor<MASTER_RESULT, WORKER_RESULT>> getWorkerInterceptors() {
        return this.workerInterceptors;
    }

    public void setWorkerInterceptors(List<WorkerInterceptor<MASTER_RESULT, WORKER_RESULT>> list) {
        this.workerInterceptors = list;
    }

    public String getMasterResultClassName() {
        return this.masterResultClassName;
    }

    public void setMasterResultClassName(String str) {
        this.masterResultClassName = str;
    }

    public String getWorkerResultClassName() {
        return this.workerResultClassName;
    }

    public void setWorkerResultClassName(String str) {
        this.workerResultClassName = str;
    }

    public InMemoryCoordinator<MASTER_RESULT, WORKER_RESULT> getCoordinator() {
        return this.coordinator;
    }

    public void setCoordinator(InMemoryCoordinator<MASTER_RESULT, WORKER_RESULT> inMemoryCoordinator) {
        this.coordinator = inMemoryCoordinator;
    }
}
