package org.apache.inlong.manager.service.core.impl;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.db.CommandEntity;
import org.apache.inlong.common.enums.PullJobTypeEnum;
import org.apache.inlong.common.enums.TaskStateEnum;
import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.agent.AgentConfigInfo;
import org.apache.inlong.common.pojo.agent.AgentConfigRequest;
import org.apache.inlong.common.pojo.agent.AgentResponseCode;
import org.apache.inlong.common.pojo.agent.CmdConfig;
import org.apache.inlong.common.pojo.agent.DataConfig;
import org.apache.inlong.common.pojo.agent.TaskRequest;
import org.apache.inlong.common.pojo.agent.TaskResult;
import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.common.pojo.agent.installer.ConfigRequest;
import org.apache.inlong.common.pojo.agent.installer.ConfigResult;
import org.apache.inlong.common.pojo.agent.installer.ModuleConfig;
import org.apache.inlong.common.pojo.agent.installer.PackageConfig;
import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.ModuleType;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.ModuleConfigEntity;
import org.apache.inlong.manager.dao.entity.PackageConfigEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.DataSourceCmdConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest;
import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
import org.apache.inlong.manager.pojo.module.ModuleDTO;
import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.cluster.node.AgentClusterNodeOperator;
import org.apache.inlong.manager.service.core.AgentService;
import org.apache.inlong.manager.service.core.ConfigLoader;
import org.apache.inlong.manager.service.source.SourceOperatorFactory;
import org.apache.inlong.manager.service.source.SourceSnapshotOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/core/impl/AgentServiceImpl.class */
public class AgentServiceImpl implements AgentService {
    private static final int UNISSUED_STATUS = 2;
    private static final int ISSUED_STATUS = 3;
    private static final int MODULUS_100 = 100;
    private static final int TASK_FETCH_SIZE = 2;
    private final LinkedBlockingQueue<ConfigRequest> updateModuleConfigQueue = new LinkedBlockingQueue<>();
    private Map<String, TaskResult> taskConfigMap = new ConcurrentHashMap();
    private Map<String, AgentConfigInfo> agentConfigMap = new ConcurrentHashMap();
    private Map<Integer, ModuleConfig> moduleConfigMap = new ConcurrentHashMap();
    private Map<String, ConfigResult> installerConfigMap = new ConcurrentHashMap();

    @Value("${source.update.enabled:false}")
    private Boolean updateTaskTimeoutEnabled;

    @Value("${source.update.before.seconds:60}")
    private Integer beforeSeconds;

    @Value("${source.update.interval:60}")
    private Integer updateTaskInterval;

    @Value("${source.clean.enabled:false}")
    private Boolean sourceCleanEnabled;

    @Value("${source.clean.interval.seconds:600}")
    private Integer cleanInterval;

    @Value("${add.task.clean.enabled:false}")
    private Boolean dataAddTaskCleanEnabled;

    @Value("${add.task.clean.interval.seconds:10}")
    private Integer dataAddTaskCleanInterval;

    @Value("${add.task.retention.days:7}")
    private Integer retentionDays;

    @Value("${default.module.id:1}")
    private Integer defaultModuleId;

    @Autowired
    private StreamSourceEntityMapper sourceMapper;

    @Autowired
    private SourceSnapshotOperator snapshotOperator;

    @Autowired
    private DataSourceCmdConfigEntityMapper sourceCmdConfigMapper;

    @Autowired
    private InlongGroupEntityMapper groupMapper;

    @Autowired
    private InlongStreamEntityMapper streamMapper;

    @Autowired
    private InlongClusterEntityMapper clusterMapper;

    @Autowired
    private InlongClusterNodeEntityMapper clusterNodeMapper;

    @Autowired
    private SourceOperatorFactory operatorFactory;

    @Autowired
    private AgentClusterNodeOperator agentClusterNodeOperator;

    @Autowired
    private ConfigLoader configLoader;
    private static final Logger LOGGER = LoggerFactory.getLogger(AgentServiceImpl.class);
    private static final Gson GSON = new Gson();

