package org.eclipse.edc.connector.controlplane.callback.dispatcher;

import java.net.URI;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.edc.connector.controlplane.services.spi.callback.CallbackEventRemoteMessage;
import org.eclipse.edc.connector.controlplane.services.spi.callback.CallbackProtocolResolverRegistry;
import org.eclipse.edc.connector.controlplane.services.spi.callback.CallbackRegistry;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.event.Event;
import org.eclipse.edc.spi.event.EventEnvelope;
import org.eclipse.edc.spi.event.EventSubscriber;
import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.domain.callback.CallbackAddress;

/* loaded from: input_file:org/eclipse/edc/connector/controlplane/callback/dispatcher/CallbackEventDispatcher.class */
public class CallbackEventDispatcher implements EventSubscriber {
    private final RemoteMessageDispatcherRegistry dispatcher;
    private final boolean transactional;
    private final Monitor monitor;
    private final CallbackRegistry callbackRegistry;
    private final CallbackProtocolResolverRegistry resolverRegistry;

    public CallbackEventDispatcher(RemoteMessageDispatcherRegistry remoteMessageDispatcherRegistry, CallbackRegistry callbackRegistry, CallbackProtocolResolverRegistry callbackProtocolResolverRegistry, boolean z, Monitor monitor) {
        this.dispatcher = remoteMessageDispatcherRegistry;
        this.callbackRegistry = callbackRegistry;
        this.transactional = z;
        this.resolverRegistry = callbackProtocolResolverRegistry;
        this.monitor = monitor;
    }

    public <E extends Event> void on(EventEnvelope<E> eventEnvelope) {
        List<CallbackAddress> callbacks = getCallbacks(eventEnvelope);
        String name = eventEnvelope.getPayload().name();
        for (CallbackAddress callbackAddress : callbacks) {
            if (matches(name, callbackAddress)) {
                try {
                    String resolve = this.resolverRegistry.resolve(URI.create(callbackAddress.getUri()).getScheme());
                    if (resolve != null) {
                        this.dispatcher.dispatch(Object.class, new CallbackEventRemoteMessage(callbackAddress, eventEnvelope, resolve)).get();
                    } else {
                        this.monitor.warning(String.format("Failed to resolve protocol for URI %s", callbackAddress.getUri()), new Throwable[0]);
                    }
                } catch (Exception e) {
                    this.monitor.severe(String.format("Failed to invoke callback at URI: %s", callbackAddress.getUri()), new Throwable[]{e});
                    throw new EdcException(e);
                }
            }
        }
    }

    public boolean isTransactional() {
        return this.transactional;
    }

    private <E extends Event> List<CallbackAddress> getCallbacks(EventEnvelope<E> eventEnvelope) {
        return (List) Stream.concat(this.callbackRegistry.resolve(eventEnvelope.getPayload().name()).stream(), eventEnvelope.getPayload().getCallbackAddresses().stream()).filter(callbackAddress -> {
            return callbackAddress.isTransactional() == this.transactional;
        }).collect(Collectors.toList());
    }

    private boolean matches(String str, CallbackAddress callbackAddress) {
        Stream stream = callbackAddress.getEvents().stream();
        Objects.requireNonNull(str);
        return stream.anyMatch(str::startsWith);
    }
}
