package org.flinkextended.flink.ml.cluster.master;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.flinkextended.flink.ml.cluster.rpc.AppMasterServer;
import org.flinkextended.flink.ml.cluster.rpc.NodeClient;
import org.flinkextended.flink.ml.cluster.statemachine.InvalidStateTransitionException;
import org.flinkextended.flink.ml.cluster.statemachine.transition.MultipleArcTransition;
import org.flinkextended.flink.ml.cluster.statemachine.transition.SingleArcTransition;
import org.flinkextended.flink.ml.proto.AMStatus;
import org.flinkextended.flink.ml.proto.FinishNodeRequest;
import org.flinkextended.flink.ml.proto.MLClusterDef;
import org.flinkextended.flink.ml.proto.MLJobDef;
import org.flinkextended.flink.ml.proto.NodeSpec;
import org.flinkextended.flink.ml.proto.RegisterFailedNodeRequest;
import org.flinkextended.flink.ml.proto.RegisterNodeRequest;
import org.flinkextended.flink.ml.util.ProtoUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/flinkextended/flink/ml/cluster/master/AMTransitions.class */
public class AMTransitions {
    private static final Logger LOG = LoggerFactory.getLogger(AMTransitions.class);

    /* loaded from: input_file:org/flinkextended/flink/ml/cluster/master/AMTransitions$CompleteCluster.class */
    public static class CompleteCluster extends AMTransition implements SingleArcTransition<AbstractAMStateMachine, AMEvent> {
        public CompleteCluster(AbstractAMStateMachine abstractAMStateMachine) {
            super(abstractAMStateMachine);
        }

        @Override // org.flinkextended.flink.ml.cluster.statemachine.transition.SingleArcTransition
        public void transition(AbstractAMStateMachine abstractAMStateMachine, AMEvent aMEvent) throws InvalidStateTransitionException {
            try {
                this.amMeta.saveAMStatus(AMStatus.AM_RUNNING, AMStatus.AM_INIT);
            } catch (IOException e) {
                e.printStackTrace();
                throw new InvalidStateTransitionException(abstractAMStateMachine.getInternalState(), aMEvent);
            }
        }
    }

    /* loaded from: input_file:org/flinkextended/flink/ml/cluster/master/AMTransitions$FailNode.class */
    public static class FailNode extends AMTransition implements SingleArcTransition<AbstractAMStateMachine, AMEvent> {
        public FailNode(AbstractAMStateMachine abstractAMStateMachine) {
            super(abstractAMStateMachine);
        }

        @Override // org.flinkextended.flink.ml.cluster.statemachine.transition.SingleArcTransition
        public void transition(AbstractAMStateMachine abstractAMStateMachine, AMEvent aMEvent) throws InvalidStateTransitionException {
            NodeSpec nodeSpec;
            long version;
            if (aMEvent.getType().equals(AMEventType.FAIL_NODE)) {
                RegisterFailedNodeRequest registerFailedNodeRequest = (RegisterFailedNodeRequest) aMEvent.getMessage();
                nodeSpec = registerFailedNodeRequest.getNodeSpec();
                version = registerFailedNodeRequest.getVersion();
            } else {
                if (!aMEvent.getType().equals(AMEventType.REGISTER_NODE)) {
                    throw new InvalidStateTransitionException(abstractAMStateMachine.getInternalState(), aMEvent);
                }
                RegisterNodeRequest registerNodeRequest = (RegisterNodeRequest) aMEvent.getMessage();
                nodeSpec = registerNodeRequest.getNodeSpec();
                version = registerNodeRequest.getVersion();
            }
            AMTransitions.LOG.info("Fail Node:" + ProtoUtil.protoToJson(nodeSpec));
            try {
                this.amMeta.saveFailedNode(nodeSpec);
                this.amMeta.saveAMStatus(AMStatus.AM_FAILOVER, AMStatus.AM_RUNNING);
                if (this.eventReporter != null) {
                    this.eventReporter.nodeFail(nodeSpec2Str(nodeSpec));
                }
                AMEvent aMEvent2 = new AMEvent(AMEventType.RESTART_CLUSTER, null, version);
                AMTransitions.LOG.info("put restart event to state machine:" + version);
                if (!this.stateMachine.sendEvent(aMEvent2)) {
                    throw new InvalidStateTransitionException(abstractAMStateMachine.getInternalState(), aMEvent);
                }
            } catch (IOException e) {
                e.printStackTrace();
                throw new InvalidStateTransitionException(abstractAMStateMachine.getInternalState(), aMEvent);
            }
        }
    }

