package org.apache.inlong.manager.service.cluster;

import com.github.pagehelper.PageHelper;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.audit.AuditConfig;
import org.apache.inlong.common.pojo.audit.MQInfo;
import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeInfo;
import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.NodeStatus;
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
import org.apache.inlong.manager.dao.entity.InlongClusterTagEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.TenantClusterTagEntity;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterTagEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.TenantClusterTagEntityMapper;
import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterTagPageRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterTagResponse;
import org.apache.inlong.manager.pojo.cluster.InlongClusterTagExtParam;
import org.apache.inlong.manager.pojo.cluster.TenantClusterTagInfo;
import org.apache.inlong.manager.pojo.cluster.TenantClusterTagPageRequest;
import org.apache.inlong.manager.pojo.cluster.TenantClusterTagRequest;
import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeRequest;
import org.apache.inlong.manager.pojo.cluster.dataproxy.DataProxyClusterNodeDTO;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.user.LoginUserUtils;
import org.apache.inlong.manager.pojo.user.UserInfo;
import org.apache.inlong.manager.service.cluster.node.InlongClusterNodeInstallOperatorFactory;
import org.apache.inlong.manager.service.cluster.node.InlongClusterNodeOperatorFactory;
import org.apache.inlong.manager.service.cmd.CommandExecutor;
import org.apache.inlong.manager.service.repository.DataProxyConfigRepository;
import org.apache.inlong.manager.service.tenant.InlongTenantService;
import org.apache.inlong.manager.service.user.InlongRoleService;
import org.apache.inlong.manager.service.user.TenantRoleService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.class */
public class InlongClusterServiceImpl implements InlongClusterService {
    private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
    private static final Gson GSON = new Gson();

    @Autowired
    private InlongGroupEntityMapper groupMapper;

    @Autowired
    private InlongStreamEntityMapper streamMapper;

    @Autowired
    private InlongClusterOperatorFactory clusterOperatorFactory;

    @Autowired
    private InlongClusterNodeOperatorFactory clusterNodeOperatorFactory;

    @Autowired
    private InlongClusterTagEntityMapper clusterTagMapper;

    @Autowired
    private InlongClusterEntityMapper clusterMapper;

    @Autowired
    private InlongClusterNodeEntityMapper clusterNodeMapper;

    @Autowired
    private TenantClusterTagEntityMapper tenantClusterTagMapper;

    @Autowired
    private InlongTenantService tenantService;

    @Autowired
    private InlongRoleService inlongRoleService;

    @Autowired
    private TenantRoleService tenantRoleService;

    @Autowired
    private InlongClusterNodeInstallOperatorFactory clusterNodeInstallOperatorFactory;

    @Autowired
    private CommandExecutor commandExecutor;

