package com.rivigo.expense.billing.event.producer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rivigo.compass.vendorcontractapi.dto.bp.BPContractDTO;
import com.rivigo.compass.vendorcontractapi.dto.rp.RPContractDTO;
import com.rivigo.expense.billing.constants.EventConstants;
import com.rivigo.expense.billing.dto.ChangeLogLiteDTO;
import com.rivigo.expense.billing.dto.internal.BaseEventPayload;
import com.rivigo.expense.billing.entity.mysql.KafkaFailedEvent;
import com.rivigo.expense.billing.pojo.partner.ContractFanOutEvent;
import com.rivigo.expense.billing.service.KafkaFailedEventService;
import com.rivigo.expense.billing.utils.Constants;
import com.rivigo.expense.billing.utils.DurationUtils;
import com.rivigo.vms.enums.ExpenseType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Component
/* loaded from: input_file:BOOT-INF/classes/com/rivigo/expense/billing/event/producer/KafkaEventProducer.class */
public class KafkaEventProducer {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaEventProducer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${expense.invoicing.event.topic}")
    private String invoicingEventTopic;

    @Value("${vcms.events.topic}")
    private String contractEventTopic;

    @Value("${expense.billing.internal.event.topic}")
    private String expenseBillingInternalEventTopic;

    @Autowired
    private ObjectMapper objectMapper;

    @Autowired
    private KafkaFailedEventService kafkaFailedEventService;

    /* loaded from: input_file:BOOT-INF/classes/com/rivigo/expense/billing/event/producer/KafkaEventProducer$KafkaResponseListener.class */
    public static class KafkaResponseListener implements ListenableFutureCallback<SendResult<String, String>> {
        private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaResponseListener.class);
        private ProducerRecord<String, String> record;
        private KafkaEventProducer eventProducer;

        KafkaResponseListener(ProducerRecord<String, String> producerRecord, KafkaEventProducer kafkaEventProducer) {
            this.record = producerRecord;
            this.eventProducer = kafkaEventProducer;
        }

        @Override // org.springframework.util.concurrent.FailureCallback
        public void onFailure(Throwable th) {
            log.info(" fail to send record {}", this.record);
            try {
                this.eventProducer.persistFailedEvent(this.record);
            } catch (Exception e) {
                log.error("couldn't persist failed kafka event. Event will be lost. key: {}, topic: {}, error: {}.", this.record.key(), this.record.topic(), ExceptionUtils.getFullStackTrace(e));
            }
        }