    /* loaded from: input_file:org/flinkextended/flink/ml/cluster/master/AMTransitions$FinishCluster.class */
    public static class FinishCluster extends AMTransition implements SingleArcTransition<AbstractAMStateMachine, AMEvent> {
        public FinishCluster(AbstractAMStateMachine abstractAMStateMachine) {
            super(abstractAMStateMachine);
        }

        @Override // org.flinkextended.flink.ml.cluster.statemachine.transition.SingleArcTransition
        public void transition(AbstractAMStateMachine abstractAMStateMachine, AMEvent aMEvent) throws InvalidStateTransitionException {
            this.amService.stopAllNodes();
            doJobFinish(aMEvent);
        }

        public void doJobFinish(AMEvent aMEvent) throws InvalidStateTransitionException {
            if (this.eventReporter != null) {
                this.eventReporter.jobFinish();
            }
            try {
                this.amMeta.saveAMStatus(AMStatus.AM_FINISH, AMStatus.AM_RUNNING);
                AMTransitions.LOG.info("Job finished, shutting down AM");
                this.amService.stopService();
            } catch (IOException e) {
                e.printStackTrace();
                throw new InvalidStateTransitionException(getInternalState(), aMEvent);
            }
        }
    }

    /* loaded from: input_file:org/flinkextended/flink/ml/cluster/master/AMTransitions$FinishNode.class */
    public static class FinishNode extends AMTransition implements SingleArcTransition<AbstractAMStateMachine, AMEvent> {
        public FinishNode(AbstractAMStateMachine abstractAMStateMachine) {
            super(abstractAMStateMachine);
        }

