package org.flinkextended.flink.ml.cluster.master.meta;

import com.google.common.net.HostAndPort;
import java.io.IOException;
import org.flinkextended.flink.ml.cluster.node.MLContext;
import org.flinkextended.flink.ml.cluster.storage.Storage;
import org.flinkextended.flink.ml.cluster.storage.StorageFactory;
import org.flinkextended.flink.ml.proto.AMStatus;
import org.flinkextended.flink.ml.proto.AMStatusMessage;
import org.flinkextended.flink.ml.proto.MLClusterDef;
import org.flinkextended.flink.ml.proto.MLJobDef;
import org.flinkextended.flink.ml.proto.NodeSpec;
import org.flinkextended.flink.ml.proto.NodeSpecList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/flinkextended/flink/ml/cluster/master/meta/AMMetaImpl.class */
public class AMMetaImpl implements AMMeta {
    private static final Logger LOG = LoggerFactory.getLogger(AMMetaImpl.class);
    Storage storage;

    public AMMetaImpl(MLContext mLContext) {
        this.storage = StorageFactory.getStorageInstance(mLContext.getProperties());
    }

    @Override // org.flinkextended.flink.ml.cluster.master.meta.AMMeta
    public void saveClusterVersion(long j) throws IOException {
        this.storage.setValue(AMMeta.VERSION_NODE, String.valueOf(j).getBytes());
    }

    @Override // org.flinkextended.flink.ml.cluster.master.meta.AMMeta
    public long restoreClusterVersion() throws IOException {
        byte[] value = this.storage.getValue(AMMeta.VERSION_NODE);
        if (null == value) {
            return 0L;
        }
        return Long.valueOf(new String(value)).longValue();
    }

    @Override // org.flinkextended.flink.ml.cluster.master.meta.AMMeta
    public void saveAMIpPort(String str, int i) throws IOException {
        String format = String.format("%s:%d", str, Integer.valueOf(i));
        LOG.info("Saving {} as {}", AMMeta.AM_ADDRESS, format);
        this.storage.setValue(AMMeta.AM_ADDRESS, format.getBytes());
    }

    @Override // org.flinkextended.flink.ml.cluster.master.meta.AMMeta
    public void removeAMIpPort() throws IOException {
        this.storage.removeValue(AMMeta.AM_ADDRESS);
    }

    @Override // org.flinkextended.flink.ml.cluster.master.meta.AMMeta
    public HostAndPort restoreAMIpPort() throws IOException {
        byte[] value = this.storage.getValue(AMMeta.AM_ADDRESS);
        if (null == value) {
            return null;
        }
        return HostAndPort.fromString(new String(value));
    }

    @Override // org.flinkextended.flink.ml.cluster.master.meta.AMMeta
    public NodeSpec restoreNodeSpec(String str, int i) throws IOException {
        byte[] value = this.storage.getValue(AMMeta.CLUSTER_INFO);
        if (null == value) {
            return null;
        }
        for (MLJobDef mLJobDef : MLClusterDef.parseFrom(value).getJobList()) {
            if (mLJobDef.getName().equals(str)) {
                if (mLJobDef.containsTasks(i)) {
                    return mLJobDef.getTasksOrDefault(i, null);
                }
                return null;
            }
        }
        return null;
    }

    @Override // org.flinkextended.flink.ml.cluster.master.meta.AMMeta
    public synchronized MLClusterDef saveNodeSpec(NodeSpec nodeSpec) throws IOException {
        byte[] value = this.storage.getValue(AMMeta.CLUSTER_INFO);
        MLClusterDef.Builder newBuilder = MLClusterDef.newBuilder();
        if (null != value) {
            newBuilder.mergeFrom(MLClusterDef.parseFrom(value));
        }
        boolean z = false;
        for (MLJobDef.Builder builder : newBuilder.getJobBuilderList()) {
            if (builder.getName().equals(nodeSpec.getRoleName())) {
                builder.putTasks(nodeSpec.getIndex(), nodeSpec);
                z = true;
            }
        }
        if (!z) {
            newBuilder.addJob(MLJobDef.newBuilder().setName(nodeSpec.getRoleName()).putTasks(nodeSpec.getIndex(), nodeSpec));
        }
        this.storage.setValue(AMMeta.CLUSTER_INFO, newBuilder.m1164build().toByteArray());
        return newBuilder.m1164build();
    }

    @Override // org.flinkextended.flink.ml.cluster.master.meta.AMMeta
    public void cleanCluster() throws IOException {
        this.storage.removeValue(AMMeta.CLUSTER_INFO);
    }

    @Override // org.flinkextended.flink.ml.cluster.master.meta.AMMeta
    public MLClusterDef restoreClusterDef() throws IOException {
        byte[] value = this.storage.getValue(AMMeta.CLUSTER_INFO);
        if (null == value) {
            return null;
        }
        return MLClusterDef.parseFrom(value);
    }

