package org.reactivecommons.async.rabbit.listeners;

import com.rabbitmq.client.AMQP;
import java.util.Optional;
import java.util.function.Function;
import java.util.logging.Logger;
import lombok.Generated;
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.commons.EventExecutor;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.rabbit.HandlerResolver;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;

/* loaded from: input_file:org/reactivecommons/async/rabbit/listeners/ApplicationEventListener.class */
public class ApplicationEventListener extends GenericMessageListener {

    @Generated
    private static final Logger log = Logger.getLogger(ApplicationEventListener.class.getName());
    private final MessageConverter messageConverter;
    private final HandlerResolver resolver;
    private final String eventsExchange;
    private final boolean withDLQRetry;
    private final int retryDelay;
    private final Optional<Integer> maxLengthBytes;
    private final String appName;

    public ApplicationEventListener(ReactiveMessageListener reactiveMessageListener, String str, String str2, HandlerResolver handlerResolver, MessageConverter messageConverter, boolean z, boolean z2, long j, int i, Optional<Integer> optional, DiscardNotifier discardNotifier, CustomReporter customReporter, String str3) {
        super(str, reactiveMessageListener, z, z2, j, discardNotifier, "event", customReporter);
        this.retryDelay = i;
        this.withDLQRetry = z;
        this.resolver = handlerResolver;
        this.eventsExchange = str2;
        this.messageConverter = messageConverter;
        this.maxLengthBytes = optional;
        this.appName = str3;
    }

    @Override // org.reactivecommons.async.rabbit.listeners.GenericMessageListener
    protected Mono<Void> setUpBindings(TopologyCreator topologyCreator) {
        Mono<AMQP.Exchange.DeclareOk> declare = topologyCreator.declare(ExchangeSpecification.exchange(this.eventsExchange).durable(true).type("topic"));
        Flux flatMap = Flux.fromIterable(this.resolver.getEventListeners()).flatMap(registeredEventListener -> {
            return topologyCreator.bind(BindingSpecification.binding(this.eventsExchange, registeredEventListener.getPath(), this.queueName));
        });
        if (!this.withDLQRetry) {
            return declare.then(topologyCreator.declareQueue(this.queueName, this.maxLengthBytes)).thenMany(flatMap).then();
        }
        String format = String.format("%s.%s.DLQ", this.appName, this.eventsExchange);
        String format2 = String.format("%s.%s", this.appName, this.eventsExchange);
        Mono<AMQP.Exchange.DeclareOk> declare2 = topologyCreator.declare(ExchangeSpecification.exchange(format2).durable(true).type("topic"));
        Mono<AMQP.Exchange.DeclareOk> declare3 = topologyCreator.declare(ExchangeSpecification.exchange(format).durable(true).type("topic"));
        Mono<AMQP.Queue.DeclareOk> declareDLQ = topologyCreator.declareDLQ(this.queueName, format2, this.retryDelay, this.maxLengthBytes);
        Mono<AMQP.Queue.DeclareOk> declareQueue = topologyCreator.declareQueue(this.queueName, format, this.maxLengthBytes);
        return declare.then(declare2).then(declare3).then(declareQueue).then(declareDLQ).thenMany(flatMap).then(topologyCreator.bind(BindingSpecification.binding(format, "#", this.queueName + ".DLQ"))).then(topologyCreator.bind(BindingSpecification.binding(format2, "#", this.queueName))).then();
    }

    @Override // org.reactivecommons.async.rabbit.listeners.GenericMessageListener
    protected Function<Message, Mono<Object>> rawMessageHandler(String str) {
        RegisteredEventListener eventListener = this.resolver.getEventListener(str);
        Class inputClass = eventListener.getInputClass();
        EventExecutor eventExecutor = new EventExecutor(eventListener.getHandler(), message -> {
            return this.messageConverter.readDomainEvent(message, inputClass);
        });
        return message2 -> {
            return eventExecutor.execute(message2).cast(Object.class);
        };
    }

    @Override // org.reactivecommons.async.rabbit.listeners.GenericMessageListener
    protected String getExecutorPath(AcknowledgableDelivery acknowledgableDelivery) {
        return acknowledgableDelivery.getEnvelope().getRoutingKey();
    }

    @Override // org.reactivecommons.async.rabbit.listeners.GenericMessageListener
    protected Object parseMessageForReporter(Message message) {
        return this.messageConverter.readDomainEventStructure(message);
    }
}
