package com.networknt.eventuate.cdc.mysql.binlog;

import com.networknt.eventuate.common.impl.JSonMapper;
import com.networknt.eventuate.kafka.producer.EventuateKafkaProducer;
import com.networknt.eventuate.server.common.BinlogFileOffset;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/* loaded from: input_file:com/networknt/eventuate/cdc/mysql/binlog/DatabaseBinlogOffsetKafkaStore.class */
public class DatabaseBinlogOffsetKafkaStore extends OffsetKafkaStore {
    private final String mySqlBinaryLogClientName;
    private ScheduledExecutorService scheduledExecutorService;
    private EventuateKafkaProducer eventuateKafkaProducer;
    private static final int N = 20;
    private Optional<BinlogFileOffset> recordToSave;

    public DatabaseBinlogOffsetKafkaStore(String str, String str2, EventuateKafkaProducer eventuateKafkaProducer) {
        super(str);
        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
        this.recordToSave = Optional.empty();
        this.mySqlBinaryLogClientName = str2;
        this.eventuateKafkaProducer = eventuateKafkaProducer;
        this.scheduledExecutorService.scheduleAtFixedRate(this::scheduledBinlogFilenameAndOffsetUpdate, 5L, 5L, TimeUnit.SECONDS);
    }

    public synchronized void scheduledBinlogFilenameAndOffsetUpdate() {
        this.recordToSave.ifPresent(this::store);
        this.recordToSave = Optional.empty();
    }

    public synchronized void save(BinlogFileOffset binlogFileOffset) {
        this.recordToSave = Optional.of(binlogFileOffset);
    }

    @Override // com.networknt.eventuate.cdc.mysql.binlog.OffsetKafkaStore
    protected BinlogFileOffset handleRecord(ConsumerRecord<String, String> consumerRecord) {
        if (consumerRecord.key().equals(this.mySqlBinaryLogClientName)) {
            return (BinlogFileOffset) JSonMapper.fromJson(consumerRecord.value(), BinlogFileOffset.class);
        }
        return null;
    }

    public synchronized void stop() {
        if (this.recordToSave.isPresent()) {
            store(this.recordToSave.get());
        }
        this.scheduledExecutorService.shutdown();
    }

    private synchronized void store(BinlogFileOffset binlogFileOffset) {
        try {
            this.eventuateKafkaProducer.send(this.dbHistoryTopicName, this.mySqlBinaryLogClientName, JSonMapper.toJson(binlogFileOffset)).get();
        } catch (RuntimeException e) {
            e.printStackTrace();
            throw e;
        } catch (Throwable th) {
            th.printStackTrace();
            throw new RuntimeException(th);
        }
    }
}
