package org.jetlinks.registry.redis.lettuce;

import io.lettuce.core.ScriptOutputType;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.commons.lang.StringUtils;
import org.jetlinks.core.device.registry.DeviceMessageHandler;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.lettuce.LettucePlus;
import org.jetlinks.lettuce.codec.StringCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/jetlinks/registry/redis/lettuce/LettuceDeviceMessageHandler.class */
public class LettuceDeviceMessageHandler implements DeviceMessageHandler {
    private LettucePlus plus;
    private Map<String, MessageFuture> futureMap = new ConcurrentHashMap();
    private Map<String, Consumer<DeviceMessage>> localConsumer = new ConcurrentHashMap();
    private static final Logger log = LoggerFactory.getLogger(LettuceDeviceMessageHandler.class);
    private static int replyExpireTimeSeconds = Integer.getInteger("device.message.reply.expire-time-seconds", (int) TimeUnit.MINUTES.toSeconds(3)).intValue();
    private static int asyncFlagExpireTimeSeconds = Integer.getInteger("device.message.async-flag.expire-time-seconds", (int) TimeUnit.MINUTES.toSeconds(30)).intValue();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jetlinks/registry/redis/lettuce/LettuceDeviceMessageHandler$MessageFuture.class */
    public class MessageFuture {
        private String messageId;
        private CompletableFuture<Object> future;
        private long expireTime;

        public MessageFuture(String str, CompletableFuture<Object> completableFuture, long j) {
            this.messageId = str;
            this.future = completableFuture;
            this.expireTime = j;
        }

        public String getMessageId() {
            return this.messageId;
        }

        public CompletableFuture<Object> getFuture() {
            return this.future;
        }

        public long getExpireTime() {
            return this.expireTime;
        }
    }

    public LettuceDeviceMessageHandler(LettucePlus lettucePlus) {
        this.plus = lettucePlus;
        this.plus.getTopic(StringCodec.getInstance(), "device:message:reply").addListener((str, str2) -> {
            Optional.ofNullable(this.futureMap.remove(str2)).map((v0) -> {
                return v0.getFuture();
            }).ifPresent(completableFuture -> {
                tryComplete(str2, completableFuture);
            });
        });
        lettucePlus.getExecutor().scheduleAtFixedRate(() -> {
            this.futureMap.entrySet().stream().filter(entry -> {
                return System.currentTimeMillis() > ((MessageFuture) entry.getValue()).expireTime;
            }).forEach(entry2 -> {
                try {
                    tryComplete((String) entry2.getKey(), ((MessageFuture) entry2.getValue()).future);
                    log.info("设备消息[{}]超时未返回", entry2.getKey());
                    this.futureMap.remove(entry2.getKey());
                } catch (Throwable th) {
                    log.info("设备消息[{}]超时未返回", entry2.getKey());
                    this.futureMap.remove(entry2.getKey());
                    throw th;
                }
            });
        }, 1L, 5L, TimeUnit.SECONDS);
    }

    private void tryComplete(String str, CompletableFuture<Object> completableFuture) {
        if (completableFuture.isCancelled()) {
            return;
        }
        this.plus.getConnection().thenApply((v0) -> {
            return v0.async();
        }).thenCompose(redisAsyncCommands -> {
            return redisAsyncCommands.get("device:message:reply:".concat(str));
        }).whenComplete((obj, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
            } else {
                completableFuture.complete(obj);
            }
        });
    }

    public void handleDeviceCheck(String str, Consumer<String> consumer) {
        this.plus.getTopic("device:state:check:".concat(str)).addListener((str2, str3) -> {
            if (StringUtils.isEmpty(str3)) {
                return;
            }
            consumer.accept(str3);
        });
    }

    public void handleMessage(String str, Consumer<DeviceMessage> consumer) {
        this.localConsumer.put(str, consumer);
        this.plus.getTopic("device:message:accept:".concat(str)).addListener((str2, deviceMessage) -> {
            if (log.isDebugEnabled()) {
                log.debug("接收到发往设备的消息:{}", deviceMessage.toJson());
            }
            consumer.accept(deviceMessage);
        });
    }

    public CompletionStage<Object> handleReply(String str, long j, TimeUnit timeUnit) {
        CompletableFuture completableFuture = new CompletableFuture();
        this.futureMap.put(str, new MessageFuture(str, completableFuture, System.currentTimeMillis() + timeUnit.toMillis(j)));
        return completableFuture;
    }

    public CompletionStage<Long> send(String str, DeviceMessage deviceMessage) {
        Consumer<DeviceMessage> consumer = this.localConsumer.get(str);
        if (consumer == null) {
            return this.plus.getTopic("device:message:accept:".concat(str)).publish(deviceMessage);
        }
        consumer.accept(deviceMessage);
        return CompletableFuture.completedFuture(1L);
    }

    public CompletionStage<Boolean> reply(DeviceMessageReply deviceMessageReply) {
        String messageId = deviceMessageReply.getMessageId();
        MessageFuture messageFuture = this.futureMap.get(messageId);
        if (null == messageFuture) {
            return this.plus.eval("redis.call('setex',KEYS[1]," + replyExpireTimeSeconds + ",ARGV[1]);return redis.call('publish',KEYS[2],KEYS[3]);", ScriptOutputType.INTEGER, new Object[]{"device:message:reply:".concat(messageId), "device:message:reply", messageId}, new Object[]{deviceMessageReply}).thenApply(l -> {
                if (l.longValue() <= 0) {
                    log.warn("消息回复[{}]没有任何服务消费", deviceMessageReply.getMessageId());
                }
                return Boolean.valueOf(l.longValue() > 0);
            }).whenComplete((bool, th) -> {
                if (th != null) {
                    log.error("回复消息失败", th);
                }
            });
        }
        this.futureMap.remove(messageId);
        messageFuture.getFuture().complete(deviceMessageReply);
        return CompletableFuture.completedFuture(true);
    }

    public CompletionStage<Void> markMessageAsync(String str) {
        return this.plus.getConnection().thenApply((v0) -> {
            return v0.async();
        }).thenCompose(redisAsyncCommands -> {
            return redisAsyncCommands.setex("async-msg:".concat(str), asyncFlagExpireTimeSeconds, true);
        }).thenApply(str2 -> {
            return null;
        });
    }

    public CompletionStage<Boolean> messageIsAsync(String str, boolean z) {
        return this.plus.getConnection().thenApply((v0) -> {
            return v0.async();
        }).thenCompose(redisAsyncCommands -> {
            return redisAsyncCommands.get("async-msg:".concat(str));
        });
    }
}
