package org.apache.shardingsphere.data.pipeline.mysql.ingest;

import com.google.common.base.Preconditions;
import java.io.Serializable;
import java.security.SecureRandom;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.PlaceholderEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.ConnectInfo;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.MySQLClient;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.MySQLDataTypeHandler;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.MySQLDataTypeHandlerFactory;
import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.class */
public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<BinlogPosition> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(MySQLIncrementalDumper.class);
    private final BinlogPosition binlogPosition;
    private final DumperConfiguration dumperConfig;
    private final PipelineTableMetaDataLoader metaDataLoader;
    private final Random random;
    private final PipelineChannel channel;
    private final MySQLClient client;
    private final String catalog;

    public MySQLIncrementalDumper(DumperConfiguration dumperConfiguration, IngestPosition<BinlogPosition> ingestPosition, PipelineChannel pipelineChannel, PipelineTableMetaDataLoader pipelineTableMetaDataLoader) {
        super(dumperConfiguration, ingestPosition, pipelineChannel, pipelineTableMetaDataLoader);
        this.random = new SecureRandom();
        this.binlogPosition = (BinlogPosition) ingestPosition;
        this.dumperConfig = dumperConfiguration;
        Preconditions.checkArgument(dumperConfiguration.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration");
        this.channel = pipelineChannel;
        this.metaDataLoader = pipelineTableMetaDataLoader;
        YamlJdbcConfiguration jdbcConfig = dumperConfiguration.getDataSourceConfig().getJdbcConfig();
        log.info("incremental dump, jdbcUrl={}", jdbcConfig.getJdbcUrl());
        DataSourceMetaData dataSourceMetaData = DatabaseTypeFactory.getInstance("MySQL").getDataSourceMetaData(jdbcConfig.getJdbcUrl(), (String) null);
        this.client = new MySQLClient(new ConnectInfo(this.random.nextInt(), dataSourceMetaData.getHostname(), dataSourceMetaData.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword()));
        this.catalog = dataSourceMetaData.getCatalog();
    }

    protected void doStart() {
        dump();
    }

    private void dump() {
        this.client.connect();
        this.client.subscribe(this.binlogPosition.getFilename(), this.binlogPosition.getPosition());
        int i = 0;
        while (isRunning()) {
            AbstractBinlogEvent poll = this.client.poll();
            if (null != poll) {
                handleEvent(this.catalog, poll);
                i++;
            }
        }
        log.info("incremental dump, eventCount={}", Integer.valueOf(i));
        pushRecord(new FinishedRecord(new PlaceholderPosition()));
    }

    private void handleEvent(String str, AbstractBinlogEvent abstractBinlogEvent) {
        if ((abstractBinlogEvent instanceof PlaceholderEvent) || filter(str, (AbstractRowsEvent) abstractBinlogEvent)) {
            createPlaceholderRecord(abstractBinlogEvent);
            return;
        }
        if (abstractBinlogEvent instanceof WriteRowsEvent) {
            handleWriteRowsEvent((WriteRowsEvent) abstractBinlogEvent, getPipelineTableMetaData(((WriteRowsEvent) abstractBinlogEvent).getTableName()));
        } else if (abstractBinlogEvent instanceof UpdateRowsEvent) {
            handleUpdateRowsEvent((UpdateRowsEvent) abstractBinlogEvent, getPipelineTableMetaData(((UpdateRowsEvent) abstractBinlogEvent).getTableName()));
        } else if (abstractBinlogEvent instanceof DeleteRowsEvent) {
            handleDeleteRowsEvent((DeleteRowsEvent) abstractBinlogEvent, getPipelineTableMetaData(((DeleteRowsEvent) abstractBinlogEvent).getTableName()));
        }
    }

    private boolean filter(String str, AbstractRowsEvent abstractRowsEvent) {
        return (abstractRowsEvent.getDatabaseName().equals(str) && this.dumperConfig.containsTable(abstractRowsEvent.getTableName())) ? false : true;
    }

    private void handleWriteRowsEvent(WriteRowsEvent writeRowsEvent, PipelineTableMetaData pipelineTableMetaData) {
        for (Serializable[] serializableArr : writeRowsEvent.getAfterRows()) {
            DataRecord createDataRecord = createDataRecord(writeRowsEvent, serializableArr.length);
            createDataRecord.setType("INSERT");
            for (int i = 0; i < serializableArr.length; i++) {
                PipelineColumnMetaData columnMetaData = pipelineTableMetaData.getColumnMetaData(i);
                createDataRecord.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, serializableArr[i]), true, pipelineTableMetaData.isUniqueKey(i)));
            }
            pushRecord(createDataRecord);
        }
    }

    private PipelineTableMetaData getPipelineTableMetaData(String str) {
        return this.metaDataLoader.getTableMetaData(this.dumperConfig.getSchemaName(new ActualTableName(str)), str);
    }

    private void handleUpdateRowsEvent(UpdateRowsEvent updateRowsEvent, PipelineTableMetaData pipelineTableMetaData) {
        for (int i = 0; i < updateRowsEvent.getBeforeRows().size(); i++) {
            Serializable[] serializableArr = updateRowsEvent.getBeforeRows().get(i);
            Serializable[] serializableArr2 = updateRowsEvent.getAfterRows().get(i);
            DataRecord createDataRecord = createDataRecord(updateRowsEvent, serializableArr.length);
            createDataRecord.setType("UPDATE");
            for (int i2 = 0; i2 < serializableArr.length; i2++) {
                Serializable serializable = serializableArr[i2];
                Serializable serializable2 = serializableArr2[i2];
                boolean z = !Objects.equals(serializable2, serializable);
                PipelineColumnMetaData columnMetaData = pipelineTableMetaData.getColumnMetaData(i2);
                createDataRecord.addColumn(new Column(columnMetaData.getName(), (columnMetaData.isPrimaryKey() && z) ? handleValue(columnMetaData, serializable) : null, handleValue(columnMetaData, serializable2), z, columnMetaData.isPrimaryKey()));
            }
            pushRecord(createDataRecord);
        }
    }

    private void handleDeleteRowsEvent(DeleteRowsEvent deleteRowsEvent, PipelineTableMetaData pipelineTableMetaData) {
        for (Serializable[] serializableArr : deleteRowsEvent.getBeforeRows()) {
            DataRecord createDataRecord = createDataRecord(deleteRowsEvent, serializableArr.length);
            createDataRecord.setType("DELETE");
            int length = serializableArr.length;
            for (int i = 0; i < length; i++) {
                PipelineColumnMetaData columnMetaData = pipelineTableMetaData.getColumnMetaData(i);
                createDataRecord.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, serializableArr[i]), true, pipelineTableMetaData.isUniqueKey(i)));
            }
            pushRecord(createDataRecord);
        }
    }

    private Serializable handleValue(PipelineColumnMetaData pipelineColumnMetaData, Serializable serializable) {
        Optional<MySQLDataTypeHandler> findInstance = MySQLDataTypeHandlerFactory.findInstance(pipelineColumnMetaData.getDataTypeName());
        return findInstance.isPresent() ? findInstance.get().handle(serializable) : serializable;
    }

    private DataRecord createDataRecord(AbstractRowsEvent abstractRowsEvent, int i) {
        DataRecord dataRecord = new DataRecord(new BinlogPosition(abstractRowsEvent.getFileName(), abstractRowsEvent.getPosition(), abstractRowsEvent.getServerId()), i);
        dataRecord.setTableName(this.dumperConfig.getLogicTableName(abstractRowsEvent.getTableName()).getLowercase());
        dataRecord.setCommitTime(abstractRowsEvent.getTimestamp() * 1000);
        return dataRecord;
    }

    private void createPlaceholderRecord(AbstractBinlogEvent abstractBinlogEvent) {
        PlaceholderRecord placeholderRecord = new PlaceholderRecord(new BinlogPosition(abstractBinlogEvent.getFileName(), abstractBinlogEvent.getPosition(), abstractBinlogEvent.getServerId()));
        placeholderRecord.setCommitTime(abstractBinlogEvent.getTimestamp() * 1000);
        pushRecord(placeholderRecord);
    }

    private void pushRecord(Record record) {
        this.channel.pushRecord(record);
    }

    protected void doStop() {
        if (null != this.client) {
            this.client.closeChannel();
        }
    }
}
