package org.apache.inlong.manager.service.core.impl;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
import org.apache.inlong.common.pojo.sort.SortConfig;
import org.apache.inlong.common.pojo.sort.SortConfigResponse;
import org.apache.inlong.common.pojo.sort.TaskConfig;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig;
import org.apache.inlong.common.pojo.sort.mq.PulsarClusterConfig;
import org.apache.inlong.common.pojo.sort.node.NodeConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.common.util.Utils;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.plugin.Plugin;
import org.apache.inlong.manager.common.plugin.PluginBinder;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.SortConfigEntity;
import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
import org.apache.inlong.manager.service.core.ConfigLoader;
import org.apache.inlong.manager.service.core.SortClusterService;
import org.apache.inlong.manager.service.core.SortService;
import org.apache.inlong.manager.service.core.SortSourceService;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.node.DataNodeOperatorFactory;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.workflow.plugin.sort.PollerPlugin;
import org.apache.inlong.manager.workflow.plugin.sort.SortPoller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Lazy
@Service
/* loaded from: input_file:org/apache/inlong/manager/service/core/impl/SortServiceImpl.class */
public class SortServiceImpl implements SortService, PluginBinder {
    private static final Logger log = LoggerFactory.getLogger(SortServiceImpl.class);
    private static final int RESPONSE_CODE_SUCCESS = 0;
    private static final int RESPONSE_CODE_NO_UPDATE = 1;
    private static final int RESPONSE_CODE_FAIL = -1;
    private static final int RESPONSE_CODE_REQUEST_PARAMS_ERROR = -101;

    @Autowired
    @Lazy
    private SortSourceService sortSourceService;

    @Autowired
    @Lazy
    private SortClusterService sortClusterService;

    @Autowired
    private InlongGroupService groupService;

    @Autowired
    private InlongStreamService streamService;

    @Autowired
    private ConfigLoader configLoader;

    @Autowired
    private DataNodeOperatorFactory dataNodeOperatorFactory;
    private Map<String, byte[]> sortConfigMap = new ConcurrentHashMap();
    private Map<String, String> sortConfigMd5Map = new ConcurrentHashMap();
    private Map<String, List<MqClusterConfig>> mqClusterConfigMap = new ConcurrentHashMap();
    private Map<String, NodeConfig> nodeInfoMap = new ConcurrentHashMap();
    private SortPoller sortPoller;

    @PostConstruct
    public void initialize() {
        log.info("create SortServiceImpl for " + SortSourceServiceImpl.class.getSimpleName());
        try {
            reload();
            setReloadTimer();
        } catch (Throwable th) {
            log.error("initialize SortServiceImpl error", th);
        }
    }

    @Transactional(rollbackFor = {Exception.class})
    public void reload() {
        log.debug("start to reload sort config.");
        try {
            reloadMqCluster();
            reloadNodeConfig();
            reloadDataFlowConfig();
        } catch (Throwable th) {
            log.error("fail to reload all sort config", th);
        }
        log.debug("end to reload config");
    }

