package org.apache.inlong.manager.service.core.impl;

import com.google.gson.Gson;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo;
import org.apache.inlong.manager.service.core.ConfigLoader;
import org.apache.inlong.manager.service.core.SortClusterService;
import org.apache.inlong.manager.service.node.DataNodeOperatorFactory;
import org.apache.inlong.manager.service.sink.SinkOperatorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Lazy
@Service
/* loaded from: input_file:org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.class */
public class SortClusterServiceImpl implements SortClusterService {
    private static final Logger LOGGER = LoggerFactory.getLogger(SortClusterServiceImpl.class);
    private static final Gson GSON = new Gson();
    private static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 60000;
    private static final int RESPONSE_CODE_SUCCESS = 0;
    private static final int RESPONSE_CODE_NO_UPDATE = 1;
    private static final int RESPONSE_CODE_FAIL = -1;
    private static final int RESPONSE_CODE_REQ_PARAMS_ERROR = -101;
    private static final String KEY_GROUP_ID = "inlongGroupId";
    private static final String KEY_STREAM_ID = "inlongStreamId";
    private static final String FILED_OFFSET = "fieldOffset";
    private Map<Integer, List<String>> fieldMap;
    private Map<String, String> sortClusterMd5Map = new ConcurrentHashMap();
    private Map<String, SortClusterConfig> sortClusterConfigMap = new ConcurrentHashMap();
    private Map<String, String> sortClusterErrorLogMap = new ConcurrentHashMap();
    private Map<String, Map<String, SortSourceStreamInfo>> allStreams;
    private long reloadInterval;

    @Autowired
    private ConfigLoader configLoader;

    @Autowired
    private SinkOperatorFactory sinkOperatorFactory;

    @Autowired
    private DataNodeOperatorFactory dataNodeOperatorFactory;

    @PostConstruct
    public void initialize() {
        LOGGER.info("create repository for " + SortClusterServiceImpl.class.getSimpleName());
        try {
            this.reloadInterval = DEFAULT_HEARTBEAT_INTERVAL_MS;
            reload();
            setReloadTimer();
        } catch (Throwable th) {
            LOGGER.error("Initialize SortClusterConfigRepository error", th);
        }
    }

    @Transactional(rollbackFor = {Exception.class})
    public void reload() {
        LOGGER.debug("start to reload sort config");
        try {
            reloadAllClusterConfig();
        } catch (Throwable th) {
            LOGGER.error("fail to reload cluster config", th);
        }
        LOGGER.debug("end to reload config");
    }

    @Override // org.apache.inlong.manager.service.core.SortClusterService
    public SortClusterResponse getClusterConfig(String str, String str2) {
        if (StringUtils.isBlank(str)) {
            LOGGER.debug("Blank cluster name, return nothing");
            return SortClusterResponse.builder().msg("Blank cluster name, return nothing").code(RESPONSE_CODE_REQ_PARAMS_ERROR).build();
        }
        if (this.sortClusterErrorLogMap.get(str) != null) {
            return SortClusterResponse.builder().msg(this.sortClusterErrorLogMap.get(str)).code(RESPONSE_CODE_FAIL).build();
        }
        if (this.sortClusterConfigMap.get(str) != null) {
            return this.sortClusterMd5Map.get(str).equals(str2) ? SortClusterResponse.builder().msg("No update").code(RESPONSE_CODE_NO_UPDATE).md5(str2).build() : SortClusterResponse.builder().msg("Success").code(RESPONSE_CODE_SUCCESS).data(this.sortClusterConfigMap.get(str)).md5(this.sortClusterMd5Map.get(str)).build();
        }
        String str3 = "There is not config for cluster " + str;
        LOGGER.debug(str3);
        return SortClusterResponse.builder().msg(str3).code(RESPONSE_CODE_SUCCESS).build();
    }

