package io.debezium.connector.mysql;

import io.debezium.DebeziumException;
import io.debezium.connector.mysql.signal.ExecuteSnapshotKafkaSignal;
import io.debezium.connector.mysql.signal.KafkaSignal;
import io.debezium.connector.mysql.signal.KafkaSignalThread;
import io.debezium.connector.mysql.signal.PauseSnapshotKafkaSignal;
import io.debezium.connector.mysql.signal.ResumeSnapshotKafkaSignal;
import io.debezium.connector.mysql.signal.StopSnapshotKafkaSignal;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.schema.DatabaseSchema;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import java.sql.SQLException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.class */
public class MySqlReadOnlyIncrementalSnapshotChangeEventSource<T extends DataCollectionId> extends AbstractIncrementalSnapshotChangeEventSource<MySqlPartition, T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MySqlReadOnlyIncrementalSnapshotChangeEventSource.class);
    private final String showMasterStmt = "SHOW MASTER STATUS";
    private final KafkaSignalThread<T> kafkaSignal;
    private MySqlPartition partition;
    private OffsetContext offsetContext;

    public MySqlReadOnlyIncrementalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig relationalDatabaseConnectorConfig, JdbcConnection jdbcConnection, EventDispatcher<MySqlPartition, T> eventDispatcher, DatabaseSchema<?> databaseSchema, Clock clock, SnapshotProgressListener<MySqlPartition> snapshotProgressListener, DataChangeEventListener<MySqlPartition> dataChangeEventListener) {
        super(relationalDatabaseConnectorConfig, jdbcConnection, eventDispatcher, databaseSchema, clock, snapshotProgressListener, dataChangeEventListener);
        this.showMasterStmt = "SHOW MASTER STATUS";
        this.kafkaSignal = new KafkaSignalThread<>(MySqlConnector.class, relationalDatabaseConnectorConfig, this);
    }

    public void init(MySqlPartition mySqlPartition, OffsetContext offsetContext) {
        super.init(mySqlPartition, offsetContext);
        this.partition = mySqlPartition;
        this.offsetContext = offsetContext;
        Long signalOffset = getContext().getSignalOffset();
        if (signalOffset != null) {
            this.kafkaSignal.seek(signalOffset.longValue());
        }
        this.kafkaSignal.start();
    }

    public void processMessage(MySqlPartition mySqlPartition, DataCollectionId dataCollectionId, Object obj, OffsetContext offsetContext) throws InterruptedException {
        if (getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        checkEnqueuedSnapshotSignals(mySqlPartition, offsetContext);
        LOGGER.trace("Checking window for table '{}', key '{}', window contains '{}'", new Object[]{dataCollectionId, obj, this.window});
        if (getContext().updateWindowState(offsetContext)) {
            sendWindowEvents(mySqlPartition, offsetContext);
            readChunk(mySqlPartition);
        } else {
            if (this.window.isEmpty() || !getContext().deduplicationNeeded()) {
                return;
            }
            deduplicateWindow(dataCollectionId, obj);
        }
    }

    public void processHeartbeat(MySqlPartition mySqlPartition, OffsetContext offsetContext) throws InterruptedException {
        if (getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
        } else {
            checkEnqueuedSnapshotSignals(mySqlPartition, offsetContext);
            readUntilGtidChange(mySqlPartition, offsetContext);
        }
    }

    private void readUntilGtidChange(MySqlPartition mySqlPartition, OffsetContext offsetContext) throws InterruptedException {
        String currentGtid = getContext().getCurrentGtid(offsetContext);
        while (getContext().snapshotRunning() && getContext().reachedHighWatermark(currentGtid)) {
            getContext().closeWindow();
            sendWindowEvents(mySqlPartition, offsetContext);
            readChunk(mySqlPartition);
            if (currentGtid == null && getContext().watermarksChanged()) {
                return;
            }
        }
    }

    public void processFilteredEvent(MySqlPartition mySqlPartition, OffsetContext offsetContext) throws InterruptedException {
        if (getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        checkEnqueuedSnapshotSignals(mySqlPartition, offsetContext);
        if (getContext().updateWindowState(offsetContext)) {
            sendWindowEvents(mySqlPartition, offsetContext);
            readChunk(mySqlPartition);
        }
    }

    public void stopSnapshot(List<String> list, long j) {
        removeDataCollectionsFromSnapshot(new StopSnapshotKafkaSignal(list, j), this.partition, this.offsetContext);
    }

    public void enqueueDataCollectionNamesToSnapshot(List<String> list, long j, Optional<String> optional) {
        getContext().enqueueKafkaSignal(new ExecuteSnapshotKafkaSignal(list, j, optional));
    }

    public void enqueuePauseSnapshot() {
        getContext().enqueueKafkaSignal(new PauseSnapshotKafkaSignal());
    }

    public void enqueueResumeSnapshot() {
        getContext().enqueueKafkaSignal(new ResumeSnapshotKafkaSignal());
    }

    public void processTransactionStartedEvent(MySqlPartition mySqlPartition, OffsetContext offsetContext) throws InterruptedException {
        if (getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
        } else if (getContext().updateWindowState(offsetContext)) {
            sendWindowEvents(mySqlPartition, offsetContext);
            readChunk(mySqlPartition);
        }
    }

    public void processTransactionCommittedEvent(MySqlPartition mySqlPartition, OffsetContext offsetContext) throws InterruptedException {
        if (getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
        } else {
            readUntilGtidChange(mySqlPartition, offsetContext);
        }
    }

    protected void updateLowWatermark() {
        MySqlReadOnlyIncrementalSnapshotContext<T> context = getContext();
        Objects.requireNonNull(context);
        getExecutedGtidSet(context::setLowWatermark);
    }

    protected void updateHighWatermark() {
        MySqlReadOnlyIncrementalSnapshotContext<T> context = getContext();
        Objects.requireNonNull(context);
        getExecutedGtidSet(context::setHighWatermark);
    }

    private void getExecutedGtidSet(Consumer<GtidSet> consumer) {
        try {
            this.jdbcConnection.query("SHOW MASTER STATUS", resultSet -> {
                if (resultSet.next()) {
                    if (resultSet.getMetaData().getColumnCount() <= 4) {
                        throw new UnsupportedOperationException("Need to add support for executed GTIDs for versions prior to 5.6.5");
                    }
                    consumer.accept(new GtidSet(resultSet.getString(5)));
                }
            });
            this.jdbcConnection.commit();
        } catch (SQLException e) {
            throw new DebeziumException(e);
        }
    }

    protected void emitWindowOpen() {
        updateLowWatermark();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emitWindowClose(MySqlPartition mySqlPartition) throws InterruptedException {
        updateHighWatermark();
        if (getContext().serverUuidChanged()) {
            rereadChunk(mySqlPartition);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendEvent(MySqlPartition mySqlPartition, EventDispatcher<MySqlPartition, T> eventDispatcher, OffsetContext offsetContext, Object[] objArr) throws InterruptedException {
        SourceInfo source = ((MySqlOffsetContext) offsetContext).getSource();
        String query = source.getQuery();
        source.setQuery(null);
        super.sendEvent(mySqlPartition, eventDispatcher, offsetContext, objArr);
        source.setQuery(query);
    }

    private void checkEnqueuedSnapshotSignals(MySqlPartition mySqlPartition, OffsetContext offsetContext) throws InterruptedException {
        while (getContext().hasKafkaSignals()) {
            KafkaSignal kafkaSignals = getContext().getKafkaSignals();
            if (kafkaSignals instanceof ExecuteSnapshotKafkaSignal) {
                addDataCollectionNamesToSnapshot((ExecuteSnapshotKafkaSignal) kafkaSignals, mySqlPartition, offsetContext);
            } else if (kafkaSignals instanceof StopSnapshotKafkaSignal) {
                LOGGER.warn("Stop signal skipped, this should never be processed via an enqueued signal");
            } else if (kafkaSignals instanceof PauseSnapshotKafkaSignal) {
                pauseSnapshot(mySqlPartition, offsetContext);
            } else {
                if (!(kafkaSignals instanceof ResumeSnapshotKafkaSignal)) {
                    throw new IllegalArgumentException("Unknown Kafka signal " + kafkaSignals);
                }
                resumeSnapshot(mySqlPartition, offsetContext);
            }
        }
    }

    private void addDataCollectionNamesToSnapshot(ExecuteSnapshotKafkaSignal executeSnapshotKafkaSignal, MySqlPartition mySqlPartition, OffsetContext offsetContext) throws InterruptedException {
        super.addDataCollectionNamesToSnapshot(mySqlPartition, executeSnapshotKafkaSignal.getDataCollections(), executeSnapshotKafkaSignal.getAdditionalCondition(), offsetContext);
        getContext().setSignalOffset(Long.valueOf(executeSnapshotKafkaSignal.getSignalOffset()));
    }

    private void removeDataCollectionsFromSnapshot(StopSnapshotKafkaSignal stopSnapshotKafkaSignal, MySqlPartition mySqlPartition, OffsetContext offsetContext) {
        super.stopSnapshot(mySqlPartition, stopSnapshotKafkaSignal.getDataCollections(), offsetContext);
        getContext().setSignalOffset(Long.valueOf(stopSnapshotKafkaSignal.getSignalOffset()));
    }

    private MySqlReadOnlyIncrementalSnapshotContext<T> getContext() {
        return this.context;
    }
}
