package org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source;

import com.google.auto.service.AutoService;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.ConnectTableChangeSerializer;
import io.debezium.relational.history.TableChanges;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffsetFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;

@AutoService({SeaTunnelSource.class})
/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.class */
public class PostgresIncrementalSource<T> extends IncrementalSource<T, JdbcSourceConfig> implements SupportParallelism {
    static final String IDENTIFIER = "Postgres-CDC";

    public PostgresIncrementalSource(ReadonlyConfig readonlyConfig, SeaTunnelDataType<SeaTunnelRow> seaTunnelDataType, List<CatalogTable> list) {
        super(readonlyConfig, seaTunnelDataType, list);
    }

    public String getPluginName() {
        return IDENTIFIER;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource
    public Option<StartupMode> getStartupModeOption() {
        return PostgresSourceOptions.STARTUP_MODE;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource
    public Option<StopMode> getStopModeOption() {
        return PostgresSourceOptions.STOP_MODE;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource
    public SourceConfig.Factory<JdbcSourceConfig> createSourceConfigFactory(ReadonlyConfig readonlyConfig) {
        PostgresSourceConfigFactory postgresSourceConfigFactory = new PostgresSourceConfigFactory();
        postgresSourceConfigFactory.fromReadonlyConfig(this.readonlyConfig);
        JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo((String) readonlyConfig.get(JdbcCatalogOptions.BASE_URL));
        postgresSourceConfigFactory.originUrl(urlInfo.getOrigin());
        postgresSourceConfigFactory.hostname(urlInfo.getHost());
        postgresSourceConfigFactory.port(urlInfo.getPort().intValue());
        postgresSourceConfigFactory.startupOptions(this.startupConfig);
        postgresSourceConfigFactory.stopOptions(this.stopConfig);
        return postgresSourceConfigFactory;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource
    public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(ReadonlyConfig readonlyConfig) {
        SeaTunnelDataType<SeaTunnelRow> seaTunnelDataType = this.dataType;
        return SeaTunnelRowDebeziumDeserializeSchema.builder().setPhysicalRowType(seaTunnelDataType).setResultTypeInfo(seaTunnelDataType).setServerTimeZone(ZoneId.of((String) readonlyConfig.get(JdbcSourceOptions.SERVER_TIME_ZONE))).build();
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource
    public DataSourceDialect<JdbcSourceConfig> createDataSourceDialect(ReadonlyConfig readonlyConfig) {
        return new PostgresDialect((PostgresSourceConfigFactory) this.configFactory, this.catalogTables);
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource
    public OffsetFactory createOffsetFactory(ReadonlyConfig readonlyConfig) {
        return new LsnOffsetFactory((PostgresSourceConfigFactory) this.configFactory, (PostgresDialect) this.dataSourceDialect);
    }

    private Map<TableId, Struct> tableChanges() {
        JdbcSourceConfig jdbcSourceConfig = (JdbcSourceConfig) this.configFactory.create2(0);
        PostgresDialect postgresDialect = new PostgresDialect((PostgresSourceConfigFactory) this.configFactory, this.catalogTables);
        List<TableId> discoverDataCollections = postgresDialect.discoverDataCollections(jdbcSourceConfig);
        ConnectTableChangeSerializer connectTableChangeSerializer = new ConnectTableChangeSerializer();
        try {
            JdbcConnection openJdbcConnection = postgresDialect.openJdbcConnection(jdbcSourceConfig);
            Throwable th = null;
            try {
                try {
                    Map<TableId, Struct> map = (Map) discoverDataCollections.stream().collect(Collectors.toMap(Function.identity(), tableId -> {
                        TableChanges tableChanges = new TableChanges();
                        tableChanges.create(postgresDialect.queryTableSchema(openJdbcConnection, tableId).getTable());
                        return connectTableChangeSerializer.serialize(tableChanges).get(0);
                    }));
                    if (openJdbcConnection != null) {
                        if (0 != 0) {
                            try {
                                openJdbcConnection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            openJdbcConnection.close();
                        }
                    }
                    return map;
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new SeaTunnelException(e);
        }
    }

    public PostgresIncrementalSource() {
    }
}
