package io.cloudex.framework.components;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import io.cloudex.framework.CommonExecutable;
import io.cloudex.framework.cloud.api.ApiUtils;
import io.cloudex.framework.cloud.api.CloudService;
import io.cloudex.framework.cloud.entities.VmInstance;
import io.cloudex.framework.cloud.entities.VmMetaData;
import io.cloudex.framework.config.Job;
import io.cloudex.framework.config.PartitionConfig;
import io.cloudex.framework.config.TaskConfig;
import io.cloudex.framework.config.VmConfig;
import io.cloudex.framework.exceptions.ClassInstantiationException;
import io.cloudex.framework.exceptions.InstancePopulationException;
import io.cloudex.framework.partition.PartitionFunction;
import io.cloudex.framework.partition.entities.Partition;
import io.cloudex.framework.partition.factory.PartitionFunctionFactory;
import io.cloudex.framework.partition.factory.PartitionFunctionFactoryImpl;
import io.cloudex.framework.task.Task;
import io.cloudex.framework.task.factory.TaskFactory;
import io.cloudex.framework.task.factory.TaskFactoryImpl;
import io.cloudex.framework.types.ErrorAction;
import io.cloudex.framework.types.PartitionType;
import io.cloudex.framework.types.ProcessorStatus;
import io.cloudex.framework.types.TargetType;
import io.cloudex.framework.utils.Constants;
import io.cloudex.framework.utils.FileUtils;
import io.cloudex.framework.utils.ObjectUtils;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.Validate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:io/cloudex/framework/components/Coordinator.class */
public class Coordinator extends CommonExecutable {
    private static final Log log = LogFactory.getLog(Coordinator.class);
    private static final String NO_TASK = "No Task";
    private Context context;
    private Set<String> processors;
    private Map<String, VmInstance> processorInstances;
    private Job job;
    private PartitionFunctionFactory partitionFunctionFactory;
    private TaskFactory taskFactory;
    private boolean shutdownProcessors;

    /* loaded from: input_file:io/cloudex/framework/components/Coordinator$Builder.class */
    public static final class Builder {
        private Job job;
        private VmMetaData metaData;
        private CloudService cloudService;
        private PartitionFunctionFactory partitionFunctionFactory;
        private TaskFactory taskFactory;
        private boolean shutdownProcessors;

        public Builder(Job job, CloudService cloudService) {
            this.job = job;
            this.cloudService = cloudService;
        }

        public Coordinator build() throws IOException {
            return new Coordinator(this);
        }

        public Builder setJob(Job job) {
            this.job = job;
            return this;
        }

        public Builder setMetaData(VmMetaData vmMetaData) {
            this.metaData = vmMetaData;
            return this;
        }

        public Builder setCloudService(CloudService cloudService) {
            this.cloudService = cloudService;
            return this;
        }

        public Builder setPartitionFunctionFactory(PartitionFunctionFactory partitionFunctionFactory) {
            this.partitionFunctionFactory = partitionFunctionFactory;
            return this;
        }

        public Builder setTaskFactory(TaskFactory taskFactory) {
            this.taskFactory = taskFactory;
            return this;
        }

        public final Job getJob() {
            return this.job;
        }

        public final VmMetaData getMetaData() {
            return this.metaData;
        }

        public final CloudService getCloudService() {
            return this.cloudService;
        }

        public final PartitionFunctionFactory getPartitionFunctionFactory() {
            return this.partitionFunctionFactory;
        }

        public final TaskFactory getTaskFactory() {
            return this.taskFactory;
        }

        public final boolean isShutdownProcessors() {
            return this.shutdownProcessors;
        }

        public final Builder setShutdownProcessors(boolean z) {
            this.shutdownProcessors = z;
            return this;
        }
    }

    public Coordinator(Job job, CloudService cloudService) throws IOException {
        this(new Builder(job, cloudService));
    }

