package org.reactivecommons.async.impl.listeners;

import com.rabbitmq.client.AMQP;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.logging.Logger;
import lombok.Generated;
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
import org.reactivecommons.async.impl.DiscardNotifier;
import org.reactivecommons.async.impl.HandlerResolver;
import org.reactivecommons.async.impl.Headers;
import org.reactivecommons.async.impl.QueryExecutor;
import org.reactivecommons.async.impl.communications.Message;
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
import org.reactivecommons.async.impl.communications.TopologyCreator;
import org.reactivecommons.async.impl.converters.MessageConverter;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.QueueSpecification;

/* loaded from: input_file:org/reactivecommons/async/impl/listeners/ApplicationQueryListener.class */
public class ApplicationQueryListener extends GenericMessageListener {

    @Generated
    private static final Logger log = Logger.getLogger(ApplicationQueryListener.class.getName());
    private final MessageConverter converter;
    private final HandlerResolver handlerResolver;
    private final ReactiveMessageSender sender;
    private final String replyExchange;
    private final String directExchange;
    private final boolean withDLQRetry;
    private final int retryDelay;

    public ApplicationQueryListener(ReactiveMessageListener reactiveMessageListener, String str, HandlerResolver handlerResolver, ReactiveMessageSender reactiveMessageSender, String str2, MessageConverter messageConverter, String str3, boolean z, long j, int i, DiscardNotifier discardNotifier) {
        super(str, reactiveMessageListener, z, j, discardNotifier, "query");
        this.retryDelay = i;
        this.withDLQRetry = z;
        this.converter = messageConverter;
        this.handlerResolver = handlerResolver;
        this.sender = reactiveMessageSender;
        this.replyExchange = str3;
        this.directExchange = str2;
    }

    @Override // org.reactivecommons.async.impl.listeners.GenericMessageListener
    protected Function<Message, Mono<Object>> rawMessageHandler(String str) {
        RegisteredQueryHandler queryHandler = this.handlerResolver.getQueryHandler(str);
        if (queryHandler == null) {
            return message -> {
                return Mono.error(new RuntimeException("Handler Not registered for Query: " + str));
            };
        }
        Class queryClass = queryHandler.getQueryClass();
        QueryExecutor queryExecutor = new QueryExecutor(queryHandler.getHandler(), message2 -> {
            return this.converter.readAsyncQuery(message2, queryClass).getQueryData();
        });
        Objects.requireNonNull(queryExecutor);
        return queryExecutor::execute;
    }

    @Override // org.reactivecommons.async.impl.listeners.GenericMessageListener
    protected Mono<Void> setUpBindings(TopologyCreator topologyCreator) {
        if (!this.withDLQRetry) {
            return topologyCreator.declare(ExchangeSpecification.exchange(this.directExchange).durable(true).type("direct")).then(topologyCreator.declare(QueueSpecification.queue(this.queueName).durable(true))).then(topologyCreator.bind(BindingSpecification.binding(this.directExchange, this.queueName, this.queueName))).then();
        }
        Mono<AMQP.Exchange.DeclareOk> declare = topologyCreator.declare(ExchangeSpecification.exchange(this.directExchange).durable(true).type("direct"));
        Mono<AMQP.Exchange.DeclareOk> declare2 = topologyCreator.declare(ExchangeSpecification.exchange(this.directExchange + ".DLQ").durable(true).type("direct"));
        Mono<AMQP.Queue.DeclareOk> declareQueue = topologyCreator.declareQueue(this.queueName, this.directExchange + ".DLQ");
        Mono<AMQP.Queue.DeclareOk> declareDLQ = topologyCreator.declareDLQ(this.queueName, this.directExchange, this.retryDelay);
        return declare.then(declare2).then(declareQueue).then(declareDLQ).then(topologyCreator.bind(BindingSpecification.binding(this.directExchange, this.queueName, this.queueName))).then(topologyCreator.bind(BindingSpecification.binding(this.directExchange + ".DLQ", this.queueName, this.queueName + ".DLQ"))).then();
    }

    @Override // org.reactivecommons.async.impl.listeners.GenericMessageListener
    protected String getExecutorPath(AcknowledgableDelivery acknowledgableDelivery) {
        return acknowledgableDelivery.getProperties().getHeaders().get(Headers.SERVED_QUERY_ID).toString();
    }

    @Override // org.reactivecommons.async.impl.listeners.GenericMessageListener
    protected Function<Mono<Object>, Mono<Object>> enrichPostProcess(Message message) {
        return mono -> {
            return mono.materialize().flatMap(signal -> {
                if (signal.isOnError()) {
                    return Mono.error((Throwable) Optional.ofNullable(signal.getThrowable()).orElseGet(RuntimeException::new));
                }
                String obj = message.getProperties().getHeaders().get(Headers.REPLY_ID).toString();
                String obj2 = message.getProperties().getHeaders().get(Headers.CORRELATION_ID).toString();
                HashMap hashMap = new HashMap();
                hashMap.put(Headers.CORRELATION_ID, obj2);
                if (!signal.hasValue()) {
                    hashMap.put(Headers.COMPLETION_ONLY_SIGNAL, Boolean.TRUE.toString());
                }
                return this.sender.sendWithConfirm(signal.get(), this.replyExchange, obj, hashMap);
            });
        };
    }
}
