package azkaban.executor;

import azkaban.alert.Alerter;
import azkaban.constants.ServerInternals;
import azkaban.constants.ServerProperties;
import azkaban.event.Event;
import azkaban.event.EventData;
import azkaban.event.EventHandler;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.selector.ExecutorComparator;
import azkaban.executor.selector.ExecutorFilter;
import azkaban.executor.selector.ExecutorSelector;
import azkaban.flow.SpecialJobTypes;
import azkaban.jobcallback.JobCallbackConstants;
import azkaban.metrics.CommonMetrics;
import azkaban.project.Project;
import azkaban.project.ProjectWhitelist;
import azkaban.scheduler.ScheduleStatisticManager;
import azkaban.user.XmlUserManager;
import azkaban.utils.FileIOUtils;
import azkaban.utils.FlowUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    */
/* loaded from: input_file:azkaban/executor/ExecutorManager.class */
public class ExecutorManager extends EventHandler implements ExecutorManagerAdapter {
    static final String AZKABAN_EXECUTOR_SELECTOR_FILTERS = "azkaban.executorselector.filters";
    static final String AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX = "azkaban.executorselector.comparator.";
    static final String AZKABAN_QUEUEPROCESSING_ENABLED = "azkaban.queueprocessing.enabled";
    static final String AZKABAN_USE_MULTIPLE_EXECUTORS = "azkaban.use.multiple.executors";
    private static final String AZKABAN_WEBSERVER_QUEUE_SIZE = "azkaban.webserver.queue.size";
    private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS = "azkaban.activeexecutor.refresh.milisecinterval";
    private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW = "azkaban.activeexecutor.refresh.flowinterval";
    private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS = "azkaban.executorinfo.refresh.maxThreads";
    private static final String AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED = "azkaban.maxDispatchingErrors";
    private static Logger logger = Logger.getLogger(ExecutorManager.class);
    private ExecutorLoader executorLoader;
    private CleanerThread cleanerThread;
    QueuedExecutions queuedFlows;
    private QueueProcessorThread queueProcessor;
    private ExecutingManagerUpdaterThread executingManager;
    private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 7257600000L;
    private Map<String, Alerter> alerters;
    File cacheDir;
    private final Props azkProps;
    private List<String> filterList;
    private Map<String, Integer> comparatorWeightsMap;
    private long lastSuccessfulExecutorInfoRefresh;
    private ExecutorService executorInforRefresherService;
    private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> runningFlows = new ConcurrentHashMap<>();
    private ConcurrentHashMap<Integer, ExecutableFlow> recentlyFinished = new ConcurrentHashMap<>();
    private final Set<Executor> activeExecutors = new HashSet();
    private volatile Pair<ExecutionReference, ExecutableFlow> runningCandidate = null;
    private long lastCleanerThreadCheckTime = -1;
    private long lastThreadCheckTime = -1;
    private String updaterStage = "not started";

    /* renamed from: azkaban.executor.ExecutorManager$1 */
    /* loaded from: input_file:azkaban/executor/ExecutorManager$1.class */
    public class AnonymousClass1 implements Callable<String> {
        final /* synthetic */ Executor val$executor;

        AnonymousClass1(Executor executor) {
            r5 = executor;
        }

        @Override // java.util.concurrent.Callable
        public String call() throws Exception {
            return ExecutorManager.this.callExecutorForJsonString(r5.getHost(), r5.getPort(), "/serverStatistics", null);
        }
    }

    /* renamed from: azkaban.executor.ExecutorManager$2 */
    /* loaded from: input_file:azkaban/executor/ExecutorManager$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$azkaban$executor$Status = new int[Status.values().length];

        static {
            try {
                $SwitchMap$azkaban$executor$Status[Status.SUCCEEDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$azkaban$executor$Status[Status.FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$azkaban$executor$Status[Status.KILLED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$azkaban$executor$Status[Status.SKIPPED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$azkaban$executor$Status[Status.DISABLED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$azkaban$executor$Status[Status.READY.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* loaded from: input_file:azkaban/executor/ExecutorManager$CleanerThread.class */
    private class CleanerThread extends Thread {
        private static final long CLEANER_THREAD_WAIT_INTERVAL_MS = 86400000;
        private final long executionLogsRetentionMs;
        private boolean shutdown = false;
        private long lastLogCleanTime = -1;

        public CleanerThread(long j) {
            this.executionLogsRetentionMs = j;
            setName("AzkabanWebServer-Cleaner-Thread");
        }

        public void shutdown() {
            this.shutdown = true;
            interrupt();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!this.shutdown) {
                synchronized (this) {
                    try {
                        ExecutorManager.access$1302(ExecutorManager.this, System.currentTimeMillis());
                        long currentTimeMillis = System.currentTimeMillis();
                        if (currentTimeMillis - CLEANER_THREAD_WAIT_INTERVAL_MS > this.lastLogCleanTime) {
                            cleanExecutionLogs();
                            this.lastLogCleanTime = currentTimeMillis;
                        }
                        wait(CLEANER_THREAD_WAIT_INTERVAL_MS);
                    } catch (InterruptedException e) {
                        ExecutorManager.logger.info("Interrupted. Probably to shut down.");
                    }
                }
            }
        }

