package org.flinkextended.flink.ml.cluster.rpc;

import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Iterator;
import org.flinkextended.flink.ml.cluster.node.MLContext;
import org.flinkextended.flink.ml.cluster.rpc.NodeServer;
import org.flinkextended.flink.ml.proto.ContextRequest;
import org.flinkextended.flink.ml.proto.ContextResponse;
import org.flinkextended.flink.ml.proto.FinishWorkerResponse;
import org.flinkextended.flink.ml.proto.GetFinishNodeResponse;
import org.flinkextended.flink.ml.proto.NodeRestartRequest;
import org.flinkextended.flink.ml.proto.NodeRestartResponse;
import org.flinkextended.flink.ml.proto.NodeServiceGrpc;
import org.flinkextended.flink.ml.proto.NodeSimpleRequest;
import org.flinkextended.flink.ml.proto.NodeSimpleResponse;
import org.flinkextended.flink.ml.proto.NodeSpecRequest;
import org.flinkextended.flink.ml.proto.NodeSpecResponse;
import org.flinkextended.flink.ml.proto.NodeStopRequest;
import org.flinkextended.flink.ml.proto.NodeStopResponse;
import org.flinkextended.flink.ml.util.IpHostUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/flinkextended/flink/ml/cluster/rpc/NodeServiceImpl.class */
public class NodeServiceImpl extends NodeServiceGrpc.NodeServiceImplBase {
    private static final Logger LOG = LoggerFactory.getLogger(NodeServiceImpl.class);
    private final NodeServer server;
    private final MLContext mlContext;
    private final AMClientFactory amClientFactory;

    /* loaded from: input_file:org/flinkextended/flink/ml/cluster/rpc/NodeServiceImpl$AMClientFactory.class */
    public interface AMClientFactory {
        AMClient getAMClient() throws IOException;
    }

    public NodeServiceImpl(NodeServer nodeServer, MLContext mLContext) {
        this(nodeServer, mLContext, () -> {
            return AMRegistry.getAMClient(mLContext);
        });
    }

    public NodeServiceImpl(NodeServer nodeServer, MLContext mLContext, AMClientFactory aMClientFactory) {
        this.server = nodeServer;
        this.mlContext = mLContext;
        this.amClientFactory = aMClientFactory;
    }

    @Override // org.flinkextended.flink.ml.proto.NodeServiceGrpc.NodeServiceImplBase
    public void getNodeSpec(NodeSpecRequest nodeSpecRequest, StreamObserver<NodeSpecResponse> streamObserver) {
        super.getNodeSpec(nodeSpecRequest, streamObserver);
    }

    @Override // org.flinkextended.flink.ml.proto.NodeServiceGrpc.NodeServiceImplBase
    public void nodeRestart(NodeRestartRequest nodeRestartRequest, StreamObserver<NodeRestartResponse> streamObserver) {
        LOG.info(this.mlContext.getIdentity() + " receive restart");
        streamObserver.onNext(NodeRestartResponse.newBuilder().setCode(RpcCode.OK.ordinal()).setMessage(this.mlContext.getIdentity()).m1355build());
        streamObserver.onCompleted();
        this.server.setAmCommand(NodeServer.AMCommand.RESTART);
    }

    @Override // org.flinkextended.flink.ml.proto.NodeServiceGrpc.NodeServiceImplBase
    public void nodeStop(NodeStopRequest nodeStopRequest, StreamObserver<NodeStopResponse> streamObserver) {
        NodeStopResponse m1785build = NodeStopResponse.newBuilder().setCode(RpcCode.OK.ordinal()).setMessage("").m1785build();
        String str = null;
        try {
            str = IpHostUtil.getIpAddress();
        } catch (Exception e) {
            e.printStackTrace();
        }
        LOG.info("Received node stop request for {}. This node is {}:{}", new Object[]{this.mlContext.getIdentity(), str, String.valueOf(this.server.getPort())});
        this.server.setAmCommand(NodeServer.AMCommand.STOP);
        streamObserver.onNext(m1785build);
        streamObserver.onCompleted();
    }

    @Override // org.flinkextended.flink.ml.proto.NodeServiceGrpc.NodeServiceImplBase
    public void getContext(ContextRequest contextRequest, StreamObserver<ContextResponse> streamObserver) {
        streamObserver.onNext(ContextResponse.newBuilder().setCode(0).setContext(this.mlContext.getContextProto() == null ? this.mlContext.toPB() : this.mlContext.getContextProto()).setMessage("").m318build());
        streamObserver.onCompleted();
    }

    @Override // org.flinkextended.flink.ml.proto.NodeServiceGrpc.NodeServiceImplBase
    public void getFinishWorker(NodeSimpleRequest nodeSimpleRequest, StreamObserver<FinishWorkerResponse> streamObserver) {
        try {
            AMClient aMClient = this.amClientFactory.getAMClient();
            Throwable th = null;
            try {
                try {
                    GetFinishNodeResponse finishedWorker = aMClient.getFinishedWorker(0L);
                    FinishWorkerResponse.Builder message = FinishWorkerResponse.newBuilder().setCode(0).setMessage("");
                    Iterator<Integer> it = finishedWorker.getWorkersList().iterator();
                    while (it.hasNext()) {
                        message.addWorkers(it.next().intValue());
                    }
                    streamObserver.onNext(message.m412build());
                    streamObserver.onCompleted();
                    if (aMClient != null) {
                        if (0 != 0) {
                            try {
                                aMClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            aMClient.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (IOException e) {
            e.printStackTrace();
            streamObserver.onNext(FinishWorkerResponse.newBuilder().setCode(1).setMessage(e.getMessage()).m412build());
            streamObserver.onCompleted();
        }
    }

    @Override // org.flinkextended.flink.ml.proto.NodeServiceGrpc.NodeServiceImplBase
    public void finishJob(NodeSimpleRequest nodeSimpleRequest, StreamObserver<NodeSimpleResponse> streamObserver) {
        AMClient aMClient;
        Throwable th;
        NodeSimpleResponse.Builder newBuilder = NodeSimpleResponse.newBuilder();
        try {
            aMClient = this.amClientFactory.getAMClient();
            th = null;
        } catch (IOException e) {
            e.printStackTrace();
            newBuilder.setCode(1);
            newBuilder.setMessage(e.getMessage());
            streamObserver.onNext(newBuilder.m1453build());
        }
        try {
            try {
                aMClient.stopJob(0L, this.mlContext.getRoleName(), this.mlContext.getIndex());
                newBuilder.setCode(0);
                newBuilder.setMessage("");
                streamObserver.onNext(newBuilder.m1453build());
                if (aMClient != null) {
                    if (0 != 0) {
                        try {
                            aMClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        aMClient.close();
                    }
                }
                streamObserver.onCompleted();
            } finally {
            }
        } finally {
        }
    }
}
