package org.apache.inlong.manager.service.resource.sort;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.dataflow.SourceConfig;
import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
import org.apache.inlong.common.pojo.sort.dataflow.deserialization.DeserializationConfig;
import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.SortConfigEntity;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.SortConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory;
import org.apache.inlong.manager.service.message.DeserializeOperatorFactory;
import org.apache.inlong.manager.service.sink.SinkOperatorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.class */
public class DefaultSortConfigOperator implements SortConfigOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSortConfigOperator.class);

    @Autowired
    public DeserializeOperatorFactory deserializeOperatorFactory;

    @Autowired
    public DataTypeOperatorFactory dataTypeOperatorFactory;

    @Autowired
    private StreamSinkFieldEntityMapper sinkFieldMapper;

    @Autowired
    private InlongClusterEntityMapper clusterMapper;

    @Autowired
    private SortConfigEntityMapper sortConfigEntityMapper;

    @Autowired
    private InlongGroupEntityMapper groupEntityMapper;

    @Autowired
    private SinkOperatorFactory operatorFactory;

    @Override // org.apache.inlong.manager.service.resource.sort.SortConfigOperator
    public Boolean accept(List<String> list) {
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            if (SinkType.SORT_STANDALONE_SINK.contains(it.next())) {
                return true;
            }
        }
        return false;
    }

    @Override // org.apache.inlong.manager.service.resource.sort.SortConfigOperator
    public void buildConfig(InlongGroupInfo inlongGroupInfo, InlongStreamInfo inlongStreamInfo, boolean z) throws Exception {
        if (inlongGroupInfo == null || inlongStreamInfo == null) {
            LOGGER.warn("group info is null or stream infos is empty, no need to build sort config");
            return;
        }
        if (z) {
            LOGGER.info("no need to build all sort config since the workflow is not stream level, groupId={}", inlongGroupInfo.getInlongGroupId());
            return;
        }
        ArrayList arrayList = new ArrayList();
        for (StreamSink streamSink : inlongStreamInfo.getSinkList()) {
            if (SinkType.SORT_STANDALONE_SINK.contains(streamSink.getSinkType())) {
                arrayList.add(streamSink);
            }
        }
        if (CollectionUtils.isEmpty(arrayList)) {
            return;
        }
        Preconditions.expectTrue("PULSAR".equals(this.groupEntityMapper.selectByGroupId(inlongGroupInfo.getInlongGroupId()).getMqType()), "standalone only support pulsar");
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            saveDataFlow(inlongGroupInfo, inlongStreamInfo, (StreamSink) it.next());
        }
    }

    private void saveDataFlow(InlongGroupInfo inlongGroupInfo, InlongStreamInfo inlongStreamInfo, StreamSink streamSink) {
        try {
            DataFlowConfig dataFlowConfig = getDataFlowConfig(inlongGroupInfo, inlongStreamInfo, streamSink);
            SortConfigEntity selectBySinkId = this.sortConfigEntityMapper.selectBySinkId(streamSink.getId());
            String inlongClusterTag = inlongGroupInfo.getInlongClusterTag();
            ObjectMapper objectMapper = new ObjectMapper();
            if (selectBySinkId == null) {
                dataFlowConfig.setVersion(0);
                SortConfigEntity sortConfigEntity = (SortConfigEntity) CommonBeanUtils.copyProperties(streamSink, SortConfigEntity::new);
                sortConfigEntity.setId((Integer) null);
                if (StringUtils.isBlank(sortConfigEntity.getSortTaskName())) {
                    sortConfigEntity.setSortTaskName("DEFAULT_TASK");
                }
                sortConfigEntity.setSinkId(streamSink.getId());
                sortConfigEntity.setConfigParams(objectMapper.writeValueAsString(dataFlowConfig));
                sortConfigEntity.setInlongClusterTag(inlongClusterTag);
                this.sortConfigEntityMapper.insert(sortConfigEntity);
            } else {
                dataFlowConfig.setVersion(Integer.valueOf(selectBySinkId.getVersion().intValue() + 1));
                selectBySinkId.setConfigParams(objectMapper.writeValueAsString(dataFlowConfig));
                selectBySinkId.setInlongClusterTag(inlongClusterTag);
                if (StringUtils.isBlank(selectBySinkId.getSortTaskName())) {
                    selectBySinkId.setSortTaskName("DEFAULT_TASK");
                }
                this.sortConfigEntityMapper.updateByIdSelective(selectBySinkId);
            }
        } catch (Exception e) {
            LOGGER.error("failed to parse id params of groupId={}, streamId={} name={}, type={}", new Object[]{streamSink.getInlongGroupId(), streamSink.getInlongStreamId(), streamSink.getSinkName(), streamSink.getSinkType(), e});
        }
    }

    private DataFlowConfig getDataFlowConfig(InlongGroupInfo inlongGroupInfo, InlongStreamInfo inlongStreamInfo, StreamSink streamSink) {
        return DataFlowConfig.builder().dataflowId(String.valueOf(streamSink.getId())).sourceConfig(getSourceConfig(inlongGroupInfo, inlongStreamInfo, streamSink)).auditTag(String.valueOf(streamSink.getId())).sinkConfig(getSinkConfig(inlongGroupInfo, inlongStreamInfo, streamSink)).inlongGroupId(inlongGroupInfo.getInlongGroupId()).inlongStreamId(inlongStreamInfo.getInlongStreamId()).properties(new HashMap()).build();
    }

    private SinkConfig getSinkConfig(InlongGroupInfo inlongGroupInfo, InlongStreamInfo inlongStreamInfo, StreamSink streamSink) {
        return this.operatorFactory.getInstance(streamSink.getSinkType()).getSinkConfig(inlongGroupInfo, inlongStreamInfo, streamSink);
    }

    private SourceConfig getSourceConfig(InlongGroupInfo inlongGroupInfo, InlongStreamInfo inlongStreamInfo, StreamSink streamSink) {
        List selectByKey = this.clusterMapper.selectByKey(inlongGroupInfo.getInlongClusterTag(), (String) null, "PULSAR");
        if (CollectionUtils.isEmpty(selectByKey)) {
            throw new WorkflowListenerException("pulsar cluster not found for groupId=" + inlongGroupInfo.getInlongGroupId());
        }
        PulsarClusterDTO fromJson = PulsarClusterDTO.getFromJson(((InlongClusterEntity) selectByKey.get(0)).getExtParams());
        if (!(inlongGroupInfo instanceof InlongPulsarInfo)) {
            throw new BusinessException("the mqType must be PULSAR for inlongGroupId=" + inlongGroupInfo.getInlongGroupId());
        }
        String pulsarTenant = ((InlongPulsarInfo) inlongGroupInfo).getPulsarTenant();
        if (StringUtils.isBlank(pulsarTenant) && StringUtils.isNotBlank(fromJson.getPulsarTenant())) {
            pulsarTenant = fromJson.getPulsarTenant();
        }
        if (StringUtils.isBlank(pulsarTenant)) {
            pulsarTenant = "public";
        }
        String mqResource = inlongGroupInfo.getMqResource();
        String mqResource2 = inlongStreamInfo.getMqResource();
        String str = "persistent://" + pulsarTenant + "/" + mqResource + "/" + mqResource2;
        String format = String.format("%s_%s_%s_consumer_group", inlongGroupInfo.getInlongClusterTag(), mqResource2, streamSink.getId());
        DeserializationConfig deserializationConfig = this.deserializeOperatorFactory.getInstance(MessageWrapType.forType(inlongStreamInfo.getWrapType())).getDeserializationConfig(inlongStreamInfo);
        DataTypeConfig dataTypeConfig = this.dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(inlongStreamInfo.getDataType())).getDataTypeConfig(inlongStreamInfo);
        SourceConfig sourceConfig = new SourceConfig();
        sourceConfig.setFieldConfigs((List) this.sinkFieldMapper.selectBySinkId(streamSink.getId()).stream().map(streamSinkFieldEntity -> {
            FieldConfig fieldConfig = new FieldConfig();
            FormatInfo convertFieldFormat = FieldInfoUtils.convertFieldFormat(streamSinkFieldEntity.getSourceFieldType().toLowerCase());
            fieldConfig.setName(streamSinkFieldEntity.getSourceFieldName());
            fieldConfig.setFormatInfo(convertFieldFormat);
            return fieldConfig;
        }).collect(Collectors.toList()));
        sourceConfig.setDeserializationConfig(deserializationConfig);
        sourceConfig.setDataTypeConfig(dataTypeConfig);
        sourceConfig.setEncodingType(inlongStreamInfo.getDataEncoding());
        sourceConfig.setTopic(str);
        sourceConfig.setSubscription(format);
        return sourceConfig;
    }
}
