package org.apache.inlong.manager.service.core.impl;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.jdbc.SQL;
import org.apache.inlong.common.enums.IndicatorType;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.AuditQuerySource;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.TimeStaticsDim;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.AuditBaseEntity;
import org.apache.inlong.manager.dao.entity.AuditSourceEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.AuditBaseEntityMapper;
import org.apache.inlong.manager.dao.mapper.AuditEntityMapper;
import org.apache.inlong.manager.dao.mapper.AuditSourceEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.audit.AuditBaseResponse;
import org.apache.inlong.manager.pojo.audit.AuditInfo;
import org.apache.inlong.manager.pojo.audit.AuditRequest;
import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
import org.apache.inlong.manager.pojo.audit.AuditSourceResponse;
import org.apache.inlong.manager.pojo.audit.AuditVO;
import org.apache.inlong.manager.pojo.user.LoginUserUtils;
import org.apache.inlong.manager.service.audit.AuditRunnable;
import org.apache.inlong.manager.service.audit.InlongAuditSourceOperatorFactory;
import org.apache.inlong.manager.service.core.AuditService;
import org.apache.inlong.manager.service.resource.sink.ck.ClickHouseConfig;
import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchApi;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Lazy
@Service
/* loaded from: input_file:org/apache/inlong/manager/service/core/impl/AuditServiceImpl.class */
public class AuditServiceImpl implements AuditService {
    private static final double DEFAULT_BOOST = 1.0d;
    private static final boolean ADJUST_PURE_NEGATIVE = true;
    private static final int QUERY_FROM = 0;
    private static final int QUERY_SIZE = 0;
    private static final String SORT_ORDER = "ASC";
    private static final String TERM_FILED = "log_ts";
    private static final String AGGREGATIONS_COUNT = "count";
    private static final String AGGREGATIONS_DELAY = "delay";
    private static final String AGGREGATIONS = "aggregations";
    private static final String BUCKETS = "buckets";
    private static final String KEY = "key";
    private static final String VALUE = "value";
    private static final String INLONG_GROUP_ID = "inlong_group_id";
    private static final String INLONG_STREAM_ID = "inlong_stream_id";
    private static final String COUNT = "count";
    private static final String DELAY = "delay";
    private static final String TERMS = "terms";
    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
    private final Map<String, Map<Integer, AuditBaseEntity>> auditIndicatorMap = new ConcurrentHashMap();
    private final Map<String, AuditBaseEntity> auditItemMap = new ConcurrentHashMap();

    @Value("#{'${audit.admin.ids:3,4,5,6}'.split(',')}")
    private List<String> auditIdListForAdmin;

    @Value("#{'${audit.user.ids:3,4,5,6}'.split(',')}")
    private List<String> auditIdListForUser;

    @Value("${audit.query.source}")
    private String auditQuerySource;

    @Value("${audit.query.url:http://127.0.0.1:10080}")
    private String auditQueryUrl;

    @Autowired
    private AuditBaseEntityMapper auditBaseMapper;

    @Autowired
    private AuditEntityMapper auditEntityMapper;

    @Autowired
    private ElasticsearchApi elasticsearchApi;

    @Autowired
    private StreamSinkEntityMapper sinkEntityMapper;

    @Autowired
    private StreamSourceEntityMapper sourceEntityMapper;

    @Autowired
    private ClickHouseConfig config;

    @Autowired
    private AuditSourceEntityMapper auditSourceMapper;

    @Autowired
    private InlongGroupEntityMapper inlongGroupMapper;

    @Autowired
    private InlongAuditSourceOperatorFactory auditSourceOperatorFactory;

