package org.jetlinks.supports.server;

import java.beans.ConstructorProperties;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.DeviceStateInfo;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.message.BroadcastMessage;
import org.jetlinks.core.message.ChildDeviceMessage;
import org.jetlinks.core.message.CommonDeviceMessageReply;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.DisconnectDeviceMessage;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.RepayableDeviceMessage;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.ToDeviceMessageContext;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.core.server.session.ChildrenDeviceSession;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.DeviceSessionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/supports/server/DefaultSendToDeviceMessageHandler.class */
public class DefaultSendToDeviceMessageHandler {
    private static final Logger log = LoggerFactory.getLogger(DefaultSendToDeviceMessageHandler.class);
    private String serverId;
    private DeviceSessionManager sessionManager;
    private MessageHandler handler;
    private DeviceRegistry registry;

    public void startup() {
        this.handler.handleSendToDeviceMessage(this.serverId).subscribe(message -> {
            if (message instanceof DeviceMessage) {
                handleDeviceMessage((DeviceMessage) message);
            }
            if (message instanceof BroadcastMessage) {
            }
        });
        this.handler.handleGetDeviceState(this.serverId, publisher -> {
            return Flux.from(publisher).map(str -> {
                return new DeviceStateInfo(str, this.sessionManager.sessionIsAlive(str) ? (byte) 1 : (byte) -1);
            });
        });
    }

    protected void handleDeviceMessage(DeviceMessage deviceMessage) {
        String deviceId = deviceMessage.getDeviceId();
        DeviceSession session = this.sessionManager.getSession(deviceId);
        if (session != null) {
            doSend(deviceMessage, session);
        } else {
            this.registry.getDevice(deviceId).flatMap(deviceOperator -> {
                Mono selfConfig = deviceOperator.getSelfConfig(DeviceConfigKey.parentMeshDeviceId);
                DeviceRegistry deviceRegistry = this.registry;
                deviceRegistry.getClass();
                return selfConfig.flatMap(deviceRegistry::getDevice);
            }).flatMap(deviceOperator2 -> {
                ChildDeviceMessage childDeviceMessage = new ChildDeviceMessage();
                childDeviceMessage.setDeviceId(deviceOperator2.getDeviceId());
                childDeviceMessage.setMessageId(deviceMessage.getMessageId());
                childDeviceMessage.setTimestamp(deviceMessage.getTimestamp());
                childDeviceMessage.setChildDeviceId(deviceId);
                childDeviceMessage.setChildDeviceMessage(deviceMessage);
                ChildrenDeviceSession session2 = this.sessionManager.getSession(deviceId, deviceOperator2.getDeviceId());
                if (null != session2) {
                    doSend(childDeviceMessage, session2);
                    return Mono.just(true);
                }
                DeviceSession session3 = this.sessionManager.getSession(deviceOperator2.getDeviceId());
                if (null == session3) {
                    return doReply(createReply(deviceId, deviceMessage).error(ErrorCode.CLIENT_OFFLINE));
                }
                doSend(childDeviceMessage, session3);
                return Mono.just(true);
            }).switchIfEmpty(Mono.defer(() -> {
                log.warn("device[{}] not connected,send message fail", deviceMessage.getDeviceId());
                return doReply(createReply(deviceId, deviceMessage).error(ErrorCode.CLIENT_OFFLINE));
            })).subscribe();
        }
    }

    protected DeviceMessageReply createReply(String str, DeviceMessage deviceMessage) {
        DeviceMessageReply newReply = deviceMessage instanceof RepayableDeviceMessage ? ((RepayableDeviceMessage) deviceMessage).newReply() : new CommonDeviceMessageReply();
        newReply.messageId(deviceMessage.getMessageId()).deviceId(str);
        return newReply;
    }

    protected void doSend(DeviceMessage deviceMessage, DeviceSession deviceSession) {
        DeviceMessageReply createReply = createReply(deviceMessage.getDeviceId(), deviceMessage);
        if (deviceMessage instanceof DisconnectDeviceMessage) {
            this.sessionManager.unregister(deviceSession.getDeviceId());
            doReply(createReply.success()).subscribe();
            return;
        }
        Flux flatMapMany = deviceSession.getOperator().getProtocol().flatMap(protocolSupport -> {
            return protocolSupport.getMessageCodec(deviceSession.getTransport());
        }).flatMapMany(deviceMessageCodec -> {
            return deviceMessageCodec.encode(new ToDeviceMessageContext() { // from class: org.jetlinks.supports.server.DefaultSendToDeviceMessageHandler.1
                public Mono<Boolean> sendToDevice(EncodedMessage encodedMessage) {
                    return deviceSession.send(encodedMessage);
                }

                public Mono<Void> disconnect() {
                    deviceSession.close();
                    return Mono.empty();
                }

                public Message getMessage() {
                    return deviceMessage;
                }

                public DeviceOperator getDevice() {
                    return deviceSession.getOperator();
                }
            });
        });
        deviceSession.getClass();
        Flux flatMap = flatMapMany.flatMap(deviceSession::send);
        Boolean bool = Boolean.TRUE;
        bool.getClass();
        flatMap.all((v1) -> {
            return r1.equals(v1);
        }).flatMap(bool2 -> {
            return ((Boolean) deviceMessage.getHeader(Headers.async).orElse(false)).booleanValue() ? doReply(createReply.message(ErrorCode.REQUEST_HANDLING.getText()).code(ErrorCode.REQUEST_HANDLING.name()).success()) : Mono.just(true);
        }).doOnError(th -> {
            log.error(th.getMessage(), th);
            doReply(createReply.error(th)).subscribe();
        }).subscribe();
    }

    private Mono<Boolean> doReply(DeviceMessageReply deviceMessageReply) {
        return ((Mono) this.handler.reply(deviceMessageReply).as(mono -> {
            return log.isDebugEnabled() ? mono.doFinally(signalType -> {
                log.debug("reply message {} ,[{}]", signalType, deviceMessageReply);
            }) : mono;
        })).doOnError(th -> {
            log.error("reply message error", th);
        });
    }

    @ConstructorProperties({"serverId", "sessionManager", "handler", "registry"})
    public DefaultSendToDeviceMessageHandler(String str, DeviceSessionManager deviceSessionManager, MessageHandler messageHandler, DeviceRegistry deviceRegistry) {
        this.serverId = str;
        this.sessionManager = deviceSessionManager;
        this.handler = messageHandler;
        this.registry = deviceRegistry;
    }
}
