package risesoft.data.transfer.stream.rdbms.in;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.List;
import risesoft.data.transfer.core.channel.InChannel;
import risesoft.data.transfer.core.data.Data;
import risesoft.data.transfer.core.data.StringData;
import risesoft.data.transfer.core.exception.CommonErrorCode;
import risesoft.data.transfer.core.exception.TransferException;
import risesoft.data.transfer.core.log.Logger;
import risesoft.data.transfer.core.record.DefaultRecord;
import risesoft.data.transfer.core.stream.in.DataInputStream;
import risesoft.data.transfer.stream.rdbms.in.columns.CreateColumnHandle;
import risesoft.data.transfer.stream.rdbms.utils.DBUtil;

/* loaded from: input_file:risesoft/data/transfer/stream/rdbms/in/RdbmsDataInputStream.class */
public class RdbmsDataInputStream implements DataInputStream {
    private Connection connection;
    private String selectSql;
    private int fetchSize;
    private List<CreateColumnHandle> createColumnHandles;
    private String encoding;
    private Logger logger;

    public RdbmsDataInputStream(Connection connection, String str, int i, List<CreateColumnHandle> list, String str2, Logger logger) {
        this.connection = connection;
        this.selectSql = str;
        this.fetchSize = i;
        this.createColumnHandles = list;
        this.encoding = str2;
        this.logger = logger;
    }

    public void close() throws Exception {
        try {
            this.connection.close();
        } catch (Exception e) {
        }
        this.logger.debug(this, "close input stream");
    }

    public void read(Data data, InChannel inChannel) {
        String str = this.selectSql + ((StringData) data).getValue();
        if (this.logger.isDebug()) {
            this.logger.debug(this, "work sql: " + str);
        }
        try {
            ResultSet query = DBUtil.query(this.connection, str, this.fetchSize);
            try {
                this.logger.debug(this, "readData:");
                while (query.next()) {
                    ResultSetMetaData metaData = query.getMetaData();
                    DefaultRecord defaultRecord = new DefaultRecord();
                    for (int i = 0; i < this.createColumnHandles.size(); i++) {
                        try {
                            defaultRecord.addColumn(this.createColumnHandles.get(i).getColumn(query, metaData, i + 1, this.encoding));
                        } catch (Exception e) {
                            inChannel.collectDirtyRecord(defaultRecord, e, "脏数据" + e.getMessage());
                        }
                    }
                    inChannel.writer(defaultRecord);
                }
                inChannel.flush();
                this.logger.debug(this, "readData end");
            } catch (Exception e2) {
                throw TransferException.as(CommonErrorCode.RUNTIME_ERROR, "读取数据报错", e2);
            }
        } catch (SQLException e3) {
            throw TransferException.as(CommonErrorCode.RUNTIME_ERROR, "sql 执行报错" + str, e3);
        }
    }
}