        @Override // org.springframework.util.concurrent.SuccessCallback
        public void onSuccess(SendResult<String, String> sendResult) {
            log.info(" successfully send record {}", this.record);
        }
    }

    @Async
    public void sendChangeLogEvent(ChangeLogLiteDTO changeLogLiteDTO) {
        try {
            preHandle();
            ProducerRecord<String, String> prepareRecord = prepareRecord(changeLogLiteDTO);
            log.info("publishing contract event to kafka, payload: {}", changeLogLiteDTO);
            sendEvent(prepareRecord);
        } catch (Exception e) {
            log.info("error while publishing contract event to kafka, payload: {}, e: {}", changeLogLiteDTO, ExceptionUtils.getFullStackTrace(e));
        }
    }

    private void preHandle() {
        sendFailedEvents(this.kafkaFailedEventService.getAllActiveEvents());
    }

    @Async
    public void fanOutContractActivation(BPContractDTO bPContractDTO) {
        preHandle();
        IntStream.rangeClosed(DurationUtils.getMonthLocalEpoch(bPContractDTO.getEffectiveDate()).intValue(), DurationUtils.getMonthLocalEpoch(bPContractDTO.getExpiryDate()).intValue()).forEach(i -> {
            try {
                ProducerRecord<String, String> prepareRecord = prepareRecord(ContractFanOutEvent.builder().contractCode(bPContractDTO.getCode()).expenseType(ExpenseType.BP).vendorCode(bPContractDTO.getContractProfileDTO().getVendorCode()).ouCode(bPContractDTO.getContractProfileDTO().getOuDetails().getOuCode()).monthId(i).build());
                log.info("publishing contract event to kafka, payload: {}", prepareRecord);
                sendEvent(prepareRecord);
            } catch (Exception e) {
                log.info("error while publishing contract event to kafka, payload: {},e: {}", bPContractDTO, ExceptionUtils.getFullStackTrace(e));
            }
        });
    }

    @Async
    public void fanOutContractActivation(RPContractDTO rPContractDTO) {
        preHandle();
        IntStream.rangeClosed(DurationUtils.getMonthLocalEpoch(rPContractDTO.getEffectiveDate()).intValue(), DurationUtils.getMonthLocalEpoch(rPContractDTO.getExpiryDate()).intValue()).forEach(i -> {
            try {
                ProducerRecord<String, String> prepareRecord = prepareRecord(ContractFanOutEvent.builder().contractCode(rPContractDTO.getCode()).expenseType(ExpenseType.RP).vendorCode(rPContractDTO.getContractProfileDTO().getVendorCode()).ouCode(rPContractDTO.getContractProfileDTO().getOuDetails().getOuCode()).monthId(i).build());
                log.info("publishing contract event to kafka, payload: {}", prepareRecord);
                sendEvent(prepareRecord);
            } catch (Exception e) {
                log.info("error while publishing contract event to kafka, payload: {},e: {}", rPContractDTO, ExceptionUtils.getFullStackTrace(e));
            }
        });
    }

    private ProducerRecord<String, String> prepareRecord(ChangeLogLiteDTO changeLogLiteDTO) throws Exception {
        RecordHeaders recordHeaders = new RecordHeaders();
        recordHeaders.add(new RecordHeader("payload_type", EventConstants.CHANGE_LOG_CREATED.getBytes()));
        return new ProducerRecord<>(this.invoicingEventTopic, (Integer) 0, changeLogLiteDTO.getBookCode(), this.objectMapper.writeValueAsString(changeLogLiteDTO), (Iterable<Header>) recordHeaders);
    }

    private ProducerRecord<String, String> prepareRecord(BaseEventPayload baseEventPayload) throws IOException {
        RecordHeaders recordHeaders = new RecordHeaders();
        recordHeaders.add(new RecordHeader("payload_type", baseEventPayload.getEventType().name().getBytes()));
        return new ProducerRecord<>(this.expenseBillingInternalEventTopic, (Integer) null, baseEventPayload.getEventKey(), this.objectMapper.writeValueAsString(baseEventPayload), recordHeaders);
    }

    private ProducerRecord<String, String> prepareRecord(ContractFanOutEvent contractFanOutEvent) throws Exception {
        RecordHeaders recordHeaders = new RecordHeaders();
        recordHeaders.add(new RecordHeader("payload_type", EventConstants.PARTNER_CONTRACT_FAN_OUT.getBytes()));
        recordHeaders.add(new RecordHeader(Constants.EXPENSE_TYPE_EVENT_HEADER_KEY, contractFanOutEvent.getExpenseType().name().getBytes()));
        return new ProducerRecord<>(this.contractEventTopic, (Integer) 0, contractFanOutEvent.getKey(), this.objectMapper.writeValueAsString(contractFanOutEvent), (Iterable<Header>) recordHeaders);
    }

    private void sendEvent(ProducerRecord<String, String> producerRecord) {
        this.kafkaTemplate.send(producerRecord).addCallback(new KafkaResponseListener(producerRecord, this));
    }

    private void sendFailedEvents(List<KafkaFailedEvent> list) {
        HashMap hashMap = new HashMap();
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        for (KafkaFailedEvent kafkaFailedEvent : list) {
            try {
                hashMap.put(kafkaFailedEvent.getId(), new ProducerRecord(kafkaFailedEvent.getTopic(), (Integer) 0, kafkaFailedEvent.getEventKey(), kafkaFailedEvent.getEventPayload(), (Iterable<Header>) deserializeHeader((Map) this.objectMapper.readValue(kafkaFailedEvent.getHeaders(), Map.class))));
            } catch (Exception e) {
                log.error("cannot create producer record for event id: {}", kafkaFailedEvent.getId());
            }
        }
        ArrayList arrayList = new ArrayList();
        hashMap.forEach((l, producerRecord) -> {
            sendEvent(producerRecord);
            arrayList.add(l);
        });
        this.kafkaFailedEventService.deleteAll(arrayList);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void persistFailedEvent(ProducerRecord producerRecord) {
        try {
            this.kafkaFailedEventService.saveKafkaFailedEvent(producerRecord.topic(), producerRecord.key().toString(), producerRecord.value().toString(), serializeHeader(producerRecord.headers()));
        } catch (Exception e) {
            log.error("couldn't persist failed kafka event. Event will be lost. key: {}, topic: {}, error: {}.", producerRecord.key(), producerRecord.topic(), ExceptionUtils.getFullStackTrace(e));
        }
    }

    private String serializeHeader(Headers headers) {
        try {
            HashMap hashMap = new HashMap();
            for (Header header : headers) {
                hashMap.put(header.key(), new String(header.value()));
            }
            return this.objectMapper.writeValueAsString(hashMap);
        } catch (Exception e) {
            log.error("cannot serialize headers.");
            return null;
        }
    }

    private Headers deserializeHeader(Map<String, String> map) {
        RecordHeaders recordHeaders = new RecordHeaders();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            recordHeaders.add(new RecordHeader(entry.getKey(), entry.getValue().getBytes()));
        }
        return recordHeaders;
    }

    public void sendInternalEvent(BaseEventPayload baseEventPayload) {
        try {
            ProducerRecord<String, String> prepareRecord = prepareRecord(baseEventPayload);
            log.info("publishing internal event to kafka, payload: {}", prepareRecord);
            sendEvent(prepareRecord);
        } catch (Exception e) {
            log.info("error while publishing internal event to kafka, payload: {},e: {}", baseEventPayload, ExceptionUtils.getFullStackTrace(e));
        }
    }
}
