package org.apache.inlong.manager.service.sink.es;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.sink.EsSinkConfig;
import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchFieldInfo;
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink;
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkDTO;
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkRequest;
import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.class */
public class ElasticsearchSinkOperator extends AbstractSinkOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchSinkOperator.class);
    private static final String KEY_FIELDS = "fieldNames";

    @Autowired
    private ObjectMapper objectMapper;

    @Override // org.apache.inlong.manager.service.sink.StreamSinkOperator
    public Boolean accept(String str) {
        return Boolean.valueOf("ES".equals(str));
    }

    @Override // org.apache.inlong.manager.service.sink.AbstractSinkOperator
    protected String getSinkType() {
        return "ES";
    }

    @Override // org.apache.inlong.manager.service.sink.AbstractSinkOperator
    protected void setTargetEntity(SinkRequest sinkRequest, StreamSinkEntity streamSinkEntity) {
        if (!getSinkType().equals(sinkRequest.getSinkType())) {
            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT, ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType());
        }
        try {
            ElasticsearchSinkDTO fromRequest = ElasticsearchSinkDTO.getFromRequest((ElasticsearchSinkRequest) sinkRequest, streamSinkEntity.getExtParams());
            InlongStreamEntity selectByIdentifier = this.inlongStreamEntityMapper.selectByIdentifier(sinkRequest.getInlongGroupId(), sinkRequest.getInlongStreamId());
            fromRequest.setSeparator(String.valueOf((char) Integer.parseInt(selectByIdentifier.getDataSeparator())));
            fromRequest.setFieldOffset(((InlongStreamExtParam) JsonUtils.parseObject(selectByIdentifier.getExtParams(), InlongStreamExtParam.class)).getExtendedFieldSize());
            streamSinkEntity.setExtParams(this.objectMapper.writeValueAsString(fromRequest));
        } catch (Exception e) {
            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED, String.format("serialize extParams of Elasticsearch SinkDTO failure: %s", e.getMessage()));
        }
    }

    @Override // org.apache.inlong.manager.service.sink.StreamSinkOperator
    public StreamSink getFromEntity(StreamSinkEntity streamSinkEntity) {
        ElasticsearchSink elasticsearchSink = new ElasticsearchSink();
        if (streamSinkEntity == null) {
            return elasticsearchSink;
        }
        ElasticsearchSinkDTO fromJson = ElasticsearchSinkDTO.getFromJson(streamSinkEntity.getExtParams());
        CommonBeanUtils.copyProperties(streamSinkEntity, elasticsearchSink, true);
        CommonBeanUtils.copyProperties(fromJson, elasticsearchSink, true);
        elasticsearchSink.setSinkFieldList(getSinkFields(streamSinkEntity.getId()));
        return elasticsearchSink;
    }

    @Override // org.apache.inlong.manager.service.sink.AbstractSinkOperator, org.apache.inlong.manager.service.sink.StreamSinkOperator
    public Map<String, String> parse2IdParams(StreamSinkEntity streamSinkEntity, List<String> list, DataNodeInfo dataNodeInfo) {
        Map<String, String> parse2IdParams = super.parse2IdParams(streamSinkEntity, list, dataNodeInfo);
        StringBuilder sb = new StringBuilder();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            sb.append(it.next()).append(" ");
        }
        parse2IdParams.computeIfAbsent(KEY_FIELDS, str -> {
            return sb.toString();
        });
        return parse2IdParams;
    }

    @Override // org.apache.inlong.manager.service.sink.AbstractSinkOperator, org.apache.inlong.manager.service.sink.StreamSinkOperator
    public void saveFieldOpt(SinkRequest sinkRequest) {
        List<SinkField> sinkFieldList = sinkRequest.getSinkFieldList();
        LOGGER.debug("begin to save es sink fields={}", sinkFieldList);
        if (CollectionUtils.isEmpty(sinkFieldList)) {
            return;
        }
        ArrayList arrayList = new ArrayList(sinkFieldList.size());
        String inlongGroupId = sinkRequest.getInlongGroupId();
        String inlongStreamId = sinkRequest.getInlongStreamId();
        String sinkType = sinkRequest.getSinkType();
        Integer id = sinkRequest.getId();
        for (SinkField sinkField : sinkFieldList) {
            checkFieldInfo(sinkField);
            sinkField.setExtParams((String) null);
            StreamSinkFieldEntity streamSinkFieldEntity = (StreamSinkFieldEntity) CommonBeanUtils.copyProperties(sinkField, StreamSinkFieldEntity::new);
            if (StringUtils.isEmpty(streamSinkFieldEntity.getFieldComment())) {
                streamSinkFieldEntity.setFieldComment(streamSinkFieldEntity.getFieldName());
            }
            try {
                streamSinkFieldEntity.setExtParams(this.objectMapper.writeValueAsString(ElasticsearchFieldInfo.getFromRequest(sinkField)));
                streamSinkFieldEntity.setInlongGroupId(inlongGroupId);
                streamSinkFieldEntity.setInlongStreamId(inlongStreamId);
                streamSinkFieldEntity.setSinkType(sinkType);
                streamSinkFieldEntity.setSinkId(id);
                streamSinkFieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
                arrayList.add(streamSinkFieldEntity);
            } catch (Exception e) {
                throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED, String.format("serialize extParams of Elasticsearch FieldInfo failure: %s", e.getMessage()));
            }
        }
        this.sinkFieldMapper.insertAll(arrayList);
        LOGGER.debug("success to save es sink fields");
    }

    @Override // org.apache.inlong.manager.service.sink.AbstractSinkOperator, org.apache.inlong.manager.service.sink.StreamSinkOperator
    public List<SinkField> getSinkFields(Integer num) {
        List selectBySinkId = this.sinkFieldMapper.selectBySinkId(num);
        ArrayList arrayList = new ArrayList();
        if (CollectionUtils.isEmpty(selectBySinkId)) {
            return arrayList;
        }
        selectBySinkId.forEach(streamSinkFieldEntity -> {
            SinkField sinkField = new SinkField();
            if (!StringUtils.isNotBlank(streamSinkFieldEntity.getExtParams())) {
                CommonBeanUtils.copyProperties(streamSinkFieldEntity, sinkField, true);
                arrayList.add(sinkField);
            } else {
                ElasticsearchFieldInfo fromJson = ElasticsearchFieldInfo.getFromJson(streamSinkFieldEntity.getExtParams());
                CommonBeanUtils.copyProperties(streamSinkFieldEntity, fromJson, true);
                arrayList.add(fromJson);
            }
        });
        return arrayList;
    }

    @Override // org.apache.inlong.manager.service.sink.AbstractSinkOperator, org.apache.inlong.manager.service.sink.StreamSinkOperator
    public SinkConfig getSinkConfig(InlongGroupInfo inlongGroupInfo, InlongStreamInfo inlongStreamInfo, StreamSink streamSink) {
        ElasticsearchSinkDTO fromJson = ElasticsearchSinkDTO.getFromJson(this.sinkMapper.selectByPrimaryKey(streamSink.getId()).getExtParams());
        EsSinkConfig esSinkConfig = (EsSinkConfig) CommonBeanUtils.copyProperties((ElasticsearchSink) streamSink, EsSinkConfig::new);
        CommonBeanUtils.copyProperties(fromJson, esSinkConfig);
        esSinkConfig.setSeparator(String.valueOf((char) Integer.parseInt(inlongStreamInfo.getDataSeparator())));
        esSinkConfig.setFieldOffset(inlongStreamInfo.getExtendedFieldSize());
        esSinkConfig.setContentOffset(0);
        esSinkConfig.setFieldConfigs((List) this.sinkFieldMapper.selectBySinkId(streamSink.getId()).stream().map(streamSinkFieldEntity -> {
            FieldConfig fieldConfig = new FieldConfig();
            FormatInfo convertFieldFormat = FieldInfoUtils.convertFieldFormat(streamSinkFieldEntity.getFieldType().toLowerCase());
            fieldConfig.setName(streamSinkFieldEntity.getFieldName());
            fieldConfig.setFormatInfo(convertFieldFormat);
            return fieldConfig;
        }).collect(Collectors.toList()));
        return esSinkConfig;
    }
}
