package org.sdase.commons.spring.boot.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.sdase.commons.spring.boot.kafka.config.KafkaConsumerConfig;
import org.sdase.commons.spring.boot.kafka.config.SdaDltPatternValidator;
import org.sdase.commons.spring.boot.kafka.config.SdaKafkaListenerContainerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.CommonLoggingErrorHandler;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.ExponentialBackOffWithMaxRetries;
import org.springframework.kafka.support.converter.ByteArrayJsonMessageConverter;
import org.springframework.validation.beanvalidation.LocalValidatorFactoryBean;

@AutoConfiguration
@ConfigurationPropertiesScan
@PropertySource({"classpath:/org/sdase/commons/spring/boot/kafka/consumer.properties"})
/* loaded from: input_file:org/sdase/commons/spring/boot/kafka/SdaKafkaConsumerConfiguration.class */
public class SdaKafkaConsumerConfiguration implements KafkaListenerConfigurer {
    private static final Logger LOG = LoggerFactory.getLogger(SdaKafkaConsumerConfiguration.class);
    public static final String DLT_SUFFIX = ".DLT";
    public static final String DLT_REGEX = "<topic>";
    private final KafkaProperties kafkaProperties;
    private final KafkaTemplate<Object, Object> recoverTemplate;
    private final LocalValidatorFactoryBean validator;
    private final ObjectMapper objectMapper;
    private final KafkaConsumerConfig consumerConfig;

    public SdaKafkaConsumerConfiguration(KafkaProperties kafkaProperties, KafkaTemplate<Object, Object> kafkaTemplate, LocalValidatorFactoryBean localValidatorFactoryBean, ObjectMapper objectMapper, KafkaConsumerConfig kafkaConsumerConfig) {
        this.kafkaProperties = kafkaProperties;
        this.recoverTemplate = kafkaTemplate;
        this.validator = localValidatorFactoryBean;
        this.objectMapper = objectMapper;
        this.consumerConfig = kafkaConsumerConfig;
    }

    @Bean
    public static SdaDltPatternValidator configurationPropertiesValidator() {
        return new SdaDltPatternValidator();
    }

    @Bean({SdaKafkaListenerContainerFactory.RETRY_AND_LOG})
    public ConcurrentKafkaListenerContainerFactory<String, ?> retryAndLogKafkaListenerContainerFactory(@Qualifier("retryErrorHandler") CommonErrorHandler commonErrorHandler) {
        return createDefaultListenerContainerFactory(commonErrorHandler);
    }

    @Bean({SdaKafkaListenerContainerFactory.RETRY_AND_DLT})
    public ConcurrentKafkaListenerContainerFactory<String, ?> retryAndDltKafkaListenerContainerFactory(@Qualifier("retryDeadLetterErrorHandler") CommonErrorHandler commonErrorHandler) {
        return createDefaultListenerContainerFactory(commonErrorHandler);
    }

    @Bean({SdaKafkaListenerContainerFactory.LOG_ON_FAILURE})
    public ConcurrentKafkaListenerContainerFactory<String, ?> logOnFailureKafkaListenerContainerFactory(@Qualifier("loggingErrorHandler") CommonErrorHandler commonErrorHandler) {
        return createDefaultListenerContainerFactory(commonErrorHandler);
    }

    public void configureKafkaListeners(KafkaListenerEndpointRegistrar kafkaListenerEndpointRegistrar) {
        kafkaListenerEndpointRegistrar.setValidator(this.validator);
    }

    @Bean({"retryErrorHandler"})
    public DefaultErrorHandler retryErrorHandler() {
        DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(createDefaultRetryBackOff());
        defaultErrorHandler.addNotRetryableExceptions(new Class[]{NotRetryableKafkaException.class});
        return defaultErrorHandler;
    }

    @Bean({"loggingErrorHandler"})
    public CommonErrorHandler loggingErrorHandler() {
        return new CommonLoggingErrorHandler();
    }

    @Bean({"containerStoppingErrorHandler"})
    public CommonErrorHandler containerStoppingErrorHandler() {
        return new CommonContainerStoppingErrorHandler();
    }

    @Bean({"retryDeadLetterErrorHandler"})
    public DefaultErrorHandler retryDeadLetterErrorHandler() {
        DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(this.recoverTemplate, this::getDeadLetterTopicName), createDefaultRetryBackOff());
        defaultErrorHandler.addNotRetryableExceptions(new Class[]{NotRetryableKafkaException.class});
        return defaultErrorHandler;
    }

    protected TopicPartition getDeadLetterTopicName(ConsumerRecord<?, ?> consumerRecord, Exception exc) {
        if (this.consumerConfig.dlt() != null && this.consumerConfig.dlt().pattern() != null) {
            try {
                return new TopicPartition(this.consumerConfig.dlt().pattern().replace(DLT_REGEX, consumerRecord.topic()), consumerRecord.partition());
            } catch (Exception e) {
                LOG.error("Custom DLT pattern " + this.consumerConfig.dlt().pattern() + ", could not be used", e);
            }
        }
        return new TopicPartition(consumerRecord.topic() + ".DLT", consumerRecord.partition());
    }

    private ExponentialBackOffWithMaxRetries createDefaultRetryBackOff() {
        ExponentialBackOffWithMaxRetries exponentialBackOffWithMaxRetries = new ExponentialBackOffWithMaxRetries(this.consumerConfig.retry().maxRetries().intValue());
        exponentialBackOffWithMaxRetries.setInitialInterval(this.consumerConfig.retry().initialBackoffInterval().longValue());
        exponentialBackOffWithMaxRetries.setMultiplier(this.consumerConfig.retry().backoffMultiplier().longValue());
        exponentialBackOffWithMaxRetries.setMaxInterval(this.consumerConfig.retry().maxBackoffInterval().longValue());
        return exponentialBackOffWithMaxRetries;
    }

    private ConcurrentKafkaListenerContainerFactory<String, ?> createDefaultListenerContainerFactory(CommonErrorHandler commonErrorHandler) {
        ConcurrentKafkaListenerContainerFactory<String, ?> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        concurrentKafkaListenerContainerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory(this.kafkaProperties.buildConsumerProperties()));
        concurrentKafkaListenerContainerFactory.setMessageConverter(new ByteArrayJsonMessageConverter(this.objectMapper));
        concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
        concurrentKafkaListenerContainerFactory.setCommonErrorHandler(commonErrorHandler);
        concurrentKafkaListenerContainerFactory.setRecordInterceptor(new MetadataContextRecordInterceptor());
        return concurrentKafkaListenerContainerFactory;
    }
}
