package org.reactivecommons.async.rabbit.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.logging.Level;
import java.util.logging.Logger;
import lombok.Generated;
import org.reactivecommons.api.domain.Command;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.async.api.AsyncQuery;
import org.reactivecommons.async.api.DefaultCommandHandler;
import org.reactivecommons.async.api.DefaultQueryHandler;
import org.reactivecommons.async.api.DynamicRegistry;
import org.reactivecommons.async.api.HandlerRegistry;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.config.BrokerConfig;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.converters.json.DefaultObjectMapperSupplier;
import org.reactivecommons.async.commons.converters.json.ObjectMapperSupplier;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.rabbit.DynamicRegistryImp;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
import org.reactivecommons.async.rabbit.config.RabbitProperties;
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
import org.reactivecommons.async.rabbit.config.props.AsyncPropsDomain;
import org.reactivecommons.async.rabbit.config.props.AsyncPropsDomainProperties;
import org.reactivecommons.async.rabbit.config.props.BrokerConfigProps;
import org.reactivecommons.async.rabbit.converters.json.JacksonCloudEventMessageConverter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.ChannelPool;
import reactor.rabbitmq.ChannelPoolFactory;
import reactor.rabbitmq.ChannelPoolOptions;
import reactor.rabbitmq.RabbitFlux;
import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;
import reactor.rabbitmq.Utils;
import reactor.util.retry.Retry;

@EnableConfigurationProperties({RabbitProperties.class, AsyncPropsDomainProperties.class})
@Configuration
@Import({RabbitHealthConfig.class, AsyncPropsDomain.class})
/* loaded from: input_file:org/reactivecommons/async/rabbit/config/RabbitMqConfig.class */
public class RabbitMqConfig {

    @Generated
    private static final Logger log = Logger.getLogger(RabbitMqConfig.class.getName());
    private static final String LISTENER_TYPE = "listener";
    private static final String SENDER_TYPE = "sender";

    @Bean
    public ConnectionManager buildConnectionManager(AsyncPropsDomain asyncPropsDomain, MessageConverter messageConverter) {
        ConnectionManager connectionManager = new ConnectionManager();
        asyncPropsDomain.forEach((str, asyncProps) -> {
            ConnectionFactoryProvider createConnectionFactoryProvider = createConnectionFactoryProvider(asyncProps.getConnectionProperties());
            connectionManager.addDomain(str, createMessageListener(createConnectionFactoryProvider, asyncProps), createMessageSender(createConnectionFactoryProvider, asyncProps, messageConverter), createConnectionFactoryProvider);
        });
        return connectionManager;
    }

    @Bean
    public DomainHandlers buildHandlers(AsyncPropsDomain asyncPropsDomain, ApplicationContext applicationContext, HandlerRegistry handlerRegistry, DefaultCommandHandler<?> defaultCommandHandler) {
        DomainHandlers domainHandlers = new DomainHandlers();
        Map beansOfType = applicationContext.getBeansOfType(HandlerRegistry.class);
        if (!beansOfType.containsValue(handlerRegistry)) {
            beansOfType.put("primaryHandlerRegistry", handlerRegistry);
        }
        asyncPropsDomain.forEach((str, asyncProps) -> {
            domainHandlers.add(str, HandlerResolverBuilder.buildResolver(str, beansOfType, defaultCommandHandler));
        });
        return domainHandlers;
    }

    private ReactiveMessageSender createMessageSender(ConnectionFactoryProvider connectionFactoryProvider, AsyncProps asyncProps, MessageConverter messageConverter) {
        Sender createSender = RabbitFlux.createSender(reactiveCommonsSenderOptions(asyncProps.getAppName(), connectionFactoryProvider, asyncProps.getConnectionProperties()));
        return new ReactiveMessageSender(createSender, asyncProps.getAppName(), messageConverter, new TopologyCreator(createSender));
    }

    private SenderOptions reactiveCommonsSenderOptions(String str, ConnectionFactoryProvider connectionFactoryProvider, RabbitProperties rabbitProperties) {
        Mono<Connection> createConnectionMono = createConnectionMono(connectionFactoryProvider.getConnectionFactory(), str, SENDER_TYPE);
        ChannelPoolOptions channelPoolOptions = new ChannelPoolOptions();
        PropertyMapper propertyMapper = PropertyMapper.get();
        RabbitProperties.Cache.Channel channel = rabbitProperties.getCache().getChannel();
        Objects.requireNonNull(channel);
        PropertyMapper.Source whenNonNull = propertyMapper.from(channel::getSize).whenNonNull();
        Objects.requireNonNull(channelPoolOptions);
        whenNonNull.to((v1) -> {
            r1.maxCacheSize(v1);
        });
        ChannelPool createChannelPool = ChannelPoolFactory.createChannelPool(createConnectionMono, channelPoolOptions);
        return new SenderOptions().channelPool(createChannelPool).resourceManagementChannelMono(createChannelPool.getChannelMono().transform(Utils::cache));
    }

