package top.yqingyu.qymsg;

import com.alibaba.fastjson2.JSON;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import top.yqingyu.common.utils.ArrayUtil;
import top.yqingyu.common.utils.IoUtil;
import top.yqingyu.common.utils.LocalDateTimeUtil;

/* loaded from: input_file:top/yqingyu/qymsg/MsgConnector.class */
public class MsgConnector implements Runnable {
    private final ConcurrentHashMap<String, ArrayList<QyMsg>> MSG_CONTAINER = new ConcurrentHashMap<>();
    public static final Logger log = LoggerFactory.getLogger(MsgConnector.class);
    private long clearTime;

    public MsgConnector(long j) {
        this.clearTime = j;
    }

    public QyMsg merger(QyMsg qyMsg) throws IOException, ClassNotFoundException {
        String partition_id = qyMsg.getPartition_id();
        Integer denominator = qyMsg.getDenominator();
        ArrayList<QyMsg> arrayList = this.MSG_CONTAINER.get(partition_id);
        if (arrayList == null || arrayList.size() + 1 != denominator.intValue()) {
            if (arrayList != null && this.MSG_CONTAINER.get(partition_id).size() + 1 != denominator.intValue()) {
                qyMsg.putMsgData("now", LocalDateTime.now());
                this.MSG_CONTAINER.get(partition_id).add(qyMsg);
                return null;
            }
            ArrayList<QyMsg> arrayList2 = new ArrayList<>();
            arrayList2.add(qyMsg);
            this.MSG_CONTAINER.put(partition_id, arrayList2);
            return null;
        }
        arrayList.add(qyMsg);
        AtomicReference atomicReference = new AtomicReference();
        atomicReference.set(new byte[0]);
        arrayList.stream().sorted(Comparator.comparingInt((v0) -> {
            return v0.getNumerator();
        })).forEach(qyMsg2 -> {
            atomicReference.set(ArrayUtil.addAll((byte[]) atomicReference.get(), (byte[]) MsgHelper.gainObjMsg(qyMsg2)));
        });
        MsgType msgType = qyMsg.getMsgType();
        DataType dataType = qyMsg.getDataType();
        QyMsg qyMsg3 = new QyMsg(msgType, dataType);
        qyMsg3.setSegmentation(false);
        qyMsg3.setFrom(qyMsg.getFrom());
        if (DataType.JSON.equals(dataType)) {
            qyMsg3 = (QyMsg) JSON.parseObject((byte[]) atomicReference.get(), QyMsg.class);
        } else if (DataType.OBJECT.equals(dataType)) {
            qyMsg3 = (QyMsg) IoUtil.deserializationObj((byte[]) atomicReference.get(), QyMsg.class);
        } else if (DataType.STRING.equals(dataType)) {
            qyMsg3.putMsg(new String((byte[]) atomicReference.get(), StandardCharsets.UTF_8));
        } else if (DataType.STREAM.equals(dataType)) {
            qyMsg3.putMsg(atomicReference.get());
        } else {
            qyMsg3.putMsg(atomicReference.get());
        }
        this.MSG_CONTAINER.remove(partition_id);
        log.debug("消息 {} piece {} 拼接完成", partition_id, denominator);
        return qyMsg3;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            try {
                Thread.sleep(this.clearTime);
                LocalDateTime now = LocalDateTime.now();
                this.MSG_CONTAINER.forEach((str, arrayList) -> {
                    Optional max = arrayList.stream().max((qyMsg, qyMsg2) -> {
                        return (int) LocalDateTimeUtil.between((LocalDateTime) MsgHelper.gainMsgOBJ(qyMsg, "now"), (LocalDateTime) MsgHelper.gainMsgOBJ(qyMsg2, "now"), ChronoUnit.SECONDS);
                    });
                    if (max.isPresent()) {
                        QyMsg qyMsg3 = (QyMsg) max.get();
                        if (LocalDateTimeUtil.between(now, (LocalDateTime) MsgHelper.gainMsgOBJ(qyMsg3, "now"), ChronoUnit.MINUTES) > this.clearTime) {
                            this.MSG_CONTAINER.remove(str);
                            log.debug("消息过期，已清除 {} ", qyMsg3);
                        }
                    }
                });
            } catch (Exception e) {
                log.error("容器清除器异常", e);
            }
        }
    }
}
