package org.jetlinks.rule.engine.standalone;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import java.util.function.Predicate;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataHelper;
import org.jetlinks.rule.engine.api.events.GlobalNodeEventListener;
import org.jetlinks.rule.engine.api.events.NodeExecuteEvent;
import org.jetlinks.rule.engine.api.executor.ExecutableRuleNode;
import org.jetlinks.rule.engine.api.executor.ExecutionContext;
import org.jetlinks.rule.engine.api.executor.Input;
import org.jetlinks.rule.engine.api.executor.Output;
import org.jetlinks.rule.engine.api.model.NodeType;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/rule/engine/standalone/DefaultRuleExecutor.class */
public class DefaultRuleExecutor implements RuleExecutor {
    private static final Logger log = LoggerFactory.getLogger(DefaultRuleExecutor.class);
    private org.jetlinks.rule.engine.api.Logger logger;
    private ExecutableRuleNode ruleNode;
    private NodeType nodeType;
    private String instanceId;
    private String nodeId;
    private volatile boolean running;
    private FluxProcessor<RuleData, RuleData> processor = EmitterProcessor.create(false);
    private volatile ExecutionContext context = new SimpleContext();
    private List<Runnable> stopListener = new ArrayList();
    private List<GlobalNodeEventListener> listeners = new CopyOnWriteArrayList();
    private Set<OutRuleExecutor> outputs = new HashSet();
    private Map<String, List<RuleExecutor>> eventHandler = new HashMap();

    /* loaded from: input_file:org/jetlinks/rule/engine/standalone/DefaultRuleExecutor$SimpleContext.class */
    private class SimpleContext implements ExecutionContext {
        private SimpleContext() {
        }

        public Input getInput() {
            return new Input() { // from class: org.jetlinks.rule.engine.standalone.DefaultRuleExecutor.SimpleContext.1
                public Flux<RuleData> subscribe() {
                    return DefaultRuleExecutor.this.processor;
                }

                public void close() {
                    DefaultRuleExecutor.this.processor.onComplete();
                }
            };
        }

        public Output getOutput() {
            DefaultRuleExecutor defaultRuleExecutor = DefaultRuleExecutor.this;
            return publisher -> {
                return defaultRuleExecutor.doNext(publisher);
            };
        }

        public Mono<Void> fireEvent(String str, RuleData ruleData) {
            return Mono.defer(() -> {
                RuleData copy = ruleData.copy();
                DefaultRuleExecutor.log.debug("fire event {}.{}:{}", new Object[]{DefaultRuleExecutor.this.nodeId, str, copy});
                copy.setAttribute("event", str);
                return DefaultRuleExecutor.this.fireEvent(str, copy);
            });
        }

        public Mono<Void> onError(RuleData ruleData, Throwable th) {
            return Mono.defer(() -> {
                logger().error(th.getMessage(), new Object[]{th});
                RuleDataHelper.putError(ruleData, th);
                return fireEvent("NODE_EXECUTE_FAIL", ruleData);
            });
        }

        public void stop() {
            DefaultRuleExecutor.this.stopListener.forEach((v0) -> {
                v0.run();
            });
        }

        public void onStop(Runnable runnable) {
            DefaultRuleExecutor.this.stopListener.add(runnable);
        }

        public String getInstanceId() {
            return DefaultRuleExecutor.this.instanceId;
        }

        public String getNodeId() {
            return DefaultRuleExecutor.this.nodeId;
        }