    Coordinator(Builder builder) throws IOException {
        this.processors = new HashSet();
        this.processorInstances = new HashMap();
        Validate.notNull(builder.getCloudService(), "cloudService is required", new Object[0]);
        this.job = builder.getJob();
        this.partitionFunctionFactory = builder.getPartitionFunctionFactory();
        this.taskFactory = builder.getTaskFactory();
        setCloudService(builder.getCloudService());
        this.shutdownProcessors = builder.isShutdownProcessors();
        Validate.notNull(this.job, "job must be provided", new Object[0]);
        if (!this.job.valid()) {
            List<String> validationErrors = this.job.getValidationErrors();
            log.error("Job is not valid: " + validationErrors);
            throw new IllegalArgumentException("Job is not valid: " + validationErrors);
        }
        if (builder.getMetaData() != null) {
            setMetaData(builder.getMetaData());
        }
        if (this.partitionFunctionFactory == null) {
            this.partitionFunctionFactory = new PartitionFunctionFactoryImpl();
        }
        if (this.taskFactory == null) {
            this.taskFactory = new TaskFactoryImpl();
        }
        this.context = new Context(this.job.getData());
        this.context.putReadOnly(Constants.PRCESSORS_KEY, this.processors);
    }

    @Override // io.cloudex.framework.Executable
    public void run() throws IOException {
        String taskName;
        Stopwatch createStarted = Stopwatch.createStarted();
        try {
            try {
                for (TaskConfig taskConfig : this.job.getTasks()) {
                    Stopwatch createStarted2 = Stopwatch.createStarted();
                    Set<String> output = taskConfig.getOutput();
                    log.info("TIMER# Job elapsed time: " + createStarted);
                    if (TargetType.PROCESSOR.equals(taskConfig.getTarget())) {
                        taskName = getTaskName(taskConfig);
                        log.info("Starting processor task: " + taskName);
                        runProcessorTask(taskConfig);
                    } else {
                        Task task = this.taskFactory.getTask(taskConfig, this.context, getCloudService());
                        taskName = getTaskName(task);
                        log.info("Starting coordinator task: " + taskName);
                        if (ErrorAction.CONTINUE.equals(taskConfig.getErrorAction())) {
                            runTaskContinue(task);
                        } else {
                            task.run();
                        }
                        addTaskOutputToContext(task, output);
                    }
                    createStarted2.stop();
                    log.info("TIMER# Task " + taskName + " completed in " + createStarted2);
                    log.info("Total processors usage cost: " + calculateProcessorsCost());
                }
                log.debug("Coordinator's context: " + this.context);
                createStarted.stop();
                log.info("TIMER# Job completed in " + createStarted);
            } catch (Exception e) {
                log.error("An error has occurred", e);
                if (!(e instanceof IOException)) {
                    throw new IOException(e);
                }
                throw ((IOException) e);
            }
        } finally {
            if (this.shutdownProcessors) {
                log.info("Shutting down all processors");
                shutdownProcessors();
            }
            log.info("Total processors usage cost: " + calculateProcessorsCost());
        }
    }

    public double calculateProcessorsCost() {
        double d = 0.0d;
        Iterator<VmInstance> it = this.processorInstances.values().iterator();
        while (it.hasNext()) {
            d += it.next().getCost();
        }
        return d;
    }

    protected void shutdownProcessors() {
        ArrayList arrayList = new ArrayList();
        for (String str : this.processors) {
            VmConfig vmConfig = new VmConfig();
            vmConfig.setInstanceId(str);
            arrayList.add(vmConfig);
        }
        try {
            try {
                getCloudService().shutdownInstance(arrayList);
                updateVmInstances(arrayList, true);
            } catch (IOException e) {
                log.error("Failed to shutdown processors", e);
                updateVmInstances(arrayList, true);
            }
        } catch (Throwable th) {
            updateVmInstances(arrayList, true);
            throw th;
        }
    }

