package org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader;

import io.debezium.DebeziumException;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresErrorHandler;
import io.debezium.connector.postgresql.PostgresEventMetadataProvider;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresTaskContext;
import io.debezium.connector.postgresql.PostgresTopicSelector;
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.history.TableChanges;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
import io.debezium.util.Metronome;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.utils.PostgresConnectionUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.utils.PostgresUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.class */
public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
    private static final Logger log = LoggerFactory.getLogger(PostgresSourceFetchTaskContext.class);
    private static final String CONTEXT_NAME = "postgres-cdc-connector-task";
    private final PostgresConnection dataConnection;
    private ReplicationConnection replicationConnection;
    private final PostgresEventMetadataProvider metadataProvider;
    private Snapshotter snapshotter;
    private PostgresSchema databaseSchema;
    private PostgresOffsetContext offsetContext;
    private TopicSelector<TableId> topicSelector;
    private JdbcSourceEventDispatcher dispatcher;
    private ChangeEventQueue<DataChangeEvent> queue;
    private PostgresErrorHandler errorHandler;
    private PostgresTaskContext taskContext;
    private SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics;
    private PostgresConnection.PostgresValueConverterBuilder postgresValueConverterBuilder;
    private Collection<TableChanges.TableChange> engineHistory;

    public PostgresSourceFetchTaskContext(JdbcSourceConfig jdbcSourceConfig, JdbcDataSourceDialect jdbcDataSourceDialect, PostgresConnection postgresConnection, Collection<TableChanges.TableChange> collection) {
        super(jdbcSourceConfig, jdbcDataSourceDialect);
        this.dataConnection = postgresConnection;
        this.metadataProvider = new PostgresEventMetadataProvider();
        this.engineHistory = collection;
        this.postgresValueConverterBuilder = PostgresConnectionUtils.newPostgresValueConverterBuilder(getDbzConnectorConfig(), jdbcSourceConfig.getServerTimeZone());
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public void configure(SourceSplitBase sourceSplitBase) {
        String str;
        registerDatabaseHistory(sourceSplitBase);
        PostgresConnectorConfig dbzConnectorConfig = getDbzConnectorConfig();
        this.snapshotter = dbzConnectorConfig.getSnapshotter();
        this.topicSelector = PostgresTopicSelector.create(dbzConnectorConfig);
        TypeRegistry typeRegistry = this.dataConnection.getTypeRegistry();
        this.databaseSchema = new PostgresSchema(dbzConnectorConfig, typeRegistry, this.topicSelector, this.postgresValueConverterBuilder.build(typeRegistry));
        this.taskContext = new PostgresTaskContext(dbzConnectorConfig, this.databaseSchema, this.topicSelector);
        try {
            this.taskContext.refreshSchema(this.dataConnection, false);
            this.offsetContext = loadStartingOffsetState(new PostgresOffsetContext.Loader(dbzConnectorConfig), sourceSplitBase);
            int maxQueueSize = (sourceSplitBase.isSnapshotSplit() && isExactlyOnce()) ? Integer.MAX_VALUE : getSourceConfig().getDbzConnectorConfig().getMaxQueueSize();
            LoggingContext.PreviousContext configureLoggingContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
            SlotState slotState = null;
            try {
                try {
                    if (log.isInfoEnabled()) {
                        log.info(this.dataConnection.serverInfo().toString());
                    }
                    slotState = this.dataConnection.getReplicationSlotState(dbzConnectorConfig.slotName(), dbzConnectorConfig.plugin().getPostgresPluginName());
                } catch (SQLException e) {
                    log.warn("unable to load info of replication slot, Debezium will try to create the slot");
                }
                if (this.offsetContext == null) {
                    log.info("No previous offset found");
                    this.snapshotter.init(dbzConnectorConfig, null, slotState);
                } else {
                    log.info("Found previous offset {}", this.offsetContext);
                    this.snapshotter.init(dbzConnectorConfig, this.offsetContext.asOffsetState(), slotState);
                }
                if (this.snapshotter.shouldStream()) {
                    createReplicationConnection(this.snapshotter.shouldSnapshot(), dbzConnectorConfig.maxRetries(), dbzConnectorConfig.retryDelay());
                    if (slotState == null) {
                        try {
                            this.replicationConnection.createReplicationSlot().orElse(null);
                        } catch (SQLException e2) {
                            str = "Creation of replication slot failed";
                            throw new DebeziumException(e2.getMessage().contains("already exists") ? str + "; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each." : "Creation of replication slot failed", e2);
                        }
                    }
                }
                try {
                    this.dataConnection.commit();
                    this.queue = new ChangeEventQueue.Builder().pollInterval(dbzConnectorConfig.getPollInterval()).maxBatchSize(dbzConnectorConfig.getMaxBatchSize()).maxQueueSize(maxQueueSize).maxQueueSizeInBytes(dbzConnectorConfig.getMaxQueueSizeInBytes()).loggingContextSupplier(() -> {
                        return this.taskContext.configureLoggingContext(CONTEXT_NAME);
                    }).build();
                    this.dispatcher = new JdbcSourceEventDispatcher(dbzConnectorConfig, this.topicSelector, this.databaseSchema, this.queue, dbzConnectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, this.metadataProvider, this.schemaNameAdjuster);
                    this.snapshotChangeEventSourceMetrics = new DefaultChangeEventSourceMetricsFactory().getSnapshotMetrics(this.taskContext, this.queue, this.metadataProvider);
                    this.errorHandler = new PostgresErrorHandler(dbzConnectorConfig.getLogicalName(), this.queue);
                    configureLoggingContext.restore();
                } catch (SQLException e3) {
                    throw new DebeziumException(e3);
                }
            } catch (Throwable th) {
                configureLoggingContext.restore();
                throw th;
            }
        } catch (SQLException e4) {
            throw new DebeziumException("load schema failed", e4);
        }
    }

    private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) {
        ArrayList arrayList = new ArrayList();
        if (sourceSplitBase instanceof SnapshotSplit) {
            arrayList.add(this.dataSourceDialect.queryTableSchema(this.dataConnection, ((SnapshotSplit) sourceSplitBase).getTableId()));
        } else {
            Iterator<TableId> it = ((IncrementalSplit) sourceSplitBase).getTableIds().iterator();
            while (it.hasNext()) {
                arrayList.add(this.dataSourceDialect.queryTableSchema(this.dataConnection, it.next()));
            }
        }
        EmbeddedDatabaseHistory.registerHistory(this.sourceConfig.getDbzConfiguration().getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME), arrayList);
    }

    public void createReplicationConnection(boolean z, int i, Duration duration) {
        if (this.replicationConnection != null) {
            return;
        }
        synchronized (this) {
            if (this.replicationConnection == null) {
                this.replicationConnection = createReplicationConnection(this.taskContext, z, i, duration);
            }
        }
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext
    public PostgresSourceConfig getSourceConfig() {
        return (PostgresSourceConfig) this.sourceConfig;
    }

    public PostgresConnection getDataConnection() {
        return this.dataConnection;
    }

    public SnapshotChangeEventSourceMetrics getSnapshotChangeEventSourceMetrics() {
        return this.snapshotChangeEventSourceMetrics;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext
    public PostgresConnectorConfig getDbzConnectorConfig() {
        return (PostgresConnectorConfig) super.getDbzConnectorConfig();
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext
    public PostgresOffsetContext getOffsetContext() {
        return this.offsetContext;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext
    public ErrorHandler getErrorHandler() {
        return this.errorHandler;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext
    public PostgresSchema getDatabaseSchema() {
        return this.databaseSchema;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext, org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public TableId getTableId(SourceRecord sourceRecord) {
        Struct struct = ((Struct) sourceRecord.value()).getStruct("source");
        return new TableId(null, struct.getString("schema"), struct.getString("table"));
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext
    public SeaTunnelRowType getSplitType(Table table) {
        return PostgresUtils.getSplitType(table);
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext
    public JdbcSourceEventDispatcher getDispatcher() {
        return this.dispatcher;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public ChangeEventQueue<DataChangeEvent> getQueue() {
        return this.queue;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public Tables.TableFilter getTableFilter() {
        return getDbzConnectorConfig().getTableFilters().dataCollectionFilter();
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public Offset getStreamOffset(SourceRecord sourceRecord) {
        return PostgresUtils.getLsnPosition(sourceRecord);
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public void close() {
        try {
            this.dataConnection.close();
            this.replicationConnection.close();
        } catch (Exception e) {
            log.warn("Failed to close connection", e);
        }
    }

    private PostgresOffsetContext loadStartingOffsetState(PostgresOffsetContext.Loader loader, SourceSplitBase sourceSplitBase) {
        return loader.load((Map<String, ?>) (sourceSplitBase.isSnapshotSplit() ? LsnOffset.INITIAL_OFFSET : sourceSplitBase.asIncrementalSplit().getStartupOffset()).getOffset());
    }

    public ReplicationConnection createReplicationConnection(PostgresTaskContext postgresTaskContext, boolean z, int i, Duration duration) throws ConnectException {
        Metronome parker = Metronome.parker(duration, Clock.SYSTEM);
        short s = 0;
        while (s <= i) {
            try {
                return postgresTaskContext.createReplicationConnection(z);
            } catch (SQLException e) {
                s = (short) (s + 1);
                if (s > i) {
                    log.error("Too many errors connecting to server. All {} retries failed.", Integer.valueOf(i));
                    throw new ConnectException(e);
                }
                log.warn("Error connecting to server; will attempt retry {} of {} after {} seconds. Exception message: {}", new Object[]{Short.valueOf(s), Integer.valueOf(i), Long.valueOf(duration.getSeconds()), e.getMessage()});
                try {
                    parker.pause();
                } catch (InterruptedException e2) {
                    log.warn("Connection retry sleep interrupted by exception: " + e2);
                    Thread.currentThread().interrupt();
                }
            }
        }
        return null;
    }

    public ReplicationConnection getReplicationConnection() {
        return this.replicationConnection;
    }

    public Snapshotter getSnapshotter() {
        return this.snapshotter;
    }

    public PostgresTaskContext getTaskContext() {
        return this.taskContext;
    }
}