    @Autowired
    @Lazy
    private DataProxyConfigRepository proxyRepository;

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public Integer saveTag(ClusterTagRequest clusterTagRequest, String str) {
        LOGGER.debug("begin to save cluster tag {}", clusterTagRequest);
        Preconditions.expectNotNull(clusterTagRequest, "inlong cluster request cannot be empty");
        Preconditions.expectNotBlank(clusterTagRequest.getClusterTag(), ErrorCodeEnum.INVALID_PARAMETER, "cluster tag cannot be empty");
        String clusterTag = clusterTagRequest.getClusterTag();
        if (this.clusterTagMapper.selectByTag(clusterTag) != null) {
            String format = String.format("inlong cluster tag [%s] already exist", clusterTag);
            LOGGER.error(format);
            throw new BusinessException(format);
        }
        InlongClusterTagEntity inlongClusterTagEntity = (InlongClusterTagEntity) CommonBeanUtils.copyProperties(clusterTagRequest, InlongClusterTagEntity::new);
        clusterTagRequest.setExtParams(inlongClusterTagEntity.getExtParams());
        inlongClusterTagEntity.setExtParams(InlongClusterTagExtParam.packExtParams(clusterTagRequest));
        inlongClusterTagEntity.setCreator(str);
        inlongClusterTagEntity.setModifier(str);
        this.clusterTagMapper.insert(inlongClusterTagEntity);
        if (CollectionUtils.isNotEmpty(clusterTagRequest.getTenantList())) {
            TenantClusterTagRequest tenantClusterTagRequest = new TenantClusterTagRequest();
            tenantClusterTagRequest.setClusterTag(clusterTag);
            clusterTagRequest.getTenantList().forEach(str2 -> {
                tenantClusterTagRequest.setTenant(str2);
                saveTenantTag(tenantClusterTagRequest, str);
            });
        }
        LOGGER.info("success to save cluster tag={} by user={}", clusterTagRequest, str);
        return inlongClusterTagEntity.getId();
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public ClusterTagResponse getTag(Integer num, String str) {
        Preconditions.expectNotNull(num, "inlong cluster tag id cannot be empty");
        InlongClusterTagEntity selectById = this.clusterTagMapper.selectById(num);
        if (selectById == null) {
            LOGGER.error("inlong cluster tag not found by id={}", num);
            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
        }
        ClusterTagResponse clusterTagResponse = (ClusterTagResponse) CommonBeanUtils.copyProperties(selectById, ClusterTagResponse::new);
        InlongClusterTagExtParam.unpackExtParams(clusterTagResponse);
        clusterTagResponse.setTenantList((List) this.tenantClusterTagMapper.selectByTag(selectById.getClusterTag()).stream().map((v0) -> {
            return v0.getTenant();
        }).collect(Collectors.toList()));
        LOGGER.debug("success to get cluster tag info by id={}", num);
        return clusterTagResponse;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public PageResult<ClusterTagResponse> listTag(ClusterTagPageRequest clusterTagPageRequest) {
        PageHelper.startPage(clusterTagPageRequest.getPageNum(), clusterTagPageRequest.getPageSize());
        PageResult<ClusterTagResponse> map = PageResult.fromPage(this.clusterTagMapper.selectByCondition(clusterTagPageRequest)).map(inlongClusterTagEntity -> {
            ClusterTagResponse clusterTagResponse = (ClusterTagResponse) CommonBeanUtils.copyProperties(inlongClusterTagEntity, ClusterTagResponse::new);
            InlongClusterTagExtParam.unpackExtParams(clusterTagResponse);
            clusterTagResponse.setTenantList((List) this.tenantClusterTagMapper.selectByTag(inlongClusterTagEntity.getClusterTag()).stream().map((v0) -> {
                return v0.getTenant();
            }).collect(Collectors.toList()));
            return clusterTagResponse;
        });
        LOGGER.debug("success to list cluster tag by {}", clusterTagPageRequest);
        return map;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public List<ClusterTagResponse> listTag(ClusterTagPageRequest clusterTagPageRequest, UserInfo userInfo) {
        ArrayList arrayList = new ArrayList();
        List<InlongClusterTagEntity> selectByCondition = this.clusterTagMapper.selectByCondition(clusterTagPageRequest);
        if (CollectionUtils.isNotEmpty(selectByCondition)) {
            for (InlongClusterTagEntity inlongClusterTagEntity : selectByCondition) {
                if (userInfo.getAccountType().equals(TenantUserTypeEnum.TENANT_ADMIN.getCode()) || Arrays.asList(inlongClusterTagEntity.getInCharges().split(",")).contains(userInfo.getName())) {
                    arrayList.add(inlongClusterTagEntity);
                }
            }
        }
        return CommonBeanUtils.copyListProperties(arrayList, ClusterTagResponse::new);
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    @Transactional(rollbackFor = {Throwable.class}, isolation = Isolation.REPEATABLE_READ)
    public Boolean updateTag(ClusterTagRequest clusterTagRequest, String str) {
        LOGGER.debug("begin to update cluster tag={}", clusterTagRequest);
        Preconditions.expectNotNull(clusterTagRequest, "inlong cluster request cannot be empty");
        Integer id = clusterTagRequest.getId();
        Preconditions.expectNotNull(id, "cluster tag id cannot be empty");
        InlongClusterTagEntity selectById = this.clusterTagMapper.selectById(id);
        if (selectById == null) {
            LOGGER.warn("inlong cluster tag was not exist for id={}", id);
            return true;
        }
        if (StringUtils.isEmpty(clusterTagRequest.getClusterTag())) {
            clusterTagRequest.setClusterTag(selectById.getClusterTag());
        }
        String clusterTag = clusterTagRequest.getClusterTag();
        String format = String.format("cluster tag has already updated with name=%s, curVersion=%s", selectById.getClusterTag(), clusterTagRequest.getVersion());
        if (!Objects.equals(selectById.getVersion(), clusterTagRequest.getVersion())) {
            LOGGER.error(format);
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        String clusterTag2 = selectById.getClusterTag();
        if (!clusterTag.equals(clusterTag2)) {
            if (this.clusterTagMapper.selectByTag(clusterTag) != null) {
                String format2 = String.format("inlong cluster tag [%s] already exist", clusterTag);
                LOGGER.error(format2);
                throw new BusinessException(format2);
            }
            assertNoInlongGroupExists(clusterTag2);
            List selectByKey = this.clusterMapper.selectByKey(clusterTag2, (String) null, (String) null);
            if (CollectionUtils.isNotEmpty(selectByKey)) {
                selectByKey.forEach(inlongClusterEntity -> {
                    HashSet newHashSet = Sets.newHashSet(inlongClusterEntity.getClusterTags().split(","));
                    newHashSet.remove(clusterTag2);
                    newHashSet.add(clusterTag);
                    inlongClusterEntity.setClusterTags(Joiner.on(",").join(newHashSet));
                    inlongClusterEntity.setModifier(str);
                    if (InlongConstants.AFFECTED_ONE_ROW.intValue() != this.clusterMapper.updateById(inlongClusterEntity)) {
                        LOGGER.error("cluster has already updated with name={}, type={}, curVersion={}", new Object[]{inlongClusterEntity.getName(), inlongClusterEntity.getType(), inlongClusterEntity.getVersion()});
                        throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
                    }
                });
            }
        }
        CommonBeanUtils.copyProperties(clusterTagRequest, selectById, true);
        clusterTagRequest.setExtParams(selectById.getExtParams());
        selectById.setExtParams(InlongClusterTagExtParam.packExtParams(clusterTagRequest));
        selectById.setModifier(str);
        if (InlongConstants.AFFECTED_ONE_ROW.intValue() != this.clusterTagMapper.updateByIdSelective(selectById)) {
            LOGGER.error(format);
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        if (CollectionUtils.isNotEmpty(clusterTagRequest.getTenantList())) {
            HashSet hashSet = new HashSet(clusterTagRequest.getTenantList());
            List selectByTag = this.tenantClusterTagMapper.selectByTag(clusterTag2);
            selectByTag.stream().filter(tenantClusterTagEntity -> {
                return !hashSet.contains(tenantClusterTagEntity.getTenant());
            }).forEach(tenantClusterTagEntity2 -> {
                try {
                    deleteTenantTag(tenantClusterTagEntity2.getId(), str);
                } catch (Exception e) {
                    LOGGER.error(e.getMessage());
                }
            });
            Set set = (Set) selectByTag.stream().map((v0) -> {
                return v0.getTenant();
            }).collect(Collectors.toSet());
            hashSet.stream().filter(str2 -> {
                return !set.contains(str2);
            }).forEach(str3 -> {
                try {
                    TenantClusterTagRequest tenantClusterTagRequest = new TenantClusterTagRequest();
                    tenantClusterTagRequest.setTenant(str3);
                    tenantClusterTagRequest.setClusterTag(clusterTag2);
                    saveTenantTag(tenantClusterTagRequest, str);
                } catch (Exception e) {
                    LOGGER.error(e.getMessage());
                }
            });
        }
        LOGGER.info("success to update cluster tag={}", clusterTagRequest);
        return true;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public Boolean deleteTag(Integer num, String str) {
        Preconditions.expectNotNull(num, "cluster tag id cannot be empty");
        InlongClusterTagEntity selectById = this.clusterTagMapper.selectById(num);
        if (selectById == null || selectById.getIsDeleted().intValue() > InlongConstants.UN_DELETED.intValue()) {
            LOGGER.error("inlong cluster tag not found by id={}", num);
            return false;
        }
        String clusterTag = selectById.getClusterTag();
        assertNoInlongGroupExists(clusterTag);
        List selectByKey = this.clusterMapper.selectByKey(clusterTag, (String) null, (String) null);
        if (CollectionUtils.isNotEmpty(selectByKey)) {
            selectByKey.forEach(inlongClusterEntity -> {
                removeClusterTag(inlongClusterEntity, clusterTag, str);
            });
        }
        selectById.setIsDeleted(selectById.getId());
        selectById.setModifier(str);
        if (InlongConstants.AFFECTED_ONE_ROW.intValue() != this.clusterTagMapper.updateByIdSelective(selectById)) {
            LOGGER.error("cluster tag has already updated with name={}, curVersion={}", selectById.getClusterTag(), selectById.getVersion());
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        this.tenantClusterTagMapper.selectByTag(clusterTag).forEach(tenantClusterTagEntity -> {
            try {
                deleteTenantTag(tenantClusterTagEntity.getId(), str);
            } catch (Exception e) {
                LOGGER.error(e.getMessage());
            }
        });
        LOGGER.info("success to delete cluster tag by id={}", num);
        return true;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public Integer save(ClusterRequest clusterRequest, String str) {
        LOGGER.debug("begin to save inlong cluster={}", clusterRequest);
        Preconditions.expectNotNull(clusterRequest, "inlong cluster request cannot be empty");
        String clusterTags = clusterRequest.getClusterTags();
        String name = clusterRequest.getName();
        if (StringUtils.isBlank(name)) {
            name = UUID.randomUUID().toString();
            clusterRequest.setName(name);
        }
        String type = clusterRequest.getType();
        if (CollectionUtils.isNotEmpty(this.clusterMapper.selectByKey(clusterTags, name, type))) {
            String format = String.format("inlong cluster already exist for cluster tag=%s name=%s type=%s", clusterTags, name, type);
            LOGGER.error(format);
            throw new BusinessException(format);
        }
        Integer saveOpt = this.clusterOperatorFactory.getInstance(clusterRequest.getType()).saveOpt(clusterRequest, str);
        LOGGER.info("success to save inlong cluster={} by user={}", clusterRequest, str);
        return saveOpt;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public ClusterInfo get(Integer num, String str) {
        Preconditions.expectNotNull(num, "inlong cluster id cannot be empty");
        InlongClusterEntity selectById = this.clusterMapper.selectById(num);
        if (selectById == null) {
            LOGGER.error("inlong cluster not found by id={}", num);
            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
        }
        ClusterInfo fromEntity = this.clusterOperatorFactory.getInstance(selectById.getType()).getFromEntity(selectById);
        LOGGER.debug("success to get inlong cluster info by id={}", num);
        return fromEntity;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public PageResult<ClusterInfo> list(ClusterPageRequest clusterPageRequest) {
        PageHelper.startPage(clusterPageRequest.getPageNum(), clusterPageRequest.getPageSize());
        PageResult<ClusterInfo> map = PageResult.fromPage(this.clusterMapper.selectByCondition(clusterPageRequest)).map(inlongClusterEntity -> {
            return this.clusterOperatorFactory.getInstance(inlongClusterEntity.getType()).getFromEntity(inlongClusterEntity);
        });
        LOGGER.debug("success to list inlong cluster by {}", clusterPageRequest);
        return map;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public List<ClusterInfo> list(ClusterPageRequest clusterPageRequest, UserInfo userInfo) {
        List<InlongClusterEntity> selectByCondition = this.clusterMapper.selectByCondition(clusterPageRequest);
        ArrayList arrayList = new ArrayList();
        for (InlongClusterEntity inlongClusterEntity : selectByCondition) {
            if (userInfo.getAccountType().equals(TenantUserTypeEnum.TENANT_ADMIN.getCode()) || Arrays.asList(inlongClusterEntity.getInCharges().split(",")).contains(userInfo.getName())) {
                arrayList.add(inlongClusterEntity);
            }
        }
        return (List) arrayList.stream().map(inlongClusterEntity2 -> {
            return this.clusterOperatorFactory.getInstance(inlongClusterEntity2.getType()).getFromEntity(inlongClusterEntity2);
        }).collect(Collectors.toList());
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public List<ClusterInfo> listByTagAndType(String str, String str2) {
        List selectByKey = this.clusterMapper.selectByKey(str, (String) null, str2);
        if (CollectionUtils.isEmpty(selectByKey)) {
            throw new BusinessException(String.format("cannot find any cluster by tag %s and type %s", str, str2));
        }
        List<ClusterInfo> list = (List) selectByKey.stream().map(inlongClusterEntity -> {
            return this.clusterOperatorFactory.getInstance(inlongClusterEntity.getType()).getFromEntity(inlongClusterEntity);
        }).collect(Collectors.toList());
        LOGGER.debug("success to list inlong cluster by tag={}", str);
        return list;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public ClusterInfo getOne(String str, String str2, String str3) {
        List selectByKey = this.clusterMapper.selectByKey(str, str2, str3);
        if (CollectionUtils.isEmpty(selectByKey)) {
            throw new BusinessException(String.format("cluster not found by tag=%s, name=%s, type=%s", str, str2, str3));
        }
        InlongClusterEntity inlongClusterEntity = (InlongClusterEntity) selectByKey.get(0);
        ClusterInfo fromEntity = this.clusterOperatorFactory.getInstance(inlongClusterEntity.getType()).getFromEntity(inlongClusterEntity);
        LOGGER.debug("success to get inlong cluster by tag={}, name={}, type={}", new Object[]{str, str2, str3});
        return fromEntity;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public Boolean update(ClusterRequest clusterRequest, String str) {
        LOGGER.debug("begin to update inlong cluster: {}", clusterRequest);
        Preconditions.expectNotNull(clusterRequest, "inlong cluster info cannot be empty");
        Integer id = clusterRequest.getId();
        InlongClusterEntity selectById = this.clusterMapper.selectById(id);
        if (selectById == null) {
            LOGGER.error("inlong cluster not found by id={}", id);
            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
        }
        chkUnmodifiableParams(selectById, clusterRequest);
        String clusterTags = clusterRequest.getClusterTags();
        List selectByKey = this.clusterMapper.selectByKey(clusterTags, clusterRequest.getName(), clusterRequest.getType());
        if (!CollectionUtils.isNotEmpty(selectByKey) || Objects.equals(id, ((InlongClusterEntity) selectByKey.get(0)).getId())) {
            this.clusterOperatorFactory.getInstance(clusterRequest.getType()).updateOpt(clusterRequest, str);
            LOGGER.info("success to update inlong cluster: {} by {}", clusterRequest, str);
            return true;
        }
        String format = String.format("inlong cluster already exist for cluster tag=%s name=%s type=%s", clusterTags, clusterRequest.getName(), clusterRequest.getType());
        LOGGER.error(format);
        throw new BusinessException(format);
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public UpdateResult updateByKey(ClusterRequest clusterRequest, String str) {
        LOGGER.debug("begin to update inlong cluster: {}", clusterRequest);
        Preconditions.expectNotNull(clusterRequest, "inlong cluster info cannot be null");
        String name = clusterRequest.getName();
        String type = clusterRequest.getType();
        InlongClusterEntity selectByNameAndType = this.clusterMapper.selectByNameAndType(name, type);
        if (selectByNameAndType == null) {
            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND, String.format("inlong cluster not found by name=%s, type=%s", name, type));
        }
        clusterRequest.setId(selectByNameAndType.getId());
        chkUnmodifiableParams(selectByNameAndType, clusterRequest);
        this.clusterOperatorFactory.getInstance(clusterRequest.getType()).updateOpt(clusterRequest, str);
        LOGGER.info("success to update inlong cluster: {} by {}", clusterRequest, str);
        return new UpdateResult(selectByNameAndType.getId(), true, Integer.valueOf(clusterRequest.getVersion().intValue() + 1));
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public Boolean bindTag(BindTagRequest bindTagRequest, String str) {
        LOGGER.info("begin to bind or unbind cluster tag: {}", bindTagRequest);
        Preconditions.expectNotNull(bindTagRequest, "inlong cluster info cannot be empty");
        String clusterTag = bindTagRequest.getClusterTag();
        Preconditions.expectNotBlank(clusterTag, ErrorCodeEnum.INVALID_PARAMETER, "cluster tag cannot be empty");
        this.clusterTagMapper.selectByTag(clusterTag);
        if (CollectionUtils.isNotEmpty(bindTagRequest.getBindClusters())) {
            bindTagRequest.getBindClusters().forEach(num -> {
                HashSet newHashSet = Sets.newHashSet(this.clusterMapper.selectById(num).getClusterTags().split(","));
                newHashSet.add(clusterTag);
                String join = Joiner.on(",").join(newHashSet);
                InlongClusterEntity selectById = this.clusterMapper.selectById(num);
                selectById.setClusterTags(join);
                selectById.setModifier(str);
                if (InlongConstants.AFFECTED_ONE_ROW.intValue() != this.clusterMapper.updateById(selectById)) {
                    LOGGER.error("cluster has already updated with name={}, type={}, curVersion={}", new Object[]{selectById.getName(), selectById.getType(), selectById.getVersion()});
                    throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
                }
            });
        }
        if (CollectionUtils.isNotEmpty(bindTagRequest.getUnbindClusters())) {
            bindTagRequest.getUnbindClusters().forEach(num2 -> {
                removeClusterTag(this.clusterMapper.selectById(num2), clusterTag, str);
            });
        }
        LOGGER.info("success to bind or unbind cluster tag {} by {}", bindTagRequest, str);
        return true;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public Boolean deleteByKey(String str, String str2, String str3) {
        Preconditions.expectNotBlank(str, ErrorCodeEnum.INVALID_PARAMETER, "cluster name should not be empty or null");
        Preconditions.expectNotBlank(str2, ErrorCodeEnum.INVALID_PARAMETER, "cluster type should not be empty or null");
        InlongClusterEntity selectByNameAndType = this.clusterMapper.selectByNameAndType(str, str2);
        if (selectByNameAndType == null || selectByNameAndType.getIsDeleted().intValue() > InlongConstants.UN_DELETED.intValue()) {
            LOGGER.error("inlong cluster not found by clusterName={}, type={} or was already deleted", str, str2);
            return false;
        }
        if (CollectionUtils.isNotEmpty(this.clusterNodeMapper.selectByParentId(selectByNameAndType.getId(), (String) null))) {
            throw new BusinessException(String.format("there are undeleted nodes under the cluster [%s], please delete the node first", selectByNameAndType.getName()));
        }
        selectByNameAndType.setIsDeleted(selectByNameAndType.getId());
        selectByNameAndType.setModifier(str3);
        if (InlongConstants.AFFECTED_ONE_ROW.intValue() != this.clusterMapper.updateById(selectByNameAndType)) {
            LOGGER.error("cluster has already updated with name={}, type={}, curVersion={}", new Object[]{selectByNameAndType.getName(), selectByNameAndType.getType(), selectByNameAndType.getVersion()});
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        LOGGER.info("success to delete inlong cluster for clusterName={}, type={} by user={}", new Object[]{str, str2, str3});
        return true;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public Boolean delete(Integer num, String str) {
        Preconditions.expectNotNull(num, "cluster id cannot be empty");
        InlongClusterEntity selectById = this.clusterMapper.selectById(num);
        Preconditions.expectNotNull(selectById, ErrorCodeEnum.CLUSTER_NOT_FOUND, ErrorCodeEnum.CLUSTER_NOT_FOUND.getMessage());
        if (CollectionUtils.isNotEmpty(this.clusterNodeMapper.selectByParentId(num, (String) null))) {
            throw new BusinessException(String.format("there are undeleted nodes under the cluster [%s], please delete the node first", selectById.getName()));
        }
        selectById.setIsDeleted(selectById.getId());
        selectById.setModifier(str);
        if (InlongConstants.AFFECTED_ONE_ROW.intValue() != this.clusterMapper.updateById(selectById)) {
            LOGGER.error("cluster has already updated with name={}, type={}, curVersion={}", new Object[]{selectById.getName(), selectById.getType(), selectById.getVersion()});
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        LOGGER.info("success to delete inlong cluster for id={} by user={}", num, str);
        return true;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public Boolean delete(Integer num, UserInfo userInfo) {
        InlongClusterEntity selectById = this.clusterMapper.selectById(num);
        Preconditions.expectNotNull(selectById, ErrorCodeEnum.CLUSTER_NOT_FOUND, ErrorCodeEnum.CONSUME_NOT_FOUND.getMessage());
        if (CollectionUtils.isNotEmpty(this.clusterNodeMapper.selectByParentId(num, (String) null))) {
            throw new BusinessException(ErrorCodeEnum.RECORD_IN_USED, String.format("there are undeleted nodes under the cluster [%s], please delete the node first", selectById.getName()));
        }
        selectById.setIsDeleted(selectById.getId());
        selectById.setModifier(userInfo.getName());
        if (InlongConstants.AFFECTED_ONE_ROW.intValue() != this.clusterMapper.updateById(selectById)) {
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED, String.format("cluster has already updated with name=%s, type=%s, curVersion=%s", selectById.getName(), selectById.getType(), selectById.getVersion()));
        }
        return true;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public Integer saveNode(ClusterNodeRequest clusterNodeRequest, String str) {
        LOGGER.debug("begin to insert inlong cluster node={}", clusterNodeRequest);
        Preconditions.expectNotNull(clusterNodeRequest, "cluster node info cannot be empty");
        if (this.clusterNodeMapper.selectByUniqueKey(clusterNodeRequest) != null) {
            String format = String.format("inlong cluster node already exist for type=%s ip=%s port=%s", clusterNodeRequest.getType(), clusterNodeRequest.getIp(), clusterNodeRequest.getPort());
            LOGGER.error(format);
            throw new BusinessException(format);
        }
        Integer saveOpt = this.clusterNodeOperatorFactory.getInstance(clusterNodeRequest.getType()).saveOpt(clusterNodeRequest, str);
        if (clusterNodeRequest.getIsInstall().booleanValue()) {
            this.clusterNodeInstallOperatorFactory.getInstance(clusterNodeRequest.getType()).install(clusterNodeRequest, str);
        }
        return saveOpt;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public ClusterNodeResponse getNode(Integer num, String str) {
        Preconditions.expectNotNull(num, "cluster node id cannot be empty");
        InlongClusterNodeEntity selectById = this.clusterNodeMapper.selectById(num);
        if (selectById != null) {
            return this.clusterNodeOperatorFactory.getInstance(selectById.getType()).getFromEntity(selectById);
        }
        LOGGER.error("inlong cluster node not found by id={}", num);
        throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public PageResult<ClusterNodeResponse> listNode(ClusterPageRequest clusterPageRequest, String str) {
        if (StringUtils.isNotBlank(clusterPageRequest.getClusterTag())) {
            return new PageResult<>(listNodeByClusterTag(clusterPageRequest), Long.valueOf(r0.size()));
        }
        Preconditions.expectNotNull(clusterPageRequest.getParentId(), "Cluster id cannot be empty");
        PageHelper.startPage(clusterPageRequest.getPageNum(), clusterPageRequest.getPageSize());
        PageResult<ClusterNodeResponse> map = PageResult.fromPage(this.clusterNodeMapper.selectByCondition(clusterPageRequest)).map(inlongClusterNodeEntity -> {
            return this.clusterNodeOperatorFactory.getInstance(inlongClusterNodeEntity.getType()).getFromEntity(inlongClusterNodeEntity);
        });
        LOGGER.debug("success to list inlong cluster node by {}", clusterPageRequest);
        return map;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public List<ClusterNodeResponse> listNode(ClusterPageRequest clusterPageRequest, UserInfo userInfo) {
        if (StringUtils.isBlank(clusterPageRequest.getClusterTag())) {
            if (clusterPageRequest.getParentId() == null) {
                throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY, "Cluster id cannot be empty");
            }
            return CommonBeanUtils.copyListProperties(this.clusterNodeMapper.selectByCondition(clusterPageRequest), ClusterNodeResponse::new);
        }
        ArrayList arrayList = new ArrayList();
        for (InlongClusterEntity inlongClusterEntity : this.clusterMapper.selectByKey(clusterPageRequest.getClusterTag(), clusterPageRequest.getName(), clusterPageRequest.getType())) {
            if (userInfo.getAccountType().equals(TenantUserTypeEnum.TENANT_ADMIN.getCode()) || Arrays.asList(inlongClusterEntity.getInCharges().split(",")).contains(userInfo.getName())) {
                arrayList.addAll(this.clusterNodeMapper.selectByParentId(inlongClusterEntity.getId(), (String) null));
            }
        }
        return CommonBeanUtils.copyListProperties(arrayList, ClusterNodeResponse::new);
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public List<ClusterNodeResponse> listNodeByGroupId(String str, String str2, String str3) {
        LOGGER.debug("begin to get cluster nodes for groupId={}, clusterType={}, protocol={}", new Object[]{str, str2, str3});
        List<InlongClusterNodeEntity> clusterNodes = getClusterNodes(str, str2, str3);
        if (CollectionUtils.isEmpty(clusterNodes)) {
            LOGGER.debug("not any cluster node for groupId={}, clusterType={}, protocol={}", new Object[]{str, str2, str3});
            return Collections.emptyList();
        }
        List<ClusterNodeResponse> copyListProperties = CommonBeanUtils.copyListProperties(clusterNodes, ClusterNodeResponse::new);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("success to get nodes for groupId={}, clusterType={}, protocol={}, result size={}", new Object[]{str, str2, str3, copyListProperties});
        }
        return copyListProperties;
    }

    public List<ClusterNodeResponse> listNodeByClusterTag(ClusterPageRequest clusterPageRequest) {
        List selectByKey = this.clusterMapper.selectByKey(clusterPageRequest.getClusterTag(), clusterPageRequest.getName(), clusterPageRequest.getType());
        ArrayList arrayList = new ArrayList();
        Iterator it = selectByKey.iterator();
        while (it.hasNext()) {
            arrayList.addAll(this.clusterNodeMapper.selectByParentId(((InlongClusterEntity) it.next()).getId(), (String) null));
        }
        return CommonBeanUtils.copyListProperties(arrayList, ClusterNodeResponse::new);
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public List<String> listNodeIpByType(String str) {
        Preconditions.expectNotBlank(str, ErrorCodeEnum.INVALID_PARAMETER, "cluster type cannot be empty");
        ClusterPageRequest clusterPageRequest = new ClusterPageRequest();
        clusterPageRequest.setType(str);
        List selectByCondition = this.clusterNodeMapper.selectByCondition(clusterPageRequest);
        if (CollectionUtils.isEmpty(selectByCondition)) {
            LOGGER.debug("not found any node for type={}", str);
            return Collections.emptyList();
        }
        List<String> list = (List) selectByCondition.stream().map(inlongClusterNodeEntity -> {
            return String.format("%s:%d", inlongClusterNodeEntity.getIp(), inlongClusterNodeEntity.getPort());
        }).collect(Collectors.toList());
        LOGGER.debug("success to list node by type={}, result={}", str, list);
        return list;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    @Transactional(rollbackFor = {Throwable.class}, isolation = Isolation.REPEATABLE_READ)
    public Boolean updateNode(ClusterNodeRequest clusterNodeRequest, String str) {
        LOGGER.debug("begin to update inlong cluster node={}", clusterNodeRequest);
        Preconditions.expectNotNull(clusterNodeRequest, "inlong cluster node cannot be empty");
        Integer id = clusterNodeRequest.getId();
        InlongClusterNodeEntity selectById = this.clusterNodeMapper.selectById(id);
        if (selectById == null) {
            LOGGER.error("cluster node not found by id={}", id);
            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
        }
        Preconditions.expectEquals(selectById.getType(), clusterNodeRequest.getType(), ErrorCodeEnum.INVALID_PARAMETER, "type not allowed modify");
        Preconditions.expectEquals(selectById.getVersion(), clusterNodeRequest.getVersion(), ErrorCodeEnum.CONFIG_EXPIRED, String.format("record has expired with record version=%d, request version=%d", selectById.getVersion(), clusterNodeRequest.getVersion()));
        if (StringUtils.isBlank(clusterNodeRequest.getProtocolType())) {
            clusterNodeRequest.setProtocolType(selectById.getProtocolType());
        }
        InlongClusterNodeEntity selectByUniqueKey = this.clusterNodeMapper.selectByUniqueKey(clusterNodeRequest);
        if (selectByUniqueKey != null && !Objects.equals(id, selectByUniqueKey.getId())) {
            String str2 = "inlong cluster node already exist for " + clusterNodeRequest;
            LOGGER.error(str2);
            throw new BusinessException(str2);
        }
        this.clusterNodeOperatorFactory.getInstance(clusterNodeRequest.getType()).updateOpt(clusterNodeRequest, str);
        if (clusterNodeRequest.getIsInstall().booleanValue()) {
            this.clusterNodeInstallOperatorFactory.getInstance(clusterNodeRequest.getType()).install(clusterNodeRequest, str);
        }
        return true;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public Boolean deleteNode(Integer num, String str) {
        Preconditions.expectNotNull(num, "cluster node id cannot be empty");
        InlongClusterNodeEntity selectById = this.clusterNodeMapper.selectById(num);
        Preconditions.expectNotNull(selectById, ErrorCodeEnum.CLUSTER_NOT_FOUND);
        selectById.setIsDeleted(selectById.getId());
        selectById.setModifier(str);
        if (InlongConstants.AFFECTED_ONE_ROW.intValue() != this.clusterNodeMapper.updateById(selectById)) {
            LOGGER.error("cluster node has already updated with parentId={}, type={}, ip={}, port={}, protocolType={}", new Object[]{selectById.getParentId(), selectById.getType(), selectById.getIp(), selectById.getPort(), selectById.getProtocolType()});
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        LOGGER.info("success to delete inlong cluster node by id={}", num);
        return true;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public Boolean unloadNode(Integer num, String str) {
        LOGGER.info("begin to unload inlong cluster node={}, operator={}", num, str);
        InlongClusterNodeEntity selectById = this.clusterNodeMapper.selectById(num);
        boolean unload = this.clusterNodeInstallOperatorFactory.getInstance(selectById.getType()).unload(selectById, str);
        LOGGER.info("success to unload inlong cluster node={}, operator={}", num, str);
        return Boolean.valueOf(unload);
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public String getManagerSSHPublicKey() {
        try {
            Path path = Paths.get(System.getProperty("user.home") + "/.ssh/inlong_rsa.pub", new String[0]);
            if (!Files.exists(path, new LinkOption[0])) {
                this.commandExecutor.execSSHKeyGeneration();
            }
            return StringUtils.strip(new String(Files.readAllBytes(path)), "\n");
        } catch (Exception e) {
            LOGGER.error("get manager ssh public key error", e);
            throw new RuntimeException("get manager ssh public key error", e);
        }
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public Boolean testSSHConnection(ClusterNodeRequest clusterNodeRequest) {
        try {
            return Boolean.valueOf(this.commandExecutor.execRemote((AgentClusterNodeRequest) clusterNodeRequest, "ls").getCode() == 0);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public DataProxyNodeResponse getDataProxyNodes(String str, String str2) {
        LOGGER.debug("begin to get data proxy nodes for groupId={}, protocol={}", str, str2);
        GroupStatus forCode = GroupStatus.forCode(this.groupMapper.selectByGroupId(str).getStatus().intValue());
        if (!Objects.equals(forCode, GroupStatus.CONFIG_SUCCESSFUL)) {
            String format = String.format("current group status=%s was not allowed to get data proxy nodes", forCode);
            LOGGER.warn(format);
            throw new BusinessException(format);
        }
        List<InlongClusterNodeEntity> clusterNodes = getClusterNodes(str, "DATAPROXY", str2);
        DataProxyNodeResponse dataProxyNodeResponse = new DataProxyNodeResponse();
        if (CollectionUtils.isEmpty(clusterNodes)) {
            LOGGER.debug("not any data proxy node for groupId={}, protocol={}", str, str2);
            return dataProxyNodeResponse;
        }
        dataProxyNodeResponse.setClusterId(clusterNodes.get(0).getParentId());
        ArrayList arrayList = new ArrayList();
        for (InlongClusterNodeEntity inlongClusterNodeEntity : clusterNodes) {
            if (!StringUtils.isNotBlank(inlongClusterNodeEntity.getExtParams()) || !Objects.equals(DataProxyClusterNodeDTO.getFromJson(inlongClusterNodeEntity.getExtParams()).getEnabledOnline(), false)) {
                DataProxyNodeInfo dataProxyNodeInfo = new DataProxyNodeInfo();
                dataProxyNodeInfo.setId(inlongClusterNodeEntity.getId());
                dataProxyNodeInfo.setIp(inlongClusterNodeEntity.getIp());
                dataProxyNodeInfo.setPort(inlongClusterNodeEntity.getPort());
                dataProxyNodeInfo.setProtocolType(inlongClusterNodeEntity.getProtocolType());
                dataProxyNodeInfo.setNodeLoad(inlongClusterNodeEntity.getNodeLoad());
                arrayList.add(dataProxyNodeInfo);
            }
        }
        dataProxyNodeResponse.setNodeList(arrayList);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("success to get dp nodes for groupId={}, protocol={}, result={}", new Object[]{str, str2, dataProxyNodeResponse});
        }
        return dataProxyNodeResponse;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public DataProxyNodeResponse getDataProxyNodesByCluster(String str, String str2, String str3) {
        LOGGER.debug("begin to get data proxy nodes for clusterName={}, protocol={}", str, str2);
        InlongClusterEntity selectByNameAndType = this.clusterMapper.selectByNameAndType(str, "DATAPROXY");
        DataProxyNodeResponse dataProxyNodeResponse = new DataProxyNodeResponse();
        if (selectByNameAndType == null) {
            LOGGER.debug("not any dataproxy cluster for clusterName={}, protocol={}", str, str2);
            return dataProxyNodeResponse;
        }
        List<InlongClusterNodeEntity> selectByParentId = this.clusterNodeMapper.selectByParentId(selectByNameAndType.getId(), str2);
        if (CollectionUtils.isEmpty(selectByParentId)) {
            LOGGER.debug("not any data proxy node for clusterName={}, protocol={}", str, str2);
            return dataProxyNodeResponse;
        }
        dataProxyNodeResponse.setClusterId(selectByNameAndType.getId());
        ArrayList arrayList = new ArrayList();
        for (InlongClusterNodeEntity inlongClusterNodeEntity : selectByParentId) {
            if (Objects.equals(inlongClusterNodeEntity.getStatus(), Integer.valueOf(NodeStatus.HEARTBEAT_TIMEOUT.getStatus()))) {
                LOGGER.debug("dataproxy node was timeout, parentId={} ip={} port={}", new Object[]{inlongClusterNodeEntity.getParentId(), inlongClusterNodeEntity.getIp(), inlongClusterNodeEntity.getPort()});
            } else {
                if (StringUtils.isNotBlank(inlongClusterNodeEntity.getExtParams())) {
                    DataProxyClusterNodeDTO fromJson = DataProxyClusterNodeDTO.getFromJson(inlongClusterNodeEntity.getExtParams());
                    if (StringUtils.isBlank(fromJson.getReportSourceType())) {
                        fromJson.setReportSourceType("INLONG");
                    }
                    if (!StringUtils.isNotBlank(str3) || Objects.equals(fromJson.getReportSourceType(), str3)) {
                        if (Objects.equals(fromJson.getEnabledOnline(), false)) {
                        }
                    }
                }
                DataProxyNodeInfo dataProxyNodeInfo = new DataProxyNodeInfo();
                dataProxyNodeInfo.setId(inlongClusterNodeEntity.getId());
                dataProxyNodeInfo.setIp(inlongClusterNodeEntity.getIp());
                dataProxyNodeInfo.setPort(inlongClusterNodeEntity.getPort());
                dataProxyNodeInfo.setProtocolType(inlongClusterNodeEntity.getProtocolType());
                dataProxyNodeInfo.setNodeLoad(inlongClusterNodeEntity.getNodeLoad());
                arrayList.add(dataProxyNodeInfo);
            }
        }
        dataProxyNodeResponse.setNodeList(arrayList);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("success to get dp nodes for clusterName={}, protocol={}, result={}", new Object[]{str, str2, dataProxyNodeResponse});
        }
        return dataProxyNodeResponse;
    }

    private List<InlongClusterNodeEntity> getClusterNodes(String str, String str2, String str3) {
        InlongGroupEntity selectByGroupId = this.groupMapper.selectByGroupId(str);
        if (selectByGroupId == null) {
            LOGGER.warn("inlong group not exists for groupId={}", str);
            return Lists.newArrayList();
        }
        String inlongClusterTag = selectByGroupId.getInlongClusterTag();
        if (StringUtils.isBlank(inlongClusterTag)) {
            String str4 = "not found any cluster tag for groupId=" + str;
            LOGGER.debug(str4);
            throw new BusinessException(str4);
        }
        List selectByKey = this.clusterMapper.selectByKey(inlongClusterTag, (String) null, str2);
        if (!CollectionUtils.isEmpty(selectByKey)) {
            return this.clusterNodeMapper.selectByParentId(((InlongClusterEntity) selectByKey.get(0)).getId(), str3);
        }
        String str5 = "not found any data proxy cluster for groupId=" + str + " and clusterTag=" + inlongClusterTag;
        LOGGER.debug(str5);
        throw new BusinessException(str5);
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public DataProxyConfig getDataProxyConfig(String str, String str2) {
        LOGGER.debug("GetDPConfig: begin to get config by cluster tag={} name={}", str, str2);
        List selectByCondition = this.clusterMapper.selectByCondition(ClusterPageRequest.builder().clusterTag(str).name(str2).type("DATAPROXY").build());
        DataProxyConfig dataProxyConfig = new DataProxyConfig();
        if (CollectionUtils.isEmpty(selectByCondition)) {
            LOGGER.warn("GetDPConfig: not found data proxy cluster by tag={} name={}", str, str2);
            return dataProxyConfig;
        }
        HashSet hashSet = new HashSet(16);
        selectByCondition.forEach(inlongClusterEntity -> {
            hashSet.addAll(Arrays.asList(inlongClusterEntity.getClusterTags().split(",")));
        });
        ArrayList arrayList = new ArrayList(hashSet);
        List<InlongGroupBriefInfo> selectBriefList = this.groupMapper.selectBriefList(InlongGroupPageRequest.builder().statusList(Collections.singletonList(GroupStatus.CONFIG_SUCCESSFUL.getCode())).clusterTagList(arrayList).build());
        if (CollectionUtils.isEmpty(selectBriefList)) {
            LOGGER.warn("GetDPConfig: not found inlong group with success status by cluster tags={}", arrayList);
            return dataProxyConfig;
        }
        LOGGER.debug("GetDPConfig: begin to get config for cluster tags={}, associated InlongGroup num={}", arrayList, Integer.valueOf(selectBriefList.size()));
        ArrayList arrayList2 = new ArrayList();
        for (InlongGroupBriefInfo inlongGroupBriefInfo : selectBriefList) {
            String inlongGroupId = inlongGroupBriefInfo.getInlongGroupId();
            String mqResource = inlongGroupBriefInfo.getMqResource();
            String inlongClusterTag = inlongGroupBriefInfo.getInlongClusterTag();
            String mqType = inlongGroupBriefInfo.getMqType();
            if ("PULSAR".equals(mqType) || "TDMQ_PULSAR".equals(mqType)) {
                String pulsarTenant = InlongPulsarDTO.getFromJson(inlongGroupBriefInfo.getExtParams()).getPulsarTenant();
                if (StringUtils.isBlank(pulsarTenant)) {
                    List selectByKey = this.clusterMapper.selectByKey(inlongClusterTag, (String) null, "PULSAR");
                    if (CollectionUtils.isEmpty(selectByKey)) {
                        LOGGER.error("GetDPConfig: not found pulsar cluster by cluster tag={}", inlongClusterTag);
                    } else {
                        pulsarTenant = PulsarClusterDTO.getFromJson(((InlongClusterEntity) selectByKey.get(0)).getExtParams()).getPulsarTenant();
                    }
                }
                for (InlongStreamBriefInfo inlongStreamBriefInfo : this.streamMapper.selectBriefList(inlongGroupId)) {
                    String inlongStreamId = inlongStreamBriefInfo.getInlongStreamId();
                    String format = String.format("persistent://%s/%s/%s", pulsarTenant, mqResource, inlongStreamBriefInfo.getMqResource());
                    DataProxyTopicInfo dataProxyTopicInfo = new DataProxyTopicInfo();
                    dataProxyTopicInfo.setInlongGroupId(inlongGroupId + "/" + inlongStreamId);
                    dataProxyTopicInfo.setTopic(format);
                    arrayList2.add(dataProxyTopicInfo);
                }
            } else if ("TUBEMQ".equals(mqType)) {
                DataProxyTopicInfo dataProxyTopicInfo2 = new DataProxyTopicInfo();
                dataProxyTopicInfo2.setInlongGroupId(inlongGroupId);
                dataProxyTopicInfo2.setTopic(mqResource);
                arrayList2.add(dataProxyTopicInfo2);
            } else if ("KAFKA".equals(mqType)) {
                for (InlongStreamBriefInfo inlongStreamBriefInfo2 : this.streamMapper.selectBriefList(inlongGroupId)) {
                    String inlongStreamId2 = inlongStreamBriefInfo2.getInlongStreamId();
                    String mqResource2 = inlongStreamBriefInfo2.getMqResource();
                    if (mqResource2.equals(inlongStreamId2)) {
                        mqResource2 = String.format("%s.%s", mqResource, inlongStreamBriefInfo2.getMqResource());
                    }
                    DataProxyTopicInfo dataProxyTopicInfo3 = new DataProxyTopicInfo();
                    dataProxyTopicInfo3.setInlongGroupId(inlongGroupId + "/" + inlongStreamId2);
                    dataProxyTopicInfo3.setTopic(mqResource2);
                    arrayList2.add(dataProxyTopicInfo3);
                }
            }
        }
        LOGGER.debug("GetDPConfig: begin to get mq clusters by tags={}", arrayList);
        ArrayList arrayList3 = new ArrayList();
        for (InlongClusterEntity inlongClusterEntity2 : this.clusterMapper.selectByCondition(ClusterPageRequest.builder().typeList(Arrays.asList("TUBEMQ", "PULSAR", "KAFKA")).clusterTagList(arrayList).build())) {
            MQClusterInfo mQClusterInfo = new MQClusterInfo();
            mQClusterInfo.setUrl(inlongClusterEntity2.getUrl());
            mQClusterInfo.setToken(inlongClusterEntity2.getToken());
            mQClusterInfo.setMqType(inlongClusterEntity2.getType());
            mQClusterInfo.setParams((Map) GSON.fromJson(inlongClusterEntity2.getExtParams(), Map.class));
            arrayList3.add(mQClusterInfo);
        }
        dataProxyConfig.setMqClusterList(arrayList3);
        dataProxyConfig.setTopicList(arrayList2);
        return dataProxyConfig;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    @Deprecated
    public String getAllConfig(String str, String str2) {
        DataProxyConfigResponse dataProxyConfigResponse = new DataProxyConfigResponse();
        String proxyMd5 = this.proxyRepository.getProxyMd5(str);
        if (proxyMd5 == null) {
            dataProxyConfigResponse.setResult(false);
            dataProxyConfigResponse.setErrCode(-101);
            return GSON.toJson(dataProxyConfigResponse);
        }
        if (proxyMd5.equals(str2)) {
            dataProxyConfigResponse.setResult(true);
            dataProxyConfigResponse.setErrCode(1);
            dataProxyConfigResponse.setMd5(proxyMd5);
            dataProxyConfigResponse.setData(new DataProxyCluster());
            return GSON.toJson(dataProxyConfigResponse);
        }
        String proxyConfigJson = this.proxyRepository.getProxyConfigJson(str);
        if (proxyConfigJson != null) {
            return proxyConfigJson;
        }
        dataProxyConfigResponse.setResult(false);
        dataProxyConfigResponse.setErrCode(-101);
        return GSON.toJson(dataProxyConfigResponse);
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public AuditConfig getAuditConfig(String str) {
        AuditConfig auditConfig = new AuditConfig();
        List<InlongClusterEntity> selectByCondition = this.clusterMapper.selectByCondition(ClusterPageRequest.builder().clusterTag(str).typeList(Arrays.asList("TUBEMQ", "PULSAR", "KAFKA")).build());
        ArrayList arrayList = new ArrayList();
        for (InlongClusterEntity inlongClusterEntity : selectByCondition) {
            MQInfo mQInfo = new MQInfo();
            mQInfo.setUrl(inlongClusterEntity.getUrl());
            mQInfo.setMqType(inlongClusterEntity.getType());
            mQInfo.setParams((Map) GSON.fromJson(inlongClusterEntity.getExtParams(), Map.class));
            arrayList.add(mQInfo);
        }
        auditConfig.setMqInfoList(arrayList);
        return auditConfig;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public Boolean testConnection(ClusterRequest clusterRequest) {
        LOGGER.info("begin test connection for: {}", clusterRequest);
        Boolean testConnection = this.clusterOperatorFactory.getInstance(clusterRequest.getType()).testConnection(clusterRequest);
        LOGGER.info("connection [{}] for: {}", testConnection.booleanValue() ? "success" : "failed", clusterRequest);
        return testConnection;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public Integer saveTenantTag(TenantClusterTagRequest tenantClusterTagRequest, String str) {
        LOGGER.debug("begin to save tenant cluster tag {}", tenantClusterTagRequest);
        Preconditions.expectNotNull(tenantClusterTagRequest, "tenant cluster request cannot be empty");
        Preconditions.expectNotBlank(tenantClusterTagRequest.getClusterTag(), ErrorCodeEnum.INVALID_PARAMETER, "cluster tag cannot be empty");
        Preconditions.expectNotBlank(tenantClusterTagRequest.getTenant(), ErrorCodeEnum.INVALID_PARAMETER, "tenant cannot be empty");
        Preconditions.expectNotNull(this.tenantService.getByName(tenantClusterTagRequest.getTenant()), ErrorCodeEnum.INVALID_PARAMETER, "target tenant cannot be found");
        TenantClusterTagEntity tenantClusterTagEntity = (TenantClusterTagEntity) CommonBeanUtils.copyProperties(tenantClusterTagRequest, TenantClusterTagEntity::new);
        tenantClusterTagEntity.setCreator(str);
        tenantClusterTagEntity.setModifier(str);
        this.tenantClusterTagMapper.insert(tenantClusterTagEntity);
        LOGGER.info("success to save tenant tag, tenant={}, tag={}", tenantClusterTagRequest.getTenant(), tenantClusterTagRequest.getClusterTag());
        return tenantClusterTagEntity.getId();
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public PageResult<ClusterTagResponse> listTagByTenantRole(TenantClusterTagPageRequest tenantClusterTagPageRequest) {
        ClusterTagPageRequest clusterTagPageRequest = (ClusterTagPageRequest) CommonBeanUtils.copyProperties(tenantClusterTagPageRequest, ClusterTagPageRequest::new);
        if (this.inlongRoleService.getByUsername(LoginUserUtils.getLoginUser().getName()) == null || StringUtils.isNotBlank(tenantClusterTagPageRequest.getTenant())) {
            tenantClusterTagPageRequest.setPageNum(1);
            tenantClusterTagPageRequest.setPageSize(Integer.MAX_VALUE);
            List list = (List) listTenantTag(tenantClusterTagPageRequest).getList().stream().map((v0) -> {
                return v0.getClusterTag();
            }).distinct().collect(Collectors.toList());
            if (CollectionUtils.isEmpty(list) && StringUtils.isNotBlank(tenantClusterTagPageRequest.getTenant())) {
                return new PageResult<>(new ArrayList(), 0L, Integer.valueOf(tenantClusterTagPageRequest.getPageNum()), Integer.valueOf(tenantClusterTagPageRequest.getPageSize()));
            }
            clusterTagPageRequest.setClusterTags(list);
        }
        return listTag(clusterTagPageRequest);
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public PageResult<ClusterInfo> listByTenantRole(ClusterPageRequest clusterPageRequest) {
        if (this.inlongRoleService.getByUsername(LoginUserUtils.getLoginUser().getName()) == null) {
            TenantClusterTagPageRequest tenantClusterTagPageRequest = new TenantClusterTagPageRequest();
            tenantClusterTagPageRequest.setPageNum(1);
            tenantClusterTagPageRequest.setPageSize(Integer.MAX_VALUE);
            clusterPageRequest.setClusterTagList((List) listTenantTag(tenantClusterTagPageRequest).getList().stream().map((v0) -> {
                return v0.getClusterTag();
            }).distinct().collect(Collectors.toList()));
        }
        return list(clusterPageRequest);
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public PageResult<TenantClusterTagInfo> listTenantTag(TenantClusterTagPageRequest tenantClusterTagPageRequest) {
        LOGGER.debug("begin to list tag by tenant {}", tenantClusterTagPageRequest);
        String name = LoginUserUtils.getLoginUser().getName();
        if (this.inlongRoleService.getByUsername(name) == null) {
            tenantClusterTagPageRequest.setTenantList(this.tenantRoleService.listTenantByUsername(name));
        }
        PageHelper.startPage(tenantClusterTagPageRequest.getPageNum(), tenantClusterTagPageRequest.getPageSize());
        PageResult<TenantClusterTagInfo> map = PageResult.fromPage(this.tenantClusterTagMapper.selectByCondition(tenantClusterTagPageRequest)).map(tenantClusterTagEntity -> {
            return (TenantClusterTagInfo) CommonBeanUtils.copyProperties(tenantClusterTagEntity, TenantClusterTagInfo::new);
        });
        LOGGER.debug("success to list tenant tag with request={}", tenantClusterTagPageRequest);
        return map;
    }

    @Override // org.apache.inlong.manager.service.cluster.InlongClusterService
    public Boolean deleteTenantTag(Integer num, String str) {
        LOGGER.debug("start to delete tenant tag with id={}", num);
        TenantClusterTagEntity selectByPrimaryKey = this.tenantClusterTagMapper.selectByPrimaryKey(num);
        Preconditions.expectNotNull(selectByPrimaryKey, ErrorCodeEnum.RECORD_NOT_FOUND.getMessage());
        selectByPrimaryKey.setModifier(str);
        selectByPrimaryKey.setIsDeleted(num);
        if (this.tenantClusterTagMapper.updateByIdSelective(selectByPrimaryKey) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
            LOGGER.error("tenant cluster tag has already updated for tenant={} tag={}", selectByPrimaryKey.getTenant(), selectByPrimaryKey.getClusterTag());
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        LOGGER.info("success to delete tenant tag of tenant={} tag={}, user={}", new Object[]{selectByPrimaryKey.getTenant(), selectByPrimaryKey.getClusterTag(), str});
        return true;
    }

    private void removeClusterTag(InlongClusterEntity inlongClusterEntity, String str, String str2) {
        HashSet newHashSet = Sets.newHashSet(inlongClusterEntity.getClusterTags().split(","));
        newHashSet.remove(str);
        inlongClusterEntity.setClusterTags(Joiner.on(",").join(newHashSet));
        inlongClusterEntity.setModifier(str2);
        if (InlongConstants.AFFECTED_ONE_ROW.intValue() != this.clusterMapper.updateById(inlongClusterEntity)) {
            LOGGER.error("cluster has already updated with name={}, type={}, curVersion={}", new Object[]{inlongClusterEntity.getName(), inlongClusterEntity.getType(), inlongClusterEntity.getVersion()});
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
    }

    private void assertNoInlongGroupExists(String str) {
        List selectByClusterTag = this.groupMapper.selectByClusterTag(str);
        if (CollectionUtils.isEmpty(selectByClusterTag)) {
            return;
        }
        String format = String.format("inlong cluster tag [%s] was used by inlong group %s", str, (List) selectByClusterTag.stream().map((v0) -> {
            return v0.getInlongGroupId();
        }).collect(Collectors.toList()));
        LOGGER.error(format);
        throw new BusinessException(format + ", please delete them first");
    }

    private void chkUnmodifiableParams(InlongClusterEntity inlongClusterEntity, ClusterRequest clusterRequest) {
        Preconditions.expectEquals(inlongClusterEntity.getType(), clusterRequest.getType(), ErrorCodeEnum.INVALID_PARAMETER, "type not allowed modify");
        Preconditions.expectEquals(inlongClusterEntity.getVersion(), clusterRequest.getVersion(), ErrorCodeEnum.CONFIG_EXPIRED, String.format("record has expired with record version=%d, request version=%d", inlongClusterEntity.getVersion(), clusterRequest.getVersion()));
        if (StringUtils.isBlank(clusterRequest.getName())) {
            clusterRequest.setName(inlongClusterEntity.getName());
        } else {
            Preconditions.expectEquals(inlongClusterEntity.getName(), clusterRequest.getName(), ErrorCodeEnum.INVALID_PARAMETER, "name not allowed modify");
        }
        if (StringUtils.isBlank(clusterRequest.getClusterTags())) {
            clusterRequest.setClusterTags(inlongClusterEntity.getClusterTags());
        }
    }
}