        private void cleanExecutionLogs() {
            ExecutorManager.logger.info("Cleaning old logs from execution_logs");
            ExecutorManager.logger.info("Cleaning old log files before " + new DateTime(DateTime.now().getMillis() - this.executionLogsRetentionMs).toString());
            ExecutorManager.this.cleanOldExecutionLogs(DateTime.now().getMillis() - this.executionLogsRetentionMs);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:azkaban/executor/ExecutorManager$ExecutingManagerUpdaterThread.class */
    public class ExecutingManagerUpdaterThread extends Thread {
        private boolean shutdown = false;
        private long recentlyFinishedLifetimeMs = 600000;
        private int waitTimeIdleMs = 2000;
        private int waitTimeMs = 500;
        private int numErrors = 6;
        private long errorThreshold = 10000;

        public ExecutingManagerUpdaterThread() {
            setName("ExecutorManagerUpdaterThread");
        }

        public void shutdown() {
            this.shutdown = true;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!this.shutdown) {
                try {
                    ExecutorManager.access$202(ExecutorManager.this, System.currentTimeMillis());
                    ExecutorManager.this.updaterStage = "Starting update all flows.";
                    Map flowToExecutorMap = ExecutorManager.this.getFlowToExecutorMap();
                    ArrayList arrayList = new ArrayList();
                    ArrayList arrayList2 = new ArrayList();
                    if (flowToExecutorMap.size() > 0) {
                        for (Map.Entry entry : flowToExecutorMap.entrySet()) {
                            ArrayList arrayList3 = new ArrayList();
                            ArrayList arrayList4 = new ArrayList();
                            Executor executor = (Executor) entry.getKey();
                            ExecutorManager.this.updaterStage = "Starting update flows on " + executor.getHost() + JobCallbackConstants.HEADER_NAME_VALUE_DELIMITER + executor.getPort();
                            ExecutorManager.this.fillUpdateTimeAndExecId((List) entry.getValue(), arrayList4, arrayList3);
                            Map map = null;
                            try {
                                map = ExecutorManager.this.callExecutorServer(executor.getHost(), executor.getPort(), "update", null, null, new Pair("executionId", JSONUtils.toJSON(arrayList4)), new Pair(ConnectorParams.UPDATE_TIME_LIST_PARAM, JSONUtils.toJSON(arrayList3)));
                            } catch (IOException e) {
                                ExecutorManager.logger.error(e);
                                for (ExecutableFlow executableFlow : (List) entry.getValue()) {
                                    Pair pair = (Pair) ExecutorManager.this.runningFlows.get(Integer.valueOf(executableFlow.getExecutionId()));
                                    ExecutorManager.this.updaterStage = "Failed to get update. Doing some clean up for flow " + ((ExecutableFlow) pair.getSecond()).getExecutionId();
                                    if (pair != null) {
                                        ExecutionReference executionReference = (ExecutionReference) pair.getFirst();
                                        int numErrors = executionReference.getNumErrors();
                                        if (executionReference.getNumErrors() < this.numErrors) {
                                            executionReference.setNextCheckTime(System.currentTimeMillis() + this.errorThreshold);
                                            executionReference.setNumErrors(numErrors + 1);
                                        } else {
                                            ExecutorManager.logger.error("Evicting flow " + executableFlow.getExecutionId() + ". The executor is unresponsive.");
                                            arrayList2.add(pair.getSecond());
                                        }
                                    }
                                }
                            }
                            if (map != null) {
                                Iterator it = ((List) map.get(ConnectorParams.RESPONSE_UPDATED_FLOWS)).iterator();
                                while (it.hasNext()) {
                                    try {
                                        ExecutableFlow updateExecution = ExecutorManager.this.updateExecution((Map) it.next());
                                        ExecutorManager.this.updaterStage = "Updated flow " + updateExecution.getExecutionId();
                                        if (ExecutorManager.this.isFinished(updateExecution)) {
                                            arrayList.add(updateExecution);
                                            arrayList2.add(updateExecution);
                                        }
                                    } catch (ExecutorManagerException e2) {
                                        ExecutableFlow executableFlow2 = e2.getExecutableFlow();
                                        ExecutorManager.logger.error(e2);
                                        if (executableFlow2 != null) {
                                            ExecutorManager.logger.error("Finalizing flow " + executableFlow2.getExecutionId());
                                            arrayList2.add(executableFlow2);
                                        }
                                    }
                                }
                            }
                        }
                        ExecutorManager.this.updaterStage = "Evicting old recently finished flows.";
                        ExecutorManager.this.evictOldRecentlyFinished(this.recentlyFinishedLifetimeMs);
                        Iterator it2 = arrayList.iterator();
                        while (it2.hasNext()) {
                            ExecutableFlow executableFlow3 = (ExecutableFlow) it2.next();
                            if (executableFlow3.getScheduleId() >= 0 && executableFlow3.getStatus() == Status.SUCCEEDED) {
                                ScheduleStatisticManager.invalidateCache(executableFlow3.getScheduleId(), ExecutorManager.this.cacheDir);
                            }
                            ExecutorManager.this.fireEventListeners(Event.create(executableFlow3, Event.Type.FLOW_FINISHED, new EventData(executableFlow3.getStatus())));
                            ExecutorManager.this.recentlyFinished.put(Integer.valueOf(executableFlow3.getExecutionId()), executableFlow3);
                        }
                        ExecutorManager.this.updaterStage = "Finalizing " + arrayList2.size() + " error flows.";
                        Iterator it3 = arrayList2.iterator();
                        while (it3.hasNext()) {
                            ExecutorManager.this.finalizeFlows((ExecutableFlow) it3.next());
                        }
                    }
                    ExecutorManager.this.updaterStage = "Updated all active flows. Waiting for next round.";
                    synchronized (this) {
                        try {
                            if (ExecutorManager.this.runningFlows.size() > 0) {
                                wait(this.waitTimeMs);
                            } else {
                                wait(this.waitTimeIdleMs);
                            }
                        } catch (InterruptedException e3) {
                        }
                    }
                } catch (Exception e4) {
                    ExecutorManager.logger.error(e4);
                }
            }
        }
    }

    /* loaded from: input_file:azkaban/executor/ExecutorManager$QueueProcessorThread.class */
    public class QueueProcessorThread extends Thread {
        private static final long QUEUE_PROCESSOR_WAIT_IN_MS = 1000;
        private final int maxDispatchingErrors;
        private final long activeExecutorRefreshWindowInMilisec;
        private final int activeExecutorRefreshWindowInFlows;
        private volatile boolean shutdown = false;
        private volatile boolean isActive = true;

        public QueueProcessorThread(boolean z, long j, int i, int i2) {
            setActive(z);
            this.maxDispatchingErrors = i2;
            this.activeExecutorRefreshWindowInFlows = i;
            this.activeExecutorRefreshWindowInMilisec = j;
            setName("AzkabanWebServer-QueueProcessor-Thread");
        }

        public void setActive(boolean z) {
            this.isActive = z;
            ExecutorManager.logger.info("QueueProcessorThread active turned " + this.isActive);
        }

        public boolean isActive() {
            return this.isActive;
        }

        public void shutdown() {
            this.shutdown = true;
            interrupt();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!this.shutdown) {
                synchronized (this) {
                    try {
                        if (this.isActive) {
                            processQueuedFlows(this.activeExecutorRefreshWindowInMilisec, this.activeExecutorRefreshWindowInFlows);
                        }
                        wait(QUEUE_PROCESSOR_WAIT_IN_MS);
                    } catch (Exception e) {
                        ExecutorManager.logger.error("QueueProcessorThread Interrupted. Probably to shut down.", e);
                    }
                }
            }
        }

        private void processQueuedFlows(long j, int i) throws InterruptedException, ExecutorManagerException {
            long j2 = 0;
            int i2 = 0;
            while (isActive()) {
                if (ExecutorManager.this.runningCandidate = ExecutorManager.this.queuedFlows.fetchHead() == null) {
                    return;
                }
                ExecutionReference executionReference = (ExecutionReference) ExecutorManager.this.runningCandidate.getFirst();
                ExecutableFlow executableFlow = (ExecutableFlow) ExecutorManager.this.runningCandidate.getSecond();
                long currentTimeMillis = System.currentTimeMillis();
                if (currentTimeMillis - j2 > j || i2 >= i) {
                    ExecutorManager.this.refreshExecutors();
                    j2 = currentTimeMillis;
                    i2 = 0;
                }
                if (executableFlow.getUpdateTime() > j2) {
                    ExecutorManager.this.queuedFlows.enqueue(executableFlow, executionReference);
                    ExecutorManager.this.runningCandidate = null;
                    sleep(j - (currentTimeMillis - j2));
                } else {
                    executableFlow.setUpdateTime(currentTimeMillis);
                    selectExecutorAndDispatchFlow(executionReference, executableFlow, new HashSet(ExecutorManager.this.activeExecutors));
                    ExecutorManager.this.runningCandidate = null;
                }
                if (ExecutorManager.this.queuedFlows.getFlow(executableFlow.getExecutionId()) == null) {
                    i2++;
                }
            }
        }

