package io.virtualan.cucumblan.core.msg.kafka;

import io.virtualan.cucumblan.message.exception.MessageNotDefinedException;
import io.virtualan.cucumblan.message.type.MessageType;
import io.virtualan.cucumblan.props.ApplicationConfiguration;
import io.virtualan.cucumblan.props.TopicConfiguration;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.logging.Logger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/* loaded from: input_file:io/virtualan/cucumblan/core/msg/kafka/KafkaConsumerClient.class */
public class KafkaConsumerClient {
    private static final Logger LOGGER = Logger.getLogger(KafkaConsumerClient.class.getName());
    private final KafkaConsumer consumer;
    private List<String> topic;

    public KafkaConsumerClient(String str) {
        Properties properties = new Properties();
        try {
            properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("consumer-" + str + ".properties"));
        } catch (IOException e) {
            LOGGER.warning("consumer-" + str + ".properties is not loaded");
            System.exit(1);
        }
        this.consumer = new KafkaConsumer(properties);
    }

    public static MessageType getEvent(String str, String str2, String str3, String str4, int i) throws InterruptedException, MessageNotDefinedException {
        MessageType messageType = (MessageType) MessageContext.getEventContextMap(str, str3);
        int i2 = i + 1;
        if (i2 == 5 || messageType != null) {
            return messageType;
        }
        Thread.sleep(1000L);
        new KafkaConsumerClient(str4).run(str, str2, str3);
        return getEvent(str, str2, str3, str4, i2);
    }

    private List<String> loadTopic(String str) {
        String property = TopicConfiguration.getProperty(str);
        if (property == null) {
            LOGGER.warning(str + " - Topic is not configured.");
            System.exit(1);
        }
        return Arrays.asList(property.split(";"));
    }

    public void run(String str, String str2, String str3) throws MessageNotDefinedException {
        this.topic = loadTopic(str);
        this.consumer.subscribe(this.topic);
        LOGGER.info(" Read Received message: " + this.topic);
        int i = 0;
        while (true) {
            try {
                boolean isEventContextMap = MessageContext.isEventContextMap(str, str3);
                if (isEventContextMap) {
                    break;
                }
                ConsumerRecords poll = this.consumer.poll(Duration.of(1000L, ChronoUnit.MILLIS));
                if (poll.count() == 0) {
                    i++;
                    if (i > ApplicationConfiguration.getMessageCount() || isEventContextMap) {
                        break;
                    }
                } else {
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        getMessageType(str, str2, (ConsumerRecord) it.next());
                        this.consumer.commitAsync();
                    }
                }
            } finally {
                if (this.consumer != null) {
                    this.consumer.close();
                }
            }
        }
        LOGGER.info("DONE");
    }

    private boolean getMessageType(String str, String str2, ConsumerRecord<Object, Object> consumerRecord) throws MessageNotDefinedException {
        MessageType messageType = MessageContext.getMessageTypes().get(str2);
        if (messageType == null) {
            throw new MessageNotDefinedException(str2 + " message type is not defined ");
        }
        try {
            MessageType buildConsumerMessage = messageType.buildConsumerMessage(consumerRecord, consumerRecord.key(), consumerRecord.value());
            if (buildConsumerMessage == null) {
                return false;
            }
            MessageContext.setEventContextMap(str, String.valueOf(buildConsumerMessage.getId()), buildConsumerMessage);
            return true;
        } catch (MessageNotDefinedException e) {
            LOGGER.warning(consumerRecord.key() + " is not defined " + e.getMessage());
            throw e;
        }
    }
}