    private void reloadAllClusterConfig() {
        List<SortFieldInfo> loadAllFields = this.configLoader.loadAllFields();
        this.fieldMap = new HashMap();
        loadAllFields.forEach(sortFieldInfo -> {
            this.fieldMap.computeIfAbsent(sortFieldInfo.getSinkId(), num -> {
                return new ArrayList();
            }).add(sortFieldInfo.getFieldName());
        });
        List<StreamSinkEntity> loadAllStreamSinkEntity = this.configLoader.loadAllStreamSinkEntity();
        Map map = (Map) this.configLoader.loadAllTask().stream().filter(sortTaskInfo -> {
            return StringUtils.isNotBlank(sortTaskInfo.getSortClusterName()) && StringUtils.isNotBlank(sortTaskInfo.getSortTaskName()) && StringUtils.isNotBlank(sortTaskInfo.getDataNodeName()) && StringUtils.isNotBlank(sortTaskInfo.getSinkType());
        }).collect(Collectors.groupingBy((v0) -> {
            return v0.getSortClusterName();
        }));
        this.allStreams = (Map) this.configLoader.loadAllStreams().stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getInlongGroupId();
        }, Collectors.toMap((v0) -> {
            return v0.getInlongStreamId();
        }, sortSourceStreamInfo -> {
            return sortSourceStreamInfo;
        })));
        Map map2 = (Map) loadAllStreamSinkEntity.stream().filter(streamSinkEntity -> {
            return StringUtils.isNotBlank(streamSinkEntity.getInlongClusterName());
        }).filter(streamSinkEntity2 -> {
            return StringUtils.isNotBlank(streamSinkEntity2.getSortTaskName());
        }).filter(streamSinkEntity3 -> {
            return StringUtils.isNotBlank(streamSinkEntity3.getDataNodeName());
        }).collect(Collectors.groupingBy((v0) -> {
            return v0.getSortTaskName();
        }));
        Map map3 = (Map) this.configLoader.loadAllDataNodeEntity().stream().filter(dataNodeEntity -> {
            return StringUtils.isNotBlank(dataNodeEntity.getName());
        }).map(dataNodeEntity2 -> {
            return this.dataNodeOperatorFactory.getInstance(dataNodeEntity2.getType()).getFromEntity(dataNodeEntity2);
        }).collect(Collectors.toMap((v0) -> {
            return v0.getName();
        }, dataNodeInfo -> {
            return dataNodeInfo;
        }));
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap();
        ConcurrentHashMap concurrentHashMap3 = new ConcurrentHashMap();
        map.forEach((str, list) -> {
            try {
                SortClusterConfig configByClusterName = getConfigByClusterName(str, list, map2, map3);
                String md5Hex = DigestUtils.md5Hex(GSON.toJson(configByClusterName));
                concurrentHashMap.put(str, configByClusterName);
                concurrentHashMap2.put(str, md5Hex);
            } catch (Throwable th) {
                concurrentHashMap3.put(str, (String) Optional.ofNullable(th.getMessage()).orElse("Unknown error, please check logs"));
                LOGGER.error("Failed to update cluster config={}", str, th);
            }
        });
        this.sortClusterErrorLogMap = concurrentHashMap3;
        this.sortClusterConfigMap = concurrentHashMap;
        this.sortClusterMd5Map = concurrentHashMap2;
    }

    private SortClusterConfig getConfigByClusterName(String str, List<SortTaskInfo> list, Map<String, List<StreamSinkEntity>> map, Map<String, DataNodeInfo> map2) {
        return SortClusterConfig.builder().clusterName(str).sortTasks((List) list.stream().map(sortTaskInfo -> {
            try {
                String sortTaskName = sortTaskInfo.getSortTaskName();
                String sinkType = sortTaskInfo.getSinkType();
                DataNodeInfo dataNodeInfo = (DataNodeInfo) map2.get(sortTaskInfo.getDataNodeName());
                return SortTaskConfig.builder().name(sortTaskName).type(sinkType).idParams(parseIdParams((List) map.get(sortTaskName), dataNodeInfo)).sinkParams(parseSinkParams(dataNodeInfo)).build();
            } catch (Exception e) {
                LOGGER.error("fail to parse sort task config of cluster={}", str, e);
                return null;
            }
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList())).build();
    }

    private List<Map<String, String>> parseIdParams(List<StreamSinkEntity> list, DataNodeInfo dataNodeInfo) {
        return (List) list.stream().map(streamSinkEntity -> {
            try {
                return this.sinkOperatorFactory.getInstance(streamSinkEntity.getSinkType()).parse2IdParams(streamSinkEntity, this.fieldMap.get(streamSinkEntity.getId()), dataNodeInfo);
            } catch (Exception e) {
                LOGGER.error("fail to parse id params of groupId={}, streamId={} name={}, type={}}", new Object[]{streamSinkEntity.getInlongGroupId(), streamSinkEntity.getInlongStreamId(), streamSinkEntity.getSinkName(), streamSinkEntity.getSinkType(), e});
                return null;
            }
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
    }

    private Map<String, String> parseSinkParams(DataNodeInfo dataNodeInfo) {
        return this.dataNodeOperatorFactory.getInstance(dataNodeInfo.getType()).parse2SinkParams(dataNodeInfo);
    }

    private void setReloadTimer() {
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this::reload, this.reloadInterval, this.reloadInterval, TimeUnit.MILLISECONDS);
    }
}
