package org.reactivecommons.async.impl.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.time.Duration;
import java.util.logging.Level;
import java.util.logging.Logger;
import lombok.Generated;
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
import org.reactivecommons.async.impl.communications.TopologyCreator;
import org.reactivecommons.async.impl.config.RabbitProperties;
import org.reactivecommons.async.impl.config.props.BrokerConfigProps;
import org.reactivecommons.async.impl.converters.MessageConverter;
import org.reactivecommons.async.impl.converters.json.DefaultObjectMapperSupplier;
import org.reactivecommons.async.impl.converters.json.JacksonMessageConverter;
import org.reactivecommons.async.impl.converters.json.ObjectMapperSupplier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.ChannelPoolFactory;
import reactor.rabbitmq.ChannelPoolOptions;
import reactor.rabbitmq.RabbitFlux;
import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.SenderOptions;

@EnableConfigurationProperties({RabbitProperties.class})
@Configuration
@Import({BrokerConfigProps.class})
/* loaded from: input_file:org/reactivecommons/async/impl/config/RabbitMqConfig.class */
public class RabbitMqConfig {

    @Generated
    private static final Logger log = Logger.getLogger(RabbitMqConfig.class.getName());

    @Value("${app.async.flux.maxConcurrency:250}")
    private Integer maxConcurrency;

    @Bean
    public ReactiveMessageSender messageSender(ConnectionFactoryProvider connectionFactoryProvider, MessageConverter messageConverter, BrokerConfigProps brokerConfigProps, RabbitProperties rabbitProperties) {
        Mono<Connection> createSenderConnectionMono = createSenderConnectionMono(connectionFactoryProvider.getConnectionFactory(), "sender");
        ChannelPoolOptions channelPoolOptions = new ChannelPoolOptions();
        PropertyMapper propertyMapper = PropertyMapper.get();
        RabbitProperties.Cache.Channel channel = rabbitProperties.getCache().getChannel();
        channel.getClass();
        PropertyMapper.Source whenNonNull = propertyMapper.from(channel::getSize).whenNonNull();
        channelPoolOptions.getClass();
        whenNonNull.to((v1) -> {
            r1.maxCacheSize(v1);
        });
        return new ReactiveMessageSender(RabbitFlux.createSender(new SenderOptions().channelPool(ChannelPoolFactory.createChannelPool(createSenderConnectionMono, channelPoolOptions))), brokerConfigProps.getAppName(), messageConverter, new TopologyCreator(createSenderConnectionMono));
    }

    @Bean
    public ReactiveMessageListener messageListener(ConnectionFactoryProvider connectionFactoryProvider) {
        Mono<Connection> createSenderConnectionMono = createSenderConnectionMono(connectionFactoryProvider.getConnectionFactory(), "listener");
        return new ReactiveMessageListener(RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(createSenderConnectionMono)), new TopologyCreator(createSenderConnectionMono), this.maxConcurrency);
    }

    @ConditionalOnMissingBean
    @Bean
    public ConnectionFactoryProvider connectionFactory(RabbitProperties rabbitProperties) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        PropertyMapper propertyMapper = PropertyMapper.get();
        rabbitProperties.getClass();
        PropertyMapper.Source whenNonNull = propertyMapper.from(rabbitProperties::determineHost).whenNonNull();
        connectionFactory.getClass();
        whenNonNull.to(connectionFactory::setHost);
        rabbitProperties.getClass();
        PropertyMapper.Source from = propertyMapper.from(rabbitProperties::determinePort);
        connectionFactory.getClass();
        from.to((v1) -> {
            r1.setPort(v1);
        });
        rabbitProperties.getClass();
        PropertyMapper.Source whenNonNull2 = propertyMapper.from(rabbitProperties::determineUsername).whenNonNull();
        connectionFactory.getClass();
        whenNonNull2.to(connectionFactory::setUsername);
        rabbitProperties.getClass();
        PropertyMapper.Source whenNonNull3 = propertyMapper.from(rabbitProperties::determinePassword).whenNonNull();
        connectionFactory.getClass();
        whenNonNull3.to(connectionFactory::setPassword);
        rabbitProperties.getClass();
        PropertyMapper.Source whenNonNull4 = propertyMapper.from(rabbitProperties::determineVirtualHost).whenNonNull();
        connectionFactory.getClass();
        whenNonNull4.to(connectionFactory::setVirtualHost);
        rabbitProperties.getClass();
        PropertyMapper.Source asInt = propertyMapper.from(rabbitProperties::getRequestedHeartbeat).whenNonNull().asInt((v0) -> {
            return v0.getSeconds();
        });
        connectionFactory.getClass();
        asInt.to((v1) -> {
            r1.setRequestedHeartbeat(v1);
        });
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setTopologyRecoveryEnabled(true);
        connectionFactory.useNio();
        return () -> {
            return connectionFactory;
        };
    }

    @ConditionalOnMissingBean
    @Bean
    public ObjectMapperSupplier objectMapperSupplier() {
        return new DefaultObjectMapperSupplier();
    }

    @ConditionalOnMissingBean
    @Bean
    public MessageConverter messageConverter(ObjectMapperSupplier objectMapperSupplier) {
        return new JacksonMessageConverter((ObjectMapper) objectMapperSupplier.get());
    }

    Mono<Connection> createSenderConnectionMono(ConnectionFactory connectionFactory, String str) {
        return Mono.fromCallable(() -> {
            return connectionFactory.newConnection(str);
        }).doOnError(th -> {
            log.log(Level.SEVERE, "Error creating connection to RabbitMq Broker. Starting retry process...", th);
        }).retryBackoff(Long.MAX_VALUE, Duration.ofMillis(300L), Duration.ofMillis(3000L)).subscribeOn(Schedulers.elastic()).cache();
    }
}
