package io.debezium.connector.mysql;

import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.strategy.AbstractConnectorConnection;
import io.debezium.connector.mysql.strategy.ConnectorAdapter;
import io.debezium.connector.mysql.strategy.mysql.MySqlConnection;
import io.debezium.connector.mysql.strategy.mysql.MySqlConnectionConfiguration;
import io.debezium.document.DocumentReader;
import io.debezium.jdbc.DefaultMainConnectionProvidingConnectionFactory;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.signal.SignalProcessor;
import io.debezium.pipeline.signal.channels.KafkaSignalChannel;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/mysql/MySqlConnectorTask.class */
public class MySqlConnectorTask extends BaseSourceTask<MySqlPartition, MySqlOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MySqlConnectorTask.class);
    private static final String CONTEXT_NAME = "mysql-connector-task";
    private volatile MySqlTaskContext taskContext;
    private volatile ChangeEventQueue<DataChangeEvent> queue;
    private volatile AbstractConnectorConnection connection;
    private volatile ErrorHandler errorHandler;
    private volatile MySqlDatabaseSchema schema;

    public String version() {
        return Module.version();
    }

    public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Configuration configuration) {
        Clock system = Clock.system();
        MySqlConnectorConfig mySqlConnectorConfig = new MySqlConnectorConfig(configuration);
        TopicNamingStrategy topicNamingStrategy = mySqlConnectorConfig.getTopicNamingStrategy(MySqlConnectorConfig.TOPIC_NAMING_STRATEGY);
        SchemaNameAdjuster schemaNameAdjuster = mySqlConnectorConfig.schemaNameAdjuster();
        MySqlValueConverters valueConverters = getValueConverters(mySqlConnectorConfig);
        Configuration build = configuration.edit().withDefault("database.responseBuffering", "adaptive").withDefault("database.fetchSize", 10000).withDefault("database.useCursorFetch", mySqlConnectorConfig.useCursorFetch()).build();
        ConnectorAdapter connectorAdapter = mySqlConnectorConfig.getConnectorAdapter();
        DefaultMainConnectionProvidingConnectionFactory defaultMainConnectionProvidingConnectionFactory = new DefaultMainConnectionProvidingConnectionFactory(() -> {
            return connectorAdapter.createConnection(build);
        });
        this.connection = (AbstractConnectorConnection) defaultMainConnectionProvidingConnectionFactory.mainConnection();
        validateBinlogConfiguration(mySqlConnectorConfig);
        Offsets previousOffsets = getPreviousOffsets(new MySqlPartition.Provider(mySqlConnectorConfig, build), new MySqlOffsetContext.Loader(mySqlConnectorConfig));
        this.schema = new MySqlDatabaseSchema(mySqlConnectorConfig, valueConverters, topicNamingStrategy, schemaNameAdjuster, this.connection.isTableIdCaseSensitive());
        LOGGER.info("Closing connection before starting schema recovery");
        try {
            this.connection.close();
            MySqlPartition mySqlPartition = (MySqlPartition) previousOffsets.getTheOnlyPartition();
            MySqlOffsetContext mySqlOffsetContext = (MySqlOffsetContext) previousOffsets.getTheOnlyOffset();
            validateAndLoadSchemaHistory(mySqlConnectorConfig, mySqlPartition, mySqlOffsetContext, this.schema);
            LOGGER.info("Reconnecting after finishing schema recovery");
            try {
                this.connection.setAutoCommit(false);
                if (validateSnapshotFeasibility(mySqlConnectorConfig, mySqlOffsetContext)) {
                    previousOffsets.resetOffset(mySqlPartition);
                }
                this.taskContext = new MySqlTaskContext(mySqlConnectorConfig, this.schema);
                this.queue = new ChangeEventQueue.Builder().pollInterval(mySqlConnectorConfig.getPollInterval()).maxBatchSize(mySqlConnectorConfig.getMaxBatchSize()).maxQueueSize(mySqlConnectorConfig.getMaxQueueSize()).maxQueueSizeInBytes(mySqlConnectorConfig.getMaxQueueSizeInBytes()).loggingContextSupplier(() -> {
                    return this.taskContext.configureLoggingContext(CONTEXT_NAME);
                }).buffering().build();
                this.errorHandler = new MySqlErrorHandler(mySqlConnectorConfig, this.queue, this.errorHandler);
                MySqlEventMetadataProvider mySqlEventMetadataProvider = new MySqlEventMetadataProvider();
                SignalProcessor<MySqlPartition, MySqlOffsetContext> signalProcessor = new SignalProcessor<>(MySqlConnector.class, mySqlConnectorConfig, Map.of(), getAvailableSignalChannels(), DocumentReader.defaultReader(), previousOffsets);
                resetOffset(mySqlConnectorConfig, mySqlOffsetContext, signalProcessor);
                EventDispatcher eventDispatcher = new EventDispatcher(mySqlConnectorConfig, topicNamingStrategy, this.schema, this.queue, mySqlConnectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, (EventDispatcher.InconsistentSchemaHandler) null, mySqlEventMetadataProvider, mySqlConnectorConfig.createHeartbeat(topicNamingStrategy, schemaNameAdjuster, () -> {
                    return new MySqlConnection(new MySqlConnectionConfiguration(build), getFieldReader(mySqlConnectorConfig));
                }, sQLException -> {
                    String sQLState = sQLException.getSQLState();
                    boolean z = -1;
                    switch (sQLState.hashCode()) {
                        case 49173023:
                            if (sQLState.equals("3D000")) {
                                z = true;
                                break;
                            }
                            break;
                        case 49560306:
                            if (sQLState.equals("42000")) {
                                z = false;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case false:
                            throw new DebeziumException("Could not execute heartbeat action query (Error: " + sQLState + ")", sQLException);
                        case true:
                            throw new DebeziumException("Could not execute heartbeat action query (Error: " + sQLState + ")", sQLException);
                        default:
                            return;
                    }
                }), schemaNameAdjuster, signalProcessor);
                MySqlStreamingChangeEventSourceMetrics mySqlStreamingChangeEventSourceMetrics = new MySqlStreamingChangeEventSourceMetrics(this.taskContext, this.queue, mySqlEventMetadataProvider);
                List notificationChannels = getNotificationChannels();
                SchemaFactory schemaFactory = SchemaFactory.get();
                Objects.requireNonNull(eventDispatcher);
                ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> changeEventSourceCoordinator = new ChangeEventSourceCoordinator<>(previousOffsets, this.errorHandler, MySqlConnector.class, mySqlConnectorConfig, new MySqlChangeEventSourceFactory(mySqlConnectorConfig, defaultMainConnectionProvidingConnectionFactory, this.errorHandler, eventDispatcher, system, this.schema, this.taskContext, mySqlStreamingChangeEventSourceMetrics, this.queue), new MySqlChangeEventSourceMetricsFactory(mySqlStreamingChangeEventSourceMetrics), eventDispatcher, this.schema, signalProcessor, new NotificationService(notificationChannels, mySqlConnectorConfig, schemaFactory, eventDispatcher::enqueueNotification));
                changeEventSourceCoordinator.start(this.taskContext, this.queue, mySqlEventMetadataProvider);
                return changeEventSourceCoordinator;
            } catch (SQLException e) {
                throw new DebeziumException(e);
            }
        } catch (SQLException e2) {
            throw new DebeziumException(e2);
        }
    }

    private MySqlValueConverters getValueConverters(MySqlConnectorConfig mySqlConnectorConfig) {
        TemporalPrecisionMode temporalPrecisionMode = mySqlConnectorConfig.getTemporalPrecisionMode();
        return new MySqlValueConverters(mySqlConnectorConfig.getDecimalMode(), temporalPrecisionMode, MySqlConnectorConfig.BigIntUnsignedHandlingMode.parse(mySqlConnectorConfig.getConfig().getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE)).asBigIntUnsignedMode(), mySqlConnectorConfig.binaryHandlingMode(), mySqlConnectorConfig.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER) ? MySqlValueConverters::adjustTemporal : temporal -> {
            return temporal;
        }, MySqlValueConverters::defaultParsingErrorHandler, mySqlConnectorConfig.getConnectorAdapter());
    }

    private MySqlFieldReader getFieldReader(MySqlConnectorConfig mySqlConnectorConfig) {
        return mySqlConnectorConfig.usesMariaDbProtocol() ? new MariaDbProtocolFieldReader(mySqlConnectorConfig) : mySqlConnectorConfig.useCursorFetch() ? new MySqlBinaryProtocolFieldReader(mySqlConnectorConfig) : new MySqlTextProtocolFieldReader(mySqlConnectorConfig);
    }

    public List<SourceRecord> doPoll() throws InterruptedException {
        return (List) this.queue.poll().stream().map((v0) -> {
            return v0.getRecord();
        }).collect(Collectors.toList());
    }

    protected void doStop() {
        try {
            if (this.connection != null) {
                this.connection.close();
            }
        } catch (SQLException e) {
            LOGGER.error("Exception while closing JDBC connection", e);
        }
        if (this.schema != null) {
            this.schema.close();
        }
    }

    protected Iterable<Field> getAllConfigurationFields() {
        return MySqlConnectorConfig.ALL_FIELDS;
    }

    private void validateBinlogConfiguration(MySqlConnectorConfig mySqlConnectorConfig) {
        if (mySqlConnectorConfig.getSnapshotMode().shouldStream()) {
            if (!this.connection.isBinlogFormatRow()) {
                throw new DebeziumException("The MySQL server is not configured to use a ROW binlog_format, which is required for this connector to work properly. Change the MySQL configuration to use a binlog_format=ROW and restart the connector.");
            }
            if (!this.connection.isBinlogRowImageFull()) {
                throw new DebeziumException("The MySQL server is not configured to use a FULL binlog_row_image, which is required for this connector to work properly. Change the MySQL configuration to use a binlog_row_image=FULL and restart the connector.");
            }
        }
    }

    private boolean validateAndLoadSchemaHistory(MySqlConnectorConfig mySqlConnectorConfig, MySqlPartition mySqlPartition, MySqlOffsetContext mySqlOffsetContext, MySqlDatabaseSchema mySqlDatabaseSchema) {
        if (mySqlOffsetContext == null) {
            if (mySqlConnectorConfig.getSnapshotMode().shouldSnapshotOnSchemaError()) {
                throw new DebeziumException("Could not find existing binlog information while attempting schema only recovery snapshot");
            }
            LOGGER.info("Connector started for the first time, database schema history recovery will not be executed");
            mySqlDatabaseSchema.initializeStorage();
            return false;
        }
        if (mySqlDatabaseSchema.historyExists()) {
            mySqlDatabaseSchema.recover(mySqlPartition, mySqlOffsetContext);
            return false;
        }
        LOGGER.warn("Database schema history was not found but was expected");
        if (!mySqlConnectorConfig.getSnapshotMode().shouldSnapshotOnSchemaError()) {
            throw new DebeziumException("The db history topic is missing. You may attempt to recover it by reconfiguring the connector to " + MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY);
        }
        if (!this.connection.isBinlogPositionAvailable(mySqlConnectorConfig, mySqlOffsetContext.gtidSet(), mySqlOffsetContext.getSource().binlogFilename())) {
            throw new DebeziumException("The connector is trying to read binlog starting at " + mySqlOffsetContext.getSource() + ", but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.");
        }
        LOGGER.info("The db-history topic is missing but we are in {} snapshot mode. Attempting to snapshot the current schema and then begin reading the binlog from the last recorded offset.", MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY);
        mySqlDatabaseSchema.initializeStorage();
        return true;
    }

    private boolean validateSnapshotFeasibility(MySqlConnectorConfig mySqlConnectorConfig, MySqlOffsetContext mySqlOffsetContext) {
        if (mySqlOffsetContext != null) {
            if (mySqlOffsetContext.isSnapshotRunning()) {
                if (mySqlConnectorConfig.getSnapshotMode().shouldSnapshot()) {
                    return false;
                }
                throw new DebeziumException("The connector previously stopped while taking a snapshot, but now the connector is configured to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.");
            }
            if (this.connection.isBinlogPositionAvailable(mySqlConnectorConfig, mySqlOffsetContext.gtidSet(), mySqlOffsetContext.getSource().binlogFilename())) {
                return false;
            }
            if (!mySqlConnectorConfig.getSnapshotMode().shouldSnapshotOnDataError()) {
                throw new DebeziumException("The connector is trying to read binlog starting at " + mySqlOffsetContext.getSource() + ", but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.");
            }
            LOGGER.warn("The connector is trying to read binlog starting at '{}', but this is no longer available on the server. Forcing the snapshot execution as it is allowed by the configuration.", mySqlOffsetContext.getSource());
            return true;
        }
        if (mySqlConnectorConfig.getSnapshotMode().shouldSnapshot()) {
            return false;
        }
        String earliestBinlogFilename = this.connection.earliestBinlogFilename();
        if (earliestBinlogFilename == null) {
            LOGGER.warn("No binlog appears to be available. Ensure that the MySQL row-level binlog is enabled.");
            return false;
        }
        if (earliestBinlogFilename.endsWith("00001")) {
            return false;
        }
        LOGGER.warn("It is possible the server has purged some binlogs. If this is the case, then using snapshot mode may be required.");
        return false;
    }

    private void resetOffset(MySqlConnectorConfig mySqlConnectorConfig, MySqlOffsetContext mySqlOffsetContext, SignalProcessor<MySqlPartition, MySqlOffsetContext> signalProcessor) {
        boolean contains = mySqlConnectorConfig.getEnabledChannels().contains("kafka");
        if (mySqlOffsetContext != null && contains && mySqlConnectorConfig.isReadOnlyConnection()) {
            KafkaSignalChannel signalChannel = signalProcessor.getSignalChannel(KafkaSignalChannel.class);
            Long readOnlyIncrementalSnapshotSignalOffset = mySqlConnectorConfig.getConnectorAdapter().getReadOnlyIncrementalSnapshotSignalOffset(mySqlOffsetContext);
            if (readOnlyIncrementalSnapshotSignalOffset != null) {
                LOGGER.info("Resetting Kafka Signal offset to {}", readOnlyIncrementalSnapshotSignalOffset);
                signalChannel.reset(readOnlyIncrementalSnapshotSignalOffset);
            }
        }
    }
}
