package org.apache.inlong.manager.service.message;

import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.common.pojo.sort.dataflow.deserialization.DeserializationConfig;
import org.apache.inlong.common.pojo.sort.dataflow.deserialization.InlongMsgDeserializationConfig;
import org.apache.inlong.common.util.StringUtil;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.class */
public class InlongMsgDeserializeOperator implements DeserializeOperator {
    private static final Logger log = LoggerFactory.getLogger(InlongMsgDeserializeOperator.class);

    @Autowired
    public DataTypeOperatorFactory dataTypeOperatorFactory;

    @Override // org.apache.inlong.manager.service.message.DeserializeOperator
    public boolean accept(MessageWrapType messageWrapType) {
        return MessageWrapType.INLONG_MSG_V0.equals(messageWrapType);
    }

    @Override // org.apache.inlong.manager.service.message.DeserializeOperator
    public List<BriefMQMessage> decodeMsg(InlongStreamInfo inlongStreamInfo, List<BriefMQMessage> list, byte[] bArr, Map<String, String> map, int i, QueryMessageRequest queryMessageRequest) {
        long parseLong;
        String str = map.get("groupId");
        String str2 = map.get("streamId");
        InLongMsg parseFrom = InLongMsg.parseFrom(bArr);
        for (String str3 : parseFrom.getAttrs()) {
            Map splitKv = StringUtil.splitKv(str3, '&', '=', (Character) null, (Character) null);
            if (splitKv.containsKey(DeserializeOperator.INLONGMSG_ATTR_TIME_T)) {
                parseLong = StringUtil.parseDateTime(((String) splitKv.get(DeserializeOperator.INLONGMSG_ATTR_TIME_T)).trim());
            } else {
                if (!splitKv.containsKey(DeserializeOperator.INLONGMSG_ATTR_TIME_DT)) {
                    throw new IllegalArgumentException(String.format("PARSE_ATTR_ERROR_STRING%s", "t or dt"));
                }
                parseLong = Long.parseLong(((String) splitKv.get(DeserializeOperator.INLONGMSG_ATTR_TIME_DT)).trim());
            }
            Iterator iterator = parseFrom.getIterator(str3);
            while (iterator.hasNext()) {
                byte[] bArr2 = (byte[]) iterator.next();
                if (!Objects.isNull(bArr2)) {
                    try {
                        String str4 = new String(bArr2, Charset.forName(inlongStreamInfo.getDataEncoding()));
                        List<BriefMQMessage.FieldInfo> parseFields = this.dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(inlongStreamInfo.getDataType())).parseFields(str4, inlongStreamInfo);
                        if (!checkIfFilter(queryMessageRequest, parseFields).booleanValue()) {
                            list.add(BriefMQMessage.builder().id(Integer.valueOf(i)).inlongGroupId(str).inlongStreamId(str2).dt(Long.valueOf(parseLong)).clientIp((String) splitKv.get(DeserializeOperator.CLIENT_IP)).headers(map).attribute(str3).body(str4).fieldList(parseFields).build());
                        }
                    } catch (Exception e) {
                        String format = String.format("decode msg failed for groupId=%s, streamId=%s", str, str2);
                        log.error(format, e);
                        throw new BusinessException(format);
                    }
                }
            }
        }
        return list;
    }

    @Override // org.apache.inlong.manager.service.message.DeserializeOperator
    public DeserializationConfig getDeserializationConfig(InlongStreamInfo inlongStreamInfo) {
        InlongMsgDeserializationConfig inlongMsgDeserializationConfig = new InlongMsgDeserializationConfig();
        inlongMsgDeserializationConfig.setStreamId(inlongStreamInfo.getInlongStreamId());
        return inlongMsgDeserializationConfig;
    }
}
