/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.eventuate.cdc.mysql.binlog;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.NullEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer;
import com.networknt.eventuate.cdc.mysql.binlog.IWriteRowsEventDataParser;
import com.networknt.eventuate.server.common.BinLogEvent;
import com.networknt.eventuate.server.common.BinlogFileOffset;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlBinaryLogClient<M extends BinLogEvent> {
    private String name;
    private BinaryLogClient client;
    private long binlogClientUniqueId;
    private final String dbUserName;
    private final String dbPassword;
    private final String host;
    private final int port;
    private final IWriteRowsEventDataParser<M> writeRowsEventDataParser;
    private final String sourceTableName;
    private final Map<Long, TableMapEventData> tableMapEventByTableId = new HashMap<Long, TableMapEventData>();
    private String binlogFilename;
    private long offset;
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public MySqlBinaryLogClient(IWriteRowsEventDataParser<M> writeRowsEventDataParser, String dbUserName, String dbPassword, String host, int port, long binlogClientUniqueId, String sourceTableName, String clientName) {
        this.writeRowsEventDataParser = writeRowsEventDataParser;
        this.binlogClientUniqueId = binlogClientUniqueId;
        this.dbUserName = dbUserName;
        this.dbPassword = dbPassword;
        this.host = host;
        this.port = port;
        this.sourceTableName = sourceTableName;
        this.name = clientName;
    }

    public void start(Optional<BinlogFileOffset> binlogFileOffset, Consumer<M> eventConsumer) throws IOException, TimeoutException {
        this.client = new BinaryLogClient(this.host, this.port, this.dbUserName, this.dbPassword);
        this.client.setServerId(this.binlogClientUniqueId);
        this.client.setKeepAliveInterval(5000L);
        BinlogFileOffset bfo = binlogFileOffset.orElse(new BinlogFileOffset("", 4L));
        this.logger.debug("Starting with {}", (Object)bfo);
        this.client.setBinlogFilename(bfo.getBinlogFilename());
        this.client.setBinlogPosition(bfo.getOffset());
        this.client.setEventDeserializer(this.getEventDeserializer());
        this.client.registerEventListener(event -> {
            switch (event.getHeader().getEventType()) {
                case TABLE_MAP: {
                    TableMapEventData tableMapEvent = (TableMapEventData)event.getData();
                    if (!tableMapEvent.getTable().equalsIgnoreCase(this.sourceTableName)) break;
                    this.tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
                    break;
                }
                case EXT_WRITE_ROWS: {
                    this.logger.debug("Got binlog event {}", (Object)event);
                    this.offset = ((EventHeaderV4)event.getHeader()).getPosition();
                    WriteRowsEventData eventData = (WriteRowsEventData)event.getData();
                    if (!this.tableMapEventByTableId.containsKey(eventData.getTableId())) break;
                    try {
                        eventConsumer.accept(this.writeRowsEventDataParser.parseEventData(eventData, this.getCurrentBinlogFilename(), this.offset));
                        break;
                    }
                    catch (IOException e) {
                        throw new RuntimeException("Event row parsing exception", e);
                    }
                }
                case ROTATE: {
                    RotateEventData eventData = (RotateEventData)event.getData();
                    if (eventData == null) break;
                    this.binlogFilename = eventData.getBinlogFilename();
                    break;
                }
            }
        });
        this.client.connect(5000L);
    }

    private EventDeserializer getEventDeserializer() {
        EventDeserializer eventDeserializer = new EventDeserializer();
        Arrays.stream(EventType.values()).forEach(eventType -> {
            if (eventType != EventType.EXT_WRITE_ROWS && eventType != EventType.TABLE_MAP && eventType != EventType.ROTATE) {
                eventDeserializer.setEventDataDeserializer((EventType)((Object)eventType), new NullEventDataDeserializer());
            }
        });
        eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS, new WriteRowsEventDataDeserializer(this.tableMapEventByTableId).setMayContainExtraInformation(true));
        return eventDeserializer;
    }

    public void stop() {
        try {
            this.client.disconnect();
        }
        catch (IOException e) {
            this.logger.error("Cannot stop the MySqlBinaryLogClient", e);
        }
    }

    public String getCurrentBinlogFilename() {
        return this.binlogFilename;
    }

    public long getCurrentOffset() {
        return this.offset;
    }

    public String getName() {
        return this.name;
    }
}

