package org.apache.inlong.manager.service.core.impl;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.audit.AuditOperator;
import org.apache.inlong.audit.entity.AuditInformation;
import org.apache.inlong.audit.entity.AuditProxy;
import org.apache.inlong.audit.entity.FlowType;
import org.apache.inlong.common.enums.IndicatorType;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.HttpUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.audit.AuditProxyResponse;
import org.apache.inlong.manager.pojo.audit.AuditRequest;
import org.apache.inlong.manager.pojo.audit.AuditVO;
import org.apache.inlong.manager.pojo.user.LoginUserUtils;
import org.apache.inlong.manager.service.audit.AuditRunnable;
import org.apache.inlong.manager.service.core.AuditService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Lazy
@Service
/* loaded from: input_file:org/apache/inlong/manager/service/core/impl/AuditServiceImpl.class */
public class AuditServiceImpl implements AuditService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AuditServiceImpl.class);
    private final Map<String, Map<Integer, AuditInformation>> auditIndicatorMap = new ConcurrentHashMap();
    private final Map<String, String> auditItemMap = new ConcurrentHashMap();
    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);

    @Value("#{'${audit.admin.ids:3,4,5,6}'.split(',')}")
    private List<String> auditIdListForAdmin;

    @Value("#{'${audit.user.ids:3,4,5,6}'.split(',')}")
    private List<String> auditIdListForUser;

    @Value("${audit.query.url:http://127.0.0.1:10080}")
    private String auditQueryUrl;

    @Autowired
    private StreamSinkEntityMapper sinkEntityMapper;

    @Autowired
    private StreamSourceEntityMapper sourceEntityMapper;

    @Autowired
    private InlongGroupEntityMapper inlongGroupMapper;

    @Autowired
    private RestTemplate restTemplate;

    @PostConstruct
    public void initialize() {
        LOGGER.info("init audit base item cache map for {}", AuditServiceImpl.class.getSimpleName());
        try {
            refreshBaseItemCache();
        } catch (Throwable th) {
            LOGGER.error("initialize audit base item cache error", th);
        }
    }

    @Override // org.apache.inlong.manager.service.core.AuditService
    public Boolean refreshBaseItemCache() {
        LOGGER.debug("start to reload audit base item info");
        try {
            this.auditIndicatorMap.clear();
            List allAuditInformation = AuditOperator.getInstance().getAllAuditInformation();
            List allMetricInformation = AuditOperator.getInstance().getAllMetricInformation();
            allAuditInformation.forEach(auditInformation -> {
                this.auditItemMap.put(String.valueOf(auditInformation.getAuditId()), auditInformation.getNameInChinese());
            });
            allMetricInformation.forEach(auditInformation2 -> {
                this.auditItemMap.put(String.valueOf(auditInformation2.getAuditId()), auditInformation2.getNameInChinese());
            });
            LOGGER.debug("success to reload audit base item info");
            return true;
        } catch (Throwable th) {
            LOGGER.error("failed to reload audit base item info", th);
            return false;
        }
    }

    @Override // org.apache.inlong.manager.service.core.AuditService
    public String getAuditId(String str, IndicatorType indicatorType) {
        if (StringUtils.isBlank(str)) {
            return null;
        }
        Map<Integer, AuditInformation> computeIfAbsent = this.auditIndicatorMap.computeIfAbsent(str, str2 -> {
            return new HashMap();
        });
        AuditInformation auditInformation = computeIfAbsent.get(Integer.valueOf(indicatorType.getCode()));
        if (auditInformation != null) {
            return String.valueOf(auditInformation.getAuditId());
        }
        AuditInformation buildAuditInformation = AuditOperator.getInstance().buildAuditInformation(str, indicatorType.getCode() % 2 == 0 ? FlowType.INPUT : FlowType.OUTPUT, IndicatorType.isSuccessType(indicatorType).booleanValue(), true, IndicatorType.isDiscardType(indicatorType).booleanValue(), IndicatorType.isRetryType(indicatorType).booleanValue());
        Preconditions.expectNotNull(buildAuditInformation, ErrorCodeEnum.AUDIT_ID_TYPE_NOT_SUPPORTED, String.format(ErrorCodeEnum.AUDIT_ID_TYPE_NOT_SUPPORTED.getMessage(), str));
        computeIfAbsent.put(Integer.valueOf(indicatorType.getCode()), buildAuditInformation);
        return String.valueOf(buildAuditInformation.getAuditId());
    }

    @Override // org.apache.inlong.manager.service.core.AuditService
    public List<AuditVO> listByCondition(AuditRequest auditRequest) throws Exception {
        LOGGER.info("begin query audit list request={}", auditRequest);
        Preconditions.expectNotNull(auditRequest, "request is null");
        String inlongGroupId = auditRequest.getInlongGroupId();
        String inlongStreamId = auditRequest.getInlongStreamId();
        Integer sinkId = auditRequest.getSinkId();
        StreamSinkEntity streamSinkEntity = null;
        List selectByRelatedId = this.sinkEntityMapper.selectByRelatedId(inlongGroupId, inlongStreamId);
        if (sinkId != null) {
            streamSinkEntity = this.sinkEntityMapper.selectByPrimaryKey(sinkId);
        } else if (CollectionUtils.isNotEmpty(selectByRelatedId)) {
            streamSinkEntity = (StreamSinkEntity) selectByRelatedId.get(0);
        }
        String sinkType = streamSinkEntity != null ? streamSinkEntity.getSinkType() : null;
        HashMap hashMap = new HashMap();
        if (StringUtils.isNotBlank(inlongGroupId)) {
            InlongGroupEntity selectByGroupId = this.inlongGroupMapper.selectByGroupId(inlongGroupId);
            List selectByRelatedId2 = this.sourceEntityMapper.selectByRelatedId(inlongGroupId, inlongStreamId, (String) null);
            String sourceType = CollectionUtils.isNotEmpty(selectByRelatedId2) ? ((StreamSourceEntity) selectByRelatedId2.get(0)).getSourceType() : null;
            hashMap.put(getAuditId(sinkType, IndicatorType.SEND_SUCCESS), sinkType);
            if (CollectionUtils.isEmpty(auditRequest.getAuditIds())) {
                if (InlongConstants.DATASYNC_REALTIME_MODE.equals(selectByGroupId.getInlongGroupMode()) || InlongConstants.DATASYNC_OFFLINE_MODE.equals(selectByGroupId.getInlongGroupMode())) {
                    hashMap.put(getAuditId(sourceType, IndicatorType.RECEIVED_SUCCESS), sourceType);
                    auditRequest.setAuditIds(getAuditIds(inlongGroupId, inlongStreamId, sourceType, sinkType));
                } else {
                    hashMap.put(getAuditId(sinkType, IndicatorType.RECEIVED_SUCCESS), sinkType);
                    auditRequest.setAuditIds(getAuditIds(inlongGroupId, inlongStreamId, null, sinkType));
                }
            }
        } else if (CollectionUtils.isEmpty(auditRequest.getAuditIds())) {
            throw new BusinessException("audits id is empty");
        }
        ArrayList arrayList = new ArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(auditRequest.getAuditIds().size());
        for (String str : auditRequest.getAuditIds()) {
            this.executor.execute(new AuditRunnable(auditRequest, str, this.auditItemMap.get(str), arrayList, countDownLatch, this.restTemplate, this.auditQueryUrl, hashMap, false));
        }
        countDownLatch.await(30L, TimeUnit.SECONDS);
        LOGGER.info("success to query audit list for request={}", auditRequest);
        return arrayList;
    }

    @Override // org.apache.inlong.manager.service.core.AuditService
    public List<AuditVO> listAll(AuditRequest auditRequest) throws Exception {
        ArrayList arrayList = new ArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(auditRequest.getAuditIds().size());
        for (String str : auditRequest.getAuditIds()) {
            this.executor.execute(new AuditRunnable(auditRequest, str, this.auditItemMap.get(str), arrayList, countDownLatch, this.restTemplate, this.auditQueryUrl, null, true));
        }
        countDownLatch.await(30L, TimeUnit.SECONDS);
        return arrayList;
    }

    @Override // org.apache.inlong.manager.service.core.AuditService
    public List<AuditInformation> getAuditBases(Boolean bool) {
        return bool.booleanValue() ? AuditOperator.getInstance().getAllMetricInformation() : AuditOperator.getInstance().getAllAuditInformation();
    }

    private List<String> getAuditIds(String str, String str2, String str3, String str4) {
        HashSet hashSet = LoginUserUtils.getLoginUser().getRoles().contains("TENANT_ADMIN") ? new HashSet(this.auditIdListForAdmin) : new HashSet(this.auditIdListForUser);
        if (str4 == null) {
            hashSet.add(getAuditId("DATAPROXY", IndicatorType.SEND_SUCCESS));
        } else {
            hashSet.add(getAuditId(str4, IndicatorType.SEND_SUCCESS));
            InlongGroupEntity selectByGroupId = this.inlongGroupMapper.selectByGroupId(str);
            if (InlongConstants.DATASYNC_REALTIME_MODE.equals(selectByGroupId.getInlongGroupMode()) || InlongConstants.DATASYNC_OFFLINE_MODE.equals(selectByGroupId.getInlongGroupMode())) {
                hashSet.add(getAuditId(str3, IndicatorType.RECEIVED_SUCCESS));
            } else {
                hashSet.add(getAuditId(str4, IndicatorType.RECEIVED_SUCCESS));
            }
        }
        List selectByRelatedId = this.sourceEntityMapper.selectByRelatedId(str, str2, (String) null);
        if ((CollectionUtils.isEmpty(selectByRelatedId) || selectByRelatedId.stream().allMatch(streamSourceEntity -> {
            return "AUTO_PUSH".equals(streamSourceEntity.getSourceType());
        })) && hashSet.contains(getAuditId("AGENT", IndicatorType.RECEIVED_SUCCESS))) {
            hashSet.add(getAuditId("DATAPROXY", IndicatorType.RECEIVED_SUCCESS));
        }
        return new ArrayList(hashSet);
    }

    @Override // org.apache.inlong.manager.service.core.AuditService
    public List<AuditProxy> getAuditProxy(String str) throws Exception {
        try {
            StringBuilder sb = new StringBuilder();
            sb.append(this.auditQueryUrl).append("/audit/query/getAuditProxy?").append("component=").append(str);
            String sb2 = sb.toString();
            LOGGER.info("query audit url ={}", sb2);
            AuditProxyResponse auditProxyResponse = (AuditProxyResponse) HttpUtils.request(this.restTemplate, sb2, HttpMethod.GET, (String) null, (HttpHeaders) null, AuditProxyResponse.class);
            LOGGER.info("success to query audit proxy url list for request url ={}", sb2);
            return auditProxyResponse.getData();
        } catch (Exception e) {
            String format = String.format("get audit proxy url failed for %s", str);
            LOGGER.info(format, e);
            throw new BusinessException(format);
        }
    }
}
