package com.cs.software.engine.kafka;

import com.cs.software.api.MessageIntf;
import com.cs.software.api.ServicesIntf;
import com.cs.software.engine.KafkaMessage;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/* loaded from: input_file:com/cs/software/engine/kafka/KafkaConsumerBase.class */
public class KafkaConsumerBase {
    private static final int DEF_ERROR_CODE = -9119;
    public static String KAFKA_BROKERS = "localhost:9092";
    public static Integer MESSAGE_COUNT = 1000;
    public static String CLIENT_ID = "client1";
    public static Integer MAX_NO_MESSAGE_FOUND_COUNT = 100;
    public static String OFFSET_RESET_LATEST = "latest";
    public static String OFFSET_RESET_EARLIER = "earliest";
    public static Integer MAX_POLL_RECORDS = 1;
    private KafkaConsumer<String, String> kafkaConsumer;
    private ServicesIntf service;
    private String topicName;
    private String groupId;

    public void init(String str, String str2) {
        this.groupId = str;
        this.topicName = str2;
    }

    public String getGroupId() {
        return this.groupId;
    }

    public String getTopicName() {
        return this.topicName;
    }

    public void setServiceIntf(ServicesIntf servicesIntf) {
        this.service = servicesIntf;
    }

    public MessageIntf runConsumerGetMsg(long j) throws Exception {
        KafkaMessage kafkaMessage = null;
        if (this.kafkaConsumer == null) {
            this.kafkaConsumer = new KafkaConsumer<>(KafkaUtil.getConsumerProperties(this.groupId));
            this.kafkaConsumer.subscribe(Collections.singletonList(this.topicName));
        }
        Iterator it = this.kafkaConsumer.poll(Duration.ofSeconds(j)).iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            kafkaMessage = new KafkaMessage();
            kafkaMessage.setParam("__Record Key", consumerRecord.key());
            kafkaMessage.setParam("__Payload", consumerRecord.value());
            kafkaMessage.setParam(KafkaMessage.KAFKA_RECORD_PARTITION, Integer.valueOf(consumerRecord.partition()));
            kafkaMessage.setParam(KafkaMessage.KAFKA_RECORD_OFFSET, Long.valueOf(consumerRecord.offset()));
            kafkaMessage.setParam(KafkaMessage.KAFKA_RECORD_TOPIC, consumerRecord.topic());
            kafkaMessage.setParam(KafkaMessage.KAFKA_RECORD_HEADERS, consumerRecord.headers());
            kafkaMessage.setParam(KafkaMessage.KAFKA_RECORD_TIMESTAMP, new Long(consumerRecord.timestamp()));
        }
        doCommitSync();
        return kafkaMessage;
    }

    public void runConsumer() throws Exception {
        this.kafkaConsumer = new KafkaConsumer<>(KafkaUtil.getConsumerProperties(this.groupId));
        this.kafkaConsumer.subscribe(Collections.singletonList(this.topicName));
        while (true) {
            try {
                try {
                    Iterator it = this.kafkaConsumer.poll(Duration.ofSeconds(100L)).iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        KafkaMessage kafkaMessage = new KafkaMessage();
                        kafkaMessage.setParam("__Record Key", consumerRecord.key());
                        kafkaMessage.setParam("__Payload", consumerRecord.value());
                        kafkaMessage.setParam(KafkaMessage.KAFKA_RECORD_PARTITION, Integer.valueOf(consumerRecord.partition()));
                        kafkaMessage.setParam(KafkaMessage.KAFKA_RECORD_OFFSET, Long.valueOf(consumerRecord.offset()));
                        kafkaMessage.setParam(KafkaMessage.KAFKA_RECORD_TOPIC, consumerRecord.topic());
                        kafkaMessage.setParam(KafkaMessage.KAFKA_RECORD_HEADERS, consumerRecord.headers());
                        kafkaMessage.setParam(KafkaMessage.KAFKA_RECORD_TIMESTAMP, new Long(consumerRecord.timestamp()));
                        processMessage(kafkaMessage);
                    }
                    doCommitSync();
                } catch (Exception e) {
                    System.out.println("Exception caught " + e.getMessage());
                    this.kafkaConsumer.close();
                    System.out.println("After closing KafkaConsumer");
                    return;
                }
            } catch (Throwable th) {
                this.kafkaConsumer.close();
                System.out.println("After closing KafkaConsumer");
                throw th;
            }
        }
    }

    public void processMessage(KafkaMessage kafkaMessage) {
        try {
            if (this.service == null) {
                System.out.println(String.valueOf("Kafka Consumer") + " - Record Key: " + kafkaMessage.getParam("__Record Key"));
                System.out.println(String.valueOf("Kafka Consumer") + " - Record value: " + kafkaMessage.getParam("__Payload"));
                System.out.println(String.valueOf("Kafka Consumer") + " - Record partition: " + kafkaMessage.getParam(KafkaMessage.KAFKA_RECORD_PARTITION));
                System.out.println(String.valueOf("Kafka Consumer") + " - Record offset: " + kafkaMessage.getParam(KafkaMessage.KAFKA_RECORD_OFFSET));
                System.out.println(String.valueOf("Kafka Consumer") + " - Record Topic: " + kafkaMessage.getParam(KafkaMessage.KAFKA_RECORD_TOPIC));
                System.out.println(String.valueOf("Kafka Consumer") + " - Record Headers: " + kafkaMessage.getParam(KafkaMessage.KAFKA_RECORD_HEADERS));
                System.out.println(String.valueOf("Kafka Consumer") + " - Record Timestamp: " + kafkaMessage.getParam(KafkaMessage.KAFKA_RECORD_TIMESTAMP));
            } else {
                this.service.init(null, this.service, kafkaMessage);
                this.service.runServicePre();
                this.service.runService();
                this.service.runServicePost();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void doCommitSync() {
        try {
            this.kafkaConsumer.commitSync();
        } catch (CommitFailedException e) {
        } catch (Exception e2) {
            doCommitSync();
        }
    }

    public static void main(String[] strArr) {
        try {
            new KafkaConsumerBase().runConsumer();
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
        System.exit(0);
    }

    public int getDefError() {
        return DEF_ERROR_CODE;
    }
}
