package org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.SnapshotChangeRecordEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.ColumnUtils;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.utils.PostgresUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.class */
public class PostgresSnapshotSplitReadTask extends AbstractSnapshotChangeEventSource {
    private static final Logger log = LoggerFactory.getLogger(PostgresSnapshotSplitReadTask.class);
    private static final Duration LOG_INTERVAL = Duration.ofMillis(CommonConnectorConfig.DEFAULT_RETRIABLE_RESTART_WAIT);
    private final PostgresConnectorConfig connectorConfig;
    private final PostgresSchema databaseSchema;
    private final PostgresConnection jdbcConnection;
    private final JdbcSourceEventDispatcher dispatcher;
    private final Clock clock;
    private final SnapshotSplit snapshotSplit;
    private final PostgresOffsetContext offsetContext;
    private final SnapshotProgressListener snapshotProgressListener;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask$SqlSeverSnapshotContext.class */
    public static class SqlSeverSnapshotContext extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext {
        public SqlSeverSnapshotContext() throws SQLException {
            super("");
        }
    }

    public PostgresSnapshotSplitReadTask(PostgresConnectorConfig postgresConnectorConfig, PostgresOffsetContext postgresOffsetContext, SnapshotProgressListener snapshotProgressListener, PostgresSchema postgresSchema, PostgresConnection postgresConnection, JdbcSourceEventDispatcher jdbcSourceEventDispatcher, SnapshotSplit snapshotSplit) {
        super(postgresConnectorConfig, snapshotProgressListener);
        this.offsetContext = postgresOffsetContext;
        this.connectorConfig = postgresConnectorConfig;
        this.databaseSchema = postgresSchema;
        this.jdbcConnection = postgresConnection;
        this.dispatcher = jdbcSourceEventDispatcher;
        this.clock = Clock.SYSTEM;
        this.snapshotSplit = snapshotSplit;
        this.snapshotProgressListener = snapshotProgressListener;
    }