    private void runProcessorTask(TaskConfig taskConfig) throws ClassInstantiationException, InstancePopulationException, IOException {
        ArrayList newArrayList;
        CloudService cloudService = getCloudService();
        VmConfig vmConfig = this.job.getVmConfig();
        VmConfig vmConfig2 = taskConfig.getVmConfig();
        String vmConfigReference = taskConfig.getVmConfigReference();
        if (vmConfig2 == null && vmConfigReference != null) {
            vmConfig2 = (VmConfig) this.context.resolveValue(vmConfigReference);
            Validate.notNull(vmConfig2, "Unable to find VmConfig in job context with reference: " + vmConfigReference, new Object[0]);
            taskConfig.setVmConfig(vmConfig2);
        }
        boolean z = vmConfig2 != null;
        PartitionConfig partitioning = taskConfig.getPartitioning();
        Map<String, String> input = partitioning.getInput();
        String str = input != null ? input.get(PartitionFunction.ITEMS_KEY) : null;
        if (PartitionType.FUNCTION.equals(partitioning.getType())) {
            str = Context.getKeyReference(partitioning.getOutput());
        }
        if (!PartitionType.COUNT.equals(partitioning.getType())) {
            Validate.notNull(str, "partition items key is required", new Object[0]);
        }
        String str2 = (String) getJobDataValue(Constants.CLOUD_STORAGE_BUCKET_KEY);
        String zoneId = this.job.getVmConfig().getZoneId();
        if (z) {
            newArrayList = Lists.newArrayList();
            log.debug("Task: " + getTaskName(taskConfig) + " uses custom vm config: " + taskConfig.getVmConfig());
            vmConfig2 = vmConfig.merge(taskConfig.getVmConfig());
            for (String str3 : this.processorInstances.keySet()) {
                VmInstance vmInstance = this.processorInstances.get(str3);
                if (vmConfig2.equals(vmInstance.getVmConfig())) {
                    log.debug("Found existing processor that matches task requirements: " + vmInstance.getVmConfig());
                    newArrayList.add(str3);
                }
            }
        } else {
            newArrayList = Lists.newArrayList(this.processors);
        }
        List<String> arrayList = new ArrayList<>();
        List<VmConfig> arrayList2 = new ArrayList<>();
        long time = new Date().getTime();
        int i = 0;
        int i2 = 0;
        int maximumMetaDataSize = cloudService.getMaximumMetaDataSize();
        HashMap hashMap = new HashMap();
        Map.Entry entry = null;
        if (taskConfig.getInput() != null) {
            hashMap.putAll(taskConfig.getInput());
            if (str != null) {
                Iterator it = hashMap.entrySet().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    Map.Entry entry2 = (Map.Entry) it.next();
                    if (((String) entry2.getValue()).equals(str)) {
                        entry = entry2;
                        break;
                    }
                }
            }
        }
        String str4 = null;
        if (entry != null) {
            str4 = (String) entry.getKey();
            hashMap.remove(str4);
        }
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry3 : hashMap.entrySet()) {
            hashMap2.put(entry3.getKey(), (String) this.context.resolveValue((String) entry3.getValue()));
        }
        Collection<String> partitionItems = getPartitionItems(taskConfig, partitioning, str);
        List<String> arrayList3 = new ArrayList<>();
        for (String str5 : partitionItems) {
            VmMetaData vmMetaData = new VmMetaData();
            vmMetaData.setTaskClass(taskConfig.getClassName());
            vmMetaData.addUserValues(hashMap2);
            if (str4 != null) {
                if (str5.length() > maximumMetaDataSize) {
                    saveMetaDataItemToFile(vmMetaData, str5, str4, str2, i2, time);
                    i2++;
                } else {
                    vmMetaData.addUserValue(str4, str5);
                }
            }
            if (newArrayList.size() > 0) {
                String str6 = (String) newArrayList.iterator().next();
                newArrayList.remove(newArrayList.indexOf(str6));
                cloudService.getMetaData(str6, zoneId).getFollowUp(vmMetaData);
                arrayList3.add(cloudService.updateMetadata(vmMetaData, zoneId, str6, false));
                arrayList.add(str6);
            } else {
                String str7 = VmMetaData.CLOUDEX_VM_PREFIX + (time + i);
                arrayList.add(str7);
                VmConfig copy = z ? vmConfig2 : vmConfig.copy();
                copy.setInstanceId(str7);
                copy.setMetaData(vmMetaData);
                arrayList2.add(copy);
                log.info("Create VM Config for " + str7);
                i++;
            }
        }
        if (!arrayList3.isEmpty()) {
            waitForOperations(arrayList3, zoneId);
        }
        if (!z || (z && !Boolean.FALSE.equals(taskConfig.getVmConfig().getReuse()))) {
            this.processors.addAll(arrayList);
        }
        if (!arrayList2.isEmpty()) {
            if (!cloudService.startInstance(arrayList2, true)) {
                throw new IOException("Some processors have failed to start");
            }
            updateVmInstances(arrayList2, false);
        }
        IOException waitForProcessors = waitForProcessors(arrayList, zoneId);
        if (waitForProcessors != null && ErrorAction.EXIT.equals(taskConfig.getErrorAction())) {
            throw waitForProcessors;
        }
        if (z && Boolean.FALSE.equals(taskConfig.getVmConfig().getReuse())) {
            log.debug("Shutting down custom vms: " + arrayList2 + " for task: " + getTaskName(taskConfig));
            getCloudService().shutdownInstance(arrayList2);
            updateVmInstances(arrayList2, true);
        }
        log.info("Successfully completed processor task " + getTaskName(taskConfig));
        log.info("Number of idle processors: " + this.processors.size() + ", instance Ids: " + this.processors);
    }

    private void updateVmInstances(List<VmConfig> list, boolean z) {
        Date date = new Date();
        for (VmConfig vmConfig : list) {
            String instanceId = vmConfig.getInstanceId();
            VmInstance vmInstance = this.processorInstances.get(instanceId);
            if (vmInstance == null) {
                vmInstance = new VmInstance(vmConfig, date);
                this.processorInstances.put(instanceId, vmInstance);
            }
            if (z) {
                vmInstance.setEnd(date);
                this.processors.remove(instanceId);
            }
        }
    }

    private void saveMetaDataItemToFile(VmMetaData vmMetaData, String str, String str2, String str3, int i, long j) throws IOException {
        Validate.notBlank(str3, "bucket is needed as metadata is larger than maximum allowed.", new Object[0]);
        log.info("Metadata too large, using file. Size = " + str.length());
        String str4 = str2 + '_' + j + '_' + i + Constants.DOT_TEXT;
        String str5 = FileUtils.TEMP_FOLDER + str4;
        FileUtils.objectToJsonFile(str5, ObjectUtils.csvToSet(str));
        getCloudService().uploadFileToCloudStorage(str5, str3);
        vmMetaData.addUserValue(str2 + VmMetaData.LONG_METADATA_FILE_Suffix, str4);
    }

    private void waitForOperations(List<String> list, String str) throws IOException {
        boolean z = false;
        int i = 0;
        do {
            try {
                getCloudService().blockOnComputeOperations(list, str);
                z = true;
                i = 0;
            } catch (IOException e) {
                log.error("Exception whilst waiting for operations completion", e);
                if (i == 3) {
                    throw e;
                }
                i++;
            }
        } while (!z);
    }

    private IOException waitForProcessors(List<String> list, String str) throws IOException {
        VmMetaData metaData;
        IOException iOException = null;
        int i = 0;
        for (String str2 : list) {
            boolean z = false;
            while (true) {
                ApiUtils.block(getCloudService().getApiRecheckDelay());
                try {
                    metaData = getCloudService().getMetaData(str2, str);
                    z = ProcessorStatus.READY.equals(metaData.getProcessorStatus());
                } catch (SocketTimeoutException e) {
                    log.warn("Timeout exception whilst waiting for processor metadata update", e);
                } catch (IOException e2) {
                    log.error("An exception occurred whilst waiting for processors", e2);
                    if (i == 3) {
                        throw e2;
                    }
                    i++;
                }
                if (ProcessorStatus.ERROR.equals(metaData.getProcessorStatus())) {
                    iOException = ApiUtils.exceptionFromCloudExError(metaData, str2);
                    log.error(str2 + " processor has failed", iOException);
                    break;
                }
                i = 0;
                if (z) {
                    break;
                }
            }
        }
        return iOException;
    }

    private Collection<String> getPartitionItems(TaskConfig taskConfig, PartitionConfig partitionConfig, String str) throws ClassInstantiationException, InstancePopulationException {
        Collection<String> collection;
        int intValue;
        PartitionType type = partitionConfig.getType();
        switch (type) {
            case COUNT:
                if (partitionConfig.getCount() != null) {
                    intValue = partitionConfig.getCount().intValue();
                } else {
                    Object resolveValue = this.context.resolveValue(partitionConfig.getCountRef());
                    Validate.notNull(resolveValue, "count reference is null or empty for task " + getTaskName(taskConfig), new Object[0]);
                    if (!(resolveValue instanceof Double)) {
                        throw new IllegalArgumentException("Expecting count reference of numeric type, found: " + resolveValue);
                    }
                    intValue = ((Double) resolveValue).intValue();
                }
                collection = new HashSet();
                for (int i = 0; i < intValue; i++) {
                    collection.add("Item" + i);
                }
                break;
            case FUNCTION:
                List<Partition> partition = this.partitionFunctionFactory.getPartitionFunction(partitionConfig, this.context).partition();
                if (partition != null && !partition.isEmpty()) {
                    collection = Partition.joinPartitionItems(partition);
                    this.context.put(partitionConfig.getOutput(), collection);
                    break;
                } else {
                    throw new IllegalArgumentException("empty partitions for task " + getTaskName(taskConfig));
                }
                break;
            case ITEMS:
                Object resolveValue2 = this.context.resolveValue(str);
                Validate.notNull(resolveValue2, "partition items are null or empty for task " + getTaskName(taskConfig), new Object[0]);
                if (!(resolveValue2 instanceof Collection)) {
                    throw new IllegalArgumentException("Expecting partition items of type Collection, found: " + resolveValue2);
                }
                collection = (Collection) resolveValue2;
                if (collection.isEmpty()) {
                    throw new IllegalArgumentException("empty partition items for task " + getTaskName(taskConfig));
                }
                if (!(collection.iterator().next() instanceof String)) {
                    throw new IllegalArgumentException("partition items must be a collection of strings");
                }
                break;
            default:
                throw new IllegalArgumentException("Invalid PartitionType: " + type);
        }
        return collection;
    }

    private void runTaskContinue(Task task) {
        try {
            task.run();
        } catch (IOException e) {
            log.warn("Task: " + getTaskName(task) + " has thrown an exception", e);
        }
    }

    private String getTaskName(Task task) {
        return task != null ? task.getClass().getName() : NO_TASK;
    }

    private String getTaskName(TaskConfig taskConfig) {
        String str = NO_TASK;
        if (taskConfig != null) {
            str = taskConfig.getClassName();
        }
        return str;
    }

    private void addTaskOutputToContext(Task task, Set<String> set) {
        Map<String, Object> output = task.getOutput();
        if (set != null) {
            for (String str : set) {
                if (output.containsKey(str)) {
                    this.context.put(str, output.get(str));
                }
            }
        }
    }

    private Object getJobDataValue(String str) {
        return this.job.getData().get(str);
    }

    protected final Context getContext() {
        return this.context;
    }

    protected final Set<String> getProcessors() {
        return this.processors;
    }

    protected Map<String, VmInstance> getProcessorInstances() {
        return this.processorInstances;
    }

    protected final Job getJob() {
        return this.job;
    }

    protected final PartitionFunctionFactory getPartitionFunctionFactory() {
        return this.partitionFunctionFactory;
    }

    protected final TaskFactory getTaskFactory() {
        return this.taskFactory;
    }

    protected final boolean isShutdownProcessors() {
        return this.shutdownProcessors;
    }
}
