package org.jetlinks.registry.redis;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.commons.lang.StringUtils;
import org.jetlinks.core.device.registry.DeviceMessageHandler;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.redisson.api.RBucket;
import org.redisson.api.RFuture;
import org.redisson.api.RSemaphore;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/jetlinks/registry/redis/RedissonDeviceMessageHandler.class */
public class RedissonDeviceMessageHandler implements DeviceMessageHandler {
    private static final Logger log = LoggerFactory.getLogger(RedissonDeviceMessageHandler.class);
    private RedissonClient redissonClient;
    private Map<String, MessageFuture> futureMap;
    private long replyExpireTimeSeconds;
    private long asyncFlagExpireTimeSeconds;
    private RTopic replyTopic;
    private Map<String, Consumer<DeviceMessage>> localConsumer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jetlinks/registry/redis/RedissonDeviceMessageHandler$MessageFuture.class */
    public class MessageFuture {
        private String messageId;
        private CompletableFuture<Object> future;
        private long expireTime;

        public MessageFuture(String str, CompletableFuture<Object> completableFuture, long j) {
            this.messageId = str;
            this.future = completableFuture;
            this.expireTime = j;
        }

        public String getMessageId() {
            return this.messageId;
        }

        public CompletableFuture<Object> getFuture() {
            return this.future;
        }

        public long getExpireTime() {
            return this.expireTime;
        }
    }

    public RedissonDeviceMessageHandler(RedissonClient redissonClient) {
        this(redissonClient, Executors.newSingleThreadScheduledExecutor());
    }

