package fun.tan90.easy.log.compute.listener;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.json.JSONUtil;
import fun.tan90.easy.log.common.model.LogTransferred;
import fun.tan90.easy.log.common.threadpool.EasyLogThreadPool;
import fun.tan90.easy.log.compute.LogAlarmRulesManager;
import fun.tan90.easy.log.compute.LogRealTimeFilterRulesManager;
import fun.tan90.easy.log.core.model.LogAlarmContent;
import fun.tan90.easy.log.core.model.LogAlarmRule;
import fun.tan90.easy.log.core.model.SlidingWindow;
import fun.tan90.easy.log.core.service.CacheService;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.client.IMqttClientMessageListener;
import net.dreamlu.iot.mqtt.spring.client.MqttClientSubscribe;
import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;
import org.jetlinks.reactor.ql.ReactorQL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.tio.core.ChannelContext;
import reactor.core.publisher.Flux;

@MqttClientSubscribe(value = {"$share/compute/el/log/#"}, qos = MqttQoS.AT_LEAST_ONCE)
@Component
/* loaded from: input_file:fun/tan90/easy/log/compute/listener/ComputeLogMessageListener.class */
public class ComputeLogMessageListener implements IMqttClientMessageListener {
    private static final Logger log = LoggerFactory.getLogger(ComputeLogMessageListener.class);

    @Resource
    MqttClientTemplate mqttClientTemplate;

    @Resource
    CacheService cacheService;

    public void onMessage(ChannelContext channelContext, String str, MqttPublishMessage mqttPublishMessage, byte[] bArr) {
        String str2 = new String(bArr, StandardCharsets.UTF_8);
        log.debug("订阅到日志信息 topic:{} payload:{}", str, str2);
        for (LogTransferred logTransferred : JSONUtil.toList(str2, LogTransferred.class)) {
            CompletableFuture.allOf(logInputSpeed(logTransferred, "recordId"), logAlarm(logTransferred, "recordId"), logRealTimeFilter(logTransferred)).join();
        }
    }

    private CompletableFuture<Void> logInputSpeed(LogTransferred logTransferred, String str) {
        String level = logTransferred.getLevel();
        long timestamp = logTransferred.getTimestamp();
        return CompletableFuture.runAsync(() -> {
            this.cacheService.slidingWindow("S_W:LOG_INPUT_SPEED:" + level, str, timestamp, 5);
        }, EasyLogThreadPool.newEasyLogFixedPoolInstance());
    }

    private CompletableFuture<Void> logAlarm(LogTransferred logTransferred, String str) {
        return CompletableFuture.runAsync(() -> {
            if ("error".equalsIgnoreCase(logTransferred.getLevel())) {
                String appName = logTransferred.getAppName();
                String namespace = logTransferred.getNamespace();
                String loggerName = logTransferred.getLoggerName();
                long timestamp = logTransferred.getTimestamp();
                Map<String, LogAlarmRule> logAlarmRule = LogAlarmRulesManager.getLogAlarmRule(appName, namespace, "all", loggerName);
                if (CollectionUtils.isEmpty(logAlarmRule)) {
                    return;
                }
                for (LogAlarmRule logAlarmRule2 : logAlarmRule.values()) {
                    Integer period = logAlarmRule2.getPeriod();
                    Integer threshold = logAlarmRule2.getThreshold();
                    String loggerName2 = logAlarmRule2.getLoggerName();
                    SlidingWindow slidingWindow = this.cacheService.slidingWindow("S_W:LOG_ALARM:" + appName + ":" + namespace + ":" + loggerName2, str, timestamp, period.intValue());
                    Integer windowCount = slidingWindow.getWindowCount();
                    log.info("阈值大小:{},滑动窗口内计数大小:{}", threshold, windowCount);
                    if (windowCount.intValue() == threshold.intValue() + 1) {
                        this.mqttClientTemplate.publish("el/log_alarm", JSONUtil.toJsonStr(LogAlarmContent.builder().alarmPlatformType(logAlarmRule2.getAlarmPlatformType()).alarmPlatformId(logAlarmRule2.getAlarmPlatformId()).windowStart(slidingWindow.getWindowStart()).windowEnd(slidingWindow.getWindowEnd()).ruleId(logAlarmRule2.getRuleId()).appName(logAlarmRule2.getAppName()).namespace(logAlarmRule2.getNamespace()).loggerName(loggerName2).receiverList(logAlarmRule2.getReceiverList()).threshold(logAlarmRule2.getThreshold()).period(logAlarmRule2.getPeriod()).build()).getBytes(StandardCharsets.UTF_8), MqttQoS.EXACTLY_ONCE);
                    }
                }
            }
        }, EasyLogThreadPool.newEasyLogFixedPoolInstance());
    }

    private CompletableFuture<Void> logRealTimeFilter(LogTransferred logTransferred) {
        return CompletableFuture.runAsync(() -> {
            List list = (List) LogRealTimeFilterRulesManager.stream().filter(str -> {
                ReactorQL logRealTimeFilterRule = LogRealTimeFilterRulesManager.getLogRealTimeFilterRule(str);
                Map beanToMap = BeanUtil.beanToMap(logTransferred, new String[0]);
                AtomicReference atomicReference = new AtomicReference(false);
                logRealTimeFilterRule.start(Flux.just(beanToMap)).subscribe(map -> {
                    atomicReference.set(true);
                }, th -> {
                    log.error("{}: {}", logRealTimeFilterRule.metadata().getSql().toString(), th.getMessage());
                });
                return ((Boolean) atomicReference.get()).booleanValue();
            }).collect(Collectors.toList());
            if (CollectionUtils.isEmpty(list)) {
                return;
            }
            Iterator it = list.iterator();
            while (it.hasNext()) {
                this.mqttClientTemplate.publish("el/after-filtered/" + ((String) it.next()), JSONUtil.toJsonStr(logTransferred).getBytes(StandardCharsets.UTF_8), MqttQoS.AT_LEAST_ONCE);
            }
        }, EasyLogThreadPool.newEasyLogFixedPoolInstance());
    }
}
