package org.coderclan.whistle;

import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.sql.DataSource;
import org.coderclan.whistle.api.EventConsumer;
import org.coderclan.whistle.api.EventContent;
import org.coderclan.whistle.api.EventService;
import org.coderclan.whistle.api.EventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.support.GenericBeanDefinition;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

@Configuration
@PropertySource(value = {"classpath:org/coderclan/whistle/spring-cloud-stream.properties"}, encoding = "UTF-8")
@AutoConfigureAfter({DataSourceAutoConfiguration.class, WhistleMongodbConfiguration.class})
/* loaded from: input_file:org/coderclan/whistle/WhistleConfiguration.class */
public class WhistleConfiguration implements ApplicationContextAware {
    private static final Logger log = LoggerFactory.getLogger(WhistleConfiguration.class);
    private static final String CLOUD_STREAM_SUPPLIER = "cloudStreamSupplier";

    @Autowired(required = false)
    private List<EventConsumer<?>> consumers;

    @Autowired(required = false)
    List<Collection<? extends EventType<?>>> publishingEventType;

    @Value("${org.coderclan.whistle.applicationName:${spring.application.name}}")
    private String applicationName;
    private ApplicationContext applicationContext;

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @PostConstruct
    @javax.annotation.PostConstruct
    public void init() {
        checkApplicationName();
        registerEventConsumers();
        checkEventType();
    }

    private void checkEventType() {
        if (Objects.isNull(this.consumers) || Objects.isNull(this.publishingEventType)) {
            return;
        }
        Set set = (Set) this.consumers.stream().map((v0) -> {
            return v0.getSupportEventType();
        }).collect(Collectors.toSet());
        Set set2 = (Set) this.publishingEventType.stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toSet());
        set2.retainAll(set);
        if (set2.isEmpty()) {
            return;
        }
        if (log.isErrorEnabled()) {
            log.error("Whistle does NOT support producing and consuming an event at the same time. Please check the following events: {}", set2.stream().map((v0) -> {
                return v0.getName();
            }).collect(Collectors.joining(",")));
        }
        throw new IllegalStateException("Whistle does NOT support producing and consuming an event at the same time.");
    }

    private void checkApplicationName() {
        if (Objects.isNull(this.applicationName) || this.applicationName.isEmpty()) {
            throw new IllegalStateException("The Application Name must be set.");
        }
        log.info("Whistle Application Name: {}.", this.applicationName);
    }

    private void registerEventConsumers() {
        StringBuilder sb = new StringBuilder(CLOUD_STREAM_SUPPLIER);
        int i = 0;
        if (!Objects.isNull(this.consumers)) {
            for (EventConsumer<?> eventConsumer : this.consumers) {
                i++;
                String str = "whistleConsumer" + i;
                sb.append(';').append(str);
                registerConsumer(eventConsumer, str);
            }
        }
        System.setProperty("spring.cloud.function.definition", sb.toString());
        System.setProperty("spring.cloud.stream.default.group", this.applicationName);
    }

    private void registerConsumer(EventConsumer<?> eventConsumer, String str) {
        System.setProperty("spring.cloud.stream.function.bindings." + str + "-in-0", eventConsumer.getSupportEventType().getName());
        GenericBeanDefinition genericBeanDefinition = new GenericBeanDefinition();
        genericBeanDefinition.setBeanClass(ConsumerWrapper.class);
        MutablePropertyValues mutablePropertyValues = new MutablePropertyValues();
        genericBeanDefinition.setPropertyValues(mutablePropertyValues);
        mutablePropertyValues.add("eventConsumer", eventConsumer);
        this.applicationContext.registerBeanDefinition(str, genericBeanDefinition);
    }

    @ConditionalOnMissingBean
    @ConditionalOnBean({DataSource.class})
    @Bean({"eventPersistenter"})
    public DatabaseEventPersistenter eventPersistenter(@Autowired DataSource dataSource, @Autowired EventContentSerializer eventContentSerializer, @Autowired EventTypeRegistrar eventTypeRegistrar, @Value("${org.coderclan.whistle.table.producedEvent:sys_event_out}") String str) {
        return new DatabaseEventPersistenter(dataSource, eventContentSerializer, eventTypeRegistrar, str);
    }

    @ConditionalOnMissingBean
    @ConditionalOnBean({EventPersistenter.class})
    @Bean
    public FailedEventRetrier failedEventRetrier(@Autowired EventPersistenter eventPersistenter, @Autowired EventQueue eventQueue) {
        return new FailedEventRetrier(eventPersistenter, eventQueue);
    }

    @ConditionalOnMissingBean
    @Bean
    public EventService eventService() {
        return new EventServiceImpl();
    }

    @ConditionalOnMissingBean
    @Bean
    public TransactionalEventHandler transactionEventHandler(@Autowired EventQueue eventQueue) {
        return new TransactionalEventHandler(eventQueue);
    }

    @ConditionalOnMissingBean
    @Bean
    public EventTypeRegistrar eventTypeRegistrar(@Autowired(required = false) List<Collection<? extends EventType<?>>> list, @Autowired(required = false) List<EventConsumer<?>> list2) {
        return new EventTypeRegistrar(list, list2);
    }

    @ConditionalOnMissingBean
    @Bean
    public EventContentMessageConverter eventContentMessageConverter() {
        return new EventContentMessageConverter();
    }

    @Bean
    ServiceActivators cloudStreamConfig() {
        return new ServiceActivators();
    }

    @ConditionalOnMissingBean
    @Bean
    public EventQueue eventQueue(@Value("${org.coderclan.whistle.eventQueueSize:128}") int i) {
        log.info("Size of Event Queue: {}.", Integer.valueOf(i));
        return new EventQueueImpl(i);
    }

    @ConditionalOnMissingBean
    @Bean
    public EventContentSerializer eventContentSerializer(@Autowired ObjectMapper objectMapper) {
        return new JacksonEventContentSerializer(objectMapper);
    }

    @Bean({CLOUD_STREAM_SUPPLIER})
    public Supplier<Flux<Message<EventContent>>> cloudStreamSupplier(@Autowired EventQueue eventQueue) {
        return () -> {
            return Flux.fromStream(Stream.generate(() -> {
                try {
                    Event take = eventQueue.take();
                    return MessageBuilder.withPayload(take.getContent()).setHeader("spring.cloud.stream.sendto.destination", take.getType().getName()).setHeader(Constants.EVENT_PERSISTENT_ID_HEADER, take.getPersistentEventId()).build();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return null;
                }
            })).subscribeOn(Schedulers.boundedElastic()).share();
        };
    }
}
