package com.rivigo.expense.billing.event.consumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rivigo.expense.billing.entity.mongo.ReplayLogKeyMetadata;
import com.rivigo.expense.billing.event.consumer.handler.ConsumerHandler;
import com.rivigo.expense.billing.exceptions.ConsumerRecordMalformedException;
import com.rivigo.expense.billing.exceptions.ExpenseBillingException;
import com.rivigo.expense.billing.repository.mongo.ReplayLogKeyMetadataRepository;
import com.rivigo.expense.billing.utils.Constants;
import com.rivigo.expense.billing.zoom.model.ZoomBaseMessage;
import com.rivigo.vms.enums.ExpenseType;
import com.rivigo.zoom.billing.dto.GlobalBaseMessage;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component("baseConsumer")
/* loaded from: input_file:BOOT-INF/classes/com/rivigo/expense/billing/event/consumer/BaseConsumer.class */
public class BaseConsumer implements HandlerRegistry {
    private static final String failedBaseDeserializationKey = "BASE_FAILURE";

    @Autowired
    private ReplayLogKeyMetadataRepository replayLogKeyMetadataRepository;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private ObjectMapper objectMapper;

    @Value("${replay.topic}")
    private String replayTopic;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) BaseConsumer.class);
    private static Map<String, ConsumerHandler> payloadToHandler = new HashMap(0);

    @Override // com.rivigo.expense.billing.event.consumer.HandlerRegistry
    public void register(ConsumerHandler consumerHandler, String str) {
        payloadToHandler.put(str, consumerHandler);
    }

    @KafkaListener(topics = {"${vcms.events.topic}", "${expense.book.topic}", "${expense.billing.internal.event.topic}", "${prime.to.compass.fuel.event.topic}"}, groupId = "${group.id.config}", containerFactory = "concurrentKafkaListenerContainerFactory")
    public void onContractEvent(ConsumerRecord<String, String> consumerRecord) {
        log.info("Received message - {} in partition {} from topic {}. Thread id {}", consumerRecord.value(), Integer.valueOf(consumerRecord.partition()), consumerRecord.topic(), Long.valueOf(Thread.currentThread().getId()));
        onMessage(consumerRecord);
    }

    @KafkaListener(topics = {"${expense.billing.consignment.topic}"}, groupId = "${group.id.config}", containerFactory = "concurrentKafkaListenerContainerFactory")
    public void onCommunicationEvent(ConsumerRecord<String, String> consumerRecord) {
        log.info("Received message - {} in partition {} from topic {}. Thread id {}", consumerRecord.value(), Integer.valueOf(consumerRecord.partition()), consumerRecord.topic(), Long.valueOf(Thread.currentThread().getId()));
        if (consumerRecord.headers().headers("payload_type").iterator().hasNext()) {
            onMessage(consumerRecord);
            return;
        }
        GlobalBaseMessage compassBaseMessage = getCompassBaseMessage(consumerRecord);
        if (compassBaseMessage.getEntityId() == null || compassBaseMessage.getEventName() == null) {
            registerBaseMessageDeserializationFailure(consumerRecord, "No entityId or event name in payload");
            throw new ExpenseBillingException("No entityId or event name in payload");
        }
        consumerRecord.headers().add(new RecordHeader("payload_type", compassBaseMessage.getEventName().getBytes()));
        onMessage(new ConsumerRecord<>(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp(), consumerRecord.timestampType(), Long.valueOf(consumerRecord.checksum()), consumerRecord.serializedKeySize(), consumerRecord.serializedValueSize(), compassBaseMessage.getEntityId().toString(), consumerRecord.value(), consumerRecord.headers()));
    }

    @KafkaListener(topics = {"${zoom.payout.topic}"}, groupId = "${group.id.config}", containerFactory = "concurrentKafkaListenerContainerFactory")
    public void onZoomPayoutEvent(ConsumerRecord<String, String> consumerRecord) {
        log.info("Received message - {} in partition {} from topic {}. Thread id {}", consumerRecord.value(), Integer.valueOf(consumerRecord.partition()), consumerRecord.topic(), Long.valueOf(Thread.currentThread().getId()));
        if (consumerRecord.headers().headers("payload_type").iterator().hasNext()) {
            onMessage(consumerRecord);
            return;
        }
        ZoomBaseMessage zoomBaseMessage = getZoomBaseMessage(consumerRecord);
        if (zoomBaseMessage.getEntityId() == null || zoomBaseMessage.getEventName() == null || zoomBaseMessage.getMetadata() == null || zoomBaseMessage.getMetadata().get(ZoomBaseMessage.RLH_FEEDER_DATA_REQUEST_DTO_KEY) == null) {
            registerBaseMessageDeserializationFailure(consumerRecord, "No entityId or event name in payload");
            throw new ExpenseBillingException("No entityId or event name in payload");
        }
        String str = zoomBaseMessage.getMetadata().get(ZoomBaseMessage.RLH_FEEDER_DATA_REQUEST_DTO_KEY);
        consumerRecord.headers().add(new RecordHeader("payload_type", zoomBaseMessage.getEventName().name().getBytes()));
        onMessage(new ConsumerRecord<>(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp(), consumerRecord.timestampType(), Long.valueOf(consumerRecord.checksum()), consumerRecord.serializedKeySize(), consumerRecord.serializedValueSize(), zoomBaseMessage.getEntityId().toString(), str, consumerRecord.headers()));
    }

    private void registerBaseMessageDeserializationFailure(ConsumerRecord<String, String> consumerRecord, String str) {
        log.warn("[FATAL] Failed to validate zoom payload to ZoomBaseMessage {}.Error - {}", consumerRecord.value(), str);
        this.replayLogKeyMetadataRepository.save(new ReplayLogKeyMetadata("BASE_FAILURE-" + System.currentTimeMillis(), consumerRecord.topic(), str));
        log.info("Saved log replay for payload {}", consumerRecord.value());
    }

    private GlobalBaseMessage getCompassBaseMessage(ConsumerRecord<String, String> consumerRecord) {
        try {
            return (GlobalBaseMessage) this.objectMapper.readValue(consumerRecord.value(), GlobalBaseMessage.class);
        } catch (Exception e) {
            log.warn("Exception occurred in parsing payload as GlobalBaseMessage. Error - {}", ExceptionUtils.getFullStackTrace(e));
            throw new ExpenseBillingException("Could not parse payload as GlobalBaseMessage");
        }
    }

    private ZoomBaseMessage getZoomBaseMessage(ConsumerRecord<String, String> consumerRecord) {
        try {
            return (ZoomBaseMessage) this.objectMapper.readValue(consumerRecord.value(), ZoomBaseMessage.class);
        } catch (Exception e) {
            log.warn("Exception occurred in parsing payload as ZoomBaseMessage. Error - {}", ExceptionUtils.getFullStackTrace(e));
            throw new ExpenseBillingException("Could not parse payload as ZoomBaseMessage");
        }
    }

    public void onMessage(ConsumerRecord<String, String> consumerRecord) {
        String str = new String(((RecordHeaders) consumerRecord.headers()).headers("payload_type").iterator().next().value());
        ReplayLogKeyMetadata findFirstByKeyAndGroupKey = this.replayLogKeyMetadataRepository.findFirstByKeyAndGroupKey(consumerRecord.key(), str);
        if (findFirstByKeyAndGroupKey == null) {
            processMessage(consumerRecord, str);
            return;
        }
        Iterator<Header> it = ((RecordHeaders) consumerRecord.headers()).headers(Constants.REPLAY_TIMESTAMP_HEADER_KEY).iterator();
        if (it.hasNext()) {
            processReplayedMessage(consumerRecord, str, findFirstByKeyAndGroupKey, (RecordHeader) it.next());
        } else {
            publishToReplay(consumerRecord);
        }
    }

    public void processMessage(ConsumerRecord<String, String> consumerRecord, String str) {
        try {
            doProcessMessage(consumerRecord, str);
        } catch (ConsumerRecordMalformedException e) {
            log.error("Process Message Exception e -> {}", ExceptionUtils.getFullStackTrace(e));
            log.error("Exception occurred in processing payload {}.", consumerRecord.value());
            handleMalformedEvent(consumerRecord, e.getMessage());
        } catch (Exception e2) {
            log.error("Process Message Exception e -> {}", ExceptionUtils.getFullStackTrace(e2));
            log.error("Exception occurred in processing payload {}.", consumerRecord.value());
            handleError(consumerRecord, str, e2.getMessage());
        }
    }

    private void handleMalformedEvent(ConsumerRecord<String, String> consumerRecord, String str) {
        this.replayLogKeyMetadataRepository.save(new ReplayLogKeyMetadata("BASE_FAILURE-" + System.currentTimeMillis(), consumerRecord.topic(), str));
    }

    public void doProcessMessage(ConsumerRecord<String, String> consumerRecord, String str) throws ConsumerRecordMalformedException {
        log.info("Processing message....");
        ConsumerHandler consumerHandler = payloadToHandler.get(str);
        if (consumerHandler == null) {
            log.info("Skipping unknown payload. No handler found for {}", str);
        } else if (!consumerRecord.headers().headers(Constants.EXPENSE_TYPE_EVENT_HEADER_KEY).iterator().hasNext()) {
            consumerHandler.handle(consumerRecord.value(), null);
        } else {
            consumerHandler.handle(consumerRecord.value(), ExpenseType.valueOf(new String(consumerRecord.headers().headers(Constants.EXPENSE_TYPE_EVENT_HEADER_KEY).iterator().next().value())));
        }
    }

    public void processReplayedMessage(ConsumerRecord<String, String> consumerRecord, String str, ReplayLogKeyMetadata replayLogKeyMetadata, RecordHeader recordHeader) {
        if (replayLogKeyMetadata.getSkipRestAll().booleanValue()) {
            log.info("Skipping message as per skip rest flag....");
            return;
        }
        try {
            log.info("Processing replayed message. Key - {}", consumerRecord.key());
            doProcessMessage(consumerRecord, str);
            replayLogKeyMetadata.setTimestampOfMRS(Long.valueOf(new String(recordHeader.value())));
            replayLogKeyMetadata.setStackTrace(null);
            this.replayLogKeyMetadataRepository.save(replayLogKeyMetadata);
        } catch (Exception e) {
            log.error("Replay Process Message Exception e -> {}", ExceptionUtils.getFullStackTrace(e));
            log.error("Exception occurred in processing replay payload {}.", consumerRecord.value());
            replayLogKeyMetadata.setStackTrace(e.getMessage());
            replayLogKeyMetadata.setSkipRestAll(true);
            this.replayLogKeyMetadataRepository.save(replayLogKeyMetadata);
        }
    }

    public void handleError(ConsumerRecord<String, String> consumerRecord, String str, String str2) {
        ReplayLogKeyMetadata findFirstByKeyAndGroupKey = this.replayLogKeyMetadataRepository.findFirstByKeyAndGroupKey(consumerRecord.key(), str);
        if (findFirstByKeyAndGroupKey == null) {
            findFirstByKeyAndGroupKey = new ReplayLogKeyMetadata(consumerRecord.key(), str, str2);
        } else {
            findFirstByKeyAndGroupKey.setStackTrace(str2);
        }
        this.replayLogKeyMetadataRepository.save(findFirstByKeyAndGroupKey);
        publishToReplay(consumerRecord);
        log.info("Successfully published to replay. Log {}", consumerRecord.key());
    }

    private void publishToReplay(ConsumerRecord<String, String> consumerRecord) {
        log.info("Publishing message to replay topic.... {}", consumerRecord.key());
        RecordHeaders recordHeaders = (RecordHeaders) consumerRecord.headers();
        recordHeaders.add(new RecordHeader(Constants.ORIGINAL_TOPIC_NAME, consumerRecord.topic().getBytes()));
        this.kafkaTemplate.send(new ProducerRecord<>(this.replayTopic, (Integer) null, consumerRecord.key(), consumerRecord.value(), recordHeaders));
        log.info("Successfully sent to replay topic. {}", consumerRecord.key());
    }
}