        public org.jetlinks.rule.engine.api.Logger logger() {
            return DefaultRuleExecutor.this.logger;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Mono<Boolean> doNext(Publisher<RuleData> publisher) {
        return Flux.from(publisher).flatMap(ruleData -> {
            return Flux.fromIterable(this.outputs).filter(outRuleExecutor -> {
                return outRuleExecutor.getCondition().test(ruleData);
            }).flatMap(outRuleExecutor2 -> {
                return outRuleExecutor2.getExecutor().execute(Mono.just(ruleData));
            }).onErrorResume(th -> {
                return Mono.empty();
            });
        }).then(Mono.just(true));
    }

    @Override // org.jetlinks.rule.engine.standalone.RuleExecutor
    public void start() {
        synchronized (this) {
            if (this.running) {
                return;
            }
            this.running = true;
            this.ruleNode.start(this.context);
            this.context.onStop(() -> {
                this.running = false;
            });
        }
    }

    @Override // org.jetlinks.rule.engine.standalone.RuleExecutor
    public void stop() {
        synchronized (this) {
            if (this.context != null) {
                this.context.stop();
            }
        }
    }

    protected Mono<Void> fireEvent(String str, RuleData ruleData) {
        Iterator<GlobalNodeEventListener> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().onEvent(NodeExecuteEvent.builder().event(str).ruleData(ruleData).instanceId(this.instanceId).nodeId(this.nodeId).build());
        }
        return Mono.justOrEmpty(this.eventHandler.get(str)).flatMapIterable(Function.identity()).concatMap(ruleExecutor -> {
            return ruleExecutor.execute(Mono.just(ruleData)).onErrorContinue((th, obj) -> {
                log.error("handle event [{}] error : {}", new Object[]{str, ruleData, th});
            });
        }).then();
    }

    @Override // org.jetlinks.rule.engine.standalone.RuleExecutor
    public Mono<Boolean> execute(Publisher<RuleData> publisher) {
        if (!this.processor.hasDownstreams()) {
            return Mono.just(false);
        }
        Flux from = Flux.from(publisher);
        FluxProcessor<RuleData, RuleData> fluxProcessor = this.processor;
        fluxProcessor.getClass();
        return from.doOnNext((v1) -> {
            r1.onNext(v1);
        }).then(Mono.just(true));
    }

    @Override // org.jetlinks.rule.engine.standalone.RuleExecutor
    public void addNext(Predicate<RuleData> predicate, RuleExecutor ruleExecutor) {
        this.outputs.add(new OutRuleExecutor(predicate, ruleExecutor));
    }

    @Override // org.jetlinks.rule.engine.standalone.RuleExecutor
    public void addEventListener(String str, RuleExecutor ruleExecutor) {
        this.eventHandler.computeIfAbsent(str, str2 -> {
            return new ArrayList();
        }).add(ruleExecutor);
    }

    @Override // org.jetlinks.rule.engine.standalone.RuleExecutor
    public void addEventListener(GlobalNodeEventListener globalNodeEventListener) {
        this.listeners.add(globalNodeEventListener);
    }

    public org.jetlinks.rule.engine.api.Logger getLogger() {
        return this.logger;
    }

    public void setLogger(org.jetlinks.rule.engine.api.Logger logger) {
        this.logger = logger;
    }

    public ExecutableRuleNode getRuleNode() {
        return this.ruleNode;
    }

    public void setRuleNode(ExecutableRuleNode executableRuleNode) {
        this.ruleNode = executableRuleNode;
    }

    @Override // org.jetlinks.rule.engine.standalone.RuleExecutor
    public NodeType getNodeType() {
        return this.nodeType;
    }

    public void setNodeType(NodeType nodeType) {
        this.nodeType = nodeType;
    }

    public String getInstanceId() {
        return this.instanceId;
    }

    public void setInstanceId(String str) {
        this.instanceId = str;
    }

    public String getNodeId() {
        return this.nodeId;
    }

    public void setNodeId(String str) {
        this.nodeId = str;
    }

    public void setProcessor(FluxProcessor<RuleData, RuleData> fluxProcessor) {
        this.processor = fluxProcessor;
    }

    public List<GlobalNodeEventListener> getListeners() {
        return this.listeners;
    }

    public void setListeners(List<GlobalNodeEventListener> list) {
        this.listeners = list;
    }
}
