package org.eclipse.edc.connector.core.event;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.eclipse.edc.spi.event.Event;
import org.eclipse.edc.spi.event.EventEnvelope;
import org.eclipse.edc.spi.event.EventRouter;
import org.eclipse.edc.spi.event.EventSubscriber;
import org.eclipse.edc.spi.monitor.Monitor;

/* loaded from: input_file:org/eclipse/edc/connector/core/event/EventRouterImpl.class */
public class EventRouterImpl implements EventRouter {
    private final Map<Class<?>, List<EventSubscriber>> subscribers = new ConcurrentHashMap();
    private final Map<Class<?>, List<EventSubscriber>> syncSubscribers = new ConcurrentHashMap();
    private final Monitor monitor;
    private final ExecutorService executor;

    public EventRouterImpl(Monitor monitor, ExecutorService executorService) {
        this.monitor = monitor;
        this.executor = executorService;
    }

    @Override // org.eclipse.edc.spi.event.EventRouter
    public <E extends Event> void registerSync(Class<E> cls, EventSubscriber eventSubscriber) {
        this.syncSubscribers.computeIfAbsent(cls, cls2 -> {
            return new ArrayList();
        }).add(eventSubscriber);
    }

    @Override // org.eclipse.edc.spi.event.EventRouter
    public <E extends Event> void register(Class<E> cls, EventSubscriber eventSubscriber) {
        this.subscribers.computeIfAbsent(cls, cls2 -> {
            return new ArrayList();
        }).add(eventSubscriber);
    }

    @Override // org.eclipse.edc.spi.event.EventRouter
    public <E extends Event> void publish(EventEnvelope<E> eventEnvelope) {
        subscriberFor(eventEnvelope, this::getSyncSubscribers).forEach(eventSubscriber -> {
            eventSubscriber.on(eventEnvelope);
        });
        subscriberFor(eventEnvelope, this::getSubscribers).map(eventSubscriber2 -> {
            return CompletableFuture.runAsync(() -> {
                eventSubscriber2.on(eventEnvelope);
            }, this.executor).thenApply(r3 -> {
                return eventSubscriber2;
            });
        }).forEach(completableFuture -> {
            completableFuture.whenComplete((eventSubscriber3, th) -> {
                if (th != null) {
                    this.monitor.severe(String.format("Subscriber %s failed to handle event %s", eventSubscriber3.getClass().getSimpleName(), eventEnvelope.getClass().getSimpleName()), th);
                }
            });
        });
    }

    private Map<Class<?>, List<EventSubscriber>> getSubscribers() {
        return this.subscribers;
    }

    private Map<Class<?>, List<EventSubscriber>> getSyncSubscribers() {
        return this.syncSubscribers;
    }

    private <E extends Event> Stream<EventSubscriber> subscriberFor(EventEnvelope<E> eventEnvelope, Supplier<Map<Class<?>, List<EventSubscriber>>> supplier) {
        return supplier.get().entrySet().stream().filter(entry -> {
            return ((Class) entry.getKey()).isAssignableFrom(eventEnvelope.getPayload().getClass());
        }).flatMap(entry2 -> {
            return ((List) entry2.getValue()).stream();
        });
    }
}
