package com.networknt.eventuate.kafka.consumer;

import com.networknt.config.Config;
import com.networknt.eventuate.kafka.KafkaConfig;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/eventuate/kafka/consumer/EventuateKafkaConsumer.class */
public class EventuateKafkaConsumer {
    private static Logger logger = LoggerFactory.getLogger((Class<?>) EventuateKafkaConsumer.class);
    static String CONFIG_NAME = "kafka";
    static KafkaConfig config = (KafkaConfig) Config.getInstance().getJsonObjectConfig(CONFIG_NAME, KafkaConfig.class);
    static String bootstrapServers = config.getBootstrapServers();
    private final String subscriberId;
    private final BiConsumer<ConsumerRecord<String, String>, BiConsumer<Void, Throwable>> handler;
    private final List<String> topics;
    private AtomicBoolean stopFlag = new AtomicBoolean(false);
    private Properties consumerProperties;

    public EventuateKafkaConsumer(String str, BiConsumer<ConsumerRecord<String, String>, BiConsumer<Void, Throwable>> biConsumer, List<String> list) {
        this.subscriberId = str;
        this.handler = biConsumer;
        this.topics = list;
        this.consumerProperties = ConsumerPropertiesFactory.makeConsumerProperties(bootstrapServers, str);
    }

    public static List<PartitionInfo> verifyTopicExistsBeforeSubscribing(KafkaConsumer<String, String> kafkaConsumer, String str) {
        try {
            logger.debug("Verifying Topic {}", str);
            List<PartitionInfo> partitionsFor = kafkaConsumer.partitionsFor(str);
            logger.debug("Got these partitions {} for Topic {}", partitionsFor, str);
            return partitionsFor;
        } catch (Throwable th) {
            logger.error("Got exception: ", th);
            throw new RuntimeException(th);
        }
    }

    private void maybeCommitOffsets(KafkaConsumer<String, String> kafkaConsumer, KafkaMessageProcessor kafkaMessageProcessor) {
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = kafkaMessageProcessor.offsetsToCommit();
        if (offsetsToCommit.isEmpty()) {
            return;
        }
        logger.debug("Committing offsets {} {}", this.subscriberId, offsetsToCommit);
        kafkaConsumer.commitSync(offsetsToCommit);
        logger.debug("Committed offsets {}", this.subscriberId);
        kafkaMessageProcessor.noteOffsetsCommitted(offsetsToCommit);
    }

    public void start() {
        try {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(this.consumerProperties);
            KafkaMessageProcessor kafkaMessageProcessor = new KafkaMessageProcessor(this.subscriberId, this.handler);
            Iterator<String> it = this.topics.iterator();
            while (it.hasNext()) {
                verifyTopicExistsBeforeSubscribing(kafkaConsumer, it.next());
            }
            logger.debug("Subscribing to {} {}", this.subscriberId, this.topics);
            kafkaConsumer.subscribe(new ArrayList(this.topics));
            logger.debug("Subscribed to {} {}", this.subscriberId, this.topics);
            new Thread(() -> {
                while (!this.stopFlag.get()) {
                    try {
                        ConsumerRecords poll = kafkaConsumer.poll(100L);
                        if (!poll.isEmpty()) {
                            logger.debug("Got {} {} records", this.subscriberId, Integer.valueOf(poll.count()));
                        }
                        Iterator it2 = poll.iterator();
                        while (it2.hasNext()) {
                            ConsumerRecord<String, String> consumerRecord = (ConsumerRecord) it2.next();
                            logger.debug("processing record {} {} {}", this.subscriberId, Long.valueOf(consumerRecord.offset()), consumerRecord.value());
                            if (logger.isDebugEnabled()) {
                                logger.debug(String.format("EventuateKafkaAggregateSubscriptions subscriber = %s, offset = %d, key = %s, value = %s", this.subscriberId, Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()));
                            }
                            if (consumerRecord != null && consumerRecord.key() != null && consumerRecord.value() != null) {
                                kafkaMessageProcessor.process(consumerRecord);
                            }
                        }
                        if (!poll.isEmpty()) {
                            logger.debug("Processed {} {} records", this.subscriberId, Integer.valueOf(poll.count()));
                        }
                        maybeCommitOffsets(kafkaConsumer, kafkaMessageProcessor);
                        if (!poll.isEmpty()) {
                            logger.debug("To commit {} {}", this.subscriberId, kafkaMessageProcessor.getPending());
                        }
                    } catch (Throwable th) {
                        logger.error("Got exception: ", th);
                        throw new RuntimeException(th);
                    }
                }
                maybeCommitOffsets(kafkaConsumer, kafkaMessageProcessor);
            }, "Eventuate-subscriber-" + this.subscriberId).start();
        } catch (Exception e) {
            logger.error("Error subscribing", (Throwable) e);
            throw new RuntimeException(e);
        }
    }

    public void stop() {
        this.stopFlag.set(true);
    }
}
