/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.scaling.mysql.component;

import com.google.common.base.Preconditions;
import com.zaxxer.hikari.HikariConfig;
import java.io.Serializable;
import java.security.SecureRandom;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import lombok.Generated;
import org.apache.shardingsphere.scaling.core.common.channel.Channel;
import org.apache.shardingsphere.scaling.core.common.datasource.DataSourceFactory;
import org.apache.shardingsphere.scaling.core.common.datasource.JdbcUri;
import org.apache.shardingsphere.scaling.core.common.record.Column;
import org.apache.shardingsphere.scaling.core.common.record.DataRecord;
import org.apache.shardingsphere.scaling.core.common.record.FinishedRecord;
import org.apache.shardingsphere.scaling.core.common.record.PlaceholderRecord;
import org.apache.shardingsphere.scaling.core.common.record.Record;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.executor.AbstractScalingExecutor;
import org.apache.shardingsphere.scaling.core.executor.dumper.IncrementalDumper;
import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
import org.apache.shardingsphere.scaling.core.job.position.ScalingPosition;
import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
import org.apache.shardingsphere.scaling.mysql.binlog.event.AbstractBinlogEvent;
import org.apache.shardingsphere.scaling.mysql.binlog.event.AbstractRowsEvent;
import org.apache.shardingsphere.scaling.mysql.binlog.event.DeleteRowsEvent;
import org.apache.shardingsphere.scaling.mysql.binlog.event.PlaceholderEvent;
import org.apache.shardingsphere.scaling.mysql.binlog.event.UpdateRowsEvent;
import org.apache.shardingsphere.scaling.mysql.binlog.event.WriteRowsEvent;
import org.apache.shardingsphere.scaling.mysql.client.ConnectInfo;
import org.apache.shardingsphere.scaling.mysql.client.MySQLClient;
import org.apache.shardingsphere.scaling.mysql.component.column.metadata.MySQLColumnMetaData;
import org.apache.shardingsphere.scaling.mysql.component.column.metadata.MySQLColumnMetaDataLoader;
import org.apache.shardingsphere.scaling.mysql.component.column.value.ValueHandler;
import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MySQLIncrementalDumper
extends AbstractScalingExecutor
implements IncrementalDumper {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(MySQLIncrementalDumper.class);
    private static final Map<String, ValueHandler> VALUE_HANDLER_MAP;
    private final BinlogPosition binlogPosition;
    private final DumperConfiguration dumperConfig;
    private final MySQLColumnMetaDataLoader columnMetaDataLoader;
    private final Random random = new SecureRandom();
    private Channel channel;

    public MySQLIncrementalDumper(DumperConfiguration dumperConfig, ScalingPosition<BinlogPosition> binlogPosition) {
        this.binlogPosition = (BinlogPosition)binlogPosition;
        this.dumperConfig = dumperConfig;
        Preconditions.checkArgument((boolean)(dumperConfig.getDataSourceConfig() instanceof StandardJDBCDataSourceConfiguration), (Object)"MySQLBinlogDumper only support StandardJDBCDataSourceConfiguration");
        this.columnMetaDataLoader = new MySQLColumnMetaDataLoader((DataSource)new DataSourceFactory().newInstance(dumperConfig.getDataSourceConfig()));
    }

    public void start() {
        super.start();
        this.dump();
    }

    private void dump() {
        HikariConfig hikariConfig = ((StandardJDBCDataSourceConfiguration)this.dumperConfig.getDataSourceConfig()).getHikariConfig();
        log.info("incremental dump, jdbcUrl={}", (Object)hikariConfig.getJdbcUrl());
        JdbcUri uri = new JdbcUri(hikariConfig.getJdbcUrl());
        MySQLClient client = new MySQLClient(new ConnectInfo(this.random.nextInt(), uri.getHostname(), uri.getPort(), hikariConfig.getUsername(), hikariConfig.getPassword()));
        client.connect();
        client.subscribe(this.binlogPosition.getFilename(), this.binlogPosition.getPosition());
        int eventCount = 0;
        while (this.isRunning()) {
            AbstractBinlogEvent event = client.poll();
            if (null == event) continue;
            this.handleEvent(uri, event);
            ++eventCount;
        }
        log.info("incremental dump, eventCount={}", (Object)eventCount);
        this.pushRecord((Record)new FinishedRecord((ScalingPosition)new PlaceholderPosition()));
    }

    private void handleEvent(JdbcUri uri, AbstractBinlogEvent event) {
        if (event instanceof PlaceholderEvent || this.filter(uri.getDatabase(), (AbstractRowsEvent)event)) {
            this.createPlaceholderRecord(event);
            return;
        }
        if (event instanceof WriteRowsEvent) {
            this.handleWriteRowsEvent((WriteRowsEvent)event);
        } else if (event instanceof UpdateRowsEvent) {
            this.handleUpdateRowsEvent((UpdateRowsEvent)event);
        } else if (event instanceof DeleteRowsEvent) {
            this.handleDeleteRowsEvent((DeleteRowsEvent)event);
        }
    }

    private boolean filter(String database, AbstractRowsEvent event) {
        return !event.getSchemaName().equals(database) || !this.dumperConfig.getTableNameMap().containsKey(event.getTableName());
    }

    private void handleWriteRowsEvent(WriteRowsEvent event) {
        List<MySQLColumnMetaData> tableMetaData = this.columnMetaDataLoader.load(event.getTableName());
        for (Serializable[] each : event.getAfterRows()) {
            DataRecord record = this.createDataRecord(event, each.length);
            record.setType("INSERT");
            for (int i = 0; i < each.length; ++i) {
                record.addColumn(new Column(tableMetaData.get(i).getName(), (Object)this.handleValue(tableMetaData.get(i), each[i]), true, tableMetaData.get(i).isPrimaryKey()));
            }
            this.pushRecord((Record)record);
        }
    }

    private void handleUpdateRowsEvent(UpdateRowsEvent event) {
        List<MySQLColumnMetaData> tableMetaData = this.columnMetaDataLoader.load(event.getTableName());
        for (int i = 0; i < event.getBeforeRows().size(); ++i) {
            Serializable[] beforeValues = event.getBeforeRows().get(i);
            Serializable[] afterValues = event.getAfterRows().get(i);
            DataRecord record = this.createDataRecord(event, beforeValues.length);
            record.setType("UPDATE");
            for (int j = 0; j < beforeValues.length; ++j) {
                Serializable newValue = afterValues[j];
                Serializable oldValue = beforeValues[j];
                boolean updated = !Objects.equals(newValue, oldValue);
                record.addColumn(new Column(tableMetaData.get(j).getName(), (Object)(tableMetaData.get(j).isPrimaryKey() && updated ? this.handleValue(tableMetaData.get(j), oldValue) : null), (Object)this.handleValue(tableMetaData.get(j), newValue), updated, tableMetaData.get(j).isPrimaryKey()));
            }
            this.pushRecord((Record)record);
        }
    }

    private void handleDeleteRowsEvent(DeleteRowsEvent event) {
        List<MySQLColumnMetaData> tableMetaData = this.columnMetaDataLoader.load(event.getTableName());
        for (Serializable[] each : event.getBeforeRows()) {
            DataRecord record = this.createDataRecord(event, each.length);
            record.setType("DELETE");
            for (int i = 0; i < each.length; ++i) {
                record.addColumn(new Column(tableMetaData.get(i).getName(), (Object)this.handleValue(tableMetaData.get(i), each[i]), true, tableMetaData.get(i).isPrimaryKey()));
            }
            this.pushRecord((Record)record);
        }
    }

    private Serializable handleValue(MySQLColumnMetaData columnMetaData, Serializable value) {
        ValueHandler valueHandler = VALUE_HANDLER_MAP.get(columnMetaData.getDataTypeName());
        if (null != valueHandler) {
            return valueHandler.handle(value);
        }
        return value;
    }

    private DataRecord createDataRecord(AbstractRowsEvent rowsEvent, int columnCount) {
        DataRecord result = new DataRecord((ScalingPosition)new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId()), columnCount);
        result.setTableName((String)this.dumperConfig.getTableNameMap().get(rowsEvent.getTableName()));
        result.setCommitTime(rowsEvent.getTimestamp() * 1000L);
        return result;
    }

    private void createPlaceholderRecord(AbstractBinlogEvent event) {
        PlaceholderRecord record = new PlaceholderRecord((ScalingPosition)new BinlogPosition(event.getFileName(), event.getPosition(), event.getServerId()));
        record.setCommitTime(event.getTimestamp() * 1000L);
        this.pushRecord((Record)record);
    }

    private void pushRecord(Record record) {
        try {
            this.channel.pushRecord(record);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    @Generated
    public void setChannel(Channel channel) {
        this.channel = channel;
    }

    static {
        ShardingSphereServiceLoader.register(ValueHandler.class);
        VALUE_HANDLER_MAP = ShardingSphereServiceLoader.getSingletonServiceInstances(ValueHandler.class).stream().collect(Collectors.toMap(ValueHandler::getTypeName, v -> v));
    }
}