    @Override // io.debezium.pipeline.source.AbstractSnapshotChangeEventSource, io.debezium.pipeline.source.spi.SnapshotChangeEventSource
    public SnapshotResult execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, OffsetContext offsetContext) throws InterruptedException {
        try {
            try {
                return doExecute(changeEventSourceContext, offsetContext, prepare(changeEventSourceContext), getSnapshottingTask(offsetContext));
            } catch (InterruptedException e) {
                log.warn("Snapshot was interrupted before completion");
                throw e;
            } catch (Exception e2) {
                throw new DebeziumException(e2);
            }
        } catch (Exception e3) {
            log.error("Failed to initialize snapshot context.", e3);
            throw new RuntimeException(e3);
        }
    }

    @Override // io.debezium.pipeline.source.AbstractSnapshotChangeEventSource
    protected SnapshotResult doExecute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, OffsetContext offsetContext, AbstractSnapshotChangeEventSource.SnapshotContext snapshotContext, AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask) throws Exception {
        SqlSeverSnapshotContext sqlSeverSnapshotContext = (SqlSeverSnapshotContext) snapshotContext;
        sqlSeverSnapshotContext.offset = this.offsetContext;
        LsnOffset currentLsn = PostgresUtils.currentLsn(this.jdbcConnection);
        log.info("Snapshot step 1 - Determining low watermark {} for split {}", currentLsn, this.snapshotSplit);
        ((SnapshotSplitChangeEventSourceContext) changeEventSourceContext).setLowWatermark(currentLsn);
        this.dispatcher.dispatchWatermarkEvent(this.offsetContext.getPartition(), this.snapshotSplit, currentLsn, WatermarkKind.LOW);
        log.info("Snapshot step 2 - Snapshotting data");
        createDataEvents(sqlSeverSnapshotContext, this.snapshotSplit.getTableId());
        LsnOffset currentLsn2 = PostgresUtils.currentLsn(this.jdbcConnection);
        log.info("Snapshot step 3 - Determining high watermark {} for split {}", currentLsn2, this.snapshotSplit);
        ((SnapshotSplitChangeEventSourceContext) changeEventSourceContext).setHighWatermark(currentLsn2);
        this.dispatcher.dispatchWatermarkEvent(this.offsetContext.getPartition(), this.snapshotSplit, currentLsn2, WatermarkKind.HIGH);
        return SnapshotResult.completed(sqlSeverSnapshotContext.offset);
    }

    @Override // io.debezium.pipeline.source.AbstractSnapshotChangeEventSource
    protected AbstractSnapshotChangeEventSource.SnapshottingTask getSnapshottingTask(OffsetContext offsetContext) {
        return new AbstractSnapshotChangeEventSource.SnapshottingTask(false, true);
    }

    @Override // io.debezium.pipeline.source.AbstractSnapshotChangeEventSource
    protected AbstractSnapshotChangeEventSource.SnapshotContext prepare(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext) throws Exception {
        return new SqlSeverSnapshotContext();
    }

    private void createDataEvents(SqlSeverSnapshotContext sqlSeverSnapshotContext, TableId tableId) throws Exception {
        EventDispatcher.SnapshotReceiver snapshotChangeEventReceiver = this.dispatcher.getSnapshotChangeEventReceiver();
        log.debug("Snapshotting table {}", tableId);
        createDataEventsForTable(sqlSeverSnapshotContext, snapshotChangeEventReceiver, this.databaseSchema.tableFor(new TableId(null, tableId.schema(), tableId.table())));
        snapshotChangeEventReceiver.completeSnapshot();
    }

    /* JADX WARN: Failed to calculate best type for var: r17v1 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Failed to calculate best type for var: r18v0 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 17, insn: 0x0275: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r17 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:80:0x0275 */
    /* JADX WARN: Not initialized variable reg: 18, insn: 0x027a: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r18 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:82:0x027a */
    /* JADX WARN: Type inference failed for: r17v1, types: [java.sql.PreparedStatement] */
    /* JADX WARN: Type inference failed for: r18v0, types: [java.lang.Throwable] */
    private void createDataEventsForTable(SqlSeverSnapshotContext sqlSeverSnapshotContext, EventDispatcher.SnapshotReceiver snapshotReceiver, Table table) throws InterruptedException {
        long currentTimeInMillis = this.clock.currentTimeInMillis();
        log.info("Exporting data from split '{}' of table {}", this.snapshotSplit.splitId(), table.id());
        String buildSplitScanQuery = PostgresUtils.buildSplitScanQuery(this.snapshotSplit.getTableId(), this.snapshotSplit.getSplitKeyType(), this.snapshotSplit.getSplitStart() == null, this.snapshotSplit.getSplitEnd() == null);
        log.info("For split '{}' of table {} using select statement: '{}'", new Object[]{this.snapshotSplit.splitId(), table.id(), buildSplitScanQuery});
        try {
            try {
                PreparedStatement readTableSplitDataStatement = PostgresUtils.readTableSplitDataStatement(this.jdbcConnection, buildSplitScanQuery, this.snapshotSplit.getSplitStart() == null, this.snapshotSplit.getSplitEnd() == null, this.snapshotSplit.getSplitStart(), this.snapshotSplit.getSplitEnd(), this.snapshotSplit.getSplitKeyType(), this.connectorConfig.getQueryFetchSize());
                Throwable th = null;
                ResultSet executeQuery = readTableSplitDataStatement.executeQuery();
                Throwable th2 = null;
                try {
                    try {
                        ColumnUtils.ColumnArray array = ColumnUtils.toArray(executeQuery, table);
                        long j = 0;
                        Threads.Timer tableScanLogTimer = getTableScanLogTimer();
                        while (executeQuery.next()) {
                            j++;
                            Object[] objArr = new Object[array.getGreatestColumnPosition()];
                            for (int i = 0; i < array.getColumns().length; i++) {
                                table.columns().get(i);
                                objArr[array.getColumns()[i].position() - 1] = readField(executeQuery, i + 1);
                            }
                            if (tableScanLogTimer.expired()) {
                                log.info("Exported {} records for split '{}' after {}", new Object[]{Long.valueOf(j), this.snapshotSplit.splitId(), Strings.duration(this.clock.currentTimeInMillis() - currentTimeInMillis)});
                                this.snapshotProgressListener.rowsScanned(table.id(), j);
                                tableScanLogTimer = getTableScanLogTimer();
                            }
                            this.dispatcher.dispatchSnapshotEvent(table.id(), getChangeRecordEmitter(sqlSeverSnapshotContext, table.id(), objArr), snapshotReceiver);
                        }
                        log.info("Finished exporting {} records for split '{}', total duration '{}'", new Object[]{Long.valueOf(j), this.snapshotSplit.splitId(), Strings.duration(this.clock.currentTimeInMillis() - currentTimeInMillis)});
                        if (executeQuery != null) {
                            if (0 != 0) {
                                try {
                                    executeQuery.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                executeQuery.close();
                            }
                        }
                        if (readTableSplitDataStatement != null) {
                            if (0 != 0) {
                                try {
                                    readTableSplitDataStatement.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                readTableSplitDataStatement.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th5) {
                    if (executeQuery != null) {
                        if (th2 != null) {
                            try {
                                executeQuery.close();
                            } catch (Throwable th6) {
                                th2.addSuppressed(th6);
                            }
                        } else {
                            executeQuery.close();
                        }
                    }
                    throw th5;
                }
            } finally {
            }
        } catch (SQLException e) {
            throw new ConnectException("Snapshotting of table " + table.id() + " failed", e);
        }
    }

    protected ChangeRecordEmitter getChangeRecordEmitter(SqlSeverSnapshotContext sqlSeverSnapshotContext, TableId tableId, Object[] objArr) {
        sqlSeverSnapshotContext.offset.event(tableId, this.clock.currentTime());
        return new SnapshotChangeRecordEmitter(sqlSeverSnapshotContext.offset, objArr, this.clock);
    }

    private Threads.Timer getTableScanLogTimer() {
        return Threads.timer(this.clock, LOG_INTERVAL);
    }

    private Object readField(ResultSet resultSet, int i) throws SQLException {
        return resultSet.getMetaData().getColumnType(i) == 92 ? resultSet.getTimestamp(i) : resultSet.getObject(i);
    }
}
