/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.eventuate.cdccore.kafka.consumer;

import com.networknt.config.Config;
import com.networknt.eventuate.cdccore.kafka.KafkaConfig;
import com.networknt.eventuate.cdccore.kafka.consumer.KafkaMessageProcessor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CdcKafkaConsumer {
    private final String subscriberId;
    private final BiConsumer<ConsumerRecord<String, String>, BiConsumer<Void, Throwable>> handler;
    private final List<String> topics;
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private AtomicBoolean stopFlag = new AtomicBoolean(false);
    private Properties consumerProperties;
    static String CONFIG_NAME = "kafkaconfig";
    static KafkaConfig config;
    private String bootstrapServers;

    public CdcKafkaConsumer(String subscriberId, BiConsumer<ConsumerRecord<String, String>, BiConsumer<Void, Throwable>> handler, List<String> topics, String bootstrapServers) {
        this.subscriberId = subscriberId;
        this.handler = handler;
        this.topics = topics;
        config = (KafkaConfig)Config.getInstance().getJsonObjectConfig(CONFIG_NAME, KafkaConfig.class);
        this.bootstrapServers = bootstrapServers != null ? bootstrapServers : config.getBootstrapServers();
        System.out.println("kafka bootstrap services:" + this.bootstrapServers);
        this.consumerProperties = new Properties();
        this.consumerProperties.put("bootstrap.servers", this.bootstrapServers);
        this.consumerProperties.put("group.id", subscriberId);
        this.consumerProperties.put("enable.auto.commit", (Object)config.isEnableaAutocommit());
        this.consumerProperties.put("session.timeout.ms", (Object)config.getSessionTimeout());
        this.consumerProperties.put("key.deserializer", config.getKeyDeSerializer());
        this.consumerProperties.put("value.deserializer", config.getValueDeSerializer());
        this.consumerProperties.put("auto.offset.reset", config.getAutoOffsetreset());
    }

    private void verifyTopicExistsBeforeSubscribing(KafkaConsumer<String, String> consumer, String topic) {
        try {
            this.logger.debug("Verifying Topic {}", (Object)topic);
            List<PartitionInfo> partitions = consumer.partitionsFor(topic);
            this.logger.debug("Got these partitions {} for Topic {}", (Object)partitions, (Object)topic);
        }
        catch (Throwable e) {
            this.logger.error("Got exception: ", e);
            throw new RuntimeException(e);
        }
    }

    private void maybeCommitOffsets(KafkaConsumer<String, String> consumer, KafkaMessageProcessor processor) {
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = processor.offsetsToCommit();
        if (!offsetsToCommit.isEmpty()) {
            this.logger.debug("Committing offsets {} {}", (Object)this.subscriberId, (Object)offsetsToCommit);
            consumer.commitSync(offsetsToCommit);
            this.logger.debug("Committed offsets {}", (Object)this.subscriberId);
            processor.noteOffsetsCommitted(offsetsToCommit);
        }
    }

    public void start() {
        try {
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(this.consumerProperties);
            KafkaMessageProcessor processor = new KafkaMessageProcessor(this.subscriberId, this.handler);
            for (String topic : this.topics) {
                this.verifyTopicExistsBeforeSubscribing(consumer, topic);
            }
            this.logger.debug("Subscribing to {} {}", (Object)this.subscriberId, (Object)this.topics);
            consumer.subscribe(new ArrayList<String>(this.topics));
            this.logger.debug("Subscribed to {} {}", (Object)this.subscriberId, (Object)this.topics);
            new Thread(() -> {
                try {
                    while (!this.stopFlag.get()) {
                        ConsumerRecords records = consumer.poll(100L);
                        if (!records.isEmpty()) {
                            this.logger.debug("Got {} {} records", (Object)this.subscriberId, (Object)records.count());
                        }
                        for (ConsumerRecord<String, String> consumerRecord : records) {
                            this.logger.debug("processing record {} {} {}", this.subscriberId, consumerRecord.offset(), consumerRecord.value());
                            if (this.logger.isDebugEnabled()) {
                                this.logger.debug(String.format("EventuateKafkaAggregateSubscriptions subscriber = %s, offset = %d, key = %s, value = %s", this.subscriberId, consumerRecord.offset(), consumerRecord.key(), consumerRecord.value()));
                            }
                            if (consumerRecord == null || consumerRecord.key() == null || consumerRecord.value() == null) continue;
                            processor.process(consumerRecord);
                        }
                        if (!records.isEmpty()) {
                            this.logger.debug("Processed {} {} records", (Object)this.subscriberId, (Object)records.count());
                        }
                        this.maybeCommitOffsets(consumer, processor);
                        if (records.isEmpty()) continue;
                        this.logger.debug("To commit {} {}", (Object)this.subscriberId, (Object)processor.getPending());
                    }
                    this.maybeCommitOffsets(consumer, processor);
                }
                catch (Throwable e) {
                    this.logger.error("Got exception: ", e);
                    throw new RuntimeException(e);
                }
            }, "Eventuate-subscriber-" + this.subscriberId).start();
        }
        catch (Exception e) {
            this.logger.error("Error subscribing", e);
            throw new RuntimeException(e);
        }
    }

    public void stop() {
        this.stopFlag.set(true);
    }
}

