package org.jetlinks.rule.engine.executor.node.mqtt;

import java.util.function.Function;
import org.hswebframework.web.dict.EnumDict;
import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataCodec;
import org.jetlinks.rule.engine.api.RuleDataCodecs;
import org.jetlinks.rule.engine.api.RuleDataHelper;
import org.jetlinks.rule.engine.api.executor.ExecutionContext;
import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/rule/engine/executor/node/mqtt/MqttClientNode.class */
public class MqttClientNode extends CommonExecutableRuleNodeFactoryStrategy<MqttClientConfiguration> {
    private MqttClientManager clientManager;

    @Override // org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy
    public Function<RuleData, Publisher<Object>> createExecutor(ExecutionContext executionContext, MqttClientConfiguration mqttClientConfiguration) {
        return !EnumDict.in(ClientType.producer, mqttClientConfiguration.getClientType()) ? (v0) -> {
            return Mono.just(v0);
        } : ruleData -> {
            return this.clientManager.getMqttClient(mqttClientConfiguration.getClientId()).flatMap(mqttClient -> {
                Flux<MqttMessage> convertMessage = convertMessage(ruleData, mqttClientConfiguration);
                mqttClient.getClass();
                return convertMessage.flatMap(mqttClient::publish).all(bool -> {
                    return bool.booleanValue();
                });
            });
        };
    }

    protected Flux<MqttMessage> convertMessage(RuleData ruleData, MqttClientConfiguration mqttClientConfiguration) {
        return (Flux) RuleDataCodecs.getCodec(MqttMessage.class).map(ruleDataCodec -> {
            return ruleDataCodec.decode(ruleData, new RuleDataCodec.Feature[]{mqttClientConfiguration.getPayloadType(), new MqttTopics(mqttClientConfiguration.getTopics(RuleDataHelper.toContextMap(ruleData)))}).cast(MqttMessage.class);
        }).orElseThrow(() -> {
            return new UnsupportedOperationException("unsupported decode message:{}" + ruleData);
        });
    }

    protected Mono<RuleData> convertMessage(MqttMessage mqttMessage, MqttClientConfiguration mqttClientConfiguration) {
        return Mono.just(RuleDataCodecs.getCodec(MqttMessage.class).map(ruleDataCodec -> {
            return ruleDataCodec.encode(mqttMessage, new RuleDataCodec.Feature[]{mqttClientConfiguration.getPayloadType(), new TopicVariables(mqttClientConfiguration.getTopicVariables())});
        }).map(RuleData::create).orElseGet(() -> {
            return RuleData.create(mqttMessage);
        }));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy
    public void onStarted(ExecutionContext executionContext, MqttClientConfiguration mqttClientConfiguration) {
        if (EnumDict.in(ClientType.consumer, mqttClientConfiguration.getClientType())) {
            Disposable subscribe = this.clientManager.getMqttClient(mqttClientConfiguration.getClientId()).flatMapMany(mqttClient -> {
                return mqttClient.subscribe(mqttClientConfiguration.getTopics());
            }).doOnNext(mqttMessage -> {
                executionContext.logger().info("consume mqtt message:{}", new Object[]{mqttMessage});
            }).flatMap(mqttMessage2 -> {
                return convertMessage(mqttMessage2, mqttClientConfiguration);
            }).flatMap(ruleData -> {
                return executionContext.getOutput().write(Mono.just(ruleData)).thenReturn(ruleData);
            }).flatMap(ruleData2 -> {
                return executionContext.fireEvent("NODE_EXECUTE_RESULT", ruleData2).thenReturn(ruleData2);
            }).onErrorContinue((th, obj) -> {
                executionContext.onError(RuleData.create("consume mqtt message error"), th).subscribe();
            }).subscribe();
            subscribe.getClass();
            executionContext.onStop(subscribe::dispose);
        }
    }

    @Override // org.jetlinks.rule.engine.executor.ExecutableRuleNodeFactoryStrategy
    public String getSupportType() {
        return "mqtt-client";
    }

    public MqttClientNode(MqttClientManager mqttClientManager) {
        this.clientManager = mqttClientManager;
    }

    static {
        MqttRuleDataCodec.load();
    }
}
