package org.jetlinks.registry.redis.lettuce;

import com.alibaba.fastjson.JSON;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.jetlinks.core.device.DeviceMessageSender;
import org.jetlinks.core.device.DeviceOperation;
import org.jetlinks.core.device.registry.DeviceMessageHandler;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.FunctionInvokeMessageSender;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.ReadPropertyMessageSender;
import org.jetlinks.core.message.RepayableDeviceMessage;
import org.jetlinks.core.message.WritePropertyMessageSender;
import org.jetlinks.core.message.exception.FunctionUndefinedException;
import org.jetlinks.core.message.exception.ParameterUndefinedException;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.function.FunctionParameter;
import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor;
import org.jetlinks.core.message.property.ReadPropertyMessage;
import org.jetlinks.core.message.property.ReadPropertyMessageReply;
import org.jetlinks.core.message.property.WritePropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessageReply;
import org.jetlinks.core.metadata.FunctionMetadata;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.ValidateResult;
import org.jetlinks.core.utils.IdUtils;
import org.jetlinks.lettuce.LettucePlus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/jetlinks/registry/redis/lettuce/LettuceDeviceMessageSender.class */
public class LettuceDeviceMessageSender implements DeviceMessageSender {
    private static final Logger log = LoggerFactory.getLogger(LettuceDeviceMessageSender.class);
    private LettucePlus plus;
    protected Supplier<String> connectionServerIdSupplier;
    protected Runnable deviceStateChecker;
    protected String deviceId;
    protected DeviceOperation operation;
    protected DeviceMessageSenderInterceptor interceptor;
    private DeviceMessageHandler messageHandler;
    private boolean asyncFromMetadata = Boolean.getBoolean("device.message.async.from-metadata");
    private int maxSendAwaitSeconds = Integer.getInteger("device.message.await.max-seconds", 30).intValue();

    public LettuceDeviceMessageSender(String str, LettucePlus lettucePlus, DeviceMessageHandler deviceMessageHandler, DeviceOperation deviceOperation) {
        this.plus = lettucePlus;
        this.operation = deviceOperation;
        this.messageHandler = deviceMessageHandler;
        deviceOperation.getClass();
        this.connectionServerIdSupplier = deviceOperation::getServerId;
        deviceOperation.getClass();
        this.deviceStateChecker = deviceOperation::checkState;
        this.deviceId = str;
    }

    protected <R extends DeviceMessageReply> R convertReply(Object obj, RepayableDeviceMessage<R> repayableDeviceMessage, Supplier<R> supplier) {
        R r = supplier.get();
        if (obj == null) {
            r.error(ErrorCode.NO_REPLY);
        } else if (obj instanceof ErrorCode) {
            r.error((ErrorCode) obj);
        } else {
            if (r.getClass().isAssignableFrom(obj.getClass())) {
                return (R) obj;
            }
            if (obj instanceof String) {
                r.fromJson(JSON.parseObject(String.valueOf(obj)));
            } else if (obj instanceof DeviceMessage) {
                r.fromJson(((DeviceMessage) obj).toJson());
            } else {
                r.error(ErrorCode.UNSUPPORTED_MESSAGE);
                log.warn("不支持的消息类型:{}", obj.getClass());
            }
        }
        if (repayableDeviceMessage != null) {
            r.from(repayableDeviceMessage);
        }
        return r;
    }

    public <R extends DeviceMessageReply> CompletionStage<R> retrieveReply(String str, Supplier<R> supplier) {
        return this.plus.getConnection().thenApply((v0) -> {
            return v0.async();
        }).thenCompose(redisAsyncCommands -> {
            return redisAsyncCommands.get("device:message:reply:".concat(str));
        }).thenApply(obj -> {
            DeviceMessageReply convertReply = convertReply(obj, null, supplier);
            convertReply.messageId(str);
            return convertReply;
        });
    }