    @Autowired
    private RestTemplate restTemplate;
    private static final Logger LOGGER = LoggerFactory.getLogger(AuditServiceImpl.class);
    private static final Gson GSON = new GsonBuilder().create();
    private static final String SECOND_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final DateTimeFormatter SECOND_DATE_FORMATTER = DateTimeFormat.forPattern(SECOND_FORMAT);
    private static final String HOUR_FORMAT = "yyyy-MM-dd HH";
    private static final DateTimeFormatter HOUR_DATE_FORMATTER = DateTimeFormat.forPattern(HOUR_FORMAT);
    private static final String DAY_FORMAT = "yyyy-MM-dd";
    private static final DateTimeFormatter DAY_DATE_FORMATTER = DateTimeFormat.forPattern(DAY_FORMAT);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.inlong.manager.service.core.impl.AuditServiceImpl$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/inlong/manager/service/core/impl/AuditServiceImpl$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$inlong$manager$common$enums$TimeStaticsDim = new int[TimeStaticsDim.values().length];

        static {
            try {
                $SwitchMap$org$apache$inlong$manager$common$enums$TimeStaticsDim[TimeStaticsDim.HOUR.ordinal()] = AuditServiceImpl.ADJUST_PURE_NEGATIVE;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$inlong$manager$common$enums$TimeStaticsDim[TimeStaticsDim.DAY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    @PostConstruct
    public void initialize() {
        LOGGER.info("init audit base item cache map for {}", AuditServiceImpl.class.getSimpleName());
        try {
            refreshBaseItemCache();
        } catch (Throwable th) {
            LOGGER.error("initialize audit base item cache error", th);
        }
    }

    @Override // org.apache.inlong.manager.service.core.AuditService
    public Boolean refreshBaseItemCache() {
        LOGGER.debug("start to reload audit base item info");
        try {
            for (AuditBaseEntity auditBaseEntity : this.auditBaseMapper.selectAll()) {
                this.auditItemMap.put(auditBaseEntity.getAuditId(), auditBaseEntity);
                this.auditIndicatorMap.computeIfAbsent(auditBaseEntity.getType(), str -> {
                    return new HashMap();
                }).put(auditBaseEntity.getIndicatorType(), auditBaseEntity);
            }
            LOGGER.debug("success to reload audit base item info");
            return true;
        } catch (Throwable th) {
            LOGGER.error("failed to reload audit base item info", th);
            return false;
        }
    }

    @Override // org.apache.inlong.manager.service.core.AuditService
    public Integer updateAuditSource(AuditSourceRequest auditSourceRequest, String str) {
        auditSourceRequest.setUrl(this.auditSourceOperatorFactory.getInstance(auditSourceRequest.getType()).convertTo(auditSourceRequest.getUrl()));
        String offlineUrl = auditSourceRequest.getOfflineUrl();
        if (StringUtils.isNotBlank(offlineUrl)) {
            this.auditSourceMapper.offlineSourceByUrl(offlineUrl);
            LOGGER.info("success offline the audit source with url: {}", offlineUrl);
        }
        AuditSourceEntity auditSourceEntity = (AuditSourceEntity) CommonBeanUtils.copyProperties(auditSourceRequest, AuditSourceEntity::new);
        auditSourceEntity.setStatus(Integer.valueOf(ADJUST_PURE_NEGATIVE));
        auditSourceEntity.setCreator(str);
        auditSourceEntity.setModifier(str);
        this.auditSourceMapper.insert(auditSourceEntity);
        Integer id = auditSourceEntity.getId();
        LOGGER.info("success to insert audit source with id={}", id);
        this.config.updateRuntimeConfig();
        LOGGER.info("success to update audit source with id={}", id);
        return id;
    }

    @Override // org.apache.inlong.manager.service.core.AuditService
    public AuditSourceResponse getAuditSource() {
        AuditSourceEntity selectOnlineSource = this.auditSourceMapper.selectOnlineSource();
        if (selectOnlineSource == null) {
            throw new BusinessException(ErrorCodeEnum.RECORD_NOT_FOUND);
        }
        LOGGER.debug("success to get audit source, id={}", selectOnlineSource.getId());
        return (AuditSourceResponse) CommonBeanUtils.copyProperties(selectOnlineSource, AuditSourceResponse::new);
    }

    @Override // org.apache.inlong.manager.service.core.AuditService
    public String getAuditId(String str, IndicatorType indicatorType) {
        if (StringUtils.isBlank(str)) {
            return null;
        }
        Map<Integer, AuditBaseEntity> computeIfAbsent = this.auditIndicatorMap.computeIfAbsent(str, str2 -> {
            return new HashMap();
        });
        AuditBaseEntity auditBaseEntity = computeIfAbsent.get(Integer.valueOf(indicatorType.getCode()));
        if (auditBaseEntity != null) {
            return auditBaseEntity.getAuditId();
        }
        AuditBaseEntity selectByTypeAndIndicatorType = this.auditBaseMapper.selectByTypeAndIndicatorType(str, Integer.valueOf(indicatorType.getCode()));
        Preconditions.expectNotNull(selectByTypeAndIndicatorType, ErrorCodeEnum.AUDIT_ID_TYPE_NOT_SUPPORTED, String.format(ErrorCodeEnum.AUDIT_ID_TYPE_NOT_SUPPORTED.getMessage(), str));
        computeIfAbsent.put(selectByTypeAndIndicatorType.getIndicatorType(), selectByTypeAndIndicatorType);
        return selectByTypeAndIndicatorType.getAuditId();
    }

    @Override // org.apache.inlong.manager.service.core.AuditService
    public List<AuditVO> listByCondition(AuditRequest auditRequest) throws Exception {
        LOGGER.info("begin query audit list request={}", auditRequest);
        Preconditions.expectNotNull(auditRequest, "request is null");
        String inlongGroupId = auditRequest.getInlongGroupId();
        String inlongStreamId = auditRequest.getInlongStreamId();
        Integer sinkId = auditRequest.getSinkId();
        StreamSinkEntity streamSinkEntity = null;
        List selectByRelatedId = this.sinkEntityMapper.selectByRelatedId(inlongGroupId, inlongStreamId);
        if (sinkId != null) {
            streamSinkEntity = this.sinkEntityMapper.selectByPrimaryKey(sinkId);
        } else if (CollectionUtils.isNotEmpty(selectByRelatedId)) {
            streamSinkEntity = (StreamSinkEntity) selectByRelatedId.get(0);
        }
        String sinkType = streamSinkEntity != null ? streamSinkEntity.getSinkType() : null;
        HashMap hashMap = new HashMap();
        if (StringUtils.isNotBlank(inlongGroupId)) {
            InlongGroupEntity selectByGroupId = this.inlongGroupMapper.selectByGroupId(inlongGroupId);
            List selectByRelatedId2 = this.sourceEntityMapper.selectByRelatedId(inlongGroupId, inlongStreamId, (String) null);
            String sourceType = CollectionUtils.isNotEmpty(selectByRelatedId2) ? ((StreamSourceEntity) selectByRelatedId2.get(0)).getSourceType() : null;
            hashMap.put(getAuditId(sinkType, IndicatorType.SEND_SUCCESS), sinkType);
            if (CollectionUtils.isEmpty(auditRequest.getAuditIds())) {
                if (InlongConstants.DATASYNC_MODE.equals(selectByGroupId.getInlongGroupMode())) {
                    hashMap.put(getAuditId(sourceType, IndicatorType.RECEIVED_SUCCESS), sourceType);
                    auditRequest.setAuditIds(getAuditIds(inlongGroupId, inlongStreamId, sourceType, sinkType));
                } else {
                    hashMap.put(getAuditId(sinkType, IndicatorType.RECEIVED_SUCCESS), sinkType);
                    auditRequest.setAuditIds(getAuditIds(inlongGroupId, inlongStreamId, null, sinkType));
                }
            }
        } else if (CollectionUtils.isEmpty(auditRequest.getAuditIds())) {
            throw new BusinessException("audits id is empty");
        }
        List<AuditVO> arrayList = new ArrayList();
        AuditQuerySource valueOf = AuditQuerySource.valueOf(this.auditQuerySource);
        CountDownLatch countDownLatch = new CountDownLatch(auditRequest.getAuditIds().size());
        for (String str : auditRequest.getAuditIds()) {
            AuditBaseEntity auditBaseEntity = this.auditItemMap.get(str);
            String name = auditBaseEntity != null ? auditBaseEntity.getName() : "";
            if (AuditQuerySource.MYSQL == valueOf) {
                String dateTime = DAY_DATE_FORMATTER.parseDateTime(auditRequest.getEndDate()).plusDays(ADJUST_PURE_NEGATIVE).toString(DAY_DATE_FORMATTER);
                arrayList.add(new AuditVO(str, name, (List) (StringUtils.isNotBlank(auditRequest.getIp()) ? this.auditEntityMapper.sumByLogTsAndIp(auditRequest.getIp(), str, auditRequest.getStartDate(), dateTime, "%Y-%m-%d %H:%i:00") : this.auditEntityMapper.sumByLogTs(inlongGroupId, inlongStreamId, str, auditRequest.getStartDate(), dateTime, "%Y-%m-%d %H:%i:00")).stream().map(map -> {
                    AuditInfo auditInfo = new AuditInfo();
                    auditInfo.setInlongGroupId((String) map.get("inlongGroupId"));
                    auditInfo.setInlongStreamId((String) map.get("inlongStreamId"));
                    auditInfo.setLogTs((String) map.get("logTs"));
                    auditInfo.setCount(((BigDecimal) map.get("total")).longValue());
                    auditInfo.setDelay(((BigDecimal) map.get("totalDelay")).longValue());
                    auditInfo.setSize(((BigDecimal) map.get("totalSize")).longValue());
                    return auditInfo;
                }).collect(Collectors.toList()), (String) hashMap.getOrDefault(str, null)));
            } else if (AuditQuerySource.CLICKHOUSE == valueOf) {
                this.executor.execute(new AuditRunnable(auditRequest, str, name, arrayList, countDownLatch, this.restTemplate, this.auditQueryUrl, hashMap, false));
            }
        }
        if (AuditQuerySource.CLICKHOUSE == valueOf) {
            countDownLatch.await(30L, TimeUnit.SECONDS);
        } else {
            arrayList = aggregateByTimeDim(arrayList, auditRequest.getTimeStaticsDim());
        }
        LOGGER.info("success to query audit list for request={}", auditRequest);
        return arrayList;
    }

    @Override // org.apache.inlong.manager.service.core.AuditService
    public List<AuditVO> listAll(AuditRequest auditRequest) throws Exception {
        ArrayList arrayList = new ArrayList();
        AuditQuerySource valueOf = AuditQuerySource.valueOf(this.auditQuerySource);
        CountDownLatch countDownLatch = new CountDownLatch(auditRequest.getAuditIds().size());
        for (String str : auditRequest.getAuditIds()) {
            AuditBaseEntity auditBaseEntity = this.auditItemMap.get(str);
            String name = auditBaseEntity != null ? auditBaseEntity.getName() : "";
            if (AuditQuerySource.MYSQL == valueOf) {
                arrayList.add(new AuditVO(str, name, (List) this.auditEntityMapper.sumGroupByIp(auditRequest.getInlongGroupId(), auditRequest.getInlongStreamId(), auditRequest.getIp(), str, auditRequest.getStartDate(), SECOND_DATE_FORMATTER.parseDateTime(auditRequest.getEndDate()).plusDays(ADJUST_PURE_NEGATIVE).toString(SECOND_DATE_FORMATTER)).stream().map(map -> {
                    AuditInfo auditInfo = new AuditInfo();
                    auditInfo.setInlongGroupId((String) map.get("inlongGroupId"));
                    auditInfo.setInlongStreamId((String) map.get("inlongStreamId"));
                    auditInfo.setLogTs((String) map.get("logTs"));
                    auditInfo.setIp((String) map.get("ip"));
                    auditInfo.setCount(((BigDecimal) map.get("total")).longValue());
                    auditInfo.setDelay(((BigDecimal) map.get("totalDelay")).longValue());
                    auditInfo.setSize(((BigDecimal) map.get("totalSize")).longValue());
                    return auditInfo;
                }).collect(Collectors.toList()), (String) null));
            } else if (AuditQuerySource.CLICKHOUSE == valueOf) {
                this.executor.execute(new AuditRunnable(auditRequest, str, name, arrayList, countDownLatch, this.restTemplate, this.auditQueryUrl, null, true));
            }
        }
        if (AuditQuerySource.CLICKHOUSE == valueOf) {
            countDownLatch.await(30L, TimeUnit.SECONDS);
        }
        return arrayList;
    }

    @Override // org.apache.inlong.manager.service.core.AuditService
    public List<AuditBaseResponse> getAuditBases() {
        return CommonBeanUtils.copyListProperties(this.auditBaseMapper.selectAll(), AuditBaseResponse::new);
    }

    private List<String> getAuditIds(String str, String str2, String str3, String str4) {
        HashSet hashSet = LoginUserUtils.getLoginUser().getRoles().contains("TENANT_ADMIN") ? new HashSet(this.auditIdListForAdmin) : new HashSet(this.auditIdListForUser);
        if (str4 == null) {
            hashSet.add(getAuditId("DATAPROXY", IndicatorType.SEND_SUCCESS));
        } else {
            hashSet.add(getAuditId(str4, IndicatorType.SEND_SUCCESS));
            if (InlongConstants.DATASYNC_MODE.equals(this.inlongGroupMapper.selectByGroupId(str).getInlongGroupMode())) {
                hashSet.add(getAuditId(str3, IndicatorType.RECEIVED_SUCCESS));
            } else {
                hashSet.add(getAuditId(str4, IndicatorType.RECEIVED_SUCCESS));
            }
        }
        List selectByRelatedId = this.sourceEntityMapper.selectByRelatedId(str, str2, (String) null);
        if ((CollectionUtils.isEmpty(selectByRelatedId) || selectByRelatedId.stream().allMatch(streamSourceEntity -> {
            return "AUTO_PUSH".equals(streamSourceEntity.getSourceType());
        })) && hashSet.contains(getAuditId("AGENT", IndicatorType.RECEIVED_SUCCESS))) {
            hashSet.add(getAuditId("DATAPROXY", IndicatorType.RECEIVED_SUCCESS));
        }
        return new ArrayList(hashSet);
    }

    private PreparedStatement getAuditCkStatementGroupByLogTs(Connection connection, String str, String str2, String str3, String str4, String str5, String str6) throws SQLException {
        String dateTime = DAY_DATE_FORMATTER.parseDateTime(str5).toString(SECOND_FORMAT);
        String dateTime2 = DAY_DATE_FORMATTER.parseDateTime(str6).plusDays(ADJUST_PURE_NEGATIVE).toString(SECOND_FORMAT);
        if (StringUtils.isNotBlank(str3)) {
            return getAuditCkStatementByIp(connection, str4, str3, str5, str6);
        }
        PreparedStatement prepareStatement = connection.prepareStatement(((SQL) ((SQL) ((SQL) ((SQL) new SQL().SELECT(new String[]{INLONG_GROUP_ID, INLONG_STREAM_ID, TERM_FILED, "sum(count) as total", "sum(delay) as total_delay", "sum(size) as total_size"})).FROM("(" + ((SQL) ((SQL) ((SQL) ((SQL) ((SQL) ((SQL) ((SQL) new SQL().SELECT_DISTINCT(new String[]{"ip", "docker_id", "thread_id", "sdk_ts", "packet_id", TERM_FILED, INLONG_GROUP_ID, INLONG_STREAM_ID, "audit_id", "count", "size", "delay"})).FROM("audit_data")).WHERE("inlong_group_id = ?")).WHERE("inlong_stream_id = ?")).WHERE("audit_id = ?")).WHERE("log_ts >= ?")).WHERE("log_ts < ?")).toString() + ") as sub")).GROUP_BY(new String[]{TERM_FILED, INLONG_GROUP_ID, INLONG_STREAM_ID})).ORDER_BY(TERM_FILED)).toString());
        prepareStatement.setString(ADJUST_PURE_NEGATIVE, str);
        prepareStatement.setString(2, str2);
        prepareStatement.setString(3, str4);
        prepareStatement.setString(4, dateTime);
        prepareStatement.setString(5, dateTime2);
        return prepareStatement;
    }

    private PreparedStatement getAuditCkStatementGroupByIp(Connection connection, String str, String str2, String str3, String str4, String str5, String str6) throws SQLException {
        if (StringUtils.isNotBlank(str3)) {
            return getAuditCkStatementByIpGroupByIp(connection, str4, str3, str5, str6);
        }
        PreparedStatement prepareStatement = connection.prepareStatement(((SQL) ((SQL) ((SQL) new SQL().SELECT(new String[]{INLONG_GROUP_ID, INLONG_STREAM_ID, "sum(count) as total", "ip", "sum(delay) as total_delay", "sum(size) as total_size"})).FROM("(" + ((SQL) ((SQL) ((SQL) ((SQL) ((SQL) ((SQL) ((SQL) new SQL().SELECT_DISTINCT(new String[]{"ip", "docker_id", "thread_id", "sdk_ts", "packet_id", TERM_FILED, INLONG_GROUP_ID, INLONG_STREAM_ID, "audit_id", "count", "size", "delay"})).FROM("audit_data")).WHERE("inlong_group_id = ?")).WHERE("inlong_stream_id = ?")).WHERE("audit_id = ?")).WHERE("log_ts >= ?")).WHERE("log_ts < ?")).toString() + ") as sub")).GROUP_BY(new String[]{INLONG_GROUP_ID, INLONG_STREAM_ID, "ip"})).toString());
        prepareStatement.setString(ADJUST_PURE_NEGATIVE, str);
        prepareStatement.setString(2, str2);
        prepareStatement.setString(3, str4);
        prepareStatement.setString(4, str5);
        prepareStatement.setString(5, str6);
        return prepareStatement;
    }

    private List<AuditVO> aggregateByTimeDim(List<AuditVO> list, TimeStaticsDim timeStaticsDim) {
        List<AuditVO> doAggregate;
        switch (AnonymousClass1.$SwitchMap$org$apache$inlong$manager$common$enums$TimeStaticsDim[timeStaticsDim.ordinal()]) {
            case ADJUST_PURE_NEGATIVE /* 1 */:
                doAggregate = doAggregate(list, HOUR_FORMAT);
                break;
            case 2:
                doAggregate = doAggregate(list, DAY_FORMAT);
                break;
            default:
                doAggregate = doAggregate(list, SECOND_FORMAT);
                break;
        }
        return doAggregate;
    }

    private List<AuditVO> doAggregate(List<AuditVO> list, String str) {
        ArrayList arrayList = new ArrayList();
        for (AuditVO auditVO : list) {
            AuditVO auditVO2 = new AuditVO();
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            HashMap hashMap3 = new HashMap();
            auditVO2.setAuditId(auditVO.getAuditId());
            auditVO2.setAuditName(auditVO.getAuditName());
            auditVO2.setNodeType(auditVO.getNodeType());
            for (AuditInfo auditInfo : auditVO.getAuditSet()) {
                String formatLogTime = formatLogTime(auditInfo.getLogTs(), str);
                if (formatLogTime != null) {
                    ((AtomicLong) hashMap.computeIfAbsent(formatLogTime, str2 -> {
                        return new AtomicLong(0L);
                    })).addAndGet(auditInfo.getCount());
                    ((AtomicLong) hashMap2.computeIfAbsent(formatLogTime, str3 -> {
                        return new AtomicLong(0L);
                    })).addAndGet(auditInfo.getDelay());
                    ((AtomicLong) hashMap3.computeIfAbsent(formatLogTime, str4 -> {
                        return new AtomicLong(0L);
                    })).addAndGet(auditInfo.getSize());
                }
            }
            LinkedList linkedList = new LinkedList();
            for (Map.Entry entry : hashMap.entrySet()) {
                AuditInfo auditInfo2 = new AuditInfo();
                auditInfo2.setLogTs((String) entry.getKey());
                long j = ((AtomicLong) entry.getValue()).get();
                auditInfo2.setCount(((AtomicLong) entry.getValue()).get());
                auditInfo2.setDelay(j == 0 ? 0L : ((AtomicLong) hashMap2.get(entry.getKey())).get() / j);
                auditInfo2.setSize(j == 0 ? 0L : ((AtomicLong) hashMap3.get(entry.getKey())).get() / j);
                linkedList.add(auditInfo2);
            }
            auditVO2.setAuditSet(linkedList);
            arrayList.add(auditVO2);
        }
        return arrayList;
    }

    private String formatLogTime(String str, String str2) {
        String str3 = null;
        try {
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat(str2);
            str3 = simpleDateFormat.format(simpleDateFormat.parse(str));
        } catch (Exception e) {
            LOGGER.error("format lot time exception", e);
        }
        return str3;
    }

    private PreparedStatement getAuditCkStatementByIp(Connection connection, String str, String str2, String str3, String str4) throws SQLException {
        String dateTime = DAY_DATE_FORMATTER.parseDateTime(str3).toString(SECOND_FORMAT);
        String dateTime2 = DAY_DATE_FORMATTER.parseDateTime(str4).plusDays(ADJUST_PURE_NEGATIVE).toString(SECOND_FORMAT);
        PreparedStatement prepareStatement = connection.prepareStatement(((SQL) ((SQL) ((SQL) ((SQL) new SQL().SELECT(new String[]{INLONG_GROUP_ID, INLONG_STREAM_ID, TERM_FILED, "sum(count) as total", "sum(delay) as total_delay", "sum(size) as total_size"})).FROM("(" + ((SQL) ((SQL) ((SQL) ((SQL) ((SQL) ((SQL) new SQL().SELECT_DISTINCT(new String[]{"ip", "docker_id", "thread_id", "sdk_ts", "packet_id", TERM_FILED, INLONG_GROUP_ID, INLONG_STREAM_ID, "audit_id", "count", "size", "delay"})).FROM("audit_data")).WHERE("ip = ?")).WHERE("audit_id = ?")).WHERE("log_ts >= ?")).WHERE("log_ts < ?")).toString() + ") as sub")).GROUP_BY(new String[]{TERM_FILED, INLONG_GROUP_ID, INLONG_STREAM_ID})).ORDER_BY(TERM_FILED)).toString());
        prepareStatement.setString(ADJUST_PURE_NEGATIVE, str2);
        prepareStatement.setString(2, str);
        prepareStatement.setString(3, dateTime);
        prepareStatement.setString(4, dateTime2);
        return prepareStatement;
    }

    private PreparedStatement getAuditCkStatementByIpGroupByIp(Connection connection, String str, String str2, String str3, String str4) throws SQLException {
        PreparedStatement prepareStatement = connection.prepareStatement(((SQL) ((SQL) ((SQL) new SQL().SELECT(new String[]{INLONG_GROUP_ID, INLONG_STREAM_ID, "ip", "sum(count) as total", "sum(delay) as total_delay", "sum(size) as total_size"})).FROM("(" + ((SQL) ((SQL) ((SQL) ((SQL) ((SQL) ((SQL) new SQL().SELECT_DISTINCT(new String[]{"ip", "docker_id", "thread_id", "sdk_ts", "packet_id", TERM_FILED, INLONG_GROUP_ID, INLONG_STREAM_ID, "audit_id", "count", "size", "delay"})).FROM("audit_data")).WHERE("ip = ?")).WHERE("audit_id = ?")).WHERE("log_ts >= ?")).WHERE("log_ts < ?")).toString() + ") as sub")).GROUP_BY(new String[]{INLONG_GROUP_ID, INLONG_STREAM_ID, "ip"})).toString());
        prepareStatement.setString(ADJUST_PURE_NEGATIVE, str2);
        prepareStatement.setString(2, str);
        prepareStatement.setString(3, str3);
        prepareStatement.setString(4, str4);
        return prepareStatement;
    }
}
