package cn.gitlab.virtualcry.reactor.bus.processor;

import cn.gitlab.virtualcry.reactor.bus.Event;
import cn.gitlab.virtualcry.reactor.bus.env.Environment;
import cn.gitlab.virtualcry.reactor.bus.subscriber.EventSubscriber;
import cn.gitlab.virtualcry.reactor.bus.subscriber.SubscriberID;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import reactor.core.Disposable;
import reactor.core.publisher.FluxProcessor;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:cn/gitlab/virtualcry/reactor/bus/processor/EventProcessor.class */
public final class EventProcessor<T extends Event> {
    private final Environment env;
    private final FluxProcessor<T, T> eventReceiver;
    private final Logger logger = Loggers.getLogger(getClass());
    private final Map<SubscriberID, Disposable> subscriberDisposables = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventProcessor(Environment environment) {
        this.env = environment;
        this.eventReceiver = this.env.eventReceiverConfig().create();
    }

    public final List<Disposable> subscribe(List<EventSubscriber<T>> list) {
        return (List) list.stream().map(this::subscribe).collect(Collectors.toList());
    }

    public final Disposable subscribe(EventSubscriber<T> eventSubscriber) {
        return this.subscriberDisposables.compute(eventSubscriber.getId(), (subscriberID, disposable) -> {
            if (disposable != null) {
                throw new RuntimeException("Subscriber { " + subscriberID.getId() + " } has been subscribed.");
            }
            FluxProcessor<T, T> create = this.env.eventSubscriberConfig().create();
            FluxProcessor<T, T> fluxProcessor = this.eventReceiver;
            create.getClass();
            fluxProcessor.subscribe((v1) -> {
                r1.onNext(v1);
            });
            return create.doOnNext(event -> {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Subscriber { " + subscriberID.getId() + " } consumed event.");
                }
                eventSubscriber.getEventConsumer().accept(this.env.forceImmutableEvent().booleanValue() ? (Event) event.clone() : event);
            }).onErrorContinue(Throwable.class, (th, obj) -> {
                this.logger.error("", th);
                eventSubscriber.getErrorConsumer().accept(th, (Event) obj);
            }).doOnSubscribe(subscription -> {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Subscriber { " + subscriberID.getId() + " } subscribed.");
                }
            }).subscribe(event2 -> {
            }, th2 -> {
            }, eventSubscriber.getCompleteConsumer(), eventSubscriber.getSubscriptionConsumer());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void onNext(T t) {
        this.eventReceiver.onNext(t);
    }

    public final void onComplete() {
        this.eventReceiver.onComplete();
    }

    public final void onCancel(SubscriberID subscriberID) {
        Disposable disposable = this.subscriberDisposables.get(subscriberID);
        if (disposable == null) {
            this.logger.warn("Fail to dispose cause could not find Subscriber: { " + subscriberID.getId() + " }");
        } else {
            disposable.dispose();
            this.subscriberDisposables.remove(subscriberID);
        }
    }
}