    public <R extends DeviceMessageReply> CompletionStage<R> send(DeviceMessage deviceMessage, Function<Object, R> function) {
        String str = this.connectionServerIdSupplier.get();
        if (str == null) {
            R apply = function.apply(null);
            if (apply != null) {
                apply.error(ErrorCode.CLIENT_OFFLINE);
            }
            if (null != apply) {
                apply.from(deviceMessage);
            }
            return CompletableFuture.completedFuture(apply);
        }
        CompletableFuture completableFuture = new CompletableFuture();
        if (this.interceptor != null) {
            deviceMessage = this.interceptor.preSend(this.operation, deviceMessage);
        }
        DeviceMessage deviceMessage2 = deviceMessage;
        if (((Boolean) Headers.async.get(deviceMessage2).asBoolean().orElse(false)).booleanValue()) {
            this.messageHandler.markMessageAsync(deviceMessage2.getMessageId()).whenComplete((r6, th) -> {
                if (th != null) {
                    log.error("设置异步消息标识[{}]失败", deviceMessage2, th);
                }
            });
        }
        Optional header = deviceMessage2.getHeader("timeout");
        Class<Number> cls = Number.class;
        Number.class.getClass();
        int intValue = ((Integer) header.map(cls::cast).map((v0) -> {
            return v0.intValue();
        }).orElse(Integer.valueOf(this.maxSendAwaitSeconds))).intValue();
        BiConsumer biConsumer = (obj, th2) -> {
            CompletableFuture thenApply = CompletableFuture.completedFuture(function.apply(th2 != null ? ErrorCode.SYSTEM_ERROR : obj)).thenApply(deviceMessageReply -> {
                if (th2 != null) {
                    deviceMessageReply.addHeader("error", th2.getMessage());
                }
                return deviceMessageReply;
            });
            if (this.interceptor != null) {
                thenApply = thenApply.thenCompose(deviceMessageReply2 -> {
                    return this.interceptor.afterReply(this.operation, deviceMessage2, deviceMessageReply2);
                });
            }
            thenApply.whenComplete((deviceMessageReply3, th2) -> {
                if (th2 != null) {
                    completableFuture.completeExceptionally(th2);
                } else {
                    completableFuture.complete(deviceMessageReply3);
                }
            });
        };
        CompletionStage handleReply = this.messageHandler.handleReply(deviceMessage2.getMessageId(), intValue, TimeUnit.SECONDS);
        handleReply.whenComplete(biConsumer);
        completableFuture.whenComplete((deviceMessageReply, th3) -> {
            if (completableFuture.isCancelled()) {
                log.info("取消等待设备[{}]消息[{}]返回", this.deviceId, deviceMessage2.getMessageId());
                handleReply.toCompletableFuture().cancel(true);
            }
        });
        this.messageHandler.send(str, deviceMessage2).whenComplete((l, th4) -> {
            if (th4 != null) {
                log.error("发送消息到设备网关服务失败:{}", deviceMessage2, th4);
                biConsumer.accept(ErrorCode.SYSTEM_ERROR, th4);
                return;
            }
            if (l.longValue() <= 0) {
                if (this.deviceStateChecker != null) {
                    this.deviceStateChecker.run();
                }
                biConsumer.accept(ErrorCode.CLIENT_OFFLINE, null);
            }
            if (l.longValue() > 1) {
                log.warn("存在多个相同的网关服务:{}", str);
            }
        });
        return completableFuture;
    }

    public <R extends DeviceMessageReply> CompletionStage<R> send(RepayableDeviceMessage<R> repayableDeviceMessage) {
        return send(repayableDeviceMessage, obj -> {
            repayableDeviceMessage.getClass();
            return convertReply(obj, repayableDeviceMessage, repayableDeviceMessage::newReply);
        });
    }

    public FunctionInvokeMessageSender invokeFunction(final String str) {
        Objects.requireNonNull(str, "function");
        final FunctionInvokeMessage functionInvokeMessage = new FunctionInvokeMessage();
        functionInvokeMessage.setTimestamp(System.currentTimeMillis());
        functionInvokeMessage.setDeviceId(this.deviceId);
        functionInvokeMessage.setFunctionId(str);
        functionInvokeMessage.setMessageId(IdUtils.newUUID());
        return new FunctionInvokeMessageSender() { // from class: org.jetlinks.registry.redis.lettuce.LettuceDeviceMessageSender.1
            boolean markAsync = false;

            public FunctionInvokeMessageSender addParameter(String str2, Object obj) {
                functionInvokeMessage.addInput(str2, obj);
                return this;
            }

            public FunctionInvokeMessageSender custom(Consumer<FunctionInvokeMessage> consumer) {
                consumer.accept(functionInvokeMessage);
                return this;
            }

            public FunctionInvokeMessageSender messageId(String str2) {
                functionInvokeMessage.setMessageId(str2);
                return this;
            }

            public FunctionInvokeMessageSender setParameter(List<FunctionParameter> list) {
                functionInvokeMessage.setInputs(list);
                return this;
            }

            public FunctionInvokeMessageSender validate(BiConsumer<FunctionParameter, ValidateResult> biConsumer) {
                Optional function = LettuceDeviceMessageSender.this.operation.getMetadata().getFunction(str);
                String str2 = str;
                FunctionMetadata functionMetadata = (FunctionMetadata) function.orElseThrow(() -> {
                    return new FunctionUndefinedException(str2, "功能[" + str2 + "]未定义");
                });
                List inputs = functionMetadata.getInputs();
                List inputs2 = functionInvokeMessage.getInputs();
                if (inputs2.size() != inputs.size()) {
                    LettuceDeviceMessageSender.log.warn("调用设备功能[{}]参数数量[需要{},传入{}]错误,功能:{}", new Object[]{str, Integer.valueOf(inputs.size()), Integer.valueOf(inputs2.size()), functionMetadata.toString()});
                    throw new IllegalArgumentException("参数数量错误");
                }
                Map map = (Map) inputs.stream().collect(Collectors.toMap((v0) -> {
                    return v0.getId();
                }, Function.identity(), (propertyMetadata, propertyMetadata2) -> {
                    return propertyMetadata;
                }));
                for (FunctionParameter functionParameter : functionInvokeMessage.getInputs()) {
                    biConsumer.accept(functionParameter, ((PropertyMetadata) Optional.ofNullable(map.get(functionParameter.getName())).orElseThrow(() -> {
                        return new ParameterUndefinedException(functionParameter.getName(), "参数[" + functionParameter.getName() + "]未定义");
                    })).getValueType().validate(functionParameter.getValue()));
                }
                return this;
            }

            public FunctionInvokeMessageSender header(String str2, Object obj) {
                functionInvokeMessage.addHeader(str2, obj);
                return this;
            }

            public FunctionInvokeMessageSender async(Boolean bool) {
                if (bool != null) {
                    custom(bool.booleanValue() ? Headers.async.setter() : Headers.async.clear());
                }
                this.markAsync = true;
                return this;
            }

            public CompletionStage<FunctionInvokeMessageReply> retrieveReply() {
                return LettuceDeviceMessageSender.this.retrieveReply((String) Objects.requireNonNull(functionInvokeMessage.getMessageId(), "messageId can not be null"), FunctionInvokeMessageReply::new);
            }

            public CompletionStage<FunctionInvokeMessageReply> send() {
                if (!this.markAsync && LettuceDeviceMessageSender.this.asyncFromMetadata && functionInvokeMessage.getHeader(Headers.async.getHeader()).isPresent()) {
                    functionInvokeMessage.addHeader(Headers.async.getHeader(), LettuceDeviceMessageSender.this.operation.getMetadata().getFunction(functionInvokeMessage.getFunctionId()).map((v0) -> {
                        return v0.isAsync();
                    }).orElse(false));
                }
                return LettuceDeviceMessageSender.this.send(functionInvokeMessage);
            }
        };
    }

