/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.eventuate.cdc.mysql.binlog;

import com.networknt.eventuate.cdc.mysql.binlog.DatabaseBinlogOffsetKafkaStore;
import com.networknt.eventuate.cdc.mysql.binlog.DebeziumBinlogOffsetKafkaStore;
import com.networknt.eventuate.cdc.mysql.binlog.MySqlBinaryLogClient;
import com.networknt.eventuate.server.common.BinLogEvent;
import com.networknt.eventuate.server.common.BinlogFileOffset;
import com.networknt.eventuate.server.common.CdcProcessor;
import java.util.Optional;
import java.util.function.Consumer;

public class MySQLCdcProcessor<EVENT extends BinLogEvent>
implements CdcProcessor<EVENT> {
    private MySqlBinaryLogClient<EVENT> mySqlBinaryLogClient;
    private DatabaseBinlogOffsetKafkaStore binlogOffsetKafkaStore;
    private DebeziumBinlogOffsetKafkaStore debeziumBinlogOffsetKafkaStore;

    public MySQLCdcProcessor(MySqlBinaryLogClient<EVENT> mySqlBinaryLogClient, DatabaseBinlogOffsetKafkaStore binlogOffsetKafkaStore, DebeziumBinlogOffsetKafkaStore debeziumBinlogOffsetKafkaStore) {
        this.mySqlBinaryLogClient = mySqlBinaryLogClient;
        this.binlogOffsetKafkaStore = binlogOffsetKafkaStore;
        this.debeziumBinlogOffsetKafkaStore = debeziumBinlogOffsetKafkaStore;
    }

    @Override
    public void start(final Consumer<EVENT> eventConsumer) {
        Optional<BinlogFileOffset> binlogFileOffset = this.binlogOffsetKafkaStore.getLastBinlogFileOffset();
        if (!binlogFileOffset.isPresent()) {
            binlogFileOffset = this.debeziumBinlogOffsetKafkaStore.getLastBinlogFileOffset();
        }
        final Optional<BinlogFileOffset> startingBinlogFileOffset = binlogFileOffset;
        try {
            this.mySqlBinaryLogClient.start(startingBinlogFileOffset, new Consumer<EVENT>(){
                private boolean couldReadDuplicateEntries = true;

                @Override
                public void accept(EVENT publishedEvent) {
                    if (this.couldReadDuplicateEntries) {
                        if (startingBinlogFileOffset.map(s -> s.isSameOrAfter(publishedEvent.getBinlogFileOffset())).orElse(false).booleanValue()) {
                            return;
                        }
                        this.couldReadDuplicateEntries = false;
                    }
                    eventConsumer.accept(publishedEvent);
                }
            });
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void stop() {
        this.mySqlBinaryLogClient.stop();
        this.binlogOffsetKafkaStore.stop();
    }
}