        private void selectExecutorAndDispatchFlow(ExecutionReference executionReference, ExecutableFlow executableFlow, Set<Executor> set) throws ExecutorManagerException {
            synchronized (executableFlow) {
                Executor selectExecutor = selectExecutor(executableFlow, set);
                if (selectExecutor != null) {
                    try {
                        ExecutorManager.this.dispatch(executionReference, executableFlow, selectExecutor);
                    } catch (ExecutorManagerException e) {
                        ExecutorManager.logger.warn(String.format("Executor %s responded with exception for exec: %d", selectExecutor, Integer.valueOf(executableFlow.getExecutionId())), e);
                        handleDispatchExceptionCase(executionReference, executableFlow, selectExecutor, set);
                    }
                } else {
                    handleNoExecutorSelectedCase(executionReference, executableFlow);
                }
            }
        }

        private Executor getUserSpecifiedExecutor(ExecutionOptions executionOptions, int i) {
            Executor executor = null;
            if (executionOptions != null && executionOptions.getFlowParameters() != null && executionOptions.getFlowParameters().containsKey(ExecutionOptions.USE_EXECUTOR)) {
                try {
                    int intValue = Integer.valueOf(executionOptions.getFlowParameters().get(ExecutionOptions.USE_EXECUTOR)).intValue();
                    executor = ExecutorManager.this.fetchExecutor(intValue);
                    if (executor == null) {
                        ExecutorManager.logger.warn(String.format("User specified executor id: %d for execution id: %d is not active, Looking up db.", Integer.valueOf(intValue), Integer.valueOf(i)));
                        executor = ExecutorManager.this.executorLoader.fetchExecutor(intValue);
                        if (executor == null) {
                            ExecutorManager.logger.warn(String.format("User specified executor id: %d for execution id: %d is missing from db. Defaulting to availableExecutors", Integer.valueOf(intValue), Integer.valueOf(i)));
                        }
                    }
                } catch (ExecutorManagerException e) {
                    ExecutorManager.logger.error("Failed to fetch user specified executor for exec_id = " + i, e);
                }
            }
            return executor;
        }

        private Executor selectExecutor(ExecutableFlow executableFlow, Set<Executor> set) {
            Executor userSpecifiedExecutor = getUserSpecifiedExecutor(executableFlow.getExecutionOptions(), executableFlow.getExecutionId());
            if (userSpecifiedExecutor == null) {
                ExecutorManager.logger.info("Using dispatcher for execution id :" + executableFlow.getExecutionId());
                userSpecifiedExecutor = new ExecutorSelector(ExecutorManager.this.filterList, ExecutorManager.this.comparatorWeightsMap).getBest(set, executableFlow);
            }
            return userSpecifiedExecutor;
        }

        private void handleDispatchExceptionCase(ExecutionReference executionReference, ExecutableFlow executableFlow, Executor executor, Set<Executor> set) throws ExecutorManagerException {
            ExecutorManager.logger.info(String.format("Reached handleDispatchExceptionCase stage for exec %d with error count %d", Integer.valueOf(executableFlow.getExecutionId()), Integer.valueOf(executionReference.getNumErrors())));
            executionReference.setNumErrors(executionReference.getNumErrors() + 1);
            if (executionReference.getNumErrors() > this.maxDispatchingErrors || set.size() <= 1) {
                ExecutorManager.logger.error("Failed to process queued flow");
                ExecutorManager.this.finalizeFlows(executableFlow);
            } else {
                set.remove(executor);
                selectExecutorAndDispatchFlow(executionReference, executableFlow, set);
            }
        }