        @Override // org.flinkextended.flink.ml.cluster.statemachine.transition.SingleArcTransition
        public void transition(AbstractAMStateMachine abstractAMStateMachine, AMEvent aMEvent) throws InvalidStateTransitionException {
            FinishNodeRequest finishNodeRequest = (FinishNodeRequest) aMEvent.getMessage();
            AMTransitions.LOG.info("Finish Node:" + ProtoUtil.protoToJson(finishNodeRequest.getNodeSpec()));
            try {
                if (this.eventReporter != null) {
                    this.eventReporter.nodeFinish(nodeSpec2Str(finishNodeRequest.getNodeSpec()));
                }
                this.amMeta.saveFinishNodeSpec(finishNodeRequest.getNodeSpec());
                boolean z = true;
                Iterator<Map.Entry<String, Integer>> it = updateRemainJobNum(aMEvent).entrySet().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    } else if (0 != it.next().getValue().intValue()) {
                        z = false;
                        break;
                    }
                }
                if (z) {
                    AMTransitions.LOG.info("send finish cluster event!");
                    abstractAMStateMachine.sendEvent(new AMEvent(AMEventType.FINISH_CLUSTER, "", finishNodeRequest.getVersion()));
                }
            } catch (IOException e) {
                e.printStackTrace();
                throw new InvalidStateTransitionException(getInternalState(), aMEvent);
            }
        }
    }

    /* loaded from: input_file:org/flinkextended/flink/ml/cluster/master/AMTransitions$IgnoreMessage.class */
    public static class IgnoreMessage extends AMTransition implements SingleArcTransition<AbstractAMStateMachine, AMEvent> {
        public IgnoreMessage(AbstractAMStateMachine abstractAMStateMachine) {
            super(abstractAMStateMachine);
        }

        @Override // org.flinkextended.flink.ml.cluster.statemachine.transition.SingleArcTransition
        public void transition(AbstractAMStateMachine abstractAMStateMachine, AMEvent aMEvent) {
            AMTransitions.LOG.info("ignore event :" + aMEvent.toString() + " current status:" + getInternalState().toString());
        }
    }

    /* loaded from: input_file:org/flinkextended/flink/ml/cluster/master/AMTransitions$InitAmState.class */
    public static class InitAmState extends AMTransition implements MultipleArcTransition<AbstractAMStateMachine, AMEvent, AMStatus> {
        public InitAmState(AbstractAMStateMachine abstractAMStateMachine) {
            super(abstractAMStateMachine);
        }

        @Override // org.flinkextended.flink.ml.cluster.statemachine.transition.MultipleArcTransition
        public AMStatus transition(AbstractAMStateMachine abstractAMStateMachine, AMEvent aMEvent) {
            try {
                long restoreClusterVersion = this.amMeta.restoreClusterVersion();
                AMTransitions.LOG.info("restore am version:" + restoreClusterVersion);
                if (0 == restoreClusterVersion) {
                    restoreClusterVersion = System.currentTimeMillis();
                }
                this.amService.setVersion(restoreClusterVersion);
                this.amMeta.saveClusterVersion(restoreClusterVersion);
                AMTransitions.LOG.info("current version:" + restoreClusterVersion);
                AMStatus restoreAMStatus = this.amMeta.restoreAMStatus();
                if (null == restoreAMStatus || AMStatus.AM_UNKNOW == restoreAMStatus || AMStatus.UNRECOGNIZED == restoreAMStatus) {
                    this.amMeta.saveAMStatus(AMStatus.AM_INIT, AMStatus.AM_UNKNOW);
                    return AMStatus.AM_INIT;
                }
                if (AMStatus.AM_FAILOVER == restoreAMStatus) {
                    abstractAMStateMachine.sendEvent(new AMEvent(AMEventType.RESTART_CLUSTER, "", restoreClusterVersion));
                } else if (AMStatus.AM_FINISH == restoreAMStatus) {
                    if (this.eventReporter != null) {
                        this.eventReporter.jobKill();
                    }
                    this.amService.stopService();
                } else {
                    MLClusterDef restoreClusterDef = this.amMeta.restoreClusterDef();
                    if (null != restoreClusterDef) {
                        MLClusterDef restoreFinishClusterDef = this.amMeta.restoreFinishClusterDef();
                        Iterator<MLJobDef> it = restoreClusterDef.getJobList().iterator();
                        while (it.hasNext()) {
                            for (NodeSpec nodeSpec : it.next().getTasksMap().values()) {
                                if (!AMTransitions.inFlinkCluster(nodeSpec, restoreFinishClusterDef)) {
                                    this.amService.updateNodeClient(AppMasterServer.getNodeClientKey(nodeSpec), new NodeClient(nodeSpec.getIp(), nodeSpec.getClientPort()));
                                    this.amService.startHeartBeatMonitor(nodeSpec, restoreClusterVersion);
                                }
                            }
                        }
                        AMTransitions.LOG.info("recover client cache and monitor!");
                    }
                }
                return restoreAMStatus;
            } catch (IOException e) {
                e.printStackTrace();
                throw new RuntimeException(e.getMessage());
            }
        }
    }

    /* loaded from: input_file:org/flinkextended/flink/ml/cluster/master/AMTransitions$RegisterNode.class */
    public static class RegisterNode extends AMTransition implements SingleArcTransition<AbstractAMStateMachine, AMEvent> {
        public RegisterNode(AbstractAMStateMachine abstractAMStateMachine) {
            super(abstractAMStateMachine);
        }

        @Override // org.flinkextended.flink.ml.cluster.statemachine.transition.SingleArcTransition
        public synchronized void transition(AbstractAMStateMachine abstractAMStateMachine, AMEvent aMEvent) throws InvalidStateTransitionException {
            RegisterNodeRequest registerNodeRequest = (RegisterNodeRequest) aMEvent.getMessage();
            AMTransitions.LOG.info("Register Node:" + ProtoUtil.protoToJson(registerNodeRequest.getNodeSpec()));
            try {
                if (this.eventReporter != null) {
                    this.eventReporter.nodeRegister(nodeSpec2Str(registerNodeRequest.getNodeSpec()));
                }
                MLClusterDef saveNodeSpec = this.amMeta.saveNodeSpec(registerNodeRequest.getNodeSpec());
                HashMap hashMap = new HashMap();
                for (MLJobDef mLJobDef : saveNodeSpec.getJobList()) {
                    hashMap.put(mLJobDef.getName(), Integer.valueOf(mLJobDef.getTasksCount()));
                }
                boolean z = true;
                Iterator<Map.Entry<String, Integer>> it = updateRemainJobNum(aMEvent).entrySet().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    Map.Entry<String, Integer> next = it.next();
                    if (((Integer) hashMap.getOrDefault(next.getKey(), 0)).intValue() != next.getValue().intValue()) {
                        z = false;
                        break;
                    }
                }
                if (z) {
                    long version = registerNodeRequest.getVersion();
                    AMEvent aMEvent2 = new AMEvent(AMEventType.COMPLETE_CLUSTER, "", version);
                    AMTransitions.LOG.info("put complete event to state machine:" + version);
                    this.stateMachine.sendEvent(aMEvent2);
                }
            } catch (IOException e) {
                e.printStackTrace();
                throw new InvalidStateTransitionException(getInternalState(), aMEvent);
            }
        }
    }

    /* loaded from: input_file:org/flinkextended/flink/ml/cluster/master/AMTransitions$RestartCluster.class */
    public static class RestartCluster extends AMTransition implements SingleArcTransition<AbstractAMStateMachine, AMEvent> {
        public RestartCluster(AbstractAMStateMachine abstractAMStateMachine) {
            super(abstractAMStateMachine);
        }

        @Override // org.flinkextended.flink.ml.cluster.statemachine.transition.SingleArcTransition
        public void transition(AbstractAMStateMachine abstractAMStateMachine, AMEvent aMEvent) {
            try {
                this.amService.restartAllNodes();
                this.amMeta.cleanCluster();
                this.amMeta.saveAMStatus(AMStatus.AM_INIT, AMStatus.AM_FAILOVER);
                if (this.eventReporter != null) {
                    this.eventReporter.jobFailover();
                }
            } catch (Exception e) {
                throw new RuntimeException("Restart cluster failed", e);
            }
        }
    }

    /* loaded from: input_file:org/flinkextended/flink/ml/cluster/master/AMTransitions$StopJob.class */
    public static class StopJob extends AMTransition implements SingleArcTransition<AbstractAMStateMachine, AMEvent> {
        public StopJob(AbstractAMStateMachine abstractAMStateMachine) {
            super(abstractAMStateMachine);
        }

        @Override // org.flinkextended.flink.ml.cluster.statemachine.transition.SingleArcTransition
        public void transition(AbstractAMStateMachine abstractAMStateMachine, AMEvent aMEvent) throws InvalidStateTransitionException {
            this.amService.stopAllNodes();
            try {
                this.amMeta.saveAMStatus(AMStatus.AM_FINISH, getInternalState());
                this.amService.stopService();
                if (this.eventReporter != null) {
                    this.eventReporter.jobKill();
                }
            } catch (IOException e) {
                e.printStackTrace();
                throw new InvalidStateTransitionException(getInternalState(), aMEvent);
            }
        }
    }

    public static boolean inFlinkCluster(NodeSpec nodeSpec, MLClusterDef mLClusterDef) {
        if (null == mLClusterDef) {
            return false;
        }
        boolean z = false;
        Iterator<MLJobDef> it = mLClusterDef.getJobList().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            MLJobDef next = it.next();
            if (next.getName().equals(nodeSpec.getRoleName()) && next.containsTasks(nodeSpec.getIndex())) {
                z = true;
                break;
            }
        }
        return z;
    }
}
