package hex.tree.xgboost.remote;

import hex.steam.SteamMessageSender;
import hex.steam.SteamMessenger;
import hex.tree.xgboost.XGBoostModel;
import hex.tree.xgboost.exec.RemoteXGBoostExecutor;
import java.io.IOException;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import water.H2O;
import water.Job;
import water.fvec.Frame;

/* loaded from: input_file:hex/tree/xgboost/remote/SteamExecutorStarter.class */
public class SteamExecutorStarter implements SteamMessenger {
    private static final Logger LOG = Logger.getLogger((Class<?>) SteamExecutorStarter.class);
    private static SteamExecutorStarter instance;
    private SteamMessageSender sender;
    private ClusterInfo cluster;
    private final Object sendingLock = new Object[0];
    private final Object clusterLock = new Object[0];
    private final Deque<Map<String, String>> receivedMessages = new LinkedList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:hex/tree/xgboost/remote/SteamExecutorStarter$ClusterInfo.class */
    public static class ClusterInfo {
        final String uri;
        final String userName;
        final String password;

        private ClusterInfo(String str, String str2, String str3) {
            this.uri = str;
            this.userName = str2;
            this.password = str3;
        }
    }

    public static SteamExecutorStarter getInstance() {
        return instance;
    }

    public SteamExecutorStarter() {
        instance = this;
    }

    public RemoteXGBoostExecutor getRemoteExecutor(XGBoostModel xGBoostModel, Frame frame, Job<XGBoostModel> job) throws IOException {
        RemoteXGBoostExecutor makeExecutor;
        synchronized (this.clusterLock) {
            if (this.cluster == null) {
                LOG.info("Starting external cluster for model " + xGBoostModel._key + ".");
                startCluster(job);
            } else {
                LOG.info("External cluster available, starting model " + xGBoostModel._key + " now.");
            }
            makeExecutor = makeExecutor(xGBoostModel, frame);
        }
        return makeExecutor;
    }

    private void startCluster(Job<XGBoostModel> job) throws IOException {
        clearMessages();
        sendMessage(makeStartRequest());
        while (!job.stop_requested()) {
            Map<String, String> waitForMessage = waitForMessage();
            if (waitForMessage == null) {
                throw new IllegalStateException("No response received from Steam.");
            }
            if ("started".equals(waitForMessage.get("status"))) {
                String str = waitForMessage.get("uri");
                this.cluster = new ClusterInfo(str, waitForMessage.get("user"), waitForMessage.get("password"));
                LOG.info("External cluster started at " + str + ".");
                return;
            } else {
                if (!"starting".equals(waitForMessage.get("status"))) {
                    if (!"failed".equals(waitForMessage.get("status"))) {
                        throw new IllegalStateException("Unknown status received from steam: " + waitForMessage.get("status") + ", reason:" + waitForMessage.get("reason"));
                    }
                    throw new IllegalStateException("Failed to start external cluster: " + waitForMessage.get("reason"));
                }
                LOG.info("Continuing to wait for external cluster to start.");
            }
        }
    }

    private RemoteXGBoostExecutor makeExecutor(XGBoostModel xGBoostModel, Frame frame) {
        return new RemoteXGBoostExecutor(xGBoostModel, frame, this.cluster.uri, this.cluster.userName, this.cluster.password);
    }

    private void clearMessages() {
        synchronized (this.receivedMessages) {
            this.receivedMessages.clear();
        }
    }

    private Map<String, String> waitForMessage() {
        int parseInt = Integer.parseInt(H2O.getSysProperty("steam.notification.timeout", "20000"));
        synchronized (this.receivedMessages) {
            if (!this.receivedMessages.isEmpty()) {
                return this.receivedMessages.pop();
            }
            try {
                this.receivedMessages.wait(parseInt);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (this.receivedMessages.isEmpty()) {
                return null;
            }
            return this.receivedMessages.pop();
        }
    }

    @Override // hex.steam.SteamMessenger
    public void onConnectionStateChange(SteamMessageSender steamMessageSender) {
        synchronized (this.sendingLock) {
            this.sender = steamMessageSender;
        }
    }

    private void sendMessage(Map<String, String> map) throws IOException {
        synchronized (this.sendingLock) {
            if (this.sender == null) {
                throw new IOException("Steam communication chanel is not open.");
            }
            this.sender.sendMessage(map);
        }
    }

    @Override // hex.steam.SteamMessenger
    public void onMessage(Map<String, String> map) {
        if ("stopXGBoostClusterNotification".equals(map.get(SteamMessenger.TYPE))) {
            handleStopRequest(map);
        } else if ("xgboostClusterStartNotification".equals(map.get(SteamMessenger.TYPE))) {
            queueResponse(map);
        } else {
            LOG.debug("Ignoring message " + map.get(SteamMessenger.ID) + StringUtils.SPACE + map.get(SteamMessenger.TYPE));
        }
    }

    private void queueResponse(Map<String, String> map) {
        synchronized (this.receivedMessages) {
            LOG.info("Received message response " + map.get(SteamMessenger.ID));
            this.receivedMessages.add(map);
            this.receivedMessages.notifyAll();
        }
    }

    private void handleStopRequest(Map<String, String> map) {
        LOG.info("Received stop request " + map.get(SteamMessenger.ID));
        if (isXGBoostInProgress()) {
            LOG.info("Responding to stop request with allowed=false");
            sendStopResponse(map, false);
            return;
        }
        synchronized (this.clusterLock) {
            LOG.info("Responding to stop request with allowed=true");
            sendStopResponse(map, true);
            this.cluster = null;
        }
    }

    private void sendStopResponse(Map<String, String> map, boolean z) {
        try {
            sendMessage(makeStopConfirmation(map, z));
        } catch (IOException e) {
            LOG.error("Failed to send stop cluster response.", e);
        }
    }

    private boolean isXGBoostInProgress() {
        return Arrays.stream(Job.jobs()).anyMatch(job -> {
            return job.isRunning() && (job._result.get() instanceof XGBoostModel);
        });
    }

    private Map<String, String> makeStartRequest() {
        HashMap hashMap = new HashMap();
        hashMap.put(SteamMessenger.TYPE, "startXGBoostCluster");
        hashMap.put(SteamMessenger.ID, H2O.SELF.getIpPortString() + "_startXGBoost");
        return hashMap;
    }

    private Map<String, String> makeStopConfirmation(Map<String, String> map, boolean z) {
        HashMap hashMap = new HashMap();
        hashMap.put(SteamMessenger.TYPE, "stopXGBoostClusterConfirmation");
        hashMap.put(SteamMessenger.ID, map.get(SteamMessenger.ID) + "_response");
        hashMap.put("allowed", Boolean.toString(z));
        return hashMap;
    }
}
