package com.rivigo.expense.billing.event.listenercontainer;

import com.rivigo.expense.billing.event.consumer.ReplayTopicConsumer;
import com.rivigo.expense.billing.service.ReplayContainerLogService;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.PostConstruct;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.stereotype.Component;

@Component(ReplayTopicListenerContainer.replayListenerContainerBeanName)
/* loaded from: input_file:BOOT-INF/classes/com/rivigo/expense/billing/event/listenercontainer/ReplayTopicListenerContainer.class */
public class ReplayTopicListenerContainer implements ListenerContainer {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ReplayTopicListenerContainer.class);
    public static final String replayListenerContainerBeanName = "replayListener";
    private static KafkaMessageListenerContainer<Integer, String> container;

    @Autowired
    @Qualifier("replayConsumer")
    private MessageListener replayConsumer;

    @Autowired
    private ReplayContainerLogService replayContainerLogService;

    @Value("${consumer.concurrency.level}")
    private int consumerConcurrencyLevel;

    @Value("${consumer.poll.timeout.millis}")
    private int consumerPollTimeoutMillis;

    @Value("${bootstrap.servers}")
    private String bootstrapServersConfig;

    @Value("${enable.auto.commit.config}")
    private Boolean enableAutoCommitConfig;

    @Value("${auto.commit.interval.ms.config}")
    private int autoCommitIntervalMillis;

    @Value("${session.timeout.ms.config}")
    private int sessionTimeoutMillis;

    @Value("${group.id.config}")
    private String consumerGroupId;

    @Value("${auto.offset.reset.config}")
    private String autoOffsetReset;

    @Value("${max.poll.interval.ms.config}")
    private int maxPollInterval;

    @Value("${replay.max.poll.records.config}")
    private int maxPollRecords;

    @Value("${replay.idle.event.interval}")
    private long idleEventInterval;

    @Value("${replay.topic}")
    private String replayTopic;

    @PostConstruct
    private synchronized void init() {
        ContainerProperties containerProperties = new ContainerProperties(this.replayTopic);
        containerProperties.setIdleEventInterval(Long.valueOf(this.idleEventInterval));
        containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
        containerProperties.setMessageListener(this.replayConsumer);
        containerProperties.setGroupId(this.consumerGroupId);
        if (container == null) {
            container = createContainer(containerProperties);
            container.setBeanName(replayListenerContainerBeanName);
            container.setAutoStartup(false);
        }
    }

    private KafkaMessageListenerContainer<Integer, String> createContainer(ContainerProperties containerProperties) {
        return new KafkaMessageListenerContainer<>(new DefaultKafkaConsumerFactory(consumerProps()), containerProperties);
    }

    private Map<String, Object> consumerProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.bootstrapServersConfig);
        hashMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.autoOffsetReset);
        hashMap.put(ConsumerConfig.GROUP_ID_CONFIG, this.consumerGroupId);
        hashMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.enableAutoCommitConfig);
        hashMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, Integer.valueOf(this.autoCommitIntervalMillis));
        hashMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.valueOf(this.sessionTimeoutMillis));
        hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        hashMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        hashMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.valueOf(this.maxPollInterval));
        hashMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.valueOf(this.maxPollRecords));
        return hashMap;
    }

    @Override // com.rivigo.expense.billing.event.listenercontainer.ListenerContainer
    public synchronized void stop() {
        if (!isRunning()) {
            log.info("[EXPENSE_BILLING_REPLAY] - Replay container already stopped! Skipping....");
            return;
        }
        log.info("[EXPENSE_BILLING_REPLAY] - Replay container stopping....");
        ((ReplayTopicConsumer) this.replayConsumer).setRunningTimeWindowEndTimestamp(Long.MIN_VALUE);
        container.stop();
        log.info("[EXPENSE_BILLING_REPLAY] - Replay container successfully stopped.");
        this.replayContainerLogService.markSuccessfulCompletion();
    }

    @Override // com.rivigo.expense.billing.event.listenercontainer.ListenerContainer
    public void restart() {
        stop();
        start();
    }

    @Override // com.rivigo.expense.billing.event.listenercontainer.ListenerContainer
    public synchronized void start() {
        try {
            if (isRunning()) {
                log.info("[EXPENSE_BILLING_REPLAY] - Replay container is already running! Skipping....");
            } else {
                log.info("[EXPENSE_BILLING_REPLAY] - Replay container starting...");
                ((ReplayTopicConsumer) this.replayConsumer).setRunningTimeWindowEndTimestamp(Long.valueOf(DateTime.now().getMillis()));
                container.start();
                log.info("[EXPENSE_BILLING_REPLAY] - Replay container successfully started.");
                this.replayContainerLogService.markRunning();
            }
        } catch (Exception e) {
            log.error("[EXPENSE_BILLING_REPLAY] - Error occurred while starting replay container. Error - {}", ExceptionUtils.getFullStackTrace(e));
        }
    }

    @Override // com.rivigo.expense.billing.event.listenercontainer.ListenerContainer
    public void registerEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        container.setApplicationEventPublisher(applicationEventPublisher);
    }

    @Override // com.rivigo.expense.billing.event.listenercontainer.ListenerContainer
    public boolean isRunning() {
        return container.isRunning();
    }
}