    @PostConstruct
    private void startHeartbeatTask() {
        try {
            reload();
            setReloadTimer();
        } catch (Exception e) {
            LOGGER.error("load agent task config failed", e);
        }
        LOGGER.debug("end to reload config for installer");
        if (this.updateTaskTimeoutEnabled.booleanValue()) {
            Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("scheduled-source-timeout-%d").setDaemon(true).build()).scheduleWithFixedDelay(() -> {
                try {
                    this.sourceMapper.updateStatusToTimeout(this.beforeSeconds);
                    LOGGER.info("update task status successfully");
                } catch (Throwable th) {
                    LOGGER.error("update task status error", th);
                }
            }, 0L, this.updateTaskInterval.intValue(), TimeUnit.SECONDS);
            LOGGER.info("update task status started successfully");
        }
        if (this.sourceCleanEnabled.booleanValue()) {
            Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("scheduled-source-deleted-%d").setDaemon(true).build()).scheduleWithFixedDelay(() -> {
                try {
                    this.sourceMapper.updateStatusByDeleted();
                    LOGGER.info("clean task successfully");
                } catch (Throwable th) {
                    LOGGER.error("clean task error", th);
                }
            }, 0L, this.cleanInterval.intValue(), TimeUnit.SECONDS);
            LOGGER.info("clean task started successfully");
        }
        if (this.dataAddTaskCleanEnabled.booleanValue()) {
            Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("scheduled-subSource-deleted-%d").setDaemon(true).build()).scheduleWithFixedDelay(() -> {
                try {
                    this.sourceMapper.logicalDeleteByTimeout(this.retentionDays);
                    LOGGER.info("clean sub task successfully");
                } catch (Throwable th) {
                    LOGGER.error("clean sub task error", th);
                }
            }, 0L, this.dataAddTaskCleanInterval.intValue(), TimeUnit.SECONDS);
            LOGGER.info("clean sub task started successfully");
        }
    }

    private void setReloadTimer() {
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this::reload, 60000L, 60000L, TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.inlong.manager.service.core.AgentService
    public Boolean reportSnapshot(TaskSnapshotRequest taskSnapshotRequest) {
        return this.snapshotOperator.snapshot(taskSnapshotRequest);
    }

    @Override // org.apache.inlong.manager.service.core.AgentService
    @Transactional(rollbackFor = {Throwable.class}, isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW)
    public void report(TaskRequest taskRequest) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("begin to get agent task: {}", taskRequest);
        }
        if (taskRequest == null || StringUtils.isBlank(taskRequest.getAgentIp())) {
            throw new BusinessException("agent request or agent ip was empty, just return");
        }
        preTimeoutTasks(taskRequest);
        if (CollectionUtils.isEmpty(taskRequest.getCommandInfo())) {
            LOGGER.debug("task result was empty in request: {}, just return", taskRequest);
            return;
        }
        Iterator it = taskRequest.getCommandInfo().iterator();
        while (it.hasNext()) {
            updateTaskStatus((CommandEntity) it.next());
        }
    }

    public void reload() {
        reloadAgentTask();
        reloadModule();
        updateModuleConfig();
    }

    public void reloadAgentTask() {
        LOGGER.debug("start to reload agent task config.");
        try {
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap();
            ConcurrentHashMap concurrentHashMap3 = new ConcurrentHashMap();
            this.configLoader.loadAllAgentTaskConfigEntity().forEach(agentTaskConfigEntity -> {
                try {
                    String str = agentTaskConfigEntity.getAgentIp() + "_" + agentTaskConfigEntity.getClusterName();
                    TaskResult taskResult = (TaskResult) JsonUtils.parseObject(agentTaskConfigEntity.getTaskParams(), TaskResult.class);
                    if (taskResult != null) {
                        taskResult.setVersion(agentTaskConfigEntity.getVersion());
                        concurrentHashMap.putIfAbsent(str, taskResult);
                    }
                    AgentConfigInfo agentConfigInfo = (AgentConfigInfo) JsonUtils.parseObject(agentTaskConfigEntity.getConfigParams(), AgentConfigInfo.class);
                    if (agentConfigInfo != null) {
                        agentConfigInfo.setVersion(agentTaskConfigEntity.getVersion());
                        concurrentHashMap2.putIfAbsent(str, agentConfigInfo);
                    }
                    ConfigResult configResult = (ConfigResult) JsonUtils.parseObject(agentTaskConfigEntity.getModuleParams(), ConfigResult.class);
                    if (configResult != null) {
                        configResult.setVersion(agentTaskConfigEntity.getVersion());
                        concurrentHashMap3.putIfAbsent(str, configResult);
                    }
                } catch (Exception e) {
                    LOGGER.error("failed to get agent task config for agent ip={}, cluster name={}", agentTaskConfigEntity.getAgentIp(), agentTaskConfigEntity.getClusterName());
                }
            });
            this.taskConfigMap = concurrentHashMap;
            this.agentConfigMap = concurrentHashMap2;
            this.installerConfigMap = concurrentHashMap3;
        } catch (Throwable th) {
            LOGGER.error("failed to reload all agent task config", th);
        }
        LOGGER.debug("end to reload agent task config");
    }

    public void reloadModule() {
        LOGGER.info("start to reload agent task config.");
        try {
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            List<ModuleConfigEntity> loadAllModuleConfigEntity = this.configLoader.loadAllModuleConfigEntity();
            List<PackageConfigEntity> loadAllPackageConfigEntity = this.configLoader.loadAllPackageConfigEntity();
            ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap();
            loadAllPackageConfigEntity.forEach(packageConfigEntity -> {
                concurrentHashMap2.putIfAbsent(packageConfigEntity.getId(), packageConfigEntity);
            });
            loadAllModuleConfigEntity.forEach(moduleConfigEntity -> {
                ModuleConfig moduleConfig = (ModuleConfig) CommonBeanUtils.copyProperties(moduleConfigEntity, ModuleConfig::new);
                moduleConfig.setId(Integer.valueOf(ModuleType.forType(moduleConfigEntity.getType()).getModuleId()));
                moduleConfig.setEntityId(moduleConfigEntity.getId());
                moduleConfig.setPackageConfig((PackageConfig) CommonBeanUtils.copyProperties((PackageConfigEntity) concurrentHashMap2.get(moduleConfigEntity.getPackageId()), PackageConfig::new));
                ModuleConfig moduleConfig2 = (ModuleConfig) CommonBeanUtils.copyProperties((ModuleDTO) JsonUtils.parseObject(moduleConfigEntity.getExtParams(), ModuleDTO.class), moduleConfig, true);
                moduleConfig2.setProcessesNum(1);
                concurrentHashMap.putIfAbsent(moduleConfigEntity.getId(), moduleConfig2);
            });
            this.moduleConfigMap = concurrentHashMap;
        } catch (Throwable th) {
            LOGGER.error("fail to reload module config", th);
        }
        LOGGER.debug("end to reload module config");
    }

    @Transactional(rollbackFor = {Exception.class})
    public void updateModuleConfig() {
        LinkedBlockingQueue linkedBlockingQueue;
        LOGGER.info("start to update module config.");
        try {
            linkedBlockingQueue = new LinkedBlockingQueue();
        } catch (Throwable th) {
            LOGGER.error("fail to update module config", th);
        }
        if (this.updateModuleConfigQueue.isEmpty()) {
            return;
        }
        LOGGER.info("begin to update module config source size={}, target size={}, move num={}", new Object[]{Integer.valueOf(this.updateModuleConfigQueue.size()), Integer.valueOf(linkedBlockingQueue.size()), Integer.valueOf(this.updateModuleConfigQueue.drainTo(linkedBlockingQueue))});
        while (!linkedBlockingQueue.isEmpty()) {
            ConfigRequest configRequest = (ConfigRequest) linkedBlockingQueue.poll();
            String localIp = configRequest.getLocalIp();
            String clusterName = configRequest.getClusterName();
            ConfigResult configResult = this.installerConfigMap.get(localIp + "_" + clusterName);
            Integer num = 0;
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            if (this.moduleConfigMap.isEmpty() || this.moduleConfigMap.get(this.defaultModuleId) != null) {
                return;
            }
            if (configResult == null) {
                arrayList2.add(this.defaultModuleId);
            } else {
                if (CollectionUtils.isNotEmpty(configResult.getModuleList())) {
                    num = ((ModuleConfig) configResult.getModuleList().get(0)).getRestartTime();
                }
                Iterator it = configResult.getModuleList().iterator();
                while (it.hasNext()) {
                    arrayList2.add(((ModuleConfig) it.next()).getEntityId());
                }
            }
            Iterator it2 = arrayList2.iterator();
            while (it2.hasNext()) {
                ModuleConfig moduleConfig = this.moduleConfigMap.get((Integer) it2.next());
                if (moduleConfig != null) {
                    moduleConfig.setRestartTime(num);
                    moduleConfig.setMd5(DigestUtils.md5Hex(GSON.toJson(moduleConfig)));
                    arrayList.add(moduleConfig);
                }
            }
            ConfigResult build = ConfigResult.builder().moduleList(arrayList).md5(DigestUtils.md5Hex(GSON.toJson(arrayList))).code(AgentResponseCode.SUCCESS).build();
            if (configResult == null || !Objects.equals(configResult.getMd5(), build.getMd5())) {
                this.agentClusterNodeOperator.updateModuleConfig(localIp, clusterName);
            }
        }
        LOGGER.info("end to update module config");
    }

    private void updateTaskStatus(CommandEntity commandEntity) {
        Integer taskId = commandEntity.getTaskId();
        StreamSourceEntity selectForAgentTask = this.sourceMapper.selectForAgentTask(taskId);
        if (selectForAgentTask == null) {
            LOGGER.warn("stream source not found by id={}, just return", taskId);
            return;
        }
        if (!Objects.equals(commandEntity.getVersion(), selectForAgentTask.getVersion())) {
            LOGGER.warn("task result version [{}] not equals to current [{}] for id [{}], skip update", new Object[]{commandEntity.getVersion(), selectForAgentTask.getVersion(), taskId});
            return;
        }
        int commandResult = commandEntity.getCommandResult();
        int intValue = selectForAgentTask.getStatus().intValue();
        int intValue2 = SourceStatus.SOURCE_NORMAL.getCode().intValue();
        if (1 == commandResult) {
            LOGGER.warn("task failed for id =[{}]", taskId);
            intValue2 = SourceStatus.SOURCE_FAILED.getCode().intValue();
        } else if (intValue / MODULUS_100 == ISSUED_STATUS) {
            if (SourceStatus.BEEN_ISSUED_DELETE.getCode().intValue() == intValue) {
                intValue2 = SourceStatus.SOURCE_DISABLE.getCode().intValue();
            } else if (SourceStatus.BEEN_ISSUED_STOP.getCode().intValue() == intValue) {
                intValue2 = SourceStatus.SOURCE_STOP.getCode().intValue();
            }
        }
        if (intValue2 != intValue) {
            this.sourceMapper.updateStatus(taskId, Integer.valueOf(intValue2), false);
            LOGGER.info("task result=[{}], update source status to [{}] for id [{}]", new Object[]{Integer.valueOf(commandResult), Integer.valueOf(intValue2), taskId});
        }
    }

    @Override // org.apache.inlong.manager.service.core.AgentService
    public AgentConfigInfo getAgentConfig(AgentConfigRequest agentConfigRequest) {
        LOGGER.debug("begin to get agent config info for {}", agentConfigRequest);
        AgentConfigInfo agentConfigInfo = this.agentConfigMap.get(agentConfigRequest.getIp() + "_" + agentConfigRequest.getClusterName());
        if (agentConfigInfo == null) {
            return null;
        }
        if (agentConfigRequest.getMd5() == null || !Objects.equals(agentConfigRequest.getMd5(), agentConfigInfo.getMd5())) {
            return agentConfigInfo;
        }
        LOGGER.debug("success to get agent config info for: {}, result: {}", agentConfigRequest, agentConfigInfo);
        return AgentConfigInfo.builder().md5(agentConfigInfo.getMd5()).code(AgentResponseCode.NO_UPDATE).build();
    }

    @Override // org.apache.inlong.manager.service.core.AgentService
    @Transactional(rollbackFor = {Throwable.class}, isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW)
    public TaskResult getTaskResult(TaskRequest taskRequest) {
        if (StringUtils.isBlank(taskRequest.getClusterName()) || StringUtils.isBlank(taskRequest.getAgentIp())) {
            throw new BusinessException("agent request or agent ip was empty, just return");
        }
        preProcessFileTask(taskRequest);
        preProcessNonFileTasks(taskRequest);
        List<DataConfig> processQueuedTasks = processQueuedTasks(taskRequest);
        return TaskResult.builder().dataConfigs(processQueuedTasks).cmdConfigs(getAgentCmdConfigs(taskRequest)).build();
    }

    @Override // org.apache.inlong.manager.service.core.AgentService
    public TaskResult getExistTaskConfig(TaskRequest taskRequest) {
        LOGGER.debug("begin to get all exist task by request={}", taskRequest);
        TaskResult taskResult = this.taskConfigMap.get(taskRequest.getAgentIp() + "_" + taskRequest.getClusterName());
        if (taskResult == null) {
            taskResult = this.taskConfigMap.get("All_" + taskRequest.getClusterName());
        }
        if (taskResult == null) {
            return null;
        }
        return (taskRequest.getMd5() == null || !Objects.equals(taskRequest.getMd5(), taskResult.getMd5())) ? taskResult : TaskResult.builder().dataConfigs(new ArrayList()).cmdConfigs(new ArrayList()).md5(taskResult.getMd5()).code(AgentResponseCode.NO_UPDATE).build();
    }

    @Override // org.apache.inlong.manager.service.core.AgentService
    @Transactional(rollbackFor = {Throwable.class}, isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW)
    public Boolean bindGroup(AgentClusterNodeBindGroupRequest agentClusterNodeBindGroupRequest) {
        HashSet newHashSet = Sets.newHashSet();
        HashSet newHashSet2 = Sets.newHashSet();
        if (agentClusterNodeBindGroupRequest.getBindClusterNodes() != null) {
            newHashSet.addAll(agentClusterNodeBindGroupRequest.getBindClusterNodes());
        }
        if (agentClusterNodeBindGroupRequest.getUnbindClusterNodes() != null) {
            newHashSet2.addAll(agentClusterNodeBindGroupRequest.getUnbindClusterNodes());
        }
        Preconditions.expectTrue(Sets.union(newHashSet, newHashSet2).size() == newHashSet.size() + newHashSet2.size(), "can not add and del node tag in the sameTime");
        InlongClusterEntity selectByNameAndType = this.clusterMapper.selectByNameAndType(agentClusterNodeBindGroupRequest.getClusterName(), "AGENT");
        if (CollectionUtils.isNotEmpty(newHashSet)) {
            newHashSet.stream().flatMap(str -> {
                ClusterPageRequest clusterPageRequest = new ClusterPageRequest();
                clusterPageRequest.setParentId(selectByNameAndType.getId());
                clusterPageRequest.setType("AGENT");
                clusterPageRequest.setKeyword(str);
                return this.clusterNodeMapper.selectByCondition(clusterPageRequest).stream();
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).forEach(inlongClusterNodeEntity -> {
                HashSet hashSet = new HashSet();
                AgentClusterNodeDTO agentClusterNodeDTO = new AgentClusterNodeDTO();
                if (StringUtils.isNotBlank(inlongClusterNodeEntity.getExtParams())) {
                    agentClusterNodeDTO = AgentClusterNodeDTO.getFromJson(inlongClusterNodeEntity.getExtParams());
                    String agentGroup = agentClusterNodeDTO.getAgentGroup();
                    hashSet = StringUtils.isBlank(agentGroup) ? hashSet : Sets.newHashSet(agentGroup.split(","));
                }
                hashSet.add(agentClusterNodeBindGroupRequest.getAgentGroup());
                agentClusterNodeDTO.setAgentGroup(Joiner.on(",").join(hashSet));
                inlongClusterNodeEntity.setExtParams(GSON.toJson(agentClusterNodeDTO));
                this.clusterNodeMapper.insertOnDuplicateKeyUpdate(inlongClusterNodeEntity);
            });
        }
        if (CollectionUtils.isNotEmpty(newHashSet2)) {
            newHashSet2.stream().flatMap(str2 -> {
                ClusterPageRequest clusterPageRequest = new ClusterPageRequest();
                clusterPageRequest.setParentId(selectByNameAndType.getId());
                clusterPageRequest.setType("AGENT");
                clusterPageRequest.setKeyword(str2);
                return this.clusterNodeMapper.selectByCondition(clusterPageRequest).stream();
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).forEach(inlongClusterNodeEntity2 -> {
                HashSet hashSet = new HashSet();
                AgentClusterNodeDTO agentClusterNodeDTO = new AgentClusterNodeDTO();
                if (StringUtils.isNotBlank(inlongClusterNodeEntity2.getExtParams())) {
                    agentClusterNodeDTO = AgentClusterNodeDTO.getFromJson(inlongClusterNodeEntity2.getExtParams());
                    String agentGroup = agentClusterNodeDTO.getAgentGroup();
                    hashSet = StringUtils.isBlank(agentGroup) ? hashSet : Sets.newHashSet(agentGroup.split(","));
                }
                hashSet.remove(agentClusterNodeBindGroupRequest.getAgentGroup());
                agentClusterNodeDTO.setAgentGroup(Joiner.on(",").join(hashSet));
                inlongClusterNodeEntity2.setExtParams(GSON.toJson(agentClusterNodeDTO));
                this.clusterNodeMapper.insertOnDuplicateKeyUpdate(inlongClusterNodeEntity2);
            });
        }
        return true;
    }

    @Override // org.apache.inlong.manager.service.core.AgentService
    public ConfigResult getConfig(ConfigRequest configRequest) {
        if (!this.updateModuleConfigQueue.contains(configRequest)) {
            this.updateModuleConfigQueue.add(configRequest);
        }
        ConfigResult configResult = this.installerConfigMap.get(configRequest.getLocalIp() + "_" + configRequest.getClusterName());
        if (configResult != null) {
            return Objects.equals(configRequest.getMd5(), configResult.getMd5()) ? ConfigResult.builder().md5(configResult.getMd5()).code(AgentResponseCode.NO_UPDATE).build() : configResult;
        }
        LOGGER.debug(String.format("can not get config result for cluster name=%s, ip=%s", configRequest.getClusterName(), configRequest.getLocalIp()));
        return null;
    }

    private List<DataConfig> processQueuedTasks(TaskRequest taskRequest) {
        HashSet newHashSet = Sets.newHashSet(SourceStatus.TOBE_ISSUED_SET);
        if (PullJobTypeEnum.NEVER == PullJobTypeEnum.getPullJobType(taskRequest.getPullJobType())) {
            LOGGER.debug("agent pull job type is [NEVER], just pull to be active tasks");
            newHashSet.remove(SourceStatus.TO_BE_ISSUED_ADD);
        }
        List<StreamSourceEntity> selectByStatusAndCluster = this.sourceMapper.selectByStatusAndCluster((List) newHashSet.stream().map((v0) -> {
            return v0.getCode();
        }).collect(Collectors.toList()), taskRequest.getClusterName(), taskRequest.getAgentIp(), taskRequest.getUuid());
        ArrayList newArrayList = Lists.newArrayList();
        for (StreamSourceEntity streamSourceEntity : selectByStatusAndCluster) {
            int op = getOp(streamSourceEntity.getStatus().intValue());
            int nextStatus = getNextStatus(streamSourceEntity.getStatus().intValue());
            streamSourceEntity.setPreviousStatus(streamSourceEntity.getStatus());
            streamSourceEntity.setStatus(Integer.valueOf(nextStatus));
            if (this.sourceMapper.updateByPrimaryKeySelective(streamSourceEntity) == 1) {
                streamSourceEntity.setVersion(Integer.valueOf(streamSourceEntity.getVersion().intValue() + 1));
                DataConfig dataConfig = getDataConfig(streamSourceEntity, op);
                newArrayList.add(dataConfig);
                LOGGER.info("Offer source task({}) for agent({}) in cluster({})", new Object[]{dataConfig, taskRequest.getAgentIp(), taskRequest.getClusterName()});
            }
        }
        return newArrayList;
    }

    private void preProcessNonFileTasks(TaskRequest taskRequest) {
        List asList;
        if (PullJobTypeEnum.NEVER == PullJobTypeEnum.getPullJobType(taskRequest.getPullJobType())) {
            LOGGER.debug("agent pull job type is [NEVER], just pull to be active tasks");
            asList = Collections.singletonList(SourceStatus.TO_BE_ISSUED_ACTIVE.getCode());
        } else {
            asList = Arrays.asList(SourceStatus.TO_BE_ISSUED_ADD.getCode(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode());
        }
        for (StreamSourceEntity streamSourceEntity : this.sourceMapper.selectByStatusAndType(asList, Lists.newArrayList(new String[]{"MYSQL_SQL", "KAFKA", "MYSQL_BINLOG", "POSTGRESQL"}), 2)) {
            streamSourceEntity.setAgentIp(taskRequest.getAgentIp());
            streamSourceEntity.setUuid(taskRequest.getUuid());
            this.sourceMapper.updateByPrimaryKeySelective(streamSourceEntity);
        }
    }

    private void preProcessFileTask(TaskRequest taskRequest) {
        preProcessTemplateFileTask(taskRequest);
        preProcessLabelFileTasks(taskRequest);
    }

    private void preProcessTemplateFileTask(TaskRequest taskRequest) {
        List asList = Arrays.asList(SourceStatus.TO_BE_ISSUED_ADD.getCode(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode());
        String agentIp = taskRequest.getAgentIp();
        String clusterName = taskRequest.getClusterName();
        Preconditions.expectTrue(StringUtils.isNotBlank(agentIp) || StringUtils.isNotBlank(clusterName), "both agent ip and cluster name are blank when fetching file task");
        List selectTemplateSourceByCluster = this.sourceMapper.selectTemplateSourceByCluster(asList, Lists.newArrayList(new String[]{"FILE"}), clusterName);
        HashSet newHashSet = Sets.newHashSet(new GroupStatus[]{GroupStatus.CONFIG_OFFLINE_SUCCESSFUL, GroupStatus.CONFIG_OFFLINE_ING, GroupStatus.CONFIG_DELETING, GroupStatus.CONFIG_DELETED});
        selectTemplateSourceByCluster.stream().forEach(streamSourceEntity -> {
            InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(streamSourceEntity.getInlongGroupId());
            if ((selectByGroupId == null || !newHashSet.contains(GroupStatus.forCode(selectByGroupId.getStatus().intValue()))) && this.sourceMapper.selectOneByTaskMapIdAndAgentIp(streamSourceEntity.getId(), agentIp) == null && matchGroup(streamSourceEntity, selectByIpAndCluster(clusterName, agentIp))) {
                StreamSourceEntity streamSourceEntity = (StreamSourceEntity) CommonBeanUtils.copyProperties(streamSourceEntity, StreamSourceEntity::new);
                streamSourceEntity.setSourceName(streamSourceEntity.getSourceName() + "-" + RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT));
                streamSourceEntity.setTaskMapId(streamSourceEntity.getId());
                streamSourceEntity.setAgentIp(agentIp);
                streamSourceEntity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
                this.sourceMapper.insert(streamSourceEntity);
                LOGGER.info("Transform new template task({}) for agent({}) in cluster({}).", new Object[]{streamSourceEntity.getId(), taskRequest.getAgentIp(), taskRequest.getClusterName()});
            }
        });
    }

    private void preProcessLabelFileTasks(TaskRequest taskRequest) {
        List asList = Arrays.asList(SourceStatus.SOURCE_NORMAL.getCode(), SourceStatus.SOURCE_FAILED.getCode(), SourceStatus.SOURCE_STOP.getCode(), SourceStatus.TO_BE_ISSUED_ADD.getCode(), SourceStatus.TO_BE_ISSUED_STOP.getCode(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode());
        String agentIp = taskRequest.getAgentIp();
        String clusterName = taskRequest.getClusterName();
        Preconditions.expectTrue(StringUtils.isNotBlank(agentIp) || StringUtils.isNotBlank(clusterName), "both agent ip and cluster name are blank when fetching file task");
        InlongClusterNodeEntity selectByIpAndCluster = selectByIpAndCluster(clusterName, agentIp);
        this.sourceMapper.selectByAgentIpAndCluster(asList, Lists.newArrayList(new String[]{"FILE"}), agentIp, clusterName).forEach(streamSourceEntity -> {
            HashSet newHashSet = Sets.newHashSet(new SourceStatus[]{SourceStatus.SOURCE_STOP, SourceStatus.TO_BE_ISSUED_STOP});
            if (!matchGroup(streamSourceEntity, selectByIpAndCluster) && !newHashSet.contains(SourceStatus.forCode(streamSourceEntity.getStatus().intValue()))) {
                LOGGER.info("Transform task({}) from {} to {} because tag mismatch for agent({}) in cluster({})", new Object[]{streamSourceEntity.getAgentIp(), streamSourceEntity.getStatus(), SourceStatus.TO_BE_ISSUED_STOP.getCode(), agentIp, clusterName});
                this.sourceMapper.updateStatus(streamSourceEntity.getId(), SourceStatus.TO_BE_ISSUED_STOP.getCode(), false);
            }
            InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(streamSourceEntity.getInlongGroupId());
            HashSet newHashSet2 = Sets.newHashSet(new SourceStatus[]{SourceStatus.SOURCE_NORMAL, SourceStatus.TO_BE_ISSUED_ADD, SourceStatus.TO_BE_ISSUED_ACTIVE});
            HashSet newHashSet3 = Sets.newHashSet(new GroupStatus[]{GroupStatus.CONFIG_SUCCESSFUL});
            if (!matchGroup(streamSourceEntity, selectByIpAndCluster) || selectByGroupId == null || newHashSet2.contains(SourceStatus.forCode(streamSourceEntity.getStatus().intValue())) || !newHashSet3.contains(GroupStatus.forCode(selectByGroupId.getStatus().intValue()))) {
                return;
            }
            LOGGER.info("Transform task({}) from {} to {} because tag rematch for agent({}) in cluster({})", new Object[]{streamSourceEntity.getAgentIp(), streamSourceEntity.getStatus(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode(), agentIp, clusterName});
            this.sourceMapper.updateStatus(streamSourceEntity.getId(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode(), false);
        });
    }

    private void preTimeoutTasks(TaskRequest taskRequest) {
        List selectHeartbeatTimeoutIds = this.sourceMapper.selectHeartbeatTimeoutIds((List) null, taskRequest.getAgentIp(), taskRequest.getClusterName());
        if (CollectionUtils.isNotEmpty(selectHeartbeatTimeoutIds)) {
            this.sourceMapper.rollbackTimeoutStatusByIds(selectHeartbeatTimeoutIds, (String) null);
        }
    }

    private InlongClusterNodeEntity selectByIpAndCluster(String str, String str2) {
        InlongClusterEntity selectByNameAndType = this.clusterMapper.selectByNameAndType(str, "AGENT");
        if (selectByNameAndType == null) {
            return null;
        }
        return (InlongClusterNodeEntity) this.clusterNodeMapper.selectByParentIdAndIp(selectByNameAndType.getId(), str2).stream().findFirst().orElse(null);
    }

    private int getOp(int i) {
        return i % MODULUS_100;
    }

    private int getNextStatus(int i) {
        return 300 + (i % MODULUS_100);
    }

    private DataConfig getDataConfig(StreamSourceEntity streamSourceEntity, int i) {
        DataConfig dataConfig = new DataConfig();
        dataConfig.setIp(streamSourceEntity.getAgentIp());
        dataConfig.setUuid(streamSourceEntity.getUuid());
        dataConfig.setOp(String.valueOf(i));
        dataConfig.setTaskId(streamSourceEntity.getId());
        dataConfig.setTaskType(Integer.valueOf(getTaskType(streamSourceEntity)));
        dataConfig.setTaskName(streamSourceEntity.getSourceName());
        dataConfig.setSnapshot(streamSourceEntity.getSnapshot());
        dataConfig.setTimeZone(streamSourceEntity.getDataTimeZone());
        dataConfig.setVersion(streamSourceEntity.getVersion());
        String inlongGroupId = streamSourceEntity.getInlongGroupId();
        String inlongStreamId = streamSourceEntity.getInlongStreamId();
        dataConfig.setInlongGroupId(inlongGroupId);
        dataConfig.setInlongStreamId(inlongStreamId);
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(inlongGroupId);
        InlongStreamEntity selectByIdentifier = this.streamMapper.selectByIdentifier(inlongGroupId, inlongStreamId);
        String extParams = this.operatorFactory.getInstance(streamSourceEntity.getSourceType()).getExtParams(streamSourceEntity);
        if (selectByGroupId != null && selectByIdentifier != null) {
            dataConfig.setState(Integer.valueOf(SourceStatus.NORMAL_STATUS_SET.contains(SourceStatus.forCode(streamSourceEntity.getStatus().intValue())) ? TaskStateEnum.RUNNING.getType() : TaskStateEnum.FROZEN.getType()));
            dataConfig.setSyncSend(selectByIdentifier.getSyncSend());
            if ("FILE".equalsIgnoreCase(streamSourceEntity.getSourceType())) {
                String valueOf = String.valueOf((char) Integer.parseInt(selectByIdentifier.getDataSeparator()));
                FileSourceDTO fileSourceDTO = (FileSourceDTO) JsonUtils.parseObject(extParams, FileSourceDTO.class);
                if (Objects.nonNull(fileSourceDTO)) {
                    fileSourceDTO.setDataSeparator(valueOf);
                    dataConfig.setAuditVersion(fileSourceDTO.getAuditVersion());
                    fileSourceDTO.setDataContentStyle(selectByIdentifier.getDataType());
                    extParams = JsonUtils.toJsonString(fileSourceDTO);
                }
            }
            InlongStreamInfo inlongStreamInfo = (InlongStreamInfo) CommonBeanUtils.copyProperties(selectByIdentifier, InlongStreamInfo::new);
            InlongStreamExtParam.unpackExtParams(selectByIdentifier.getExtParams(), inlongStreamInfo);
            dataConfig.setPredefinedFields(inlongStreamInfo.getPredefinedFields());
            int intValue = selectByGroupId.getDataReportType().intValue();
            dataConfig.setDataReportType(Integer.valueOf(intValue));
            if (InlongConstants.REPORT_TO_MQ_RECEIVED.intValue() == intValue) {
                ArrayList arrayList = new ArrayList();
                List<InlongClusterEntity> selectByCondition = this.clusterMapper.selectByCondition(ClusterPageRequest.builder().type(selectByGroupId.getMqType()).clusterTagList(Collections.singletonList(selectByGroupId.getInlongClusterTag())).build());
                for (InlongClusterEntity inlongClusterEntity : selectByCondition) {
                    MQClusterInfo mQClusterInfo = new MQClusterInfo();
                    mQClusterInfo.setUrl(inlongClusterEntity.getUrl());
                    mQClusterInfo.setToken(inlongClusterEntity.getToken());
                    mQClusterInfo.setMqType(inlongClusterEntity.getType());
                    mQClusterInfo.setParams((Map) JsonUtils.parseObject(inlongClusterEntity.getExtParams(), HashMap.class));
                    arrayList.add(mQClusterInfo);
                }
                dataConfig.setMqClusters(arrayList);
                String mqResource = selectByGroupId.getMqResource();
                String mqType = selectByGroupId.getMqType();
                if ("PULSAR".equals(mqType) || "TDMQ_PULSAR".equals(mqType)) {
                    String pulsarTenant = InlongPulsarDTO.getFromJson(selectByGroupId.getExtParams()).getPulsarTenant();
                    if (StringUtils.isBlank(pulsarTenant)) {
                        pulsarTenant = PulsarClusterDTO.getFromJson(((InlongClusterEntity) selectByCondition.get(0)).getExtParams()).getPulsarTenant();
                    }
                    String format = String.format("persistent://%s/%s/%s", pulsarTenant, mqResource, selectByIdentifier.getMqResource());
                    DataProxyTopicInfo dataProxyTopicInfo = new DataProxyTopicInfo();
                    dataProxyTopicInfo.setInlongGroupId(inlongGroupId + "/" + inlongStreamId);
                    dataProxyTopicInfo.setTopic(format);
                    dataConfig.setTopicInfo(dataProxyTopicInfo);
                } else if ("TUBEMQ".equals(mqType)) {
                    DataProxyTopicInfo dataProxyTopicInfo2 = new DataProxyTopicInfo();
                    dataProxyTopicInfo2.setInlongGroupId(inlongGroupId);
                    dataProxyTopicInfo2.setTopic(mqResource);
                    dataConfig.setTopicInfo(dataProxyTopicInfo2);
                } else if ("KAFKA".equals(mqType)) {
                    DataProxyTopicInfo dataProxyTopicInfo3 = new DataProxyTopicInfo();
                    dataProxyTopicInfo3.setInlongGroupId(inlongGroupId);
                    dataProxyTopicInfo3.setTopic(selectByGroupId.getMqResource() + "." + selectByIdentifier.getMqResource());
                    dataConfig.setTopicInfo(dataProxyTopicInfo3);
                }
            } else {
                LOGGER.warn("set syncSend=[0] as the stream not exists for groupId={}, streamId={}", inlongGroupId, inlongStreamId);
            }
        }
        dataConfig.setExtParams(extParams);
        return dataConfig;
    }

    private int getTaskType(StreamSourceEntity streamSourceEntity) {
        TaskTypeEnum taskTypeEnum = (TaskTypeEnum) SourceType.SOURCE_TASK_MAP.get(streamSourceEntity.getSourceType());
        if (taskTypeEnum == null) {
            throw new BusinessException("Unsupported task type for source type " + streamSourceEntity.getSourceType());
        }
        return taskTypeEnum.getType();
    }

    private List<CmdConfig> getAgentCmdConfigs(TaskRequest taskRequest) {
        return (List) this.sourceCmdConfigMapper.queryCmdByAgentIp(taskRequest.getAgentIp()).stream().map(dataSourceCmdConfigEntity -> {
            CmdConfig cmdConfig = new CmdConfig();
            cmdConfig.setDataTime(dataSourceCmdConfigEntity.getSpecifiedDataTime());
            cmdConfig.setOp(dataSourceCmdConfigEntity.getCmdType());
            cmdConfig.setId(dataSourceCmdConfigEntity.getId());
            cmdConfig.setTaskId(dataSourceCmdConfigEntity.getTaskId());
            return cmdConfig;
        }).collect(Collectors.toList());
    }

    private boolean matchGroup(StreamSourceEntity streamSourceEntity, InlongClusterNodeEntity inlongClusterNodeEntity) {
        Preconditions.expectNotNull(streamSourceEntity, "cluster must be valid");
        if (streamSourceEntity.getInlongClusterNodeGroup() == null) {
            return true;
        }
        if (inlongClusterNodeEntity == null || inlongClusterNodeEntity.getExtParams() == null) {
            return false;
        }
        HashSet hashSet = new HashSet();
        if (StringUtils.isNotBlank(inlongClusterNodeEntity.getExtParams())) {
            String agentGroup = AgentClusterNodeDTO.getFromJson(inlongClusterNodeEntity.getExtParams()).getAgentGroup();
            hashSet = StringUtils.isBlank(agentGroup) ? new HashSet() : Sets.newHashSet(agentGroup.split(","));
        }
        Stream stream = ((Set) Stream.of((Object[]) streamSourceEntity.getInlongClusterNodeGroup().split(",")).collect(Collectors.toSet())).stream();
        HashSet hashSet2 = hashSet;
        hashSet2.getClass();
        return stream.anyMatch((v1) -> {
            return r1.contains(v1);
        });
    }
}
