package com.networknt.eventuate.cdc.mysql;

import com.networknt.config.Config;
import com.networknt.eventuate.cdc.common.BinlogFileOffset;
import com.networknt.eventuate.common.impl.JSonMapper;
import com.networknt.eventuate.kafka.KafkaConfig;
import com.networknt.eventuate.kafka.producer.EventuateKafkaProducer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/* loaded from: input_file:com/networknt/eventuate/cdc/mysql/DatabaseBinlogOffsetKafkaStore.class */
public class DatabaseBinlogOffsetKafkaStore {
    private static final String CONFIG_NAME = "kafka";
    private static final KafkaConfig config = (KafkaConfig) Config.getInstance().getJsonObjectConfig(CONFIG_NAME, KafkaConfig.class);
    private final String dbHistoryTopicName;
    private final String mySqlBinaryLogClientName;
    private EventuateKafkaProducer eventuateKafkaProducer;
    private static final int N = 20;
    private ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    private Optional<BinlogFileOffset> recordToSave = Optional.empty();

    public DatabaseBinlogOffsetKafkaStore(String str, String str2, EventuateKafkaProducer eventuateKafkaProducer) {
        this.dbHistoryTopicName = str;
        this.mySqlBinaryLogClientName = str2;
        this.eventuateKafkaProducer = eventuateKafkaProducer;
        this.scheduledExecutorService.scheduleAtFixedRate(this::scheduledBinlogFilenameAndOffsetUpdate, 5L, 5L, TimeUnit.SECONDS);
    }

    public synchronized void scheduledBinlogFilenameAndOffsetUpdate() {
        this.recordToSave.ifPresent(this::store);
        this.recordToSave = Optional.empty();
    }

    public synchronized void save(BinlogFileOffset binlogFileOffset) {
        this.recordToSave = Optional.of(binlogFileOffset);
    }

    public Optional<BinlogFileOffset> getLastBinlogFileOffset() {
        KafkaConsumer<String, String> createConsumer = createConsumer();
        Throwable th = null;
        try {
            createConsumer.partitionsFor(this.dbHistoryTopicName);
            createConsumer.subscribe(Arrays.asList(this.dbHistoryTopicName));
            int i = N;
            BinlogFileOffset binlogFileOffset = null;
            boolean z = false;
            while (!z) {
                ConsumerRecords poll = createConsumer.poll(100L);
                if (poll.isEmpty()) {
                    i--;
                    if (i == 0) {
                        z = true;
                    }
                } else {
                    i = N;
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        if (((String) consumerRecord.key()).equals(this.mySqlBinaryLogClientName)) {
                            binlogFileOffset = (BinlogFileOffset) JSonMapper.fromJson((String) consumerRecord.value(), BinlogFileOffset.class);
                        }
                    }
                }
            }
            Optional<BinlogFileOffset> ofNullable = Optional.ofNullable(binlogFileOffset);
            if (createConsumer != null) {
                if (0 != 0) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    createConsumer.close();
                }
            }
            return ofNullable;
        } catch (Throwable th3) {
            if (createConsumer != null) {
                if (0 != 0) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createConsumer.close();
                }
            }
            throw th3;
        }
    }

    public synchronized void stop() {
        if (this.recordToSave.isPresent()) {
            store(this.recordToSave.get());
        }
        this.scheduledExecutorService.shutdown();
    }

    private synchronized void store(BinlogFileOffset binlogFileOffset) {
        try {
            this.eventuateKafkaProducer.send(this.dbHistoryTopicName, this.mySqlBinaryLogClientName, JSonMapper.toJson(binlogFileOffset)).get();
        } catch (RuntimeException e) {
            e.printStackTrace();
            throw e;
        } catch (Throwable th) {
            th.printStackTrace();
            throw new RuntimeException(th);
        }
    }

    private KafkaConsumer<String, String> createConsumer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", config.getBootstrapServers());
        properties.put("auto.offset.reset", "earliest");
        properties.put("group.id", UUID.randomUUID().toString());
        properties.put("enable.auto.commit", "false");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return new KafkaConsumer<>(properties);
    }
}
