/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.eventuate.cdc.mysql.binlog;

import com.networknt.eventuate.cdc.mysql.binlog.OffsetKafkaStore;
import com.networknt.eventuate.common.impl.JSonMapper;
import com.networknt.eventuate.kafka.producer.EventuateKafkaProducer;
import com.networknt.eventuate.server.common.BinlogFileOffset;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class DatabaseBinlogOffsetKafkaStore
extends OffsetKafkaStore {
    private final String mySqlBinaryLogClientName;
    private ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    private EventuateKafkaProducer eventuateKafkaProducer;
    private static final int N = 20;
    private Optional<BinlogFileOffset> recordToSave = Optional.empty();

    public DatabaseBinlogOffsetKafkaStore(String dbHistoryTopicName, String mySqlBinaryLogClientName, EventuateKafkaProducer eventuateKafkaProducer) {
        super(dbHistoryTopicName);
        this.mySqlBinaryLogClientName = mySqlBinaryLogClientName;
        this.eventuateKafkaProducer = eventuateKafkaProducer;
        this.scheduledExecutorService.scheduleAtFixedRate(this::scheduledBinlogFilenameAndOffsetUpdate, 5L, 5L, TimeUnit.SECONDS);
    }

    public synchronized void scheduledBinlogFilenameAndOffsetUpdate() {
        this.recordToSave.ifPresent(this::store);
        this.recordToSave = Optional.empty();
    }

    public synchronized void save(BinlogFileOffset binlogFileOffset) {
        this.recordToSave = Optional.of(binlogFileOffset);
    }

    @Override
    protected BinlogFileOffset handleRecord(ConsumerRecord<String, String> record) {
        if (record.key().equals(this.mySqlBinaryLogClientName)) {
            return JSonMapper.fromJson(record.value(), BinlogFileOffset.class);
        }
        return null;
    }

    public synchronized void stop() {
        if (this.recordToSave.isPresent()) {
            this.store(this.recordToSave.get());
        }
        this.scheduledExecutorService.shutdown();
    }

    private synchronized void store(BinlogFileOffset binlogFileOffset) {
        try {
            this.eventuateKafkaProducer.send(this.dbHistoryTopicName, this.mySqlBinaryLogClientName, JSonMapper.toJson(binlogFileOffset)).get();
        }
        catch (RuntimeException e) {
            e.printStackTrace();
            throw e;
        }
        catch (Throwable e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }
}

