package com.networknt.eventuate.cdc.mysql.binlog;

import com.networknt.eventuate.server.common.BinLogEvent;
import com.networknt.eventuate.server.common.BinlogFileOffset;
import com.networknt.eventuate.server.common.CdcProcessor;
import java.util.Optional;
import java.util.function.Consumer;

/* loaded from: input_file:com/networknt/eventuate/cdc/mysql/binlog/MySQLCdcProcessor.class */
public class MySQLCdcProcessor<EVENT extends BinLogEvent> implements CdcProcessor<EVENT> {
    private MySqlBinaryLogClient<EVENT> mySqlBinaryLogClient;
    private DatabaseBinlogOffsetKafkaStore binlogOffsetKafkaStore;
    private DebeziumBinlogOffsetKafkaStore debeziumBinlogOffsetKafkaStore;

    public MySQLCdcProcessor(MySqlBinaryLogClient<EVENT> mySqlBinaryLogClient, DatabaseBinlogOffsetKafkaStore databaseBinlogOffsetKafkaStore, DebeziumBinlogOffsetKafkaStore debeziumBinlogOffsetKafkaStore) {
        this.mySqlBinaryLogClient = mySqlBinaryLogClient;
        this.binlogOffsetKafkaStore = databaseBinlogOffsetKafkaStore;
        this.debeziumBinlogOffsetKafkaStore = debeziumBinlogOffsetKafkaStore;
    }

    @Override // com.networknt.eventuate.server.common.CdcProcessor
    public void start(final Consumer<EVENT> consumer) {
        Optional<BinlogFileOffset> lastBinlogFileOffset = this.binlogOffsetKafkaStore.getLastBinlogFileOffset();
        if (!lastBinlogFileOffset.isPresent()) {
            lastBinlogFileOffset = this.debeziumBinlogOffsetKafkaStore.getLastBinlogFileOffset();
        }
        final Optional<BinlogFileOffset> optional = lastBinlogFileOffset;
        try {
            this.mySqlBinaryLogClient.start(optional, new Consumer<EVENT>() { // from class: com.networknt.eventuate.cdc.mysql.binlog.MySQLCdcProcessor.1
                private boolean couldReadDuplicateEntries = true;

                @Override // java.util.function.Consumer
                public void accept(EVENT event) {
                    if (this.couldReadDuplicateEntries) {
                        if (((Boolean) optional.map(binlogFileOffset -> {
                            return Boolean.valueOf(binlogFileOffset.isSameOrAfter(event.getBinlogFileOffset()));
                        }).orElse(false)).booleanValue()) {
                            return;
                        } else {
                            this.couldReadDuplicateEntries = false;
                        }
                    }
                    consumer.accept(event);
                }
            });
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // com.networknt.eventuate.server.common.CdcProcessor
    public void stop() {
        this.mySqlBinaryLogClient.stop();
        this.binlogOffsetKafkaStore.stop();
    }
}
