/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.eventuate.cdc.mysql.binlog;

import com.networknt.config.Config;
import com.networknt.eventuate.kafka.KafkaConfig;
import com.networknt.eventuate.server.common.BinlogFileOffset;
import java.util.Arrays;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public abstract class OffsetKafkaStore {
    protected final String dbHistoryTopicName;
    private static final String CONFIG_NAME = "kafka";
    private static final KafkaConfig config = (KafkaConfig)Config.getInstance().getJsonObjectConfig("kafka", KafkaConfig.class);
    private static final int N = 20;

    public OffsetKafkaStore(String dbHistoryTopicName) {
        this.dbHistoryTopicName = dbHistoryTopicName;
    }

    public Optional<BinlogFileOffset> getLastBinlogFileOffset() {
        try (KafkaConsumer<String, String> consumer = this.createConsumer();){
            consumer.partitionsFor(this.dbHistoryTopicName);
            consumer.subscribe(Arrays.asList(this.dbHistoryTopicName));
            int count = 20;
            BinlogFileOffset result = null;
            boolean lastRecordFound = false;
            while (!lastRecordFound) {
                ConsumerRecords<String, String> records = consumer.poll(100L);
                if (records.isEmpty()) {
                    if (--count != 0) continue;
                    lastRecordFound = true;
                    continue;
                }
                count = 20;
                for (ConsumerRecord<String, String> consumerRecord : records) {
                    BinlogFileOffset current = this.handleRecord(consumerRecord);
                    if (current == null) continue;
                    result = current;
                }
            }
            Optional<Object> optional = Optional.ofNullable(result);
            return optional;
        }
    }

    private KafkaConsumer<String, String> createConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", config.getBootstrapServers());
        props.put("auto.offset.reset", "earliest");
        props.put("group.id", UUID.randomUUID().toString());
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return new KafkaConsumer<String, String>(props);
    }

    protected abstract BinlogFileOffset handleRecord(ConsumerRecord<String, String> var1);
}

