package com.rivigo.expense.billing.event.consumer;

import com.rivigo.expense.billing.annotation.Lock;
import com.rivigo.expense.billing.annotation.Locks;
import com.rivigo.expense.billing.entity.mongo.ReplayLogKeyMetadata;
import com.rivigo.expense.billing.enums.LockNamespace;
import com.rivigo.expense.billing.event.listenercontainer.ListenerContainer;
import com.rivigo.expense.billing.event.listenercontainer.ReplayTopicListenerContainer;
import com.rivigo.expense.billing.repository.mongo.ReplayLogKeyMetadataRepository;
import com.rivigo.expense.billing.utils.Constants;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.joda.time.DateTime;
import org.joda.time.Minutes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component("replayConsumer")
/* loaded from: input_file:BOOT-INF/classes/com/rivigo/expense/billing/event/consumer/ReplayTopicConsumer.class */
public class ReplayTopicConsumer implements MessageListener<String, String>, ConsumerSeekAware {
    private Long runningTimeWindowEndTimestamp;

    @Autowired
    private ReplayLogKeyMetadataRepository replayLogKeyMetadataRepository;

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Value("${replay.topic}")
    private String replayTopic;

    @Autowired
    @Qualifier(ReplayTopicListenerContainer.replayListenerContainerBeanName)
    private ListenerContainer listenerContainer;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ReplayTopicConsumer.class);
    public static AtomicBoolean seekToBeginning = new AtomicBoolean(Boolean.FALSE.booleanValue());
    private static DateTime replayEventConsumedLatestAt = new DateTime(0);

    @Scheduled(fixedRate = Metadata.TOPIC_EXPIRY_MS)
    @Locks({@Lock(ns = LockNamespace.SCHEDULE_TASK, timeoutInSeconds = 0, key = Constants.LOCK_KEY_SOURCE)})
    public void stopReplayContainer() {
        synchronized (this) {
            log.info("[EXPENSE_BILLING_REPLAY] comparing now {}, to last consumed time {}", Long.valueOf(DateTime.now().getMillis()), Long.valueOf(replayEventConsumedLatestAt.getMillis()));
            if (Minutes.minutesBetween(replayEventConsumedLatestAt, DateTime.now()).getMinutes() > 1) {
                this.listenerContainer.stop();
            }
        }
    }

    @Override // org.springframework.kafka.listener.GenericMessageListener
    public void onMessage(ConsumerRecord<String, String> consumerRecord) {
        log.info("Consuming replay message - {} with timestamp {}, offset {}", consumerRecord.value(), Long.valueOf(consumerRecord.timestamp()), Long.valueOf(consumerRecord.offset()));
        replayEventConsumedLatestAt = DateTime.now();
        String str = null;
        if (consumerRecord.timestamp() >= this.runningTimeWindowEndTimestamp.longValue()) {
            log.info("Received out of window event. Skipping for now. This one will be processed in next cycle. dataT {}, windowT {}", Long.valueOf(consumerRecord.timestamp()), this.runningTimeWindowEndTimestamp);
            return;
        }
        if (!consumerRecord.headers().headers(Constants.ORIGINAL_TOPIC_NAME).iterator().hasNext()) {
            log.warn("[FATAL] No headers found in payload. Corrupted payload!");
            return;
        }
        String str2 = new String(consumerRecord.headers().headers(Constants.ORIGINAL_TOPIC_NAME).iterator().next().value());
        if (consumerRecord.headers().headers("payload_type").iterator().hasNext()) {
            str = new String(consumerRecord.headers().headers("payload_type").iterator().next().value());
        }
        if (StringUtils.isEmpty(str)) {
            str = str2;
        }
        if (StringUtils.isEmpty(str)) {
            log.warn("[FATAL] Invalid replay group key for topic {}. Cannot replay the payload {}", str, consumerRecord.value());
            return;
        }
        ReplayLogKeyMetadata findFirstByKeyAndGroupKey = this.replayLogKeyMetadataRepository.findFirstByKeyAndGroupKey(consumerRecord.key(), str);
        if (findFirstByKeyAndGroupKey == null) {
            log.warn("[WARN] Skipping replay of payload with key {} as no replay metadata present..", consumerRecord.key());
        } else if (consumerRecord.timestamp() <= findFirstByKeyAndGroupKey.getTimestampOfMRS().longValue()) {
            log.info("Skipping replay of payload with key - {} as it is already successfully consumed", consumerRecord.key());
        } else {
            replayMessage(consumerRecord, str2);
        }
    }

    private void replayMessage(ConsumerRecord<String, String> consumerRecord, String str) {
        log.info("Republishing to original topic {}", str);
        RecordHeaders recordHeaders = (RecordHeaders) consumerRecord.headers();
        recordHeaders.remove(Constants.REPLAY_TIMESTAMP_HEADER_KEY);
        recordHeaders.add(new RecordHeader(Constants.REPLAY_TIMESTAMP_HEADER_KEY, String.valueOf(consumerRecord.timestamp()).getBytes()));
        this.kafkaTemplate.send(new ProducerRecord(str, (Integer) null, consumerRecord.key(), consumerRecord.value(), recordHeaders));
        log.info("Republished payload for key - {}. Payload - {} to topic - {}", consumerRecord.key(), consumerRecord.value(), str);
    }

    @Override // org.springframework.kafka.listener.ConsumerSeekAware
    public void registerSeekCallback(ConsumerSeekAware.ConsumerSeekCallback consumerSeekCallback) {
    }

    @Override // org.springframework.kafka.listener.ConsumerSeekAware
    public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekAware.ConsumerSeekCallback consumerSeekCallback) {
        if (!seekToBeginning.get()) {
            log.info("[EXPENSE_BILLING_REPLAY] - Consumer group rebalanced midway or replay consumer was not successfully completed in last run. Will continue consuming from last offset for 0th partition");
            return;
        }
        log.info("[EXPENSE_BILLING_REPLAY] - Replay consumer was successfully completed in last run. Will restart from 0 offset...");
        consumerSeekCallback.seekToBeginning(this.replayTopic, 0);
        seekToBeginning.set(Boolean.FALSE.booleanValue());
    }

    @Override // org.springframework.kafka.listener.ConsumerSeekAware
    public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekAware.ConsumerSeekCallback consumerSeekCallback) {
    }

    public void setRunningTimeWindowEndTimestamp(Long l) {
        this.runningTimeWindowEndTimestamp = l;
    }
}
