/*
 * Decompiled with CFR 0.152.
 */
package org.reactivecommons.async.rabbit.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.beans.ConstructorProperties;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Stream;
import lombok.Generated;
import org.reactivecommons.api.domain.Command;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.api.domain.DomainEventBus;
import org.reactivecommons.async.api.AsyncQuery;
import org.reactivecommons.async.api.DefaultCommandHandler;
import org.reactivecommons.async.api.DefaultQueryHandler;
import org.reactivecommons.async.api.DynamicRegistry;
import org.reactivecommons.async.api.HandlerRegistry;
import org.reactivecommons.async.api.handlers.CommandHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.config.BrokerConfig;
import org.reactivecommons.async.commons.config.IBrokerConfigProps;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.converters.json.DefaultObjectMapperSupplier;
import org.reactivecommons.async.commons.converters.json.ObjectMapperSupplier;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.rabbit.DynamicRegistryImp;
import org.reactivecommons.async.rabbit.HandlerResolver;
import org.reactivecommons.async.rabbit.RabbitDiscardNotifier;
import org.reactivecommons.async.rabbit.RabbitDomainEventBus;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
import org.reactivecommons.async.rabbit.config.ConnectionFactoryProvider;
import org.reactivecommons.async.rabbit.config.RabbitHealthConfig;
import org.reactivecommons.async.rabbit.config.RabbitProperties;
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
import org.reactivecommons.async.rabbit.config.props.BrokerConfigProps;
import org.reactivecommons.async.rabbit.converters.json.JacksonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.ChannelPool;
import reactor.rabbitmq.ChannelPoolFactory;
import reactor.rabbitmq.ChannelPoolOptions;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.RabbitFlux;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;
import reactor.rabbitmq.Utils;
import reactor.util.retry.Retry;

@Configuration
@EnableConfigurationProperties(value={RabbitProperties.class, AsyncProps.class})
@Import(value={BrokerConfigProps.class, RabbitHealthConfig.class})
public class RabbitMqConfig {
    @Generated
    private static final Logger log = Logger.getLogger(RabbitMqConfig.class.getName());
    private static final String LISTENER_TYPE = "listener";
    private static final String SENDER_TYPE = "sender";
    private final AsyncProps asyncProps;
    @Value(value="${spring.application.name}")
    private String appName;

    @Bean
    public ReactiveMessageSender messageSender(MessageConverter converter, BrokerConfigProps brokerConfigProps, SenderOptions senderOptions) {
        Sender sender = RabbitFlux.createSender((SenderOptions)senderOptions);
        return new ReactiveMessageSender(sender, brokerConfigProps.getAppName(), converter, new TopologyCreator(sender));
    }

    @Bean
    public SenderOptions reactiveCommonsSenderOptions(ConnectionFactoryProvider provider, RabbitProperties rabbitProperties) {
        Mono<Connection> senderConnection = this.createConnectionMono(provider.getConnectionFactory(), this.appName, SENDER_TYPE);
        ChannelPoolOptions channelPoolOptions = new ChannelPoolOptions();
        PropertyMapper map = PropertyMapper.get();
        map.from(rabbitProperties.getCache().getChannel()::getSize).whenNonNull().to(arg_0 -> ((ChannelPoolOptions)channelPoolOptions).maxCacheSize(arg_0));
        ChannelPool channelPool = ChannelPoolFactory.createChannelPool(senderConnection, (ChannelPoolOptions)channelPoolOptions);
        return new SenderOptions().channelPool(channelPool).resourceManagementChannelMono(channelPool.getChannelMono().transform(Utils::cache));
    }

    @Bean
    public ReactiveMessageListener messageListener(ConnectionFactoryProvider provider) {
        Mono<Connection> connection = this.createConnectionMono(provider.getConnectionFactory(), this.appName, LISTENER_TYPE);
        Receiver receiver = RabbitFlux.createReceiver((ReceiverOptions)new ReceiverOptions().connectionMono(connection));
        Sender sender = RabbitFlux.createSender((SenderOptions)new SenderOptions().connectionMono(connection));
        return new ReactiveMessageListener(receiver, new TopologyCreator(sender), this.asyncProps.getFlux().getMaxConcurrency(), this.asyncProps.getPrefetchCount());
    }

    @Bean
    @ConditionalOnMissingBean
    public BrokerConfig brokerConfig() {
        return new BrokerConfig();
    }

    @Bean
    @ConditionalOnMissingBean
    public ConnectionFactoryProvider rabbitRConnectionFactory(RabbitProperties properties) throws NoSuchAlgorithmException, KeyManagementException {
        ConnectionFactory factory = new ConnectionFactory();
        PropertyMapper map = PropertyMapper.get();
        map.from(properties::determineHost).whenNonNull().to(arg_0 -> ((ConnectionFactory)factory).setHost(arg_0));
        map.from(properties::determinePort).to(arg_0 -> ((ConnectionFactory)factory).setPort(arg_0));
        map.from(properties::determineUsername).whenNonNull().to(arg_0 -> ((ConnectionFactory)factory).setUsername(arg_0));
        map.from(properties::determinePassword).whenNonNull().to(arg_0 -> ((ConnectionFactory)factory).setPassword(arg_0));
        map.from(properties::determineVirtualHost).whenNonNull().to(arg_0 -> ((ConnectionFactory)factory).setVirtualHost(arg_0));
        factory.useNio();
        if (properties.getSsl() != null && properties.getSsl().isEnabled()) {
            factory.useSslProtocol();
        }
        return () -> factory;
    }