    public ReactiveMessageListener createMessageListener(ConnectionFactoryProvider connectionFactoryProvider, AsyncProps asyncProps) {
        Mono<Connection> createConnectionMono = createConnectionMono(connectionFactoryProvider.getConnectionFactory(), asyncProps.getAppName(), LISTENER_TYPE);
        return new ReactiveMessageListener(RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(createConnectionMono)), new TopologyCreator(RabbitFlux.createSender(new SenderOptions().connectionMono(createConnectionMono))), asyncProps.getFlux().getMaxConcurrency(), asyncProps.getPrefetchCount());
    }

    public ConnectionFactoryProvider createConnectionFactoryProvider(RabbitProperties rabbitProperties) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        PropertyMapper propertyMapper = PropertyMapper.get();
        Objects.requireNonNull(rabbitProperties);
        PropertyMapper.Source whenNonNull = propertyMapper.from(rabbitProperties::determineHost).whenNonNull();
        Objects.requireNonNull(connectionFactory);
        whenNonNull.to(connectionFactory::setHost);
        Objects.requireNonNull(rabbitProperties);
        PropertyMapper.Source from = propertyMapper.from(rabbitProperties::determinePort);
        Objects.requireNonNull(connectionFactory);
        from.to((v1) -> {
            r1.setPort(v1);
        });
        Objects.requireNonNull(rabbitProperties);
        PropertyMapper.Source whenNonNull2 = propertyMapper.from(rabbitProperties::determineUsername).whenNonNull();
        Objects.requireNonNull(connectionFactory);
        whenNonNull2.to(connectionFactory::setUsername);
        Objects.requireNonNull(rabbitProperties);
        PropertyMapper.Source whenNonNull3 = propertyMapper.from(rabbitProperties::determinePassword).whenNonNull();
        Objects.requireNonNull(connectionFactory);
        whenNonNull3.to(connectionFactory::setPassword);
        Objects.requireNonNull(rabbitProperties);
        PropertyMapper.Source whenNonNull4 = propertyMapper.from(rabbitProperties::determineVirtualHost).whenNonNull();
        Objects.requireNonNull(connectionFactory);
        whenNonNull4.to(connectionFactory::setVirtualHost);
        connectionFactory.useNio();
        if (rabbitProperties.getSsl() != null && rabbitProperties.getSsl().isEnabled()) {
            connectionFactory.useSslProtocol();
        }
        return () -> {
            return connectionFactory;
        };
    }

    @ConditionalOnMissingBean
    @Bean
    public BrokerConfig brokerConfig() {
        return new BrokerConfig();
    }

    @ConditionalOnMissingBean
    @Bean
    public ObjectMapperSupplier objectMapperSupplier() {
        return new DefaultObjectMapperSupplier();
    }

    @ConditionalOnMissingBean
    @Bean
    public MessageConverter messageConverter(ObjectMapperSupplier objectMapperSupplier) {
        return new JacksonCloudEventMessageConverter((ObjectMapper) objectMapperSupplier.get());
    }

    @ConditionalOnMissingBean
    @Bean
    public CustomReporter reactiveCommonsCustomErrorReporter() {
        return new CustomReporter() { // from class: org.reactivecommons.async.rabbit.config.RabbitMqConfig.1
            public Mono<Void> reportError(Throwable th, Message message, Command<?> command, boolean z) {
                return Mono.empty();
            }

            public Mono<Void> reportError(Throwable th, Message message, DomainEvent<?> domainEvent, boolean z) {
                return Mono.empty();
            }

            public Mono<Void> reportError(Throwable th, Message message, AsyncQuery<?> asyncQuery, boolean z) {
                return Mono.empty();
            }
        };
    }

    Mono<Connection> createConnectionMono(ConnectionFactory connectionFactory, String str, String str2) {
        return Mono.fromCallable(() -> {
            return connectionFactory.newConnection(str + " " + str2);
        }).doOnError(th -> {
            log.log(Level.SEVERE, "Error creating connection to RabbitMq Broker. Starting retry process...", th);
        }).retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(300L)).maxBackoff(Duration.ofMillis(3000L))).cache();
    }

    @Bean
    public DynamicRegistry dynamicRegistry(ConnectionManager connectionManager, AsyncPropsDomain asyncPropsDomain, DomainHandlers domainHandlers) {
        return new DynamicRegistryImp(domainHandlers.get("app"), connectionManager.getListener("app").getTopologyCreator(), new BrokerConfigProps(asyncPropsDomain.getProps("app")));
    }

    @ConditionalOnMissingBean
    @Bean
    public DefaultQueryHandler defaultHandler() {
        return obj -> {
            return Mono.error(new RuntimeException("No Handler Registered"));
        };
    }

    @ConditionalOnMissingBean
    @Bean
    public DefaultCommandHandler defaultCommandHandler() {
        return obj -> {
            return Mono.error(new RuntimeException("No Handler Registered"));
        };
    }

    @ConditionalOnMissingBean({HandlerRegistry.class})
    @Bean
    public HandlerRegistry defaultHandlerRegistry() {
        return HandlerRegistry.register();
    }

    @ConditionalOnMissingBean({AsyncPropsDomain.SecretFiller.class})
    @Bean
    public AsyncPropsDomain.SecretFiller defaultSecretFiller() {
        return asyncProps -> {
        };
    }

    @Generated
    public RabbitMqConfig() {
    }
}
