package org.jetlinks.rule.engine.executor.node.device;

import java.util.function.Consumer;
import java.util.function.Function;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataCodec;
import org.jetlinks.rule.engine.api.RuleDataCodecs;
import org.jetlinks.rule.engine.api.executor.ExecutionContext;
import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/rule/engine/executor/node/device/DeviceOperationNode.class */
public class DeviceOperationNode extends CommonExecutableRuleNodeFactoryStrategy<DeviceOperationConfiguration> {
    private MessageHandler messageHandler;
    private ClusterManager clusterManager;
    private DeviceRegistry registry;
    private Consumer<DeviceOperator> onOnline;
    private Consumer<DeviceOperator> onOffline;

    public DeviceOperationNode(MessageHandler messageHandler, ClusterManager clusterManager, DeviceRegistry deviceRegistry) {
        this.messageHandler = messageHandler;
        this.clusterManager = clusterManager;
        this.registry = deviceRegistry;
    }

    public DeviceOperationNode onOnline(Consumer<DeviceOperator> consumer) {
        this.onOnline = consumer;
        return this;
    }

    public DeviceOperationNode onOffline(Consumer<DeviceOperator> consumer) {
        this.onOffline = consumer;
        return this;
    }

    @Override // org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy
    public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext executionContext, DeviceOperationConfiguration deviceOperationConfiguration) {
        return ruleData -> {
            String deviceId = deviceOperationConfiguration.getDeviceId(ruleData);
            if (deviceId != null) {
                return this.registry.getDevice(deviceId).switchIfEmpty(Mono.fromRunnable(() -> {
                    executionContext.logger().warn("设备[{}]未注册到注册中心", new Object[]{deviceId});
                })).flatMapMany(deviceOperator -> {
                    return handleMessage(deviceOperationConfiguration, deviceOperator, executionContext, ruleData);
                });
            }
            executionContext.logger().warn("无法从上游数据中获取deviceId:{} :{}", new Object[]{deviceOperationConfiguration.getDeviceId(), ruleData});
            return Mono.empty();
        };
    }

    protected Publisher<?> handleMessage(DeviceOperationConfiguration deviceOperationConfiguration, DeviceOperator deviceOperator, ExecutionContext executionContext, RuleData ruleData) {
        switch (deviceOperationConfiguration.getOperation()) {
            case ONLINE:
                String concat = "rule:".concat(executionContext.getInstanceId()).concat(this.clusterManager.getCurrentServerId());
                return deviceOperator.online(concat, concat).thenReturn(ruleData).doFinally(signalType -> {
                    if (this.onOnline != null) {
                        this.onOnline.accept(deviceOperator);
                    }
                });
            case OFFLINE:
                return deviceOperator.offline().thenReturn(ruleData).doFinally(signalType2 -> {
                    if (this.onOffline != null) {
                        this.onOffline.accept(deviceOperator);
                    }
                });
            case DECODE:
                return deviceOperationConfiguration.decode(deviceOperator, ruleData).switchIfEmpty(Flux.error(() -> {
                    return new UnsupportedOperationException("unsupported data:" + ruleData);
                })).map(this::convertRuleSafe);
            case REPLY_MESSAGE:
                Flux switchIfEmpty = deviceOperationConfiguration.decode(deviceOperator, ruleData).switchIfEmpty(Flux.error(() -> {
                    return new UnsupportedOperationException("unsupported data:" + ruleData);
                }));
                Class<DeviceMessageReply> cls = DeviceMessageReply.class;
                DeviceMessageReply.class.getClass();
                Flux cast = switchIfEmpty.filter((v1) -> {
                    return r1.isInstance(v1);
                }).cast(DeviceMessageReply.class);
                MessageHandler messageHandler = this.messageHandler;
                messageHandler.getClass();
                return cast.flatMap(messageHandler::reply);
            case ENCODE:
                return deviceOperationConfiguration.encode(deviceOperator, ruleData).switchIfEmpty(Flux.error(() -> {
                    return new UnsupportedOperationException("unsupported data:" + ruleData);
                })).map(this::convertRuleSafe);
            case SEND_MESSAGE:
                return deviceOperationConfiguration.doSendMessage(deviceOperator, ruleData);
            default:
                return Mono.just(ruleData);
        }
    }

    protected Object convertRuleSafe(EncodedMessage encodedMessage) {
        return RuleDataCodecs.getCodec(encodedMessage.getClass()).map(ruleDataCodec -> {
            return ruleDataCodec.encode(encodedMessage, new RuleDataCodec.Feature[0]);
        }).orElse(encodedMessage);
    }

    protected Object convertRuleSafe(Message message) {
        return RuleDataCodecs.getCodec(message.getClass()).map(ruleDataCodec -> {
            return ruleDataCodec.encode(message, new RuleDataCodec.Feature[0]);
        }).orElse(message);
    }

    protected RuleData convertDeviceMessage(Message message) {
        return RuleData.create(convertRuleSafe(message));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy
    public void onStarted(ExecutionContext executionContext, DeviceOperationConfiguration deviceOperationConfiguration) {
        if (deviceOperationConfiguration.getOperation() == DeviceOperation.HANDLE_MESSAGE) {
            Disposable subscribe = this.messageHandler.handleSendToDeviceMessage("rule:".concat(executionContext.getInstanceId()).concat(this.clusterManager.getCurrentServerId())).map(this::convertDeviceMessage).doOnNext(ruleData -> {
                executionContext.logger().info("桥接发往设备的消息:{}", new Object[]{ruleData});
            }).flatMap(ruleData2 -> {
                return executionContext.getOutput().write(Mono.just(ruleData2).doOnError(th -> {
                    executionContext.onError(ruleData2, th).subscribe();
                }));
            }).onErrorContinue((th, obj) -> {
                executionContext.onError(RuleData.create(obj), th).subscribe();
            }).subscribe(bool -> {
            });
            subscribe.getClass();
            executionContext.onStop(subscribe::dispose);
        }
    }

    @Override // org.jetlinks.rule.engine.executor.ExecutableRuleNodeFactoryStrategy
    public String getSupportType() {
        return "device-operation";
    }

    public void setOnOnline(Consumer<DeviceOperator> consumer) {
        this.onOnline = consumer;
    }

    public void setOnOffline(Consumer<DeviceOperator> consumer) {
        this.onOffline = consumer;
    }

    static {
        RuleDeviceMessageCodec.load();
    }
}
