package com.networknt.eventuate.cdc.mysql.binlog;

import com.networknt.config.Config;
import com.networknt.eventuate.kafka.KafkaConfig;
import com.networknt.eventuate.server.common.BinlogFileOffset;
import io.undertow.server.handlers.builder.PredicatedHandlersParser;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/eventuate/cdc/mysql/binlog/OffsetKafkaStore.class */
public abstract class OffsetKafkaStore {
    protected final String dbHistoryTopicName;
    private static final String CONFIG_NAME = "kafka";
    private static final KafkaConfig config = (KafkaConfig) Config.getInstance().getJsonObjectConfig("kafka", KafkaConfig.class);
    private Logger logger = LoggerFactory.getLogger(getClass());
    private static final int N = 20;

    public OffsetKafkaStore(String str) {
        this.dbHistoryTopicName = str;
    }

    public Optional<BinlogFileOffset> getLastBinlogFileOffset() {
        for (int i = 0; i < 5; i++) {
            try {
                KafkaConsumer<String, String> createConsumer = createConsumer();
                Throwable th = null;
                try {
                    try {
                        createConsumer.partitionsFor(this.dbHistoryTopicName);
                        createConsumer.subscribe(Arrays.asList(this.dbHistoryTopicName));
                        int i2 = 20;
                        BinlogFileOffset binlogFileOffset = null;
                        boolean z = false;
                        while (!z) {
                            ConsumerRecords<String, String> poll = createConsumer.poll(100L);
                            if (poll.isEmpty()) {
                                i2--;
                                if (i2 == 0) {
                                    z = true;
                                }
                            } else {
                                i2 = 20;
                                Iterator<ConsumerRecord<String, String>> it = poll.iterator();
                                while (it.hasNext()) {
                                    BinlogFileOffset handleRecord = handleRecord(it.next());
                                    if (handleRecord != null) {
                                        binlogFileOffset = handleRecord;
                                    }
                                }
                            }
                        }
                        Optional<BinlogFileOffset> ofNullable = Optional.ofNullable(binlogFileOffset);
                        if (createConsumer != null) {
                            if (0 != 0) {
                                try {
                                    createConsumer.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                createConsumer.close();
                            }
                        }
                        return ofNullable;
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                        break;
                    }
                } finally {
                }
            } catch (Exception e) {
                this.logger.error("kafak consumer error:" + e);
                System.out.println("kafak consumer error:" + e);
                try {
                    Thread.sleep(2000L);
                } catch (InterruptedException e2) {
                    this.logger.error(e2.getMessage(), (Throwable) e2);
                }
            }
        }
        return Optional.ofNullable(null);
    }

    private KafkaConsumer<String, String> createConsumer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", config.getBootstrapServers());
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, PredicatedHandlersParser.FALSE);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        return new KafkaConsumer<>(properties);
    }

    protected abstract BinlogFileOffset handleRecord(ConsumerRecord<String, String> consumerRecord);
}
