package cn.gitlab.virtualcry.reactor.bus.processor;

import cn.gitlab.virtualcry.reactor.bus.Event;
import cn.gitlab.virtualcry.reactor.bus.env.Environment;
import cn.gitlab.virtualcry.reactor.bus.support.EventProcessorStore;
import java.util.concurrent.Executors;
import reactor.core.publisher.Flux;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:cn/gitlab/virtualcry/reactor/bus/processor/EventRouter.class */
public final class EventRouter {
    private final Environment env;
    private final Logger logger = Loggers.getLogger(getClass());
    private final EventRecorder<Event> recorder = new EventRecorder<>(Executors.newFixedThreadPool(1));
    private final EventProcessorStore processorStore = new EventProcessorStore();

    public EventRouter(Environment environment) {
        this.env = environment;
        initialize();
    }

    private void initialize() {
        Flux.create(this.recorder).doOnNext(event -> {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Event { " + event.getClass().getName() + ": " + event.getEventId() + " } has been published.");
            }
        }).subscribe(event2 -> {
            route(event2.getClass()).onNext(event2);
        });
    }

    public <T extends Event> EventProcessor<T> route(Class<T> cls) {
        return this.processorStore.computeIfAbsent(cls, cls2 -> {
            return new EventProcessor(this.env);
        });
    }

    public final void publish(Event event) {
        this.recorder.record(event);
    }
}
