package org.jetlinks.rule.engine.executor;

import com.alibaba.fastjson.JSON;
import java.util.function.Function;
import org.hswebframework.web.logger.ReactiveLogger;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataHelper;
import org.jetlinks.rule.engine.api.executor.ExecutableRuleNode;
import org.jetlinks.rule.engine.api.executor.ExecutionContext;
import org.jetlinks.rule.engine.api.executor.Output;
import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/rule/engine/executor/CommonExecutableRuleNodeFactoryStrategy.class */
public abstract class CommonExecutableRuleNodeFactoryStrategy<C extends RuleNodeConfig> extends AbstractExecutableRuleNodeFactoryStrategy<C> {
    private static final Logger log = LoggerFactory.getLogger(CommonExecutableRuleNodeFactoryStrategy.class);

    public abstract Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext executionContext, C c);

    protected boolean returnNewValue(C c) {
        return c.getNodeType() != null && c.getNodeType().isReturnNewValue();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onStarted(ExecutionContext executionContext, C c) {
    }

    @Override // org.jetlinks.rule.engine.executor.AbstractExecutableRuleNodeFactoryStrategy
    protected ExecutableRuleNode doCreate(C c) {
        c.validate();
        return executionContext -> {
            Function<RuleData, ? extends Publisher<?>> createExecutor = createExecutor(executionContext, c);
            boolean returnNewValue = returnNewValue(c);
            Disposable subscribe = executionContext.getInput().subscribe().doOnSubscribe(subscription -> {
                executionContext.fireEvent("NODE_STARTED", RuleData.create(c)).subscribe();
            }).flatMap(ruleData -> {
                RuleDataHelper.setExecuteTimeNow(ruleData);
                return executionContext.fireEvent("NODE_EXECUTE_BEFORE", ruleData.newData(ruleData)).doOnEach(ReactiveLogger.onError(th -> {
                    log.error(th.getMessage(), th);
                })).onErrorResume(th2 -> {
                    return Mono.empty();
                }).thenReturn(ruleData);
            }).subscribe(ruleData2 -> {
                Flux flatMap = Flux.from((Publisher) createExecutor.apply(ruleData2)).map(this::convertObject).map(obj -> {
                    return returnNewValue ? ruleData2.newData(obj) : ruleData2.newData(ruleData2);
                }).cast(RuleData.class).flatMap(ruleData2 -> {
                    return executionContext.fireEvent("NODE_EXECUTE_RESULT", ruleData2.copy()).thenReturn(ruleData2);
                });
                Output output = executionContext.getOutput();
                output.getClass();
                ((Mono) flatMap.as((v1) -> {
                    return r1.write(v1);
                })).doOnError(th -> {
                    executionContext.onError(ruleData2, th).subscribe();
                }).doFinally(signalType -> {
                    executionContext.fireEvent("NODE_EXECUTE_DONE", ruleData2.copy()).subscribe();
                }).subscribe();
            });
            onStarted(executionContext, c);
            subscribe.getClass();
            executionContext.onStop(subscribe::dispose);
        };
    }

    protected Object convertObject(Object obj) {
        if (obj instanceof String) {
            String str = (String) obj;
            if (str.startsWith("[") || str.startsWith("{")) {
                return JSON.parse(str);
            }
        }
        return obj;
    }
}
