/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.edc.connector.callback.dispatcher;

import java.net.URI;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.edc.connector.spi.callback.CallbackEventRemoteMessage;
import org.eclipse.edc.connector.spi.callback.CallbackProtocolResolverRegistry;
import org.eclipse.edc.connector.spi.callback.CallbackRegistry;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.event.Event;
import org.eclipse.edc.spi.event.EventEnvelope;
import org.eclipse.edc.spi.event.EventSubscriber;
import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.domain.callback.CallbackAddress;
import org.eclipse.edc.spi.types.domain.message.RemoteMessage;

public class CallbackEventDispatcher
implements EventSubscriber {
    private final RemoteMessageDispatcherRegistry dispatcher;
    private final boolean transactional;
    private final Monitor monitor;
    private final CallbackRegistry callbackRegistry;
    private final CallbackProtocolResolverRegistry resolverRegistry;

    public CallbackEventDispatcher(RemoteMessageDispatcherRegistry dispatcher, CallbackRegistry callbackRegistry, CallbackProtocolResolverRegistry resolveRegistry, boolean transactional, Monitor monitor) {
        this.dispatcher = dispatcher;
        this.callbackRegistry = callbackRegistry;
        this.transactional = transactional;
        this.resolverRegistry = resolveRegistry;
        this.monitor = monitor;
    }

    public <E extends Event> void on(EventEnvelope<E> eventEnvelope) {
        List<CallbackAddress> callbacks = this.getCallbacks(eventEnvelope);
        String eventName = eventEnvelope.getPayload().name();
        for (CallbackAddress callback : callbacks) {
            if (!this.matches(eventName, callback)) continue;
            try {
                String protocol = this.resolverRegistry.resolve(URI.create(callback.getUri()).getScheme());
                if (protocol != null) {
                    this.dispatcher.dispatch(Object.class, (RemoteMessage)new CallbackEventRemoteMessage(callback, eventEnvelope, protocol)).get();
                    continue;
                }
                this.monitor.warning(String.format("Failed to resolve protocol for URI %s", callback.getUri()), new Throwable[0]);
            }
            catch (Exception e) {
                this.monitor.severe(String.format("Failed to invoke callback at URI: %s", callback.getUri()), new Throwable[]{e});
                throw new EdcException((Throwable)e);
            }
        }
    }

    public boolean isTransactional() {
        return this.transactional;
    }

    private <E extends Event> List<CallbackAddress> getCallbacks(EventEnvelope<E> eventEnvelope) {
        Stream staticCallbacks = this.callbackRegistry.resolve(eventEnvelope.getPayload().name()).stream();
        Stream dynamicCallbacks = eventEnvelope.getPayload().getCallbackAddresses().stream();
        return Stream.concat(staticCallbacks, dynamicCallbacks).filter(cb -> cb.isTransactional() == this.transactional).collect(Collectors.toList());
    }

    private boolean matches(String eventName, CallbackAddress callbackAddress) {
        return callbackAddress.getEvents().stream().anyMatch(eventName::startsWith);
    }
}