    @Bean
    @ConditionalOnMissingBean
    public ObjectMapperSupplier objectMapperSupplier() {
        return new DefaultObjectMapperSupplier();
    }

    @Bean
    @ConditionalOnMissingBean
    public MessageConverter messageConverter(ObjectMapperSupplier objectMapperSupplier) {
        return new JacksonMessageConverter((ObjectMapper)objectMapperSupplier.get());
    }

    @Bean
    @ConditionalOnMissingBean
    public DiscardNotifier rabbitDiscardNotifier(ObjectMapperSupplier objectMapperSupplier, AsyncProps asyncProps, ReactiveMessageSender sender, BrokerConfigProps props) {
        return new RabbitDiscardNotifier(this.domainEventBus(sender, props, asyncProps.getCreateTopology()), (ObjectMapper)objectMapperSupplier.get());
    }

    @Bean
    @ConditionalOnMissingBean
    public CustomReporter reactiveCommonsCustomErrorReporter() {
        return new CustomReporter(){

            public Mono<Void> reportError(Throwable ex, Message rawMessage, Command<?> message, boolean redelivered) {
                return Mono.empty();
            }

            public Mono<Void> reportError(Throwable ex, Message rawMessage, DomainEvent<?> message, boolean redelivered) {
                return Mono.empty();
            }

            public Mono<Void> reportError(Throwable ex, Message rawMessage, AsyncQuery<?> message, boolean redelivered) {
                return Mono.empty();
            }
        };
    }

    private DomainEventBus domainEventBus(ReactiveMessageSender sender, BrokerConfigProps props, boolean createExchange) {
        String exchangeName = props.getDomainEventsExchangeName();
        if (createExchange) {
            sender.getTopologyCreator().declare(ExchangeSpecification.exchange((String)exchangeName).durable(true).type("topic")).subscribe();
        }
        return new RabbitDomainEventBus(sender, exchangeName);
    }

    Mono<Connection> createConnectionMono(ConnectionFactory factory, String connectionPrefix, String connectionType) {
        return Mono.fromCallable(() -> factory.newConnection(connectionPrefix + " " + connectionType)).doOnError(err -> log.log(Level.SEVERE, "Error creating connection to RabbitMq Broker. Starting retry process...", (Throwable)err)).retryWhen((Retry)Retry.backoff((long)Long.MAX_VALUE, (Duration)Duration.ofMillis(300L)).maxBackoff(Duration.ofMillis(3000L))).cache();
    }

    @Bean
    public HandlerResolver resolver(ApplicationContext context, final DefaultCommandHandler defaultCommandHandler) {
        Map registries = context.getBeansOfType(HandlerRegistry.class);
        ConcurrentMap queryHandlers = registries.values().stream().flatMap(r -> r.getHandlers().stream()).collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll);
        ConcurrentMap eventsToBind = registries.values().stream().flatMap(r -> ((List)r.getDomainEventListeners().get("app")).stream()).collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll);
        ConcurrentMap eventHandlers = registries.values().stream().flatMap(r -> Stream.concat(((List)r.getDomainEventListeners().get("app")).stream(), r.getDynamicEventHandlers().stream())).collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll);
        ConcurrentMap commandHandlers = registries.values().stream().flatMap(r -> r.getCommandHandlers().stream()).collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll);
        ConcurrentMap eventNotificationListener = registries.values().stream().flatMap(r -> r.getEventNotificationListener().stream()).collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll);
        return new HandlerResolver(queryHandlers, eventHandlers, eventsToBind, eventNotificationListener, commandHandlers){

            public <T> RegisteredCommandHandler<T> getCommandHandler(String path) {
                RegisteredCommandHandler handler = super.getCommandHandler(path);
                return handler != null ? handler : new RegisteredCommandHandler("", (CommandHandler)defaultCommandHandler, Object.class);
            }
        };
    }

    @Bean
    public DynamicRegistry dynamicRegistry(HandlerResolver resolver, ReactiveMessageListener listener, IBrokerConfigProps props) {
        return new DynamicRegistryImp(resolver, listener.getTopologyCreator(), props);
    }

    @Bean
    @ConditionalOnMissingBean
    public DefaultQueryHandler defaultHandler() {
        return command -> Mono.error((Throwable)new RuntimeException("No Handler Registered"));
    }

    @Bean
    @ConditionalOnMissingBean
    public DefaultCommandHandler defaultCommandHandler() {
        return message -> Mono.error((Throwable)new RuntimeException("No Handler Registered"));
    }

    @ConstructorProperties(value={"asyncProps"})
    @Generated
    public RabbitMqConfig(AsyncProps asyncProps) {
        this.asyncProps = asyncProps;
    }
}