    public ReadPropertyMessageSender readProperty(String... strArr) {
        final ReadPropertyMessage readPropertyMessage = new ReadPropertyMessage();
        readPropertyMessage.setDeviceId(this.deviceId);
        readPropertyMessage.addProperties(Arrays.asList(strArr));
        readPropertyMessage.setMessageId(IdUtils.newUUID());
        return new ReadPropertyMessageSender() { // from class: org.jetlinks.registry.redis.lettuce.LettuceDeviceMessageSender.2
            public ReadPropertyMessageSender messageId(String str) {
                readPropertyMessage.setMessageId(str);
                return this;
            }

            public ReadPropertyMessageSender custom(Consumer<ReadPropertyMessage> consumer) {
                consumer.accept(readPropertyMessage);
                return this;
            }

            public ReadPropertyMessageSender read(List<String> list) {
                readPropertyMessage.addProperties(list);
                return this;
            }

            public ReadPropertyMessageSender header(String str, Object obj) {
                readPropertyMessage.addHeader(str, obj);
                return this;
            }

            public CompletionStage<ReadPropertyMessageReply> retrieveReply() {
                return LettuceDeviceMessageSender.this.retrieveReply((String) Objects.requireNonNull(readPropertyMessage.getMessageId(), "messageId can not be null"), ReadPropertyMessageReply::new);
            }

            public CompletionStage<ReadPropertyMessageReply> send() {
                return LettuceDeviceMessageSender.this.send(readPropertyMessage);
            }
        };
    }

    public WritePropertyMessageSender writeProperty() {
        final WritePropertyMessage writePropertyMessage = new WritePropertyMessage();
        writePropertyMessage.setDeviceId(this.deviceId);
        writePropertyMessage.setMessageId(IdUtils.newUUID());
        return new WritePropertyMessageSender() { // from class: org.jetlinks.registry.redis.lettuce.LettuceDeviceMessageSender.3
            public WritePropertyMessageSender write(String str, Object obj) {
                writePropertyMessage.addProperty(str, obj);
                return this;
            }

            public WritePropertyMessageSender custom(Consumer<WritePropertyMessage> consumer) {
                consumer.accept(writePropertyMessage);
                return this;
            }

            public WritePropertyMessageSender header(String str, Object obj) {
                writePropertyMessage.addHeader(str, obj);
                return this;
            }

            public CompletionStage<WritePropertyMessageReply> retrieveReply() {
                return LettuceDeviceMessageSender.this.retrieveReply((String) Objects.requireNonNull(writePropertyMessage.getMessageId(), "messageId can not be null"), WritePropertyMessageReply::new);
            }

            public CompletionStage<WritePropertyMessageReply> send() {
                return LettuceDeviceMessageSender.this.send(writePropertyMessage);
            }
        };
    }

    public boolean isAsyncFromMetadata() {
        return this.asyncFromMetadata;
    }

    public void setAsyncFromMetadata(boolean z) {
        this.asyncFromMetadata = z;
    }

    public DeviceMessageSenderInterceptor getInterceptor() {
        return this.interceptor;
    }

    public void setInterceptor(DeviceMessageSenderInterceptor deviceMessageSenderInterceptor) {
        this.interceptor = deviceMessageSenderInterceptor;
    }

    public int getMaxSendAwaitSeconds() {
        return this.maxSendAwaitSeconds;
    }

    public void setMaxSendAwaitSeconds(int i) {
        this.maxSendAwaitSeconds = i;
    }
}
