package org.jetlinks.rule.engine.executor.node.spring;

import java.util.function.Function;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataCodec;
import org.jetlinks.rule.engine.api.RuleDataCodecs;
import org.jetlinks.rule.engine.api.executor.ExecutionContext;
import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.event.EventListener;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/rule/engine/executor/node/spring/SpringEventNode.class */
public class SpringEventNode extends CommonExecutableRuleNodeFactoryStrategy<SpringEventConfiguration> {

    @Autowired
    private ApplicationContext eventPublisher;
    private EmitterProcessor<Object> processor = EmitterProcessor.create(false);

    @EventListener
    public void handleEvent(Object obj) {
        if (this.processor.hasDownstreams()) {
            this.processor.onNext(obj);
        }
    }

    @Override // org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy
    public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext executionContext, SpringEventConfiguration springEventConfiguration) {
        return !StringUtils.hasText(springEventConfiguration.getPublishClass()) ? (v0) -> {
            return Mono.just(v0);
        } : (Function) RuleDataCodecs.getCodec(Class.forName(springEventConfiguration.getPublishClass())).map(ruleDataCodec -> {
            return ruleData -> {
                Flux decode = ruleDataCodec.decode(ruleData, new RuleDataCodec.Feature[0]);
                ApplicationContext applicationContext = this.eventPublisher;
                applicationContext.getClass();
                return decode.doOnNext(applicationContext::publishEvent).then();
            };
        }).orElseGet(() -> {
            return ruleData -> {
                return Flux.just(ruleData).filter(ruleData -> {
                    return ruleData.getData() != null;
                }).doOnNext(ruleData2 -> {
                    this.eventPublisher.publishEvent(ruleData2.getData());
                });
            };
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy
    public void onStarted(ExecutionContext executionContext, SpringEventConfiguration springEventConfiguration) {
        if (StringUtils.hasText(springEventConfiguration.getSubscribeClass())) {
            try {
                Class<?> cls = Class.forName(springEventConfiguration.getSubscribeClass());
                Function function = (Function) RuleDataCodecs.getCodec(cls).map(ruleDataCodec -> {
                    return obj -> {
                        return ruleDataCodec.encode(obj, new RuleDataCodec.Feature[0]);
                    };
                }).orElseGet(Function::identity);
                EmitterProcessor<Object> emitterProcessor = this.processor;
                cls.getClass();
                Disposable subscribe = emitterProcessor.filter(cls::isInstance).doOnNext(obj -> {
                    executionContext.logger().info("accept spring event: {}", new Object[]{obj});
                }).map(function).map(RuleData::create).flatMap(ruleData -> {
                    return executionContext.getOutput().write(Mono.just(ruleData)).thenReturn(ruleData);
                }).doOnNext(ruleData2 -> {
                    executionContext.fireEvent("NODE_EXECUTE_RESULT", ruleData2);
                }).onErrorContinue((th, obj2) -> {
                    executionContext.onError(RuleData.create(obj2), th).subscribe();
                }).subscribe();
                subscribe.getClass();
                executionContext.onStop(subscribe::dispose);
            } catch (Exception e) {
                executionContext.onError(RuleData.create(springEventConfiguration.getSubscribeClass()), e).subscribe();
            }
        }
    }

    @Override // org.jetlinks.rule.engine.executor.ExecutableRuleNodeFactoryStrategy
    public String getSupportType() {
        return "spring-event";
    }
}