    public RedissonDeviceMessageHandler(RedissonClient redissonClient, ScheduledExecutorService scheduledExecutorService) {
        this.futureMap = new ConcurrentHashMap();
        this.replyExpireTimeSeconds = Long.getLong("device.message.reply.expire-time-seconds", TimeUnit.MINUTES.toSeconds(3L)).longValue();
        this.asyncFlagExpireTimeSeconds = Long.getLong("device.message.async-flag.expire-time-seconds", TimeUnit.MINUTES.toSeconds(30L)).longValue();
        this.localConsumer = new ConcurrentHashMap();
        this.redissonClient = redissonClient;
        this.replyTopic = this.redissonClient.getTopic("device:message:reply");
        this.replyTopic.addListener(String.class, (charSequence, str) -> {
            Optional.ofNullable(this.futureMap.remove(str)).map((v0) -> {
                return v0.getFuture();
            }).ifPresent(completableFuture -> {
                if (completableFuture.isCancelled()) {
                    return;
                }
                CompletableFuture.supplyAsync(() -> {
                    return redissonClient.getBucket("device:message:reply:".concat(str)).getAndDelete();
                }).whenComplete((obj, th) -> {
                    if (th != null) {
                        completableFuture.completeExceptionally(th);
                    } else {
                        completableFuture.complete(obj);
                    }
                });
            });
        });
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            this.futureMap.entrySet().stream().filter(entry -> {
                return System.currentTimeMillis() > ((MessageFuture) entry.getValue()).expireTime;
            }).forEach(entry2 -> {
                try {
                    CompletableFuture completableFuture = ((MessageFuture) entry2.getValue()).future;
                    if (completableFuture.isCancelled()) {
                        log.info("设备消息[{}]超时未返回", entry2.getKey());
                        this.futureMap.remove(entry2.getKey());
                    } else {
                        redissonClient.getBucket("device:message:reply:".concat((String) entry2.getKey())).getAndDeleteAsync().whenComplete((obj, th) -> {
                            if (obj != null) {
                                completableFuture.complete(obj);
                            } else {
                                completableFuture.complete(ErrorCode.TIME_OUT);
                            }
                        });
                        log.info("设备消息[{}]超时未返回", entry2.getKey());
                        this.futureMap.remove(entry2.getKey());
                    }
                } catch (Throwable th2) {
                    log.info("设备消息[{}]超时未返回", entry2.getKey());
                    this.futureMap.remove(entry2.getKey());
                    throw th2;
                }
            });
        }, 1L, 5L, TimeUnit.SECONDS);
    }

    public void handleDeviceCheck(String str, Consumer<String> consumer) {
        this.redissonClient.getTopic("device:state:check:".concat(str)).addListener(String.class, (charSequence, str2) -> {
            if (StringUtils.isEmpty(str2)) {
                return;
            }
            consumer.accept(str2);
            RSemaphore semaphore = this.redissonClient.getSemaphore("device:state:check:semaphore:".concat(str2));
            RFuture expireAsync = semaphore.expireAsync(5L, TimeUnit.SECONDS);
            semaphore.getClass();
            expireAsync.thenRun(semaphore::releaseAsync);
        });
    }

    public void handleMessage(String str, Consumer<DeviceMessage> consumer) {
        this.localConsumer.put(str, consumer);
        this.redissonClient.getTopic("device:message:accept:".concat(str)).addListener(DeviceMessage.class, (charSequence, deviceMessage) -> {
            if (log.isDebugEnabled()) {
                log.debug("接收到发往设备的消息:{}", deviceMessage.toJson());
            }
            consumer.accept(deviceMessage);
        });
    }

    public CompletionStage<Object> handleReply(String str, long j, TimeUnit timeUnit) {
        CompletableFuture completableFuture = new CompletableFuture();
        this.futureMap.put(str, new MessageFuture(str, completableFuture, System.currentTimeMillis() + timeUnit.toMillis(j)));
        return completableFuture;
    }

    public CompletionStage<Long> send(String str, DeviceMessage deviceMessage) {
        Consumer<DeviceMessage> consumer = this.localConsumer.get(str);
        if (consumer == null) {
            return this.redissonClient.getTopic("device:message:accept:".concat(str)).publishAsync(deviceMessage);
        }
        consumer.accept(deviceMessage);
        return CompletableFuture.completedFuture(1L);
    }

    public CompletionStage<Boolean> reply(DeviceMessageReply deviceMessageReply) {
        MessageFuture messageFuture = this.futureMap.get(deviceMessageReply.getMessageId());
        if (null == messageFuture) {
            RBucket bucket = this.redissonClient.getBucket("device:message:reply:".concat(deviceMessageReply.getMessageId()));
            return CompletableFuture.runAsync(() -> {
                bucket.set(deviceMessageReply, this.replyExpireTimeSeconds, TimeUnit.SECONDS);
            }).thenApply(r2 -> {
                return true;
            }).whenComplete((BiConsumer<? super U, ? super Throwable>) (bool, th) -> {
                if (this.replyTopic.publish(deviceMessageReply.getMessageId()) <= 0) {
                    log.warn("消息回复[{}]没有任何服务消费", deviceMessageReply.getMessageId());
                }
            });
        }
        this.futureMap.remove(deviceMessageReply.getMessageId());
        messageFuture.getFuture().complete(deviceMessageReply);
        return CompletableFuture.completedFuture(true);
    }

    public CompletionStage<Void> markMessageAsync(String str) {
        return this.redissonClient.getBucket("async-msg:".concat(str)).setAsync(true, this.asyncFlagExpireTimeSeconds, TimeUnit.SECONDS);
    }

    public CompletionStage<Boolean> messageIsAsync(String str, boolean z) {
        RBucket bucket = this.redissonClient.getBucket("async-msg:".concat(str));
        return z ? bucket.getAndDeleteAsync() : bucket.getAsync().whenComplete((bool, th) -> {
            if (bool != null) {
                bucket.expireAsync(this.replyExpireTimeSeconds, TimeUnit.SECONDS);
            }
        });
    }

    public long getReplyExpireTimeSeconds() {
        return this.replyExpireTimeSeconds;
    }

    public void setReplyExpireTimeSeconds(long j) {
        this.replyExpireTimeSeconds = j;
    }

    public long getAsyncFlagExpireTimeSeconds() {
        return this.asyncFlagExpireTimeSeconds;
    }

    public void setAsyncFlagExpireTimeSeconds(long j) {
        this.asyncFlagExpireTimeSeconds = j;
    }
}
