package org.springframework.cloud.fn.aggregator;

import java.util.function.Function;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.fn.aggregator.MessageStoreConfiguration;
import org.springframework.cloud.fn.common.config.ComponentCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.aggregator.CorrelationStrategy;
import org.springframework.integration.aggregator.DefaultAggregatingMessageGroupProcessor;
import org.springframework.integration.aggregator.ExpressionEvaluatingCorrelationStrategy;
import org.springframework.integration.aggregator.ExpressionEvaluatingMessageGroupProcessor;
import org.springframework.integration.aggregator.ExpressionEvaluatingReleaseStrategy;
import org.springframework.integration.aggregator.MessageGroupProcessor;
import org.springframework.integration.aggregator.ReleaseStrategy;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.config.AggregatorFactoryBean;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;

@EnableConfigurationProperties({AggregatorFunctionProperties.class})
@AutoConfiguration
/* loaded from: input_file:org/springframework/cloud/fn/aggregator/AggregatorFunctionConfiguration.class */
public class AggregatorFunctionConfiguration {
    private final FluxMessageChannel outputChannel = new FluxMessageChannel();

    @Autowired
    private AggregatorFunctionProperties properties;

    @Autowired
    private BeanFactory beanFactory;

    @ConditionalOnMissingBean({MessageGroupStore.class})
    @Configuration
    @Import({MessageStoreConfiguration.Mongo.class, MessageStoreConfiguration.Redis.class, MessageStoreConfiguration.Jdbc.class})
    /* loaded from: input_file:org/springframework/cloud/fn/aggregator/AggregatorFunctionConfiguration$MessageStoreAutoConfiguration.class */
    protected static class MessageStoreAutoConfiguration {
        protected MessageStoreAutoConfiguration() {
        }
    }

    @Bean
    public Function<Flux<Message<?>>, Flux<Message<?>>> aggregatorFunction(FluxMessageChannel fluxMessageChannel) {
        return flux -> {
            return Flux.from(this.outputChannel).doOnRequest(j -> {
                fluxMessageChannel.subscribeTo(flux.map(message -> {
                    return MessageBuilder.fromMessage(message).removeHeader("kafka_consumer").build();
                }));
            });
        };
    }

    @Bean
    public FluxMessageChannel aggregatorInputChannel() {
        return new FluxMessageChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "aggregatorInputChannel")
    public AggregatorFactoryBean aggregator(@Nullable CorrelationStrategy correlationStrategy, @Nullable ReleaseStrategy releaseStrategy, @Nullable MessageGroupProcessor messageGroupProcessor, @Nullable MessageGroupStore messageGroupStore, @Nullable ComponentCustomizer<AggregatorFactoryBean> componentCustomizer) {
        AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
        aggregatorFactoryBean.setExpireGroupsUponCompletion(true);
        aggregatorFactoryBean.setSendPartialResultOnExpiry(true);
        aggregatorFactoryBean.setGroupTimeoutExpression(this.properties.getGroupTimeout());
        if (correlationStrategy != null) {
            aggregatorFactoryBean.setCorrelationStrategy(correlationStrategy);
        }
        if (releaseStrategy != null) {
            aggregatorFactoryBean.setReleaseStrategy(releaseStrategy);
        }
        MessageGroupProcessor messageGroupProcessor2 = messageGroupProcessor;
        if (messageGroupProcessor2 == null) {
            messageGroupProcessor2 = new DefaultAggregatingMessageGroupProcessor();
            ((BeanFactoryAware) messageGroupProcessor2).setBeanFactory(this.beanFactory);
        }
        aggregatorFactoryBean.setProcessorBean(messageGroupProcessor2);
        if (messageGroupStore != null) {
            aggregatorFactoryBean.setMessageStore(messageGroupStore);
        }
        aggregatorFactoryBean.setOutputChannel(this.outputChannel);
        if (componentCustomizer != null) {
            componentCustomizer.customize(aggregatorFactoryBean);
        }
        return aggregatorFactoryBean;
    }

    @ConditionalOnMissingBean
    @ConditionalOnProperty(prefix = "aggregator", name = {"correlation"})
    @Bean
    public CorrelationStrategy correlationStrategy() {
        return new ExpressionEvaluatingCorrelationStrategy(this.properties.getCorrelation());
    }

    @ConditionalOnMissingBean
    @ConditionalOnProperty(prefix = "aggregator", name = {"release"})
    @Bean
    public ReleaseStrategy releaseStrategy() {
        return new ExpressionEvaluatingReleaseStrategy(this.properties.getRelease());
    }

    @ConditionalOnMissingBean
    @ConditionalOnProperty(prefix = "aggregator", name = {"aggregation"})
    @Bean
    public MessageGroupProcessor messageGroupProcessor() {
        return new ExpressionEvaluatingMessageGroupProcessor(this.properties.getAggregation().getExpressionString());
    }
}
