package org.springframework.cloud.fn.aggregator;

import java.util.function.Function;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.fn.aggregator.MessageStoreConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.aggregator.CorrelationStrategy;
import org.springframework.integration.aggregator.DefaultAggregatingMessageGroupProcessor;
import org.springframework.integration.aggregator.ExpressionEvaluatingCorrelationStrategy;
import org.springframework.integration.aggregator.ExpressionEvaluatingMessageGroupProcessor;
import org.springframework.integration.aggregator.ExpressionEvaluatingReleaseStrategy;
import org.springframework.integration.aggregator.MessageGroupProcessor;
import org.springframework.integration.aggregator.ReleaseStrategy;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.config.AggregatorFactoryBean;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import reactor.core.publisher.Flux;

@EnableConfigurationProperties({AggregatorFunctionProperties.class})
@Configuration(proxyBeanMethods = false)
/* loaded from: input_file:org/springframework/cloud/fn/aggregator/AggregatorFunctionConfiguration.class */
public class AggregatorFunctionConfiguration {

    @Autowired
    private AggregatorFunctionProperties properties;

    @Autowired
    private BeanFactory beanFactory;

    @ConditionalOnMissingBean({MessageGroupStore.class})
    @Configuration
    @Import({MessageStoreConfiguration.Mongo.class, MessageStoreConfiguration.Redis.class, MessageStoreConfiguration.Gemfire.class, MessageStoreConfiguration.Jdbc.class})
    /* loaded from: input_file:org/springframework/cloud/fn/aggregator/AggregatorFunctionConfiguration$MessageStoreAutoConfiguration.class */
    protected static class MessageStoreAutoConfiguration {
        protected MessageStoreAutoConfiguration() {
        }
    }

    @Bean
    public Function<Flux<Message<?>>, Flux<Message<?>>> aggregatorFunction(FluxMessageChannel fluxMessageChannel, FluxMessageChannel fluxMessageChannel2) {
        return flux -> {
            return Flux.from(fluxMessageChannel2).doOnSubscribe(subscription -> {
                fluxMessageChannel.subscribeTo(flux);
            });
        };
    }

    @Bean
    public FluxMessageChannel inputChannel() {
        return new FluxMessageChannel();
    }

    @Bean
    public FluxMessageChannel outputChannel() {
        return new FluxMessageChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "inputChannel")
    public AggregatorFactoryBean aggregator(ObjectProvider<CorrelationStrategy> objectProvider, ObjectProvider<ReleaseStrategy> objectProvider2, ObjectProvider<MessageGroupProcessor> objectProvider3, ObjectProvider<MessageGroupStore> objectProvider4, @Qualifier("outputChannel") MessageChannel messageChannel) {
        AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
        aggregatorFactoryBean.setExpireGroupsUponCompletion(true);
        aggregatorFactoryBean.setSendPartialResultOnExpiry(true);
        aggregatorFactoryBean.setGroupTimeoutExpression(this.properties.getGroupTimeout());
        aggregatorFactoryBean.setCorrelationStrategy((CorrelationStrategy) objectProvider.getIfAvailable());
        aggregatorFactoryBean.setReleaseStrategy((ReleaseStrategy) objectProvider2.getIfAvailable());
        BeanFactoryAware beanFactoryAware = (MessageGroupProcessor) objectProvider3.getIfAvailable();
        if (beanFactoryAware == null) {
            beanFactoryAware = new DefaultAggregatingMessageGroupProcessor();
            beanFactoryAware.setBeanFactory(this.beanFactory);
        }
        aggregatorFactoryBean.setProcessorBean(beanFactoryAware);
        aggregatorFactoryBean.setMessageStore((MessageGroupStore) objectProvider4.getIfAvailable());
        aggregatorFactoryBean.setOutputChannel(messageChannel);
        return aggregatorFactoryBean;
    }

    @ConditionalOnMissingBean
    @ConditionalOnProperty(prefix = "aggregator", name = {"correlation"})
    @Bean
    public CorrelationStrategy correlationStrategy() {
        return new ExpressionEvaluatingCorrelationStrategy(this.properties.getCorrelation());
    }

    @ConditionalOnMissingBean
    @ConditionalOnProperty(prefix = "aggregator", name = {"release"})
    @Bean
    public ReleaseStrategy releaseStrategy() {
        return new ExpressionEvaluatingReleaseStrategy(this.properties.getRelease());
    }

    @ConditionalOnMissingBean
    @ConditionalOnProperty(prefix = "aggregator", name = {"aggregation"})
    @Bean
    public MessageGroupProcessor messageGroupProcessor() {
        return new ExpressionEvaluatingMessageGroupProcessor(this.properties.getAggregation().getExpressionString());
    }
}
