package io.debezium.connector.mariadb;

import io.debezium.DebeziumException;
import io.debezium.bean.StandardBeanNames;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.BinlogEventMetadataProvider;
import io.debezium.connector.binlog.BinlogSourceTask;
import io.debezium.connector.binlog.jdbc.BinlogConnectorConnection;
import io.debezium.connector.binlog.jdbc.BinlogFieldReader;
import io.debezium.connector.binlog.jdbc.BinlogValueConverters;
import io.debezium.connector.mariadb.MariaDbOffsetContext;
import io.debezium.connector.mariadb.MariaDbPartition;
import io.debezium.connector.mariadb.jdbc.MariaDbConnection;
import io.debezium.connector.mariadb.jdbc.MariaDbConnectionConfiguration;
import io.debezium.connector.mariadb.jdbc.MariaDbFieldReader;
import io.debezium.connector.mariadb.jdbc.MariaDbValueConverters;
import io.debezium.connector.mariadb.metrics.MariaDbChangeEventSourceMetricsFactory;
import io.debezium.connector.mariadb.metrics.MariaDbStreamingChangeEventSourceMetrics;
import io.debezium.document.DocumentReader;
import io.debezium.jdbc.DefaultMainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.signal.SignalProcessor;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.snapshot.Snapshotter;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/mariadb/MariaDbConnectorTask.class */
public class MariaDbConnectorTask extends BinlogSourceTask<MariaDbPartition, MariaDbOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MariaDbConnectorTask.class);
    private static final String CONTEXT_NAME = "mariadb-connector-task";
    private volatile MariaDbTaskContext taskContext;
    private volatile ChangeEventQueue<DataChangeEvent> queue;
    private volatile BinlogConnectorConnection connection;
    private volatile BinlogConnectorConnection beanRegistryJdbcConnection;
    private volatile ErrorHandler errorHandler;
    private volatile MariaDbDatabaseSchema schema;

    public String version() {
        return Module.version();
    }

    protected Iterable<Field> getAllConfigurationFields() {
        return MariaDbConnectorConfig.ALL_FIELDS;
    }

    protected ChangeEventSourceCoordinator<MariaDbPartition, MariaDbOffsetContext> start(Configuration configuration) {
        Clock system = Clock.system();
        MariaDbConnectorConfig mariaDbConnectorConfig = new MariaDbConnectorConfig(configuration);
        TopicNamingStrategy topicNamingStrategy = mariaDbConnectorConfig.getTopicNamingStrategy(BinlogConnectorConfig.TOPIC_NAMING_STRATEGY);
        SchemaNameAdjuster schemaNameAdjuster = mariaDbConnectorConfig.schemaNameAdjuster();
        MariaDbValueConverters valueConverters = getValueConverters(mariaDbConnectorConfig);
        Configuration build = configuration.edit().withDefault("database.responseBuffering", "adaptive").withDefault("database.fetchSize", 10000).withDefault("database.useCursorFetch", mariaDbConnectorConfig.useCursorFetch()).build();
        DefaultMainConnectionProvidingConnectionFactory defaultMainConnectionProvidingConnectionFactory = new DefaultMainConnectionProvidingConnectionFactory(() -> {
            return new MariaDbConnection(new MariaDbConnectionConfiguration(build), new MariaDbFieldReader(mariaDbConnectorConfig));
        });
        this.connection = defaultMainConnectionProvidingConnectionFactory.mainConnection();
        Offsets previousOffsets = getPreviousOffsets(new MariaDbPartition.Provider(mariaDbConnectorConfig, build), new MariaDbOffsetContext.Loader(mariaDbConnectorConfig));
        this.schema = new MariaDbDatabaseSchema(mariaDbConnectorConfig, valueConverters, topicNamingStrategy, schemaNameAdjuster, this.connection.isTableIdCaseSensitive());
        this.beanRegistryJdbcConnection = defaultMainConnectionProvidingConnectionFactory.newConnection();
        mariaDbConnectorConfig.getBeanRegistry().add(StandardBeanNames.CONFIGURATION, build);
        mariaDbConnectorConfig.getBeanRegistry().add("ConnectorConfig", mariaDbConnectorConfig);
        mariaDbConnectorConfig.getBeanRegistry().add("Schema", this.schema);
        mariaDbConnectorConfig.getBeanRegistry().add(StandardBeanNames.JDBC_CONNECTION, this.beanRegistryJdbcConnection);
        mariaDbConnectorConfig.getBeanRegistry().add(StandardBeanNames.VALUE_CONVERTER, valueConverters);
        mariaDbConnectorConfig.getBeanRegistry().add(StandardBeanNames.OFFSETS, previousOffsets);
        registerServiceProviders(mariaDbConnectorConfig.getServiceRegistry());
        SnapshotterService tryGetService = mariaDbConnectorConfig.getServiceRegistry().tryGetService(SnapshotterService.class);
        Snapshotter snapshotter = tryGetService.getSnapshotter();
        validateBinlogConfiguration(snapshotter, this.connection);
        if (validateSnapshotFeasibility(snapshotter, previousOffsets.getTheOnlyOffset(), this.connection)) {
            previousOffsets.resetOffset(previousOffsets.getTheOnlyPartition());
        }
        LOGGER.info("Closing JDBC connection before starting schema recovery.");
        try {
            this.connection.close();
            MariaDbOffsetContext theOnlyOffset = previousOffsets.getTheOnlyOffset();
            BinlogConnectorConnection binlogConnectorConnection = this.connection;
            Objects.requireNonNull(binlogConnectorConnection);
            validateAndLoadSchemaHistory(mariaDbConnectorConfig, binlogConnectorConnection::validateLogPosition, previousOffsets, this.schema, snapshotter);
            LOGGER.info("Reconnecting after finishing schema recovery");
            try {
                this.connection.setAutoCommit(false);
                if (theOnlyOffset == null) {
                    LOGGER.info("No previous offset found");
                } else {
                    LOGGER.info("Found previous offset {}", theOnlyOffset);
                }
                this.taskContext = new MariaDbTaskContext(mariaDbConnectorConfig, this.schema);
                this.queue = new ChangeEventQueue.Builder().pollInterval(mariaDbConnectorConfig.getPollInterval()).maxBatchSize(mariaDbConnectorConfig.getMaxBatchSize()).maxQueueSize(mariaDbConnectorConfig.getMaxQueueSize()).maxQueueSizeInBytes(mariaDbConnectorConfig.getMaxQueueSizeInBytes()).loggingContextSupplier(() -> {
                    return this.taskContext.configureLoggingContext(CONTEXT_NAME);
                }).buffering().build();
                this.errorHandler = new MariaDbErrorHandler(mariaDbConnectorConfig, this.queue, this.errorHandler);
                BinlogEventMetadataProvider binlogEventMetadataProvider = new BinlogEventMetadataProvider();
                SignalProcessor signalProcessor = new SignalProcessor(MariaDbConnector.class, mariaDbConnectorConfig, Map.of(), getAvailableSignalChannels(), DocumentReader.defaultReader(), previousOffsets);
                EventDispatcher eventDispatcher = new EventDispatcher(mariaDbConnectorConfig, topicNamingStrategy, this.schema, this.queue, mariaDbConnectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, (EventDispatcher.InconsistentSchemaHandler) null, binlogEventMetadataProvider, mariaDbConnectorConfig.createHeartbeat(topicNamingStrategy, schemaNameAdjuster, () -> {
                    return new MariaDbConnection(new MariaDbConnectionConfiguration(build), getFieldReader(mariaDbConnectorConfig));
                }, new BinlogSourceTask.BinlogHeartbeatErrorHandler()), schemaNameAdjuster, signalProcessor);
                MariaDbStreamingChangeEventSourceMetrics mariaDbStreamingChangeEventSourceMetrics = new MariaDbStreamingChangeEventSourceMetrics(this.taskContext, this.queue, binlogEventMetadataProvider);
                List notificationChannels = getNotificationChannels();
                SchemaFactory schemaFactory = SchemaFactory.get();
                Objects.requireNonNull(eventDispatcher);
                ChangeEventSourceCoordinator<MariaDbPartition, MariaDbOffsetContext> changeEventSourceCoordinator = new ChangeEventSourceCoordinator<>(previousOffsets, this.errorHandler, MariaDbConnector.class, mariaDbConnectorConfig, new MariaDbChangeEventSourceFactory(mariaDbConnectorConfig, defaultMainConnectionProvidingConnectionFactory, this.errorHandler, eventDispatcher, system, this.schema, this.taskContext, mariaDbStreamingChangeEventSourceMetrics, this.queue, tryGetService), new MariaDbChangeEventSourceMetricsFactory(mariaDbStreamingChangeEventSourceMetrics), eventDispatcher, this.schema, signalProcessor, new NotificationService(notificationChannels, mariaDbConnectorConfig, schemaFactory, eventDispatcher::enqueueNotification), tryGetService);
                changeEventSourceCoordinator.start(this.taskContext, this.queue, binlogEventMetadataProvider);
                return changeEventSourceCoordinator;
            } catch (SQLException e) {
                throw new DebeziumException(e);
            }
        } catch (SQLException e2) {
            throw new DebeziumException(e2);
        }
    }

    protected void doStop() {
        try {
            if (this.connection != null) {
                this.connection.close();
            }
        } catch (SQLException e) {
            LOGGER.error("Exception while closing the JDBC connection.", e);
        }
        try {
            if (this.beanRegistryJdbcConnection != null) {
                this.beanRegistryJdbcConnection.close();
            }
        } catch (SQLException e2) {
            LOGGER.error("Exception while closing the Bean Registry JDBC connection.", e2);
        }
        if (this.schema != null) {
            this.schema.close();
        }
    }

    protected List<SourceRecord> doPoll() throws InterruptedException {
        return (List) this.queue.poll().stream().map((v0) -> {
            return v0.getRecord();
        }).collect(Collectors.toList());
    }

    private MariaDbValueConverters getValueConverters(MariaDbConnectorConfig mariaDbConnectorConfig) {
        return new MariaDbValueConverters(mariaDbConnectorConfig.getDecimalMode(), mariaDbConnectorConfig.getTemporalPrecisionMode(), mariaDbConnectorConfig.getBigIntUnsignedHandlingMode().asBigIntUnsignedMode(), mariaDbConnectorConfig.binaryHandlingMode(), mariaDbConnectorConfig.isTimeAdjustedEnabled() ? BinlogValueConverters::adjustTemporal : temporal -> {
            return temporal;
        }, mariaDbConnectorConfig.getEventConvertingFailureHandlingMode(), mariaDbConnectorConfig.getServiceRegistry());
    }

    private BinlogFieldReader getFieldReader(MariaDbConnectorConfig mariaDbConnectorConfig) {
        return new MariaDbFieldReader(mariaDbConnectorConfig);
    }
}
