/*
 * Decompiled with CFR 0.152.
 */
package org.reactivecommons.async.impl.listeners;

import com.rabbitmq.client.AMQP;
import java.util.function.Function;
import java.util.logging.Logger;
import lombok.Generated;
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
import org.reactivecommons.async.impl.DiscardNotifier;
import org.reactivecommons.async.impl.EventExecutor;
import org.reactivecommons.async.impl.HandlerResolver;
import org.reactivecommons.async.impl.communications.Message;
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
import org.reactivecommons.async.impl.communications.TopologyCreator;
import org.reactivecommons.async.impl.converters.MessageConverter;
import org.reactivecommons.async.impl.listeners.GenericMessageListener;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.QueueSpecification;

public class ApplicationNotificationListener
extends GenericMessageListener {
    @Generated
    private static final Logger log = Logger.getLogger(ApplicationNotificationListener.class.getName());
    private final MessageConverter messageConverter;
    private final HandlerResolver resolver;
    private final String exchangeName;

    public ApplicationNotificationListener(ReactiveMessageListener receiver, String exchangeName, String queueName, HandlerResolver handlerResolver, MessageConverter messageConverter, DiscardNotifier discardNotifier) {
        super(queueName, receiver, false, 1L, discardNotifier, "event");
        this.resolver = handlerResolver;
        this.messageConverter = messageConverter;
        this.exchangeName = exchangeName;
    }

    @Override
    protected Mono<Void> setUpBindings(TopologyCreator creator) {
        Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(ExchangeSpecification.exchange((String)this.exchangeName).type("topic").durable(true));
        Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declare(QueueSpecification.queue((String)this.queueName).durable(false).autoDelete(true).exclusive(true));
        Flux bindings = Flux.fromIterable(this.resolver.getNotificationListeners()).flatMap(listener -> creator.bind(BindingSpecification.binding((String)this.exchangeName, (String)listener.getPath(), (String)this.queueName)));
        return declareExchange.then(declareQueue).thenMany((Publisher)bindings).then();
    }

    @Override
    protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath) {
        RegisteredEventListener eventListener = this.resolver.getNotificationListener(executorPath);
        Function converter = message -> this.messageConverter.readDomainEvent((Message)message, eventListener.getInputClass());
        EventExecutor executor = new EventExecutor(eventListener.getHandler(), converter);
        return message -> executor.execute((Message)message).cast(Object.class);
    }

    @Override
    protected String getExecutorPath(AcknowledgableDelivery message) {
        return message.getEnvelope().getRoutingKey();
    }
}