    @Override // org.flinkextended.flink.ml.cluster.master.meta.AMMeta
    public MLClusterDef restoreFinishClusterDef() throws IOException {
        byte[] value = this.storage.getValue(AMMeta.FINISH_CLUSTER_INFO);
        if (null == value) {
            return null;
        }
        return MLClusterDef.parseFrom(value);
    }

    @Override // org.flinkextended.flink.ml.cluster.master.meta.AMMeta
    public synchronized MLClusterDef saveFinishNodeSpec(NodeSpec nodeSpec) throws IOException {
        byte[] value = this.storage.getValue(AMMeta.FINISH_CLUSTER_INFO);
        if (null == value) {
            MLClusterDef m1164build = MLClusterDef.newBuilder().addJob(MLJobDef.newBuilder().setName(nodeSpec.getRoleName()).putTasks(nodeSpec.getIndex(), nodeSpec)).m1164build();
            this.storage.setValue(AMMeta.FINISH_CLUSTER_INFO, m1164build.toByteArray());
            return m1164build;
        }
        MLClusterDef parseFrom = MLClusterDef.parseFrom(value);
        MLClusterDef.Builder newBuilder = MLClusterDef.newBuilder();
        newBuilder.mergeFrom(parseFrom);
        boolean z = false;
        for (MLJobDef.Builder builder : newBuilder.getJobBuilderList()) {
            if (builder.getName().equals(nodeSpec.getRoleName())) {
                builder.putTasks(nodeSpec.getIndex(), nodeSpec);
                z = true;
            }
        }
        if (!z) {
            newBuilder.addJob(MLJobDef.newBuilder().setName(nodeSpec.getRoleName()).putTasks(nodeSpec.getIndex(), nodeSpec));
        }
        this.storage.setValue(AMMeta.FINISH_CLUSTER_INFO, newBuilder.m1164build().toByteArray());
        return newBuilder.m1164build();
    }

    @Override // org.flinkextended.flink.ml.cluster.master.meta.AMMeta
    public synchronized AMStatus saveAMStatus(AMStatus aMStatus, AMStatus aMStatus2) throws IOException {
        if (aMStatus2 != AMStatus.AM_UNKNOW) {
            AMStatus restoreAMStatus = restoreAMStatus();
            LOG.debug("master status is " + restoreAMStatus.toString());
            if (AMStatus.AM_UNKNOW == restoreAMStatus) {
                this.storage.setValue(AMMeta.AM_STATUS, AMStatusMessage.newBuilder().setStatus(aMStatus).m73build().toByteArray());
                return aMStatus;
            }
            if (restoreAMStatus != aMStatus2) {
                return restoreAMStatus;
            }
        }
        this.storage.setValue(AMMeta.AM_STATUS, AMStatusMessage.newBuilder().setStatus(aMStatus).m73build().toByteArray());
        return aMStatus;
    }

    @Override // org.flinkextended.flink.ml.cluster.master.meta.AMMeta
    public AMStatus restoreAMStatus() throws IOException {
        byte[] value = this.storage.getValue(AMMeta.AM_STATUS);
        return null == value ? AMStatus.AM_UNKNOW : AMStatusMessage.parseFrom(value).getStatus();
    }

    @Override // org.flinkextended.flink.ml.cluster.master.meta.AMMeta
    public NodeSpecList restoreFailedNodes() throws IOException {
        byte[] value = this.storage.getValue(AMMeta.FAILED_NODES);
        if (null == value) {
            return null;
        }
        return NodeSpecList.parseFrom(value);
    }

    @Override // org.flinkextended.flink.ml.cluster.master.meta.AMMeta
    public synchronized void saveFailedNode(NodeSpec nodeSpec) throws IOException {
        byte[] value = this.storage.getValue(AMMeta.FAILED_NODES);
        if (null == value) {
            this.storage.setValue(AMMeta.FAILED_NODES, NodeSpecList.newBuilder().addNodes(nodeSpec).m1548build().toByteArray());
        } else {
            NodeSpecList.Builder mergeFrom = NodeSpecList.newBuilder().mergeFrom(NodeSpecList.parseFrom(value));
            mergeFrom.addNodes(nodeSpec);
            this.storage.setValue(AMMeta.FAILED_NODES, mergeFrom.m1548build().toByteArray());
        }
    }

    @Override // org.flinkextended.flink.ml.cluster.master.meta.AMMeta
    public void close() {
        this.storage.close();
    }

    @Override // org.flinkextended.flink.ml.cluster.master.meta.AMMeta
    public void clear() {
        this.storage.clear();
    }
}