        private void handleNoExecutorSelectedCase(ExecutionReference executionReference, ExecutableFlow executableFlow) throws ExecutorManagerException {
            ExecutorManager.logger.info(String.format("Reached handleNoExecutorSelectedCase stage for exec %d with error count %d", Integer.valueOf(executableFlow.getExecutionId()), Integer.valueOf(executionReference.getNumErrors())));
            ExecutorManager.this.queuedFlows.enqueue(executableFlow, executionReference);
        }
    }

    public ExecutorManager(Props props, ExecutorLoader executorLoader, Map<String, Alerter> map) throws ExecutorManagerException {
        this.alerters = map;
        this.azkProps = props;
        this.executorLoader = executorLoader;
        setupExecutors();
        loadRunningFlows();
        this.queuedFlows = new QueuedExecutions(props.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000L));
        loadQueuedFlows();
        this.cacheDir = new File(props.getString("cache.directory", "cache"));
        this.executingManager = new ExecutingManagerUpdaterThread();
        this.executingManager.start();
        if (isMultiExecutorMode()) {
            setupMultiExecutorMode();
        }
        this.cleanerThread = new CleanerThread(props.getLong("execution.logs.retention.ms", DEFAULT_EXECUTION_LOGS_RETENTION_MS));
        this.cleanerThread.start();
    }

    private void setupMultiExecutorMode() {
        String string = this.azkProps.getString(AZKABAN_EXECUTOR_SELECTOR_FILTERS, "");
        if (string != null) {
            this.filterList = Arrays.asList(StringUtils.split(string, ","));
        }
        Map<String, String> mapByPrefix = this.azkProps.getMapByPrefix(AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX);
        if (mapByPrefix != null) {
            this.comparatorWeightsMap = new TreeMap();
            for (Map.Entry<String, String> entry : mapByPrefix.entrySet()) {
                this.comparatorWeightsMap.put(entry.getKey(), Integer.valueOf(entry.getValue()));
            }
        }
        this.executorInforRefresherService = Executors.newFixedThreadPool(this.azkProps.getInt(AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS, 5));
        this.queueProcessor = new QueueProcessorThread(this.azkProps.getBoolean(AZKABAN_QUEUEPROCESSING_ENABLED, true), this.azkProps.getLong(AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000L), this.azkProps.getInt(AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW, 5), this.azkProps.getInt(AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED, this.activeExecutors.size()));
        this.queueProcessor.start();
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void setupExecutors() throws ExecutorManagerException {
        HashSet hashSet = new HashSet();
        if (isMultiExecutorMode()) {
            logger.info("Initializing multi executors from database");
            hashSet.addAll(this.executorLoader.fetchActiveExecutors());
        } else if (this.azkProps.containsKey(ServerInternals.AZKABAN_EXECUTOR_PORT_FILENAME)) {
            String string = this.azkProps.getString(ServerProperties.EXECUTOR_HOST, "localhost");
            int i = this.azkProps.getInt(ServerInternals.AZKABAN_EXECUTOR_PORT_FILENAME);
            logger.info(String.format("Initializing local executor %s:%d", string, Integer.valueOf(i)));
            Executor fetchExecutor = this.executorLoader.fetchExecutor(string, i);
            if (fetchExecutor == null) {
                fetchExecutor = this.executorLoader.addExecutor(string, i);
            } else if (!fetchExecutor.isActive()) {
                fetchExecutor.setActive(true);
                this.executorLoader.updateExecutor(fetchExecutor);
            }
            hashSet.add(new Executor(fetchExecutor.getId(), string, i, true));
        }
        if (hashSet.isEmpty()) {
            logger.error("No active executor found");
            throw new ExecutorManagerException("No active executor found");
        }
        if (hashSet.size() > 1 && !isMultiExecutorMode()) {
            logger.error("Multiple local executors specified");
            throw new ExecutorManagerException("Multiple local executors specified");
        }
        this.activeExecutors.clear();
        this.activeExecutors.addAll(hashSet);
    }

    private boolean isMultiExecutorMode() {
        return this.azkProps.getBoolean(AZKABAN_USE_MULTIPLE_EXECUTORS, false);
    }

    public void refreshExecutors() {
        synchronized (this.activeExecutors) {
            ArrayList<Pair> arrayList = new ArrayList();
            for (Executor executor : this.activeExecutors) {
                arrayList.add(new Pair(executor, this.executorInforRefresherService.submit(new Callable<String>() { // from class: azkaban.executor.ExecutorManager.1
                    final /* synthetic */ Executor val$executor;

                    AnonymousClass1(Executor executor2) {
                        r5 = executor2;
                    }

                    @Override // java.util.concurrent.Callable
                    public String call() throws Exception {
                        return ExecutorManager.this.callExecutorForJsonString(r5.getHost(), r5.getPort(), "/serverStatistics", null);
                    }
                })));
            }
            boolean z = true;
            for (Pair pair : arrayList) {
                Executor executor2 = (Executor) pair.getFirst();
                executor2.setExecutorInfo(null);
                try {
                    String str = (String) ((Future) pair.getSecond()).get(5L, TimeUnit.SECONDS);
                    executor2.setExecutorInfo(ExecutorInfo.fromJSONString(str));
                    logger.info(String.format("Successfully refreshed executor: %s with executor info : %s", executor2, str));
                } catch (TimeoutException e) {
                    z = false;
                    logger.error("Timed out while waiting for ExecutorInfo refresh" + executor2, e);
                } catch (Exception e2) {
                    z = false;
                    logger.error("Failed to update ExecutorInfo for executor : " + executor2, e2);
                }
            }
            if (z) {
                this.lastSuccessfulExecutorInfoRefresh = System.currentTimeMillis();
            }
        }
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void disableQueueProcessorThread() throws ExecutorManagerException {
        if (!isMultiExecutorMode()) {
            throw new ExecutorManagerException("Cannot disable QueueProcessor in local mode");
        }
        this.queueProcessor.setActive(false);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void enableQueueProcessorThread() throws ExecutorManagerException {
        if (!isMultiExecutorMode()) {
            throw new ExecutorManagerException("Cannot enable QueueProcessor in local mode");
        }
        this.queueProcessor.setActive(true);
    }

    public Thread.State getQueueProcessorThreadState() {
        return isMultiExecutorMode() ? this.queueProcessor.getState() : Thread.State.NEW;
    }

    public boolean isQueueProcessorThreadActive() {
        if (isMultiExecutorMode()) {
            return this.queueProcessor.isActive();
        }
        return false;
    }

    public long getLastSuccessfulExecutorInfoRefresh() {
        return this.lastSuccessfulExecutorInfoRefresh;
    }

    public Set<String> getAvailableExecutorComparatorNames() {
        return ExecutorComparator.getAvailableComparatorNames();
    }

    public Set<String> getAvailableExecutorFilterNames() {
        return ExecutorFilter.getAvailableFilterNames();
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public Thread.State getExecutorManagerThreadState() {
        return this.executingManager.getState();
    }

    public String getExecutorThreadStage() {
        return this.updaterStage;
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public boolean isExecutorManagerThreadActive() {
        return this.executingManager.isAlive();
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public long getLastExecutorManagerThreadCheckTime() {
        return this.lastThreadCheckTime;
    }

    public long getLastCleanerThreadCheckTime() {
        return this.lastCleanerThreadCheckTime;
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public Collection<Executor> getAllActiveExecutors() {
        return Collections.unmodifiableCollection(this.activeExecutors);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public Executor fetchExecutor(int i) throws ExecutorManagerException {
        for (Executor executor : this.activeExecutors) {
            if (executor.getId() == i) {
                return executor;
            }
        }
        return this.executorLoader.fetchExecutor(i);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public Set<String> getPrimaryServerHosts() {
        HashSet hashSet = new HashSet();
        for (Executor executor : this.activeExecutors) {
            hashSet.add(executor.getHost() + JobCallbackConstants.HEADER_NAME_VALUE_DELIMITER + executor.getPort());
        }
        return hashSet;
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public Set<String> getAllActiveExecutorServerHosts() {
        HashSet hashSet = new HashSet();
        for (Executor executor : this.activeExecutors) {
            hashSet.add(executor.getHost() + JobCallbackConstants.HEADER_NAME_VALUE_DELIMITER + executor.getPort());
        }
        Iterator<Pair<ExecutionReference, ExecutableFlow>> it = this.runningFlows.values().iterator();
        while (it.hasNext()) {
            ExecutionReference first = it.next().getFirst();
            hashSet.add(first.getHost() + JobCallbackConstants.HEADER_NAME_VALUE_DELIMITER + first.getPort());
        }
        return hashSet;
    }

    private void loadRunningFlows() throws ExecutorManagerException {
        this.runningFlows.putAll(this.executorLoader.fetchActiveFlows());
    }

    private void loadQueuedFlows() throws ExecutorManagerException {
        List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows = this.executorLoader.fetchQueuedFlows();
        if (fetchQueuedFlows != null) {
            for (Pair<ExecutionReference, ExecutableFlow> pair : fetchQueuedFlows) {
                this.queuedFlows.enqueue(pair.getSecond(), pair.getFirst());
            }
        }
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public List<Integer> getRunningFlows(int i, String str) {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(getRunningFlowsHelper(i, str, this.queuedFlows.getAllEntries()));
        if (this.runningCandidate != null) {
            arrayList.addAll(getRunningFlowsHelper(i, str, Lists.newArrayList(new Pair[]{this.runningCandidate})));
        }
        arrayList.addAll(getRunningFlowsHelper(i, str, this.runningFlows.values()));
        Collections.sort(arrayList);
        return arrayList;
    }

    private List<Integer> getRunningFlowsHelper(int i, String str, Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
        ArrayList arrayList = new ArrayList();
        for (Pair<ExecutionReference, ExecutableFlow> pair : collection) {
            if (pair.getSecond().getFlowId().equals(str) && pair.getSecond().getProjectId() == i) {
                arrayList.add(Integer.valueOf(pair.getFirst().getExecId()));
            }
        }
        return arrayList;
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public List<Pair<ExecutableFlow, Executor>> getActiveFlowsWithExecutor() throws IOException {
        ArrayList arrayList = new ArrayList();
        getActiveFlowsWithExecutorHelper(arrayList, this.queuedFlows.getAllEntries());
        getActiveFlowsWithExecutorHelper(arrayList, this.runningFlows.values());
        return arrayList;
    }

    private void getActiveFlowsWithExecutorHelper(List<Pair<ExecutableFlow, Executor>> list, Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
        for (Pair<ExecutionReference, ExecutableFlow> pair : collection) {
            list.add(new Pair<>(pair.getSecond(), pair.getFirst().getExecutor()));
        }
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public boolean isFlowRunning(int i, String str) {
        return (0 != 0 || isFlowRunningHelper(i, str, this.queuedFlows.getAllEntries())) || isFlowRunningHelper(i, str, this.runningFlows.values());
    }

    private boolean isFlowRunningHelper(int i, String str, Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
        for (Pair<ExecutionReference, ExecutableFlow> pair : collection) {
            if (pair.getSecond().getProjectId() == i && pair.getSecond().getFlowId().equals(str)) {
                return true;
            }
        }
        return false;
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public ExecutableFlow getExecutableFlow(int i) throws ExecutorManagerException {
        return this.executorLoader.fetchExecutableFlow(i);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public List<ExecutableFlow> getRunningFlows() {
        ArrayList<ExecutableFlow> arrayList = new ArrayList<>();
        getActiveFlowHelper(arrayList, this.queuedFlows.getAllEntries());
        getActiveFlowHelper(arrayList, this.runningFlows.values());
        return arrayList;
    }

    private void getActiveFlowHelper(ArrayList<ExecutableFlow> arrayList, Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
        Iterator<Pair<ExecutionReference, ExecutableFlow>> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getSecond());
        }
    }

    public String getRunningFlowIds() {
        ArrayList arrayList = new ArrayList();
        getRunningFlowsIdsHelper(arrayList, this.queuedFlows.getAllEntries());
        getRunningFlowsIdsHelper(arrayList, this.runningFlows.values());
        Collections.sort(arrayList);
        return arrayList.toString();
    }

    public String getQueuedFlowIds() {
        ArrayList arrayList = new ArrayList();
        getRunningFlowsIdsHelper(arrayList, this.queuedFlows.getAllEntries());
        Collections.sort(arrayList);
        return arrayList.toString();
    }

    public long getQueuedFlowSize() {
        return this.queuedFlows.size();
    }

    private void getRunningFlowsIdsHelper(List<Integer> list, Collection<Pair<ExecutionReference, ExecutableFlow>> collection) {
        Iterator<Pair<ExecutionReference, ExecutableFlow>> it = collection.iterator();
        while (it.hasNext()) {
            list.add(Integer.valueOf(it.next().getSecond().getExecutionId()));
        }
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public List<ExecutableFlow> getRecentlyFinishedFlows() {
        return new ArrayList(this.recentlyFinished.values());
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public List<ExecutableFlow> getExecutableFlows(Project project, String str, int i, int i2) throws ExecutorManagerException {
        return this.executorLoader.fetchFlowHistory(project.getId(), str, i, i2);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public List<ExecutableFlow> getExecutableFlows(int i, int i2) throws ExecutorManagerException {
        return this.executorLoader.fetchFlowHistory(i, i2);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public List<ExecutableFlow> getExecutableFlows(String str, int i, int i2) throws ExecutorManagerException {
        return this.executorLoader.fetchFlowHistory(null, '%' + str + '%', null, 0, -1L, -1L, i, i2);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public List<ExecutableFlow> getExecutableFlows(String str, String str2, String str3, int i, long j, long j2, int i2, int i3) throws ExecutorManagerException {
        return this.executorLoader.fetchFlowHistory(str, str2, str3, i, j, j2, i2, i3);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public List<ExecutableJobInfo> getExecutableJobs(Project project, String str, int i, int i2) throws ExecutorManagerException {
        return this.executorLoader.fetchJobHistory(project.getId(), str, i, i2);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public int getNumberOfJobExecutions(Project project, String str) throws ExecutorManagerException {
        return this.executorLoader.fetchNumExecutableNodes(project.getId(), str);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public int getNumberOfExecutions(Project project, String str) throws ExecutorManagerException {
        return this.executorLoader.fetchNumExecutableFlows(project.getId(), str);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public FileIOUtils.LogData getExecutableFlowLog(ExecutableFlow executableFlow, int i, int i2) throws ExecutorManagerException {
        Pair<ExecutionReference, ExecutableFlow> pair = this.runningFlows.get(Integer.valueOf(executableFlow.getExecutionId()));
        if (pair == null) {
            return this.executorLoader.fetchLogs(executableFlow.getExecutionId(), "", 0, i, i2);
        }
        return FileIOUtils.LogData.createLogDataFromObject(callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION, new Pair<>("type", SpecialJobTypes.EMBEDDED_FLOW_TYPE), new Pair<>(ExecutorManagerAdapter.INFO_OFFSET, String.valueOf(i)), new Pair<>(ExecutorManagerAdapter.INFO_LENGTH, String.valueOf(i2))));
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public FileIOUtils.LogData getExecutionJobLog(ExecutableFlow executableFlow, String str, int i, int i2, int i3) throws ExecutorManagerException {
        Pair<ExecutionReference, ExecutableFlow> pair = this.runningFlows.get(Integer.valueOf(executableFlow.getExecutionId()));
        if (pair == null) {
            return this.executorLoader.fetchLogs(executableFlow.getExecutionId(), str, i3, i, i2);
        }
        return FileIOUtils.LogData.createLogDataFromObject(callExecutorServer(pair.getFirst(), ConnectorParams.LOG_ACTION, new Pair<>("type", "job"), new Pair<>(ConnectorParams.UPDATE_MAP_JOBID, str), new Pair<>(ExecutorManagerAdapter.INFO_OFFSET, String.valueOf(i)), new Pair<>(ExecutorManagerAdapter.INFO_LENGTH, String.valueOf(i2)), new Pair<>("attempt", String.valueOf(i3))));
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public List<Object> getExecutionJobStats(ExecutableFlow executableFlow, String str, int i) throws ExecutorManagerException {
        Pair<ExecutionReference, ExecutableFlow> pair = this.runningFlows.get(Integer.valueOf(executableFlow.getExecutionId()));
        if (pair == null) {
            return this.executorLoader.fetchAttachments(executableFlow.getExecutionId(), str, i);
        }
        return (List) callExecutorServer(pair.getFirst(), ConnectorParams.ATTACHMENTS_ACTION, new Pair<>(ConnectorParams.UPDATE_MAP_JOBID, str), new Pair<>("attempt", String.valueOf(i))).get(ConnectorParams.ATTACHMENTS_ACTION);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public FileIOUtils.JobMetaData getExecutionJobMetaData(ExecutableFlow executableFlow, String str, int i, int i2, int i3) throws ExecutorManagerException {
        Pair<ExecutionReference, ExecutableFlow> pair = this.runningFlows.get(Integer.valueOf(executableFlow.getExecutionId()));
        if (pair == null) {
            return null;
        }
        return FileIOUtils.JobMetaData.createJobMetaDataFromObject(callExecutorServer(pair.getFirst(), ConnectorParams.METADATA_ACTION, new Pair<>("type", "job"), new Pair<>(ConnectorParams.UPDATE_MAP_JOBID, str), new Pair<>(ExecutorManagerAdapter.INFO_OFFSET, String.valueOf(i)), new Pair<>(ExecutorManagerAdapter.INFO_LENGTH, String.valueOf(i2)), new Pair<>("attempt", String.valueOf(i3))));
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void cancelFlow(ExecutableFlow executableFlow, String str) throws ExecutorManagerException {
        synchronized (executableFlow) {
            if (this.runningFlows.containsKey(Integer.valueOf(executableFlow.getExecutionId()))) {
                callExecutorServer(this.runningFlows.get(Integer.valueOf(executableFlow.getExecutionId())).getFirst(), ConnectorParams.CANCEL_ACTION, str);
            } else {
                if (!this.queuedFlows.hasExecution(executableFlow.getExecutionId())) {
                    throw new ExecutorManagerException("Execution " + executableFlow.getExecutionId() + " of flow " + executableFlow.getFlowId() + " isn't running.");
                }
                this.queuedFlows.dequeue(executableFlow.getExecutionId());
                finalizeFlows(executableFlow);
            }
        }
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void resumeFlow(ExecutableFlow executableFlow, String str) throws ExecutorManagerException {
        synchronized (executableFlow) {
            Pair<ExecutionReference, ExecutableFlow> pair = this.runningFlows.get(Integer.valueOf(executableFlow.getExecutionId()));
            if (pair == null) {
                throw new ExecutorManagerException("Execution " + executableFlow.getExecutionId() + " of flow " + executableFlow.getFlowId() + " isn't running.");
            }
            callExecutorServer(pair.getFirst(), ConnectorParams.RESUME_ACTION, str);
        }
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void pauseFlow(ExecutableFlow executableFlow, String str) throws ExecutorManagerException {
        synchronized (executableFlow) {
            Pair<ExecutionReference, ExecutableFlow> pair = this.runningFlows.get(Integer.valueOf(executableFlow.getExecutionId()));
            if (pair == null) {
                throw new ExecutorManagerException("Execution " + executableFlow.getExecutionId() + " of flow " + executableFlow.getFlowId() + " isn't running.");
            }
            callExecutorServer(pair.getFirst(), ConnectorParams.PAUSE_ACTION, str);
        }
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void pauseExecutingJobs(ExecutableFlow executableFlow, String str, String... strArr) throws ExecutorManagerException {
        modifyExecutingJobs(executableFlow, ConnectorParams.MODIFY_PAUSE_JOBS, str, strArr);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void resumeExecutingJobs(ExecutableFlow executableFlow, String str, String... strArr) throws ExecutorManagerException {
        modifyExecutingJobs(executableFlow, ConnectorParams.MODIFY_RESUME_JOBS, str, strArr);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void retryFailures(ExecutableFlow executableFlow, String str) throws ExecutorManagerException {
        modifyExecutingJobs(executableFlow, ConnectorParams.MODIFY_RETRY_FAILURES, str, new String[0]);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void retryExecutingJobs(ExecutableFlow executableFlow, String str, String... strArr) throws ExecutorManagerException {
        modifyExecutingJobs(executableFlow, ConnectorParams.MODIFY_RETRY_JOBS, str, strArr);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void disableExecutingJobs(ExecutableFlow executableFlow, String str, String... strArr) throws ExecutorManagerException {
        modifyExecutingJobs(executableFlow, ConnectorParams.MODIFY_DISABLE_JOBS, str, strArr);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void enableExecutingJobs(ExecutableFlow executableFlow, String str, String... strArr) throws ExecutorManagerException {
        modifyExecutingJobs(executableFlow, ConnectorParams.MODIFY_ENABLE_JOBS, str, strArr);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void cancelExecutingJobs(ExecutableFlow executableFlow, String str, String... strArr) throws ExecutorManagerException {
        modifyExecutingJobs(executableFlow, ConnectorParams.MODIFY_CANCEL_JOBS, str, strArr);
    }

    private Map<String, Object> modifyExecutingJobs(ExecutableFlow executableFlow, String str, String str2, String... strArr) throws ExecutorManagerException {
        Map<String, Object> callExecutorServer;
        Map<String, Object> map;
        synchronized (executableFlow) {
            Pair<ExecutionReference, ExecutableFlow> pair = this.runningFlows.get(Integer.valueOf(executableFlow.getExecutionId()));
            if (pair == null) {
                throw new ExecutorManagerException("Execution " + executableFlow.getExecutionId() + " of flow " + executableFlow.getFlowId() + " isn't running.");
            }
            if (strArr == null || strArr.length <= 0) {
                callExecutorServer = callExecutorServer(pair.getFirst(), "modifyExecution", str2, new Pair<>(ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, str));
            } else {
                for (String str3 : strArr) {
                    if (!str3.isEmpty() && executableFlow.getExecutableNode(str3) == null) {
                        throw new ExecutorManagerException("Job " + str3 + " doesn't exist in execution " + executableFlow.getExecutionId() + ".");
                    }
                }
                callExecutorServer = callExecutorServer(pair.getFirst(), "modifyExecution", str2, new Pair<>(ConnectorParams.MODIFY_EXECUTION_ACTION_TYPE, str), new Pair<>(ConnectorParams.MODIFY_JOBS_LIST, StringUtils.join(strArr, ',')));
            }
            map = callExecutorServer;
        }
        return map;
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public String submitExecutableFlow(ExecutableFlow executableFlow, String str) throws ExecutorManagerException {
        String str2;
        String str3;
        synchronized ((executableFlow.getProjectName() + "." + executableFlow.getId() + ".submitFlow").intern()) {
            String flowId = executableFlow.getFlowId();
            logger.info("Submitting execution flow " + flowId + " by " + str);
            String str4 = "";
            if (this.queuedFlows.isFull()) {
                str2 = String.format("Failed to submit %s for project %s. Azkaban has overrun its webserver queue capacity", flowId, executableFlow.getProjectName());
                logger.error(str2);
            } else {
                int projectId = executableFlow.getProjectId();
                executableFlow.setSubmitUser(str);
                executableFlow.setSubmitTime(System.currentTimeMillis());
                List<Integer> runningFlows = getRunningFlows(projectId, flowId);
                ExecutionOptions executionOptions = executableFlow.getExecutionOptions();
                if (executionOptions == null) {
                    executionOptions = new ExecutionOptions();
                }
                if (executionOptions.getDisabledJobs() != null) {
                    FlowUtils.applyDisabledJobs(executionOptions.getDisabledJobs(), executableFlow);
                }
                if (!runningFlows.isEmpty()) {
                    if (executionOptions.getConcurrentOption().equals(ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
                        Collections.sort(runningFlows);
                        Integer num = runningFlows.get(runningFlows.size() - 1);
                        executionOptions.setPipelineExecutionId(num);
                        str4 = "Flow " + flowId + " is already running with exec id " + num + ". Pipelining level " + executionOptions.getPipelineLevel() + ". \n";
                    } else {
                        if (executionOptions.getConcurrentOption().equals(ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
                            throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.", ExecutorManagerException.Reason.SkippedExecution);
                        }
                        str4 = "Flow " + flowId + " is already running with exec id " + StringUtils.join(runningFlows, ",") + ". Will execute concurrently. \n";
                    }
                }
                executionOptions.setMemoryCheck(!ProjectWhitelist.isProjectWhitelisted(executableFlow.getProjectId(), ProjectWhitelist.WhitelistType.MemoryCheck));
                this.executorLoader.uploadExecutableFlow(executableFlow);
                ExecutionReference executionReference = new ExecutionReference(executableFlow.getExecutionId());
                if (isMultiExecutorMode()) {
                    this.executorLoader.addActiveExecutableReference(executionReference);
                    this.queuedFlows.enqueue(executableFlow, executionReference);
                } else {
                    Executor next = this.activeExecutors.iterator().next();
                    this.executorLoader.addActiveExecutableReference(executionReference);
                    try {
                        dispatch(executionReference, executableFlow, next);
                    } catch (ExecutorManagerException e) {
                        this.executorLoader.removeActiveExecutableReference(executionReference.getExecId());
                        throw e;
                    }
                }
                str2 = str4 + "Execution submitted successfully with exec id " + executableFlow.getExecutionId();
            }
            str3 = str2;
        }
        return str3;
    }

    public void cleanOldExecutionLogs(long j) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            logger.info("Cleaned up " + this.executorLoader.removeExecutionLogsByTime(j) + " log entries.");
        } catch (ExecutorManagerException e) {
            logger.error("log clean up failed. ", e);
        }
        logger.info("log clean up time: " + ((System.currentTimeMillis() - currentTimeMillis) / 1000) + " seconds.");
    }

    private Map<String, Object> callExecutorServer(ExecutableFlow executableFlow, Executor executor, String str) throws ExecutorManagerException {
        try {
            return callExecutorServer(executor.getHost(), executor.getPort(), str, Integer.valueOf(executableFlow.getExecutionId()), null, (Pair[]) null);
        } catch (IOException e) {
            throw new ExecutorManagerException(e);
        }
    }

    private Map<String, Object> callExecutorServer(ExecutionReference executionReference, String str, String str2) throws ExecutorManagerException {
        try {
            return callExecutorServer(executionReference.getHost(), executionReference.getPort(), str, Integer.valueOf(executionReference.getExecId()), str2, (Pair[]) null);
        } catch (IOException e) {
            throw new ExecutorManagerException(e);
        }
    }

    private Map<String, Object> callExecutorServer(ExecutionReference executionReference, String str, Pair<String, String>... pairArr) throws ExecutorManagerException {
        try {
            return callExecutorServer(executionReference.getHost(), executionReference.getPort(), str, Integer.valueOf(executionReference.getExecId()), null, pairArr);
        } catch (IOException e) {
            throw new ExecutorManagerException(e);
        }
    }

    private Map<String, Object> callExecutorServer(ExecutionReference executionReference, String str, String str2, Pair<String, String>... pairArr) throws ExecutorManagerException {
        try {
            return callExecutorServer(executionReference.getHost(), executionReference.getPort(), str, Integer.valueOf(executionReference.getExecId()), str2, pairArr);
        } catch (IOException e) {
            throw new ExecutorManagerException(e);
        }
    }

    public Map<String, Object> callExecutorServer(String str, int i, String str2, Integer num, String str3, Pair<String, String>... pairArr) throws IOException {
        ArrayList arrayList = new ArrayList();
        if (pairArr != null) {
            arrayList.addAll(Arrays.asList(pairArr));
        }
        arrayList.add(new Pair<>("action", str2));
        arrayList.add(new Pair<>(ConnectorParams.EXECID_PARAM, String.valueOf(num)));
        arrayList.add(new Pair<>("user", str3));
        return callExecutorForJsonObject(str, i, "/executor", arrayList);
    }

    private Map<String, Object> callExecutorForJsonObject(String str, int i, String str2, List<Pair<String, String>> list) throws IOException {
        Map<String, Object> map = (Map) JSONUtils.parseJSONFromString(callExecutorForJsonString(str, i, str2, list));
        String str3 = (String) map.get("error");
        if (str3 != null) {
            throw new IOException(str3);
        }
        return map;
    }

    public String callExecutorForJsonString(String str, int i, String str2, List<Pair<String, String>> list) throws IOException {
        if (list == null) {
            list = new ArrayList();
        }
        return ExecutorApiClient.getInstance().httpGet(ExecutorApiClient.buildUri(str, i, str2, true, (Pair[]) list.toArray(new Pair[0])), null);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public Map<String, Object> callExecutorStats(int i, String str, Pair<String, String>... pairArr) throws IOException, ExecutorManagerException {
        Executor fetchExecutor = fetchExecutor(i);
        ArrayList arrayList = new ArrayList();
        if (pairArr != null) {
            arrayList.addAll(Arrays.asList(pairArr));
        }
        arrayList.add(new Pair<>("action", str));
        return callExecutorForJsonObject(fetchExecutor.getHost(), fetchExecutor.getPort(), "/stats", arrayList);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public Map<String, Object> callExecutorJMX(String str, String str2, String str3) throws IOException {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Pair<>(str2, ""));
        if (str3 != null) {
            arrayList.add(new Pair<>(ConnectorParams.JMX_MBEAN, str3));
        }
        String[] split = str.split(JobCallbackConstants.HEADER_NAME_VALUE_DELIMITER);
        return callExecutorForJsonObject(split[0], Integer.valueOf(split[1]).intValue(), "/jmx", arrayList);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public void shutdown() {
        if (isMultiExecutorMode()) {
            this.queueProcessor.shutdown();
        }
        this.executingManager.shutdown();
    }

    public void finalizeFlows(ExecutableFlow executableFlow) {
        ExecutableFlow fetchExecutableFlow;
        int executionId = executableFlow.getExecutionId();
        boolean z = true;
        this.updaterStage = "finalizing flow " + executionId;
        try {
            if (isFinished(executableFlow)) {
                fetchExecutableFlow = executableFlow;
            } else {
                this.updaterStage = "finalizing flow " + executionId + " loading from db";
                fetchExecutableFlow = this.executorLoader.fetchExecutableFlow(executionId);
                if (!isFinished(fetchExecutableFlow)) {
                    this.updaterStage = "finalizing flow " + executionId + " failing the flow";
                    failEverything(fetchExecutableFlow);
                    this.executorLoader.updateExecutableFlow(fetchExecutableFlow);
                }
            }
            this.updaterStage = "finalizing flow " + executionId + " deleting active reference";
            if (executableFlow.getEndTime() == -1) {
                executableFlow.setEndTime(System.currentTimeMillis());
                this.executorLoader.updateExecutableFlow(fetchExecutableFlow);
            }
            this.executorLoader.removeActiveExecutableReference(executionId);
            this.updaterStage = "finalizing flow " + executionId + " cleaning from memory";
            this.runningFlows.remove(Integer.valueOf(executionId));
            fireEventListeners(Event.create(fetchExecutableFlow, Event.Type.FLOW_FINISHED, new EventData(fetchExecutableFlow.getStatus())));
            this.recentlyFinished.put(Integer.valueOf(executionId), fetchExecutableFlow);
        } catch (ExecutorManagerException e) {
            z = false;
            logger.error(e);
        }
        this.updaterStage = "finalizing flow " + executionId + " alerting and emailing";
        if (z) {
            ExecutionOptions executionOptions = executableFlow.getExecutionOptions();
            Alerter alerter = this.alerters.get(XmlUserManager.EMAIL_ATTR);
            if (executableFlow.getStatus() == Status.FAILED || executableFlow.getStatus() == Status.KILLED) {
                if (executionOptions.getFailureEmails() != null && !executionOptions.getFailureEmails().isEmpty()) {
                    try {
                        alerter.alertOnError(executableFlow, new String[0]);
                    } catch (Exception e2) {
                        logger.error(e2);
                    }
                }
                if (executionOptions.getFlowParameters().containsKey("alert.type")) {
                    String str = executionOptions.getFlowParameters().get("alert.type");
                    Alerter alerter2 = this.alerters.get(str);
                    if (alerter2 == null) {
                        logger.error("Alerter type " + str + " doesn't exist. Failed to alert.");
                        return;
                    }
                    try {
                        alerter2.alertOnError(executableFlow, new String[0]);
                        return;
                    } catch (Exception e3) {
                        e3.printStackTrace();
                        logger.error("Failed to alert by " + str);
                        return;
                    }
                }
                return;
            }
            if (executionOptions.getSuccessEmails() != null && !executionOptions.getSuccessEmails().isEmpty()) {
                try {
                    alerter.alertOnSuccess(executableFlow);
                } catch (Exception e4) {
                    logger.error(e4);
                }
            }
            if (executionOptions.getFlowParameters().containsKey("alert.type")) {
                String str2 = executionOptions.getFlowParameters().get("alert.type");
                Alerter alerter3 = this.alerters.get(str2);
                if (alerter3 == null) {
                    logger.error("Alerter type " + str2 + " doesn't exist. Failed to alert.");
                    return;
                }
                try {
                    alerter3.alertOnSuccess(executableFlow);
                } catch (Exception e5) {
                    e5.printStackTrace();
                    logger.error("Failed to alert by " + str2);
                }
            }
        }
    }

    private void failEverything(ExecutableFlow executableFlow) {
        long currentTimeMillis = System.currentTimeMillis();
        for (ExecutableNode executableNode : executableFlow.getExecutableNodes()) {
            switch (AnonymousClass2.$SwitchMap$azkaban$executor$Status[executableNode.getStatus().ordinal()]) {
                case ConnectorParams.NODE_STATUS_INDEX /* 1 */:
                case ConnectorParams.NODE_START_INDEX /* 2 */:
                case 3:
                case 4:
                case ExecutionOptions.DEFAULT_FLOW_PRIORITY /* 5 */:
                    break;
                case 6:
                    executableNode.setStatus(Status.KILLED);
                    break;
                default:
                    executableNode.setStatus(Status.FAILED);
                    break;
            }
            if (executableNode.getStartTime() == -1) {
                executableNode.setStartTime(currentTimeMillis);
            }
            if (executableNode.getEndTime() == -1) {
                executableNode.setEndTime(currentTimeMillis);
            }
        }
        if (executableFlow.getEndTime() == -1) {
            executableFlow.setEndTime(currentTimeMillis);
        }
        executableFlow.setStatus(Status.FAILED);
    }

    public void evictOldRecentlyFinished(long j) {
        ArrayList arrayList = new ArrayList(this.recentlyFinished.keySet());
        long currentTimeMillis = System.currentTimeMillis() - j;
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Integer num = (Integer) it.next();
            if (this.recentlyFinished.get(num).getEndTime() < currentTimeMillis) {
                this.recentlyFinished.remove(num);
            }
        }
    }

    public ExecutableFlow updateExecution(Map<String, Object> map) throws ExecutorManagerException {
        Integer num = (Integer) map.get("executionId");
        if (num == null) {
            throw new ExecutorManagerException("Response is malformed. Need exec id to update.");
        }
        Pair<ExecutionReference, ExecutableFlow> pair = this.runningFlows.get(num);
        if (pair == null) {
            throw new ExecutorManagerException("No running flow found with the execution id. Removing " + num);
        }
        ExecutionReference first = pair.getFirst();
        ExecutableFlow second = pair.getSecond();
        if (map.containsKey("error")) {
            throw new ExecutorManagerException((String) map.get("error"), second);
        }
        first.setNextCheckTime(0L);
        first.setNumErrors(0);
        Status status = second.getStatus();
        second.applyUpdateObject(map);
        Status status2 = second.getStatus();
        if (status != status2 && status2 == Status.FAILED) {
            CommonMetrics.INSTANCE.markFlowFail();
        }
        ExecutionOptions executionOptions = second.getExecutionOptions();
        if (status != status2 && status2.equals(Status.FAILED_FINISHING)) {
            if (executionOptions.getNotifyOnFirstFailure()) {
                try {
                    this.alerters.get(XmlUserManager.EMAIL_ATTR).alertOnFirstError(second);
                } catch (Exception e) {
                    e.printStackTrace();
                    logger.error("Failed to send first error email." + e.getMessage());
                }
            }
            if (executionOptions.getFlowParameters().containsKey("alert.type")) {
                String str = executionOptions.getFlowParameters().get("alert.type");
                Alerter alerter = this.alerters.get(str);
                if (alerter != null) {
                    try {
                        alerter.alertOnFirstError(second);
                    } catch (Exception e2) {
                        e2.printStackTrace();
                        logger.error("Failed to alert by " + str);
                    }
                } else {
                    logger.error("Alerter type " + str + " doesn't exist. Failed to alert.");
                }
            }
        }
        return second;
    }

    public boolean isFinished(ExecutableFlow executableFlow) {
        switch (AnonymousClass2.$SwitchMap$azkaban$executor$Status[executableFlow.getStatus().ordinal()]) {
            case ConnectorParams.NODE_STATUS_INDEX /* 1 */:
            case ConnectorParams.NODE_START_INDEX /* 2 */:
            case 3:
                return true;
            default:
                return false;
        }
    }

    public void fillUpdateTimeAndExecId(List<ExecutableFlow> list, List<Integer> list2, List<Long> list3) {
        for (ExecutableFlow executableFlow : list) {
            list2.add(Integer.valueOf(executableFlow.getExecutionId()));
            list3.add(Long.valueOf(executableFlow.getUpdateTime()));
        }
    }

    public Map<Executor, List<ExecutableFlow>> getFlowToExecutorMap() {
        HashMap hashMap = new HashMap();
        for (Pair<ExecutionReference, ExecutableFlow> pair : this.runningFlows.values()) {
            ExecutionReference first = pair.getFirst();
            ExecutableFlow second = pair.getSecond();
            Executor executor = first.getExecutor();
            if (first.getNextCheckTime() < System.currentTimeMillis()) {
                List list = (List) hashMap.get(executor);
                if (list == null) {
                    list = new ArrayList();
                    hashMap.put(executor, list);
                }
                list.add(second);
            }
        }
        return hashMap;
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public int getExecutableFlows(int i, String str, int i2, int i3, List<ExecutableFlow> list) throws ExecutorManagerException {
        list.addAll(this.executorLoader.fetchFlowHistory(i, str, i2, i3));
        return this.executorLoader.fetchNumExecutableFlows(i, str);
    }

    @Override // azkaban.executor.ExecutorManagerAdapter
    public List<ExecutableFlow> getExecutableFlows(int i, String str, int i2, int i3, Status status) throws ExecutorManagerException {
        return this.executorLoader.fetchFlowHistory(i, str, i2, i3, status);
    }

    public void dispatch(ExecutionReference executionReference, ExecutableFlow executableFlow, Executor executor) throws ExecutorManagerException {
        executableFlow.setUpdateTime(System.currentTimeMillis());
        this.executorLoader.assignExecutor(executor.getId(), executableFlow.getExecutionId());
        try {
            callExecutorServer(executableFlow, executor, ConnectorParams.EXECUTE_ACTION);
            executionReference.setExecutor(executor);
            this.runningFlows.put(Integer.valueOf(executableFlow.getExecutionId()), new Pair<>(executionReference, executableFlow));
            logger.info(String.format("Successfully dispatched exec %d with error count %d", Integer.valueOf(executableFlow.getExecutionId()), Integer.valueOf(executionReference.getNumErrors())));
        } catch (ExecutorManagerException e) {
            logger.error("Rolling back executor assignment for execution id:" + executableFlow.getExecutionId(), e);
            this.executorLoader.unassignExecutor(executableFlow.getExecutionId());
            throw new ExecutorManagerException(e);
        }
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: azkaban.executor.ExecutorManager.access$202(azkaban.executor.ExecutorManager, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$202(azkaban.executor.ExecutorManager r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.lastThreadCheckTime = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: azkaban.executor.ExecutorManager.access$202(azkaban.executor.ExecutorManager, long):long");
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: azkaban.executor.ExecutorManager.access$1302(azkaban.executor.ExecutorManager, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$1302(azkaban.executor.ExecutorManager r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.lastCleanerThreadCheckTime = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: azkaban.executor.ExecutorManager.access$1302(azkaban.executor.ExecutorManager, long):long");
    }

    static {
    }
}
