/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.eventuate.kafka.consumer;

import com.networknt.config.Config;
import com.networknt.eventuate.kafka.KafkaConfig;
import com.networknt.eventuate.kafka.consumer.ConsumerPropertiesFactory;
import com.networknt.eventuate.kafka.consumer.KafkaMessageProcessor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventuateKafkaConsumer {
    private static Logger logger = LoggerFactory.getLogger(EventuateKafkaConsumer.class);
    static KafkaConfig config = (KafkaConfig)Config.getInstance().getJsonObjectConfig("kafka", KafkaConfig.class);
    private final String subscriberId;
    private final BiConsumer<ConsumerRecord<String, String>, BiConsumer<Void, Throwable>> handler;
    private final List<String> topics;
    private AtomicBoolean stopFlag = new AtomicBoolean(false);
    private Properties consumerProperties;

    public EventuateKafkaConsumer(String subscriberId, BiConsumer<ConsumerRecord<String, String>, BiConsumer<Void, Throwable>> handler, List<String> topics) {
        this.subscriberId = subscriberId;
        this.handler = handler;
        this.topics = topics;
        this.consumerProperties = ConsumerPropertiesFactory.makeConsumerProperties(config, subscriberId);
    }

    public static List<PartitionInfo> verifyTopicExistsBeforeSubscribing(KafkaConsumer<String, String> consumer, String topic) {
        try {
            logger.debug("Verifying Topic {}", (Object)topic);
            List<PartitionInfo> partitions = consumer.partitionsFor(topic);
            logger.debug("Got these partitions {} for Topic {}", (Object)partitions, (Object)topic);
            return partitions;
        }
        catch (Throwable e2) {
            logger.error("Got exception: ", e2);
            throw new RuntimeException(e2);
        }
    }

    private void maybeCommitOffsets(KafkaConsumer<String, String> consumer, KafkaMessageProcessor processor) {
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = processor.offsetsToCommit();
        if (!offsetsToCommit.isEmpty()) {
            logger.debug("Committing offsets {} {}", (Object)this.subscriberId, (Object)offsetsToCommit);
            consumer.commitSync(offsetsToCommit);
            logger.debug("Committed offsets {}", (Object)this.subscriberId);
            processor.noteOffsetsCommitted(offsetsToCommit);
        }
    }

    public void start() {
        try {
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(this.consumerProperties);
            KafkaMessageProcessor processor = new KafkaMessageProcessor(this.subscriberId, this.handler);
            for (String topic : this.topics) {
                EventuateKafkaConsumer.verifyTopicExistsBeforeSubscribing(consumer, topic);
            }
            logger.debug("Subscribing to {} {}", (Object)this.subscriberId, (Object)this.topics);
            consumer.subscribe(new ArrayList<String>(this.topics));
            logger.debug("Subscribed to {} {}", (Object)this.subscriberId, (Object)this.topics);
            new Thread(() -> {
                try {
                    while (!this.stopFlag.get()) {
                        ConsumerRecords records = consumer.poll(100L);
                        if (!records.isEmpty()) {
                            logger.debug("Got {} {} records", (Object)this.subscriberId, (Object)records.count());
                        }
                        for (ConsumerRecord<String, String> consumerRecord : records) {
                            logger.debug("processing record {} {} {}", this.subscriberId, consumerRecord.offset(), consumerRecord.value());
                            if (logger.isDebugEnabled()) {
                                logger.debug(String.format("EventuateKafkaAggregateSubscriptions subscriber = %s, offset = %d, key = %s, value = %s", this.subscriberId, consumerRecord.offset(), consumerRecord.key(), consumerRecord.value()));
                            }
                            if (consumerRecord == null || consumerRecord.key() == null || consumerRecord.value() == null) continue;
                            processor.process(consumerRecord);
                        }
                        if (!records.isEmpty()) {
                            logger.debug("Processed {} {} records", (Object)this.subscriberId, (Object)records.count());
                        }
                        this.maybeCommitOffsets(consumer, processor);
                        if (records.isEmpty()) continue;
                        logger.debug("To commit {} {}", (Object)this.subscriberId, (Object)processor.getPending());
                    }
                    this.maybeCommitOffsets(consumer, processor);
                }
                catch (Throwable e2) {
                    logger.error("Got exception: ", e2);
                    throw new RuntimeException(e2);
                }
            }, "Eventuate-subscriber-" + this.subscriberId).start();
        }
        catch (Exception e2) {
            logger.error("Error subscribing", e2);
            throw new RuntimeException(e2);
        }
    }

    public void stop() {
        this.stopFlag.set(true);
    }
}

