package org.flinkextended.flink.ml.cluster.rpc;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.flinkextended.flink.ml.cluster.BaseEventReporter;
import org.flinkextended.flink.ml.cluster.master.AMEvent;
import org.flinkextended.flink.ml.cluster.master.AMEventType;
import org.flinkextended.flink.ml.cluster.master.AMService;
import org.flinkextended.flink.ml.cluster.master.AMStateMachineFactory;
import org.flinkextended.flink.ml.cluster.master.AbstractAMStateMachine;
import org.flinkextended.flink.ml.cluster.master.meta.AMMeta;
import org.flinkextended.flink.ml.cluster.master.meta.AMMetaImpl;
import org.flinkextended.flink.ml.cluster.node.MLContext;
import org.flinkextended.flink.ml.proto.NodeSpec;
import org.flinkextended.flink.ml.util.IpHostUtil;
import org.flinkextended.flink.ml.util.LogBaseEventReporter;
import org.flinkextended.flink.ml.util.MLConstants;
import org.flinkextended.flink.ml.util.MLException;
import org.flinkextended.flink.ml.util.ReflectUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/flinkextended/flink/ml/cluster/rpc/AppMasterServer.class */
public class AppMasterServer implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(AppMasterServer.class);
    private Server server;
    private final AtomicBoolean end;
    private final AMMeta amMeta;
    private final AbstractAMStateMachine amStateMachine;
    private final AMService appMasterService;
    private volatile long rpcLastContact;
    private final long rpcContactTimeout;
    private volatile Throwable error = null;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v45, types: [org.flinkextended.flink.ml.cluster.BaseEventReporter] */
    public AppMasterServer(MLContext mLContext) {
        LogBaseEventReporter logBaseEventReporter;
        this.rpcContactTimeout = Long.parseLong(mLContext.getProperties().getOrDefault(MLConstants.SERVER_RPC_CONTACT_TIMEOUT, "600000"));
        Map<String, String> properties = mLContext.getProperties();
        if (properties.containsKey(MLConstants.AM_STATE_CLASS)) {
            try {
                this.amMeta = (AMMeta) ReflectUtil.createInstance(properties.get(MLConstants.AM_STATE_CLASS), new Class[]{MLContext.class}, new Object[]{mLContext});
            } catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        } else {
            this.amMeta = new AMMetaImpl(mLContext);
        }
        if ("all".equalsIgnoreCase(mLContext.getProperties().getOrDefault(MLConstants.FAILOVER_STRATEGY, "all"))) {
            this.amMeta.clear();
        }
        this.end = new AtomicBoolean(false);
        if (properties.containsKey(MLConstants.CONFIG_EVENT_REPORTER)) {
            try {
                logBaseEventReporter = (BaseEventReporter) ReflectUtil.createInstance(properties.get(MLConstants.CONFIG_EVENT_REPORTER), new Class[0], new Object[0]);
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        } else {
            logBaseEventReporter = new LogBaseEventReporter();
        }
        if (logBaseEventReporter != null) {
            logBaseEventReporter.configure(properties.getOrDefault(MLConstants.CONFIG_JOB_NAME, "dl-on-flink") + properties.get(MLConstants.JOB_VERSION), properties);
        }
        int i = 0;
        Iterator<Integer> it = mLContext.getRoleParallelismMap().values().iterator();
        while (it.hasNext()) {
            i += it.next().intValue();
        }
        this.appMasterService = new AppMasterServiceImpl(this, i, Duration.ofMillis(Long.parseLong(mLContext.getProperties().getOrDefault(MLConstants.HEARTBEAT_TIMEOUT, MLConstants.HEARTBEAT_TIMEOUT_DEFAULT))));
        try {
            this.amStateMachine = AMStateMachineFactory.getAMStateMachine(this.appMasterService, this.amMeta, mLContext, logBaseEventReporter);
        } catch (MLException e3) {
            e3.printStackTrace();
            throw new RuntimeException(e3);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (this.server != null) {
                LOG.error("*** shutting down gRPC server since JVM is shutting down");
                cleanup();
                LOG.error("*** AM server shut down");
            }
        }));
        try {
            try {
                updateRpcLastContact();
                this.server = ServerBuilder.forPort(0).addService(this.appMasterService).build();
                this.server.start();
                this.end.set(false);
                LOG.info("App Master Server started, listening on " + this.server.getPort());
                this.amMeta.saveAMIpPort(IpHostUtil.getIpAddress(), this.server.getPort());
                this.amStateMachine.handle(new AMEvent(AMEventType.INTI_AM_STATE, null, 0L));
                while (!getEnd()) {
                    long currentTimeMillis = System.currentTimeMillis() - this.rpcLastContact;
                    if (currentTimeMillis > this.rpcContactTimeout) {
                        throw new MLException(String.format("%d seconds elapsed since last grpc contact", Long.valueOf(currentTimeMillis / 1000)));
                    }
                    Thread.sleep(1000L);
                }
                if (this.error != null) {
                    throw new MLException("Error encountered in AM", this.error);
                }
                cleanup();
            } catch (InterruptedException e) {
                LOG.warn("AM server interrupted");
                cleanup();
            } catch (Exception e2) {
                LOG.error("Fail to execute AM.");
                throw new RuntimeException(e2);
            }
        } catch (Throwable th) {
            cleanup();
            throw th;
        }
    }

    public void updateRpcLastContact() {
        this.rpcLastContact = System.currentTimeMillis();
    }

    private void cleanup() {
        LOG.info("Clean up AM node.");
        try {
            LOG.info("before clean up am node, sleep 10 seconds to wait for node complete stop.");
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (this.server != null) {
            this.server.shutdownNow();
            try {
                this.server.awaitTermination(2L, TimeUnit.MINUTES);
            } catch (InterruptedException e2) {
                e2.printStackTrace();
                LOG.error("Interrupted shutting down GRPC server.");
            }
            this.server = null;
        }
        LOG.info("stop am service!");
        if (this.amMeta != null) {
            this.amMeta.close();
        }
        LOG.info("stop am meta!");
        if (null != this.amStateMachine) {
            this.amStateMachine.close();
        }
        LOG.info("stop am state machine!");
        this.end.set(true);
        this.appMasterService.stopHeartBeatMonitorAllNode();
        LOG.info("stop heartbeat all nodes!");
        LOG.info("app master server stopped.");
    }

    public int getPort() {
        if (null == this.server) {
            return -1;
        }
        return this.server.getPort();
    }

    private boolean getEnd() {
        return this.end.get();
    }

    public void setEnd(boolean z) {
        this.end.set(z);
    }

    public synchronized void onError(Throwable th) {
        if (this.error != null) {
            this.error = th;
        }
        setEnd(true);
    }

    public AbstractAMStateMachine getAmStateMachine() {
        return this.amStateMachine;
    }

    public AMService getAppMasterService() {
        return this.appMasterService;
    }

    public AMMeta getAmMeta() {
        return this.amMeta;
    }

    public static String getNodeClientKey(NodeSpec nodeSpec) {
        return nodeSpec.getRoleName() + "_" + nodeSpec.getIndex();
    }
}