    private void setReloadTimer() {
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this::reload, 60000L, 60000L, TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.inlong.manager.service.core.SortService
    public SortClusterResponse getClusterConfig(String str, String str2) {
        return this.sortClusterService.getClusterConfig(str, str2);
    }

    @Override // org.apache.inlong.manager.service.core.SortService
    public SortSourceConfigResponse getSourceConfig(String str, String str2, String str3) {
        return this.sortSourceService.getSourceConfig(str, str2, str3);
    }

    @Override // org.apache.inlong.manager.service.core.SortService
    public List<SortStatusInfo> listSortStatus(SortStatusRequest sortStatusRequest) {
        Preconditions.expectNotNull(this.sortPoller, "sort status poller not initialized, please try later");
        try {
            List list = (List) sortStatusRequest.getInlongGroupIds().stream().map(str -> {
                try {
                    return this.groupService.get(str);
                } catch (Exception e) {
                    log.error("can not get groupId: {}, skip it", str, e);
                    return null;
                }
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList());
            ArrayList arrayList = new ArrayList();
            list.forEach(inlongGroupInfo -> {
                arrayList.addAll(this.streamService.list(inlongGroupInfo.getInlongGroupId()));
            });
            List<SortStatusInfo> pollSortStatus = this.sortPoller.pollSortStatus(arrayList, sortStatusRequest.getCredentials());
            log.debug("success to list sort status for request={}, result={}", sortStatusRequest, pollSortStatus);
            return pollSortStatus;
        } catch (Exception e) {
            log.error("poll sort status error: ", e);
            throw new BusinessException("poll sort status error: " + e.getMessage());
        }
    }

    public void acceptPlugin(Plugin plugin) {
        if (plugin instanceof PollerPlugin) {
            this.sortPoller = ((PollerPlugin) plugin).getSortPoller();
        }
    }

    @Override // org.apache.inlong.manager.service.core.SortService
    public SortConfigResponse getSortConfig(String str, String str2) {
        if (StringUtils.isBlank(str)) {
            log.debug("cluster name is blank, return nothing");
            return SortConfigResponse.builder().code(RESPONSE_CODE_REQUEST_PARAMS_ERROR).msg("cluster name is blank, return nothing").build();
        }
        if (this.sortConfigMap.containsKey(str)) {
            return this.sortConfigMd5Map.get(str).equals(str2) ? SortConfigResponse.builder().code(RESPONSE_CODE_NO_UPDATE).msg("No update").md5(str2).build() : SortConfigResponse.builder().code(RESPONSE_CODE_SUCCESS).data(this.sortConfigMap.get(str)).md5(this.sortConfigMd5Map.get(str)).build();
        }
        String format = String.format("there is no valid sort config of cluster %s", str);
        log.debug(format);
        return SortConfigResponse.builder().code(RESPONSE_CODE_FAIL).msg(format).build();
    }

    private void reloadMqCluster() {
        HashMap hashMap = new HashMap();
        this.configLoader.loadAllClusterConfigEntity().forEach(clusterConfigEntity -> {
            String clusterTag = clusterConfigEntity.getClusterTag();
            if ("PULSAR".equals(clusterConfigEntity.getClusterType())) {
                hashMap.putIfAbsent(clusterTag, new ArrayList(JsonUtils.parseArray(clusterConfigEntity.getConfigParams(), PulsarClusterConfig.class)));
            }
        });
        this.mqClusterConfigMap = hashMap;
    }

    private void reloadNodeConfig() {
        this.nodeInfoMap = (Map) this.configLoader.loadAllDataNodeEntity().stream().filter(dataNodeEntity -> {
            return StringUtils.isNotBlank(dataNodeEntity.getName());
        }).map(dataNodeEntity2 -> {
            try {
                return this.dataNodeOperatorFactory.getInstance(dataNodeEntity2.getType()).getNodeConfig(dataNodeEntity2);
            } catch (Exception e) {
                log.error("parse node config error for data node name={}", dataNodeEntity2.getName(), e);
                return null;
            }
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toMap((v0) -> {
            return v0.getNodeName();
        }, nodeConfig -> {
            return nodeConfig;
        }));
    }

    private void reloadDataFlowConfig() {
        ObjectMapper objectMapper = new ObjectMapper();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        List<SortConfigEntity> loadAllSortConfigEntity = this.configLoader.loadAllSortConfigEntity();
        for (SortConfigEntity sortConfigEntity : loadAllSortConfigEntity) {
            if (StringUtils.isBlank(sortConfigEntity.getSortTaskName())) {
                sortConfigEntity.setSortTaskName("DEFAULT_TASK");
            }
        }
        Map map = (Map) loadAllSortConfigEntity.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getInlongClusterName();
        }, Collectors.groupingBy((v0) -> {
            return v0.getSortTaskName();
        }, Collectors.groupingBy((v0) -> {
            return v0.getInlongClusterTag();
        }))));
        for (String str : map.keySet()) {
            List list = (List) hashMap3.computeIfAbsent(str, str2 -> {
                return new ArrayList();
            });
            SortConfig sortConfig = new SortConfig();
            sortConfig.setSortClusterName(str);
            Map map2 = (Map) map.get(str);
            for (String str3 : map2.keySet()) {
                Map map3 = (Map) map2.get(str3);
                TaskConfig build = TaskConfig.builder().sortTaskName(str3).clusterTagConfigs(new ArrayList()).nodeConfig(this.nodeInfoMap.get(str3)).build();
                for (String str4 : map3.keySet()) {
                    build.getClusterTagConfigs().add(ClusterTagConfig.builder().mqClusterConfigs(this.mqClusterConfigMap.getOrDefault(str4, new ArrayList())).clusterTag(str4).dataFlowConfigs((List) ((List) map3.get(str4)).stream().map(sortConfigEntity2 -> {
                        try {
                            return (DataFlowConfig) objectMapper.readValue(sortConfigEntity2.getConfigParams(), DataFlowConfig.class);
                        } catch (Exception e) {
                            log.error("parse data flow config error for sinkId={}", sortConfigEntity2.getSinkId(), e);
                            return null;
                        }
                    }).filter((v0) -> {
                        return Objects.nonNull(v0);
                    }).sorted(Comparator.comparingInt(dataFlowConfig -> {
                        return Integer.parseInt(dataFlowConfig.getDataflowId());
                    })).collect(Collectors.toList())).build());
                }
                list.add(build);
            }
            sortConfig.setTasks((List) hashMap3.get(str));
            try {
                String writeValueAsString = objectMapper.writeValueAsString(sortConfig);
                hashMap.put(str, Utils.compressGZip(writeValueAsString.getBytes()));
                hashMap2.put(str, DigestUtils.md5Hex(writeValueAsString));
            } catch (Exception e) {
                log.info("parse sort config error for cluster name={}", str, e);
            }
        }
        this.sortConfigMap = hashMap;
        this.sortConfigMd5Map = hashMap2;
    }
}
