/*
 * Decompiled with CFR 0.152.
 */
package org.apache.samza.autoscaling.deployer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import joptsimple.OptionSet;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.samza.autoscaling.utils.YarnUtil;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.SamzaContainer;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.coordinator.stream.messages.SetConfig;
import org.apache.samza.job.JobRunner;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.SystemStreamPartitionIterator;
import org.apache.samza.util.CommandLine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConfigManager {
    private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
    private SystemStreamPartitionIterator coordinatorStreamIterator;
    private static final Logger log = LoggerFactory.getLogger(ConfigManager.class);
    private final long defaultPollingInterval = 100L;
    private final int defaultReadJobModelDelayMs = 100;
    private final long interval;
    private String coordinatorServerURL = null;
    private final String jobName;
    private final int jobID;
    private Config config;
    private YarnUtil yarnUtil;
    private final String rmAddressOpt = "yarn.rm.address";
    private final String rmPortOpt = "yarn.rm.port";
    private final String pollingIntervalOpt = "configManager.polling.interval";
    private static final String SERVER_URL_OPT = "samza.autoscaling.server.url";
    private static final String YARN_CONTAINER_COUNT_OPT = "yarn.container.count";

    public ConfigManager(Config config) {
        if (!config.containsKey((Object)"yarn.rm.address") || !config.containsKey((Object)"yarn.rm.port")) {
            throw new IllegalArgumentException("Missing config: the config file does not contain the rm host or port.");
        }
        String rmAddress = (String)config.get((Object)"yarn.rm.address");
        int rmPort = config.getInt("yarn.rm.port");
        if (!config.containsKey((Object)JobConfig.JOB_NAME())) {
            throw new IllegalArgumentException("Missing config: the config does not contain the job name");
        }
        this.jobName = (String)config.get((Object)JobConfig.JOB_NAME());
        this.jobID = config.getInt(JobConfig.JOB_ID(), 1);
        if (config.containsKey((Object)"configManager.polling.interval")) {
            long pollingInterval = config.getLong("configManager.polling.interval");
            if (pollingInterval <= 0L) {
                throw new IllegalArgumentException("polling interval should be greater than 0");
            }
            this.interval = pollingInterval;
        } else {
            this.interval = 100L;
        }
        this.config = config;
        this.coordinatorStreamConsumer = new CoordinatorStreamSystemConsumer(config, (MetricsRegistry)new MetricsRegistryMap());
        this.yarnUtil = new YarnUtil(rmAddress, rmPort);
    }

    private void run() {
        this.start();
        try {
            try {
                while (true) {
                    Thread.sleep(this.interval);
                    this.processConfigMessages();
                }
            }
            catch (InterruptedException e) {
                e.printStackTrace();
                log.warn("Got interrupt in config manager thread, so shutting down");
                Thread.currentThread().interrupt();
                log.info("Stopping the config manager");
                this.stop();
            }
        }
        catch (Throwable throwable) {
            log.info("Stopping the config manager");
            this.stop();
            throw throwable;
        }
    }

    private void start() {
        this.register();
        this.coordinatorStreamConsumer.start();
        this.coordinatorStreamIterator = this.coordinatorStreamConsumer.getStartIterator();
        this.bootstrap();
    }

    private void stop() {
        this.coordinatorStreamConsumer.stop();
        this.coordinatorServerURL = null;
        this.yarnUtil.stop();
    }

    private void register() {
        this.coordinatorStreamConsumer.register();
    }

    private void bootstrap() {
        LinkedList<String> keysToProcess = new LinkedList<String>();
        keysToProcess.add(SERVER_URL_OPT);
        this.processConfigMessages(keysToProcess);
        if (this.coordinatorServerURL == null) {
            throw new IllegalStateException("coordinator server url is null, while the bootstrap has finished ");
        }
        log.info("Config manager bootstrapped");
    }

    private void skipUnreadMessages() {
        this.processConfigMessages(Collections.emptyList());
        log.info("Config manager skipped messages");
    }

    private void processConfigMessages() {
        List<String> keysToProcess = Arrays.asList(YARN_CONTAINER_COUNT_OPT, SERVER_URL_OPT);
        this.processConfigMessages(keysToProcess);
    }

    private void processConfigMessages(List<String> keysToProcess) {
        if (!this.coordinatorStreamConsumer.hasNewMessages(this.coordinatorStreamIterator)) {
            return;
        }
        if (keysToProcess == null) {
            throw new IllegalArgumentException("The keys to process list is null");
        }
        for (CoordinatorStreamMessage message : this.coordinatorStreamConsumer.getUnreadMessages(this.coordinatorStreamIterator, "set-config")) {
            String key = null;
            try {
                SetConfig setConfigMessage = new SetConfig(message);
                key = setConfigMessage.getKey();
                Map valuesMap = (Map)setConfigMessage.getMessageMap().get("values");
                String value = null;
                if (valuesMap != null) {
                    value = (String)valuesMap.get("value");
                }
                log.debug("Received set-config message with key: " + key + " and value: " + value);
                if (!keysToProcess.contains(key)) continue;
                if (key.equals(YARN_CONTAINER_COUNT_OPT)) {
                    this.handleYarnContainerChange(value);
                    continue;
                }
                if (key.equals(SERVER_URL_OPT)) {
                    this.handleServerURLChange(value);
                    continue;
                }
                log.info("Setting the " + key + " configuration is currently not supported, skipping the message");
            }
            catch (Exception e) {
                log.error("Error in reading a message, skipping message with key " + key);
            }
        }
    }

    private void handleServerURLChange(String newServerURL) {
        this.coordinatorServerURL = newServerURL;
        log.info("Server URL being set to " + newServerURL);
    }

    private void handleYarnContainerChange(String containerCountAsString) throws IOException, YarnException {
        String applicationId = this.yarnUtil.getRunningAppId(this.jobName, this.jobID);
        int containerCount = Integer.valueOf(containerCountAsString);
        int currentNumTask = this.getCurrentNumTasks();
        int currentNumContainers = this.getCurrentNumContainers();
        if (containerCount == currentNumContainers) {
            log.error("The new number of containers is equal to the current number of containers, skipping this message");
            return;
        }
        if (containerCount <= 0) {
            log.error("The number of containers cannot be zero or less, skipping this message");
            return;
        }
        if (containerCount > currentNumTask) {
            log.error("The number of containers cannot be more than the number of task, skipping this message");
            return;
        }
        log.info("Killing the current job");
        this.yarnUtil.killApplication(applicationId);
        this.coordinatorServerURL = null;
        try {
            String state = this.yarnUtil.getApplicationState(applicationId);
            Thread.sleep(1000L);
            int countSleep = 1;
            while (!state.equals("KILLED")) {
                state = this.yarnUtil.getApplicationState(applicationId);
                log.info("Job kill signal sent, but job not killed yet for " + applicationId + ". Sleeping for another 1000ms");
                Thread.sleep(1000L);
                if (++countSleep <= 10) continue;
                throw new IllegalStateException("Job has not been killed after 10 attempts.");
            }
        }
        catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
        log.info("Killed the current job successfully");
        log.info("Staring the job again");
        this.skipUnreadMessages();
        JobRunner jobRunner = new JobRunner(this.config);
        jobRunner.run(false);
    }

    private int getCurrentNumTasks() {
        int currentNumTasks = 0;
        for (ContainerModel containerModel : SamzaContainer.readJobModel((String)this.coordinatorServerURL, (int)100).getContainers().values()) {
            currentNumTasks += containerModel.getTasks().size();
        }
        return currentNumTasks;
    }

    private int getCurrentNumContainers() {
        return SamzaContainer.readJobModel((String)this.coordinatorServerURL, (int)100).getContainers().values().size();
    }

    public String getCoordinatorServerURL() {
        return this.coordinatorServerURL;
    }

    public static void main(String[] args) {
        CommandLine cmdline = new CommandLine();
        OptionSet options = cmdline.parser().parse(args);
        MapConfig config = cmdline.loadConfig(options);
        ConfigManager configManager = new ConfigManager((Config)config);
        configManager.run();
    }
}

