package org.jetlinks.rule.engine.executor.node.timer;

import java.time.Duration;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.jetlinks.core.cluster.ClusterCache;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.executor.ExecutionContext;
import org.jetlinks.rule.engine.api.model.NodeType;
import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.support.CronSequenceGenerator;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/rule/engine/executor/node/timer/TimerNode.class */
public class TimerNode extends CommonExecutableRuleNodeFactoryStrategy<Configuration> {
    private static final Logger log = LoggerFactory.getLogger(TimerNode.class);
    private ClusterManager clusterManager;
    private ClusterCache<String, TimerInfo> workerInfo;
    private Map<String, TimerJob> jobs = new ConcurrentHashMap();

    /* loaded from: input_file:org/jetlinks/rule/engine/executor/node/timer/TimerNode$Configuration.class */
    public static class Configuration implements RuleNodeConfig {
        private String cron;
        private volatile CronSequenceGenerator generator;

        @Override // org.jetlinks.rule.engine.executor.node.RuleNodeConfig
        public NodeType getNodeType() {
            return NodeType.PEEK;
        }

        @Override // org.jetlinks.rule.engine.executor.node.RuleNodeConfig
        public void setNodeType(NodeType nodeType) {
        }

        public void init() {
            this.generator = new CronSequenceGenerator(this.cron);
        }

        @Override // org.jetlinks.rule.engine.executor.node.RuleNodeConfig
        public void validate() {
            init();
        }

        public long nextMillis() {
            return Math.max(100L, this.generator.next(new Date()).getTime() - System.currentTimeMillis());
        }

        public String getCron() {
            return this.cron;
        }

        public void setCron(String str) {
            this.cron = str;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/jetlinks/rule/engine/executor/node/timer/TimerNode$TimerJob.class */
    public class TimerJob {
        private String id;
        private Configuration configuration;
        private ExecutionContext context;
        private volatile boolean running;
        private TimerInfo timerInfo;

        TimerJob(Configuration configuration, ExecutionContext executionContext, TimerInfo timerInfo) {
            this.configuration = configuration;
            this.context = executionContext;
            this.id = executionContext.getInstanceId() + ":" + executionContext.getNodeId();
            this.timerInfo = timerInfo;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void start() {
            this.running = true;
            doStart();
        }

        void doStart() {
            if (this.running) {
                this.running = true;
                Mono.delay(Duration.ofMillis(this.configuration.nextMillis())).subscribe(l -> {
                    execute(this::start);
                });
            }
        }

        void execute(Runnable runnable) {
            if (this.running) {
                TimerNode.this.workerInfo.get(this.id).filter(timerInfo -> {
                    if (TimerNode.this.clusterManager.getCurrentServerId().equals(timerInfo.getCurrentWorker())) {
                        return this.running;
                    }
                    this.context.logger().debug("timer running another server:[{}]", new Object[]{timerInfo});
                    cancel();
                    return false;
                }).doOnNext(timerInfo2 -> {
                    this.context.logger().debug("execute timer", new Object[0]);
                }).flatMap(timerInfo3 -> {
                    return this.context.getOutput().write(Mono.just(RuleData.create(Long.valueOf(System.currentTimeMillis()))));
                }).doOnError(th -> {
                    this.context.logger().error("fire timer error", new Object[]{th});
                }).doFinally(signalType -> {
                    runnable.run();
                }).subscribe();
            }
        }

        void cancel() {
            this.running = false;
        }

        public TimerJob(String str, Configuration configuration, ExecutionContext executionContext, boolean z, TimerInfo timerInfo) {
            this.id = str;
            this.configuration = configuration;
            this.context = executionContext;
            this.running = z;
            this.timerInfo = timerInfo;
        }
    }

    @Override // org.jetlinks.rule.engine.executor.ExecutableRuleNodeFactoryStrategy
    public String getSupportType() {
        return "timer";
    }

    public TimerNode(ClusterManager clusterManager) {
        this.clusterManager = clusterManager;
        this.workerInfo = clusterManager.getCache("_rule-engine-timer-worker");
        this.clusterManager.getHaManager().subscribeServerOffline().subscribe(serverNode -> {
            this.workerInfo.entries().map((v0) -> {
                return v0.getKey();
            }).flatMap(str -> {
                return Mono.justOrEmpty(this.jobs.get(str));
            }).flatMap(timerJob -> {
                return this.workerInfo.put(timerJob.id, timerJob.timerInfo.current(clusterManager.getCurrentServerId())).thenReturn(timerJob);
            }).subscribe((v0) -> {
                v0.start();
            });
        });
    }

    @Override // org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy
    public Function<RuleData, Publisher<Object>> createExecutor(ExecutionContext executionContext, Configuration configuration) {
        return (v0) -> {
            return Mono.just(v0);
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy
    public void onStarted(ExecutionContext executionContext, Configuration configuration) {
        super.onStarted(executionContext, (ExecutionContext) configuration);
        String str = executionContext.getInstanceId() + ":" + executionContext.getNodeId();
        executionContext.onStop(() -> {
            TimerJob remove = this.jobs.remove(str);
            if (null != remove) {
                executionContext.logger().info("cancel timer", new Object[0]);
                remove.cancel();
            }
        });
        this.workerInfo.get(str).switchIfEmpty(Mono.just(TimerInfo.builder().id(str).currentWorker(this.clusterManager.getCurrentServerId()).firstWorker(this.clusterManager.getCurrentServerId()).build())).flatMap(timerInfo -> {
            return this.workerInfo.putIfAbsent(str, timerInfo).then(this.workerInfo.get(str));
        }).map(timerInfo2 -> {
            return new TimerJob(configuration, executionContext, timerInfo2);
        }).doOnError(th -> {
            executionContext.logger().error("start timer error", new Object[]{th});
        }).subscribe(timerJob -> {
            TimerJob put = this.jobs.put(timerJob.id, timerJob);
            if (put != null) {
                put.cancel();
            }
            if (timerJob.timerInfo.getFirstWorker().equals(this.clusterManager.getCurrentServerId())) {
                timerJob.timerInfo.setCurrentWorker(this.clusterManager.getCurrentServerId());
                this.workerInfo.put(timerJob.id, timerJob.timerInfo).subscribe(bool -> {
                    timerJob.start();
                });
            } else if (timerJob.timerInfo.getCurrentWorker().equals(this.clusterManager.getCurrentServerId())) {
                timerJob.start();
            }
        });
    }
}
