package org.apache.shardingsphere.scaling.postgresql.component;

import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.SQLException;
import lombok.Generated;
import org.apache.shardingsphere.scaling.core.common.channel.Channel;
import org.apache.shardingsphere.scaling.core.common.exception.ScalingTaskExecuteException;
import org.apache.shardingsphere.scaling.core.common.record.Record;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.executor.AbstractScalingExecutor;
import org.apache.shardingsphere.scaling.core.executor.dumper.IncrementalDumper;
import org.apache.shardingsphere.scaling.core.job.position.ScalingPosition;
import org.apache.shardingsphere.scaling.core.util.ThreadUtil;
import org.apache.shardingsphere.scaling.postgresql.wal.LogicalReplication;
import org.apache.shardingsphere.scaling.postgresql.wal.WalEventConverter;
import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
import org.apache.shardingsphere.scaling.postgresql.wal.decode.PostgreSQLLogSequenceNumber;
import org.apache.shardingsphere.scaling.postgresql.wal.decode.PostgreSQLTimestampUtils;
import org.apache.shardingsphere.scaling.postgresql.wal.decode.TestDecodingPlugin;
import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractWalEvent;
import org.apache.shardingsphere.scaling.postgresql.wal.event.PlaceholderEvent;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.replication.PGReplicationStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.class */
public final class PostgreSQLWalDumper extends AbstractScalingExecutor implements IncrementalDumper {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(PostgreSQLWalDumper.class);
    private final WalPosition walPosition;
    private final DumperConfiguration dumperConfig;
    private final LogicalReplication logicalReplication = new LogicalReplication();
    private final WalEventConverter walEventConverter;
    private Channel channel;

    public PostgreSQLWalDumper(DumperConfiguration dumperConfiguration, ScalingPosition<WalPosition> scalingPosition) {
        this.walPosition = (WalPosition) scalingPosition;
        if (!StandardJDBCDataSourceConfiguration.class.equals(dumperConfiguration.getDataSourceConfig().getClass())) {
            throw new UnsupportedOperationException("PostgreSQLWalDumper only support JDBCDataSourceConfiguration");
        }
        this.dumperConfig = dumperConfiguration;
        this.walEventConverter = new WalEventConverter(dumperConfiguration);
    }

    public void start() {
        super.start();
        dump();
    }

    private void dump() {
        try {
            Connection createPgConnection = this.logicalReplication.createPgConnection((StandardJDBCDataSourceConfiguration) this.dumperConfig.getDataSourceConfig());
            Throwable th = null;
            try {
                PGReplicationStream createReplicationStream = this.logicalReplication.createReplicationStream(createPgConnection, PostgreSQLPositionInitializer.SLOT_NAME, this.walPosition.getLogSequenceNumber());
                Throwable th2 = null;
                try {
                    try {
                        TestDecodingPlugin testDecodingPlugin = new TestDecodingPlugin(new PostgreSQLTimestampUtils(((PgConnection) createPgConnection.unwrap(PgConnection.class)).getTimestampUtils()));
                        while (isRunning()) {
                            ByteBuffer readPending = createReplicationStream.readPending();
                            if (null == readPending) {
                                ThreadUtil.sleep(10L);
                            } else {
                                AbstractWalEvent decode = testDecodingPlugin.decode(readPending, new PostgreSQLLogSequenceNumber(createReplicationStream.getLastReceiveLSN()));
                                Record convert = this.walEventConverter.convert(decode);
                                if (!(decode instanceof PlaceholderEvent) && log.isDebugEnabled()) {
                                    log.debug("dump, event={}, record={}", decode, convert);
                                }
                                pushRecord(convert);
                            }
                        }
                        if (createReplicationStream != null) {
                            if (0 != 0) {
                                try {
                                    createReplicationStream.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                createReplicationStream.close();
                            }
                        }
                        if (createPgConnection != null) {
                            if (0 != 0) {
                                try {
                                    createPgConnection.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                createPgConnection.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th5) {
                    if (createReplicationStream != null) {
                        if (th2 != null) {
                            try {
                                createReplicationStream.close();
                            } catch (Throwable th6) {
                                th2.addSuppressed(th6);
                            }
                        } else {
                            createReplicationStream.close();
                        }
                    }
                    throw th5;
                }
            } finally {
            }
        } catch (SQLException e) {
            throw new ScalingTaskExecuteException(e);
        }
    }

    private void pushRecord(Record record) {
        try {
            this.channel.pushRecord(record);
        } catch (InterruptedException e) {
        }
    }

    @Generated
    public void setChannel(Channel channel) {
        this.channel = channel;
    }
}
