package io.micrc.core.message.springboot;

import io.micrc.core.message.MessageConsumeRouterExecution;
import io.micrc.core.message.MessageRouteConfiguration;
import io.micrc.core.message.error.ErrorMessage;
import io.micrc.core.message.store.MessagePublisherSchedule;
import io.micrc.lib.JsonUtil;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import org.apache.camel.EndpointInject;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.direct.DirectComponent;
import org.apache.camel.spring.boot.CamelAutoConfiguration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.util.backoff.FixedBackOff;

@EnableKafka
@Configuration
@AutoConfigureAfter({CamelAutoConfiguration.class})
@EnableJpaRepositories(basePackages = {"io.micrc.core.message.store", "io.micrc.core.message.error"})
@EntityScan(basePackages = {"io.micrc.core.message.store", "io.micrc.core.message.error"})
@Import({MessageRouteConfiguration.class, MessagePublisherSchedule.class, MessageConsumeRouterExecution.class})
/* loaded from: input_file:io/micrc/core/message/springboot/MessageAutoConfiguration.class */
public class MessageAutoConfiguration implements BeanFactoryPostProcessor, EnvironmentAware, ApplicationContextAware {
    private static final Logger log = LoggerFactory.getLogger(MessageAutoConfiguration.class);
    private ApplicationContext applicationContext;
    private Environment environment;

    @EndpointInject
    private ProducerTemplate producerTemplate;

    public void setApplicationContext(@NotNull ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public void setEnvironment(@NotNull Environment environment) {
        this.environment = environment;
    }

    public void postProcessBeanFactory(@NotNull ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
        Arrays.stream(obtainProvider()).filter(str -> {
            return !str.isEmpty();
        }).forEach(str2 -> {
            String findBrokerDefine = findBrokerDefine(str2, "host");
            String findBrokerDefine2 = findBrokerDefine(str2, "port");
            HashMap hashMap = new HashMap();
            hashMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            hashMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            hashMap.put("bootstrap.servers", findBrokerDefine + ":" + findBrokerDefine2);
            DefaultKafkaConsumerFactory defaultKafkaConsumerFactory = new DefaultKafkaConsumerFactory(hashMap);
            ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory();
            concurrentKafkaListenerContainerFactory.setConsumerFactory(defaultKafkaConsumerFactory);
            configurableListableBeanFactory.registerSingleton("kafkaListenerContainerFactory-" + str2, concurrentKafkaListenerContainerFactory);
            HashMap hashMap2 = new HashMap();
            hashMap2.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            hashMap2.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            hashMap2.put("bootstrap.servers", findBrokerDefine + ":" + findBrokerDefine2);
            configurableListableBeanFactory.registerSingleton("kafkaTemplate-" + str2, new KafkaTemplate(new DefaultKafkaProducerFactory(hashMap2)));
        });
    }

    @NotNull
    private String findBrokerDefine(String str, String str2) {
        return (String) this.environment.getPropertySources().stream().map(propertySource -> {
            String name = propertySource.getName();
            Object source = propertySource.getSource();
            String str3 = null;
            if (name.contains(str + "_broker_" + str2)) {
                str3 = (String) JsonUtil.readPath(JsonUtil.writeValueAsString(source), "/" + str + "_broker_" + str2);
            }
            return str3;
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).findFirst().orElseThrow();
    }

    private String[] obtainProvider() {
        return ((String) this.environment.getSystemEnvironment().getOrDefault("BROKER_PROVIDERS", "")).split(",");
    }

    @Bean({"clean"})
    public DirectComponent clean() {
        return new DirectComponent();
    }

    @Bean({"publish"})
    public DirectComponent publish() {
        return new DirectComponent();
    }

    @Bean({"eventstore"})
    public DirectComponent eventStore() {
        return new DirectComponent();
    }

    @Bean({"subscribe"})
    public DirectComponent subscribe() {
        return new DirectComponent();
    }

    @Bean
    @Primary
    public DefaultErrorHandler deadLetterPublishingRecoverer() {
        return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(Arrays.asList(((String) Optional.ofNullable(this.environment.getProperty("application.profiles")).orElse("")).split(",")).contains("default") ? (KafkaTemplate) this.applicationContext.getBean("kafkaTemplate", KafkaTemplate.class) : (KafkaTemplate) this.applicationContext.getBean("kafkaTemplate-public", KafkaTemplate.class), (consumerRecord, exc) -> {
            return new TopicPartition("deadLetter", consumerRecord.partition());
        }), new FixedBackOff(0L, 1L));
    }

    @KafkaListener(topics = {"deadLetter"}, autoStartup = "true", concurrency = "3")
    public void deadLetter(ConsumerRecord<?, ?> consumerRecord, Acknowledgment acknowledgment) {
        try {
            HashMap hashMap = new HashMap();
            for (Header header : consumerRecord.headers()) {
                hashMap.put(header.key(), new String(header.value()));
            }
            if (((String) hashMap.get("senderHost")).equals(this.environment.getProperty("micrc.x-host"))) {
                ErrorMessage errorMessage = new ErrorMessage();
                errorMessage.setMessageId(Long.valueOf((String) hashMap.get("messageId")));
                errorMessage.setSender((String) hashMap.get("sender"));
                errorMessage.setTopic((String) hashMap.get("kafka_dlt-original-topic"));
                errorMessage.setEvent((String) hashMap.get("event"));
                errorMessage.setMappingMap((String) hashMap.get("mappingMap"));
                errorMessage.setContent(consumerRecord.value().toString());
                errorMessage.setGroupId((String) hashMap.get("kafka_dlt-original-consumer-group"));
                errorMessage.setErrorCount(1);
                errorMessage.setErrorStatus("STOP");
                errorMessage.setErrorMessage((String) hashMap.get("kafka_dlt-exception-message"));
                this.producerTemplate.requestBody("subscribe://dead-message", errorMessage);
                log.info("死信保存: " + errorMessage.getMessageId());
            }
            acknowledgment.acknowledge();
        } catch (Exception e) {
            acknowledgment.nack(Duration.ofMillis(0L));
        }
    }
}
