package io.micrc.core.message.springboot;

import io.micrc.core.message.MessageConsumeRouterExecution;
import io.micrc.core.message.MessageRouteConfiguration;
import io.micrc.core.message.error.ErrorMessage;
import io.micrc.core.message.store.MessagePublisherSchedule;
import java.time.Duration;
import java.util.HashMap;
import org.apache.camel.EndpointInject;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.direct.DirectComponent;
import org.apache.camel.spring.boot.CamelAutoConfiguration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.util.backoff.FixedBackOff;

@EnableKafka
@Configuration
@AutoConfigureAfter({CamelAutoConfiguration.class})
@EnableJpaRepositories(basePackages = {"io.micrc.core.message.store", "io.micrc.core.message.error"})
@EntityScan(basePackages = {"io.micrc.core.message.store", "io.micrc.core.message.error"})
@Import({MessageRouteConfiguration.class, MessagePublisherSchedule.class, MessageConsumeRouterExecution.class})
/* loaded from: input_file:io/micrc/core/message/springboot/MessageAutoConfiguration.class */
public class MessageAutoConfiguration {
    private static final Logger log = LoggerFactory.getLogger(MessageAutoConfiguration.class);

    @Autowired
    Environment environment;

    @EndpointInject
    private ProducerTemplate producerTemplate;

    @Bean({"clean"})
    public DirectComponent clean() {
        return new DirectComponent();
    }

    @Bean({"publish"})
    public DirectComponent publish() {
        return new DirectComponent();
    }

    @Bean({"eventstore"})
    public DirectComponent eventStore() {
        return new DirectComponent();
    }

    @Bean({"subscribe"})
    public DirectComponent subscribe() {
        return new DirectComponent();
    }

    @Bean
    @Primary
    public DefaultErrorHandler deadLetterPublishingRecoverer(KafkaTemplate<?, ?> kafkaTemplate) {
        return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate, (consumerRecord, exc) -> {
            return new TopicPartition("deadLetter", consumerRecord.partition());
        }), new FixedBackOff(0L, 1L));
    }

    @KafkaListener(topics = {"deadLetter"}, autoStartup = "true", concurrency = "3")
    public void deadLetter(ConsumerRecord<?, ?> consumerRecord, Acknowledgment acknowledgment) {
        try {
            HashMap hashMap = new HashMap();
            for (Header header : consumerRecord.headers()) {
                hashMap.put(header.key(), new String(header.value()));
            }
            if (((String) hashMap.get("context")).equals(this.environment.getProperty("spring.application.name"))) {
                ErrorMessage errorMessage = new ErrorMessage();
                errorMessage.setMessageId(Long.valueOf((String) hashMap.get("messageId")));
                errorMessage.setSender((String) hashMap.get("sender"));
                errorMessage.setTopic((String) hashMap.get("kafka_dlt-original-topic"));
                errorMessage.setEvent((String) hashMap.get("event"));
                errorMessage.setMappingMap((String) hashMap.get("mappingMap"));
                errorMessage.setContent(consumerRecord.value().toString());
                errorMessage.setGroupId((String) hashMap.get("kafka_dlt-original-consumer-group"));
                errorMessage.setErrorCount(1);
                errorMessage.setErrorStatus("STOP");
                errorMessage.setErrorMessage((String) hashMap.get("kafka_dlt-exception-message"));
                this.producerTemplate.requestBody("subscribe://dead-message", errorMessage);
                log.info("死信保存: " + errorMessage.getMessageId());
            }
            acknowledgment.acknowledge();
        } catch (Exception e) {
            acknowledgment.nack(Duration.ofMillis(0L));
        }
    }
}
