package org.apache.inlong.manager.service.message;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.pojo.sort.dataflow.deserialization.DeserializationConfig;
import org.apache.inlong.common.pojo.sort.dataflow.deserialization.InlongMsgPbDeserialiationConfig;
import org.apache.inlong.common.util.Utils;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory;
import org.apache.inlong.sdk.commons.protocol.ProxySdk;
import org.apache.inlong.sort.configuration.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.class */
public class PbMsgDeserializeOperator implements DeserializeOperator {
    private static final Logger log = LoggerFactory.getLogger(PbMsgDeserializeOperator.class);

    @Autowired
    public DataTypeOperatorFactory dataTypeOperatorFactory;

    @Override // org.apache.inlong.manager.service.message.DeserializeOperator
    public boolean accept(MessageWrapType messageWrapType) {
        return MessageWrapType.INLONG_MSG_V1.equals(messageWrapType);
    }

    @Override // org.apache.inlong.manager.service.message.DeserializeOperator
    public List<BriefMQMessage> decodeMsg(InlongStreamInfo inlongStreamInfo, List<BriefMQMessage> list, byte[] bArr, Map<String, String> map, int i, QueryMessageRequest queryMessageRequest) throws Exception {
        int parseInt = Integer.parseInt(map.getOrDefault(DeserializeOperator.COMPRESS_TYPE_KEY, "0"));
        byte[] bArr2 = bArr;
        switch (parseInt) {
            case 0:
                break;
            case 1:
                bArr2 = Utils.gzipDecompress(bArr, 0, bArr.length);
                break;
            case 2:
                bArr2 = Utils.snappyDecompress(bArr, 0, bArr.length);
                break;
            default:
                throw new IllegalArgumentException("Unknown compress type:" + parseInt);
        }
        list.addAll(transformMessageObjs(ProxySdk.MessageObjs.parseFrom(bArr2), inlongStreamInfo, i, queryMessageRequest));
        return list;
    }

    private List<BriefMQMessage> transformMessageObjs(ProxySdk.MessageObjs messageObjs, InlongStreamInfo inlongStreamInfo, int i, QueryMessageRequest queryMessageRequest) {
        if (null == messageObjs) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        for (ProxySdk.MessageObj messageObj : messageObjs.getMsgsList()) {
            List<ProxySdk.MapFieldEntry> paramsList = messageObj.getParamsList();
            HashMap hashMap = new HashMap();
            for (ProxySdk.MapFieldEntry mapFieldEntry : paramsList) {
                hashMap.put(mapFieldEntry.getKey(), mapFieldEntry.getValue());
            }
            try {
                String str = new String(messageObj.getBody().toByteArray(), Charset.forName(inlongStreamInfo.getDataEncoding()));
                List<BriefMQMessage.FieldInfo> parseFields = this.dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(inlongStreamInfo.getDataType())).parseFields(str, inlongStreamInfo);
                if (!checkIfFilter(queryMessageRequest, parseFields).booleanValue()) {
                    arrayList.add(BriefMQMessage.builder().id(Integer.valueOf(i)).inlongGroupId((String) hashMap.get("groupId")).inlongStreamId((String) hashMap.get("streamId")).dt(Long.valueOf(messageObj.getMsgTime())).clientIp((String) hashMap.get(DeserializeOperator.CLIENT_IP)).body(str).headers(hashMap).fieldList(parseFields).build());
                }
            } catch (Exception e) {
                String format = String.format("decode msg failed for groupId=%s, streamId=%s", inlongStreamInfo.getInlongGroupId(), inlongStreamInfo.getInlongStreamId());
                log.error(format, e);
                throw new BusinessException(format);
            }
        }
        return arrayList;
    }

    @Override // org.apache.inlong.manager.service.message.DeserializeOperator
    public DeserializationConfig getDeserializationConfig(InlongStreamInfo inlongStreamInfo) {
        InlongMsgPbDeserialiationConfig inlongMsgPbDeserialiationConfig = new InlongMsgPbDeserialiationConfig();
        inlongMsgPbDeserialiationConfig.setCompressionType(Constants.CompressionType.GZIP.name());
        return inlongMsgPbDeserialiationConfig;
    }
}
