/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle;

import io.debezium.DebeziumException;
import io.debezium.bean.StandardBeanNames;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.oracle.AbstractOracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.Module;
import io.debezium.connector.oracle.OracleChangeEventSourceFactory;
import io.debezium.connector.oracle.OracleChangeEventSourceMetricsFactory;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleDefaultValueConverter;
import io.debezium.connector.oracle.OracleErrorHandler;
import io.debezium.connector.oracle.OracleEventMetadataProvider;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleTaskContext;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.StreamingAdapter;
import io.debezium.document.DocumentReader;
import io.debezium.jdbc.DefaultMainConnectionProvidingConnectionFactory;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.signal.SignalProcessor;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OracleConnectorTask
extends BaseSourceTask<OraclePartition, OracleOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(OracleConnectorTask.class);
    private static final String CONTEXT_NAME = "oracle-connector-task";
    private volatile OracleTaskContext taskContext;
    private volatile ChangeEventQueue<DataChangeEvent> queue;
    private volatile OracleConnection jdbcConnection;
    private volatile OracleConnection beanRegistryJdbcConnection;
    private volatile ErrorHandler errorHandler;
    private volatile OracleDatabaseSchema schema;

    @Override
    public String version() {
        return Module.version();
    }

    @Override
    public ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> start(Configuration config) {
        OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config);
        TopicNamingStrategy topicNamingStrategy = connectorConfig.getTopicNamingStrategy(CommonConnectorConfig.TOPIC_NAMING_STRATEGY);
        SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjuster();
        JdbcConfiguration jdbcConfig = connectorConfig.getJdbcConfig();
        DefaultMainConnectionProvidingConnectionFactory<OracleConnection> connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<OracleConnection>(() -> new OracleConnection(jdbcConfig));
        this.jdbcConnection = (OracleConnection)connectionFactory.mainConnection();
        OracleValueConverters valueConverters = connectorConfig.getAdapter().getValueConverter(connectorConfig, this.jdbcConnection);
        OracleDefaultValueConverter defaultValueConverter = new OracleDefaultValueConverter(valueConverters, this.jdbcConnection);
        StreamingAdapter.TableNameCaseSensitivity tableNameCaseSensitivity = connectorConfig.getAdapter().getTableNameCaseSensitivity(this.jdbcConnection);
        this.schema = new OracleDatabaseSchema(connectorConfig, valueConverters, defaultValueConverter, schemaNameAdjuster, topicNamingStrategy, tableNameCaseSensitivity);
        Offsets<OraclePartition, OracleOffsetContext> previousOffsets = this.getPreviousOffsets(new OraclePartition.Provider(connectorConfig), connectorConfig.getAdapter().getOffsetContextLoader());
        this.beanRegistryJdbcConnection = (OracleConnection)connectionFactory.newConnection();
        connectorConfig.getBeanRegistry().add(StandardBeanNames.CONFIGURATION, config);
        connectorConfig.getBeanRegistry().add("ConnectorConfig", connectorConfig);
        connectorConfig.getBeanRegistry().add("Schema", this.schema);
        connectorConfig.getBeanRegistry().add(StandardBeanNames.JDBC_CONNECTION, this.beanRegistryJdbcConnection);
        connectorConfig.getBeanRegistry().add(StandardBeanNames.VALUE_CONVERTER, valueConverters);
        connectorConfig.getBeanRegistry().add(StandardBeanNames.OFFSETS, previousOffsets);
        this.registerServiceProviders(connectorConfig.getServiceRegistry());
        SnapshotterService snapshotterService = connectorConfig.getServiceRegistry().tryGetService(SnapshotterService.class);
        this.validateRedoLogConfiguration(connectorConfig, snapshotterService);
        this.checkArchiveLogDestination(this.jdbcConnection, connectorConfig.getArchiveLogDestinationName());
        OracleOffsetContext previousOffset = previousOffsets.getTheOnlyOffset();
        this.validateAndLoadSchemaHistory(connectorConfig, this.jdbcConnection::validateLogPosition, previousOffsets, this.schema, snapshotterService.getSnapshotter());
        this.taskContext = new OracleTaskContext(connectorConfig, this.schema);
        if (previousOffset == null) {
            LOGGER.info("No previous offset found");
        } else {
            LOGGER.info("Found previous offset {}", (Object)previousOffset);
        }
        Clock clock = Clock.system();
        this.queue = new ChangeEventQueue.Builder().pollInterval(connectorConfig.getPollInterval()).maxBatchSize(connectorConfig.getMaxBatchSize()).maxQueueSize(connectorConfig.getMaxQueueSize()).maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes()).loggingContextSupplier(() -> this.taskContext.configureLoggingContext(CONTEXT_NAME)).build();
        this.errorHandler = new OracleErrorHandler(connectorConfig, this.queue, this.errorHandler);
        OracleEventMetadataProvider metadataProvider = new OracleEventMetadataProvider();
        SignalProcessor<OraclePartition, OracleOffsetContext> signalProcessor = new SignalProcessor<OraclePartition, OracleOffsetContext>(OracleConnector.class, connectorConfig, Map.of(), this.getAvailableSignalChannels(), DocumentReader.defaultReader(), previousOffsets);
        EventDispatcher<OraclePartition, TableId> dispatcher = new EventDispatcher<OraclePartition, TableId>(connectorConfig, topicNamingStrategy, this.schema, this.queue, connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, metadataProvider, connectorConfig.createHeartbeat(topicNamingStrategy, schemaNameAdjuster, () -> this.getHeartbeatConnection(connectorConfig, jdbcConfig), exception -> {
            String sqlErrorId = exception.getMessage();
            throw new DebeziumException("Could not execute heartbeat action query (Error: " + sqlErrorId + ")", exception);
        }), schemaNameAdjuster, signalProcessor);
        Object streamingMetrics = connectorConfig.getAdapter().getStreamingMetrics(this.taskContext, this.queue, metadataProvider, connectorConfig);
        NotificationService notificationService = new NotificationService(this.getNotificationChannels(), connectorConfig, SchemaFactory.get(), dispatcher::enqueueNotification);
        ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> coordinator = new ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext>(previousOffsets, this.errorHandler, OracleConnector.class, connectorConfig, new OracleChangeEventSourceFactory(connectorConfig, (MainConnectionProvidingConnectionFactory<OracleConnection>)connectionFactory, this.errorHandler, dispatcher, clock, this.schema, jdbcConfig, this.taskContext, (AbstractOracleStreamingChangeEventSourceMetrics)streamingMetrics, snapshotterService), new OracleChangeEventSourceMetricsFactory((AbstractOracleStreamingChangeEventSourceMetrics)streamingMetrics), dispatcher, this.schema, signalProcessor, notificationService, snapshotterService);
        coordinator.start(this.taskContext, this.queue, metadataProvider);
        return coordinator;
    }

    private void checkArchiveLogDestination(OracleConnection connection, String destinationName) {
        try {
            if (!Strings.isNullOrBlank(destinationName)) {
                if (!connection.isArchiveLogDestinationValid(destinationName)) {
                    LOGGER.warn("Archive log destination '{}' may not be valid, please check the database.", (Object)destinationName);
                }
            } else if (!connection.isOnlyOneArchiveLogDestinationValid()) {
                LOGGER.warn("There are multiple valid archive log destinations. Please add '{}' to the connector configuration to avoid log availability problems.", (Object)OracleConnectorConfig.ARCHIVE_DESTINATION_NAME.name());
            }
        }
        catch (SQLException e) {
            throw new DebeziumException("Error while checking validity of archive log configuration", e);
        }
    }

    private OracleConnection getHeartbeatConnection(OracleConnectorConfig connectorConfig, JdbcConfiguration jdbcConfig) {
        OracleConnection connection = new OracleConnection(jdbcConfig);
        if (!Strings.isNullOrBlank(connectorConfig.getPdbName())) {
            connection.setSessionToPdb(connectorConfig.getPdbName());
        }
        return connection;
    }

    @Override
    public List<SourceRecord> doPoll() throws InterruptedException {
        List<DataChangeEvent> records = this.queue.poll();
        List<SourceRecord> sourceRecords = records.stream().map(DataChangeEvent::getRecord).collect(Collectors.toList());
        return sourceRecords;
    }

    @Override
    public void doStop() {
        try {
            if (this.jdbcConnection != null) {
                this.jdbcConnection.close();
            }
        }
        catch (SQLException e) {
            LOGGER.error("Exception while closing JDBC connection", e);
        }
        try {
            if (this.beanRegistryJdbcConnection != null) {
                this.beanRegistryJdbcConnection.close();
            }
        }
        catch (SQLException e) {
            LOGGER.error("Exception while closing JDBC bean registry connection", e);
        }
        if (this.schema != null) {
            this.schema.close();
        }
    }

    @Override
    protected Iterable<Field> getAllConfigurationFields() {
        return OracleConnectorConfig.ALL_FIELDS;
    }

    private void validateRedoLogConfiguration(OracleConnectorConfig config, SnapshotterService snapshotterService) {
        boolean archivelogMode = this.jdbcConnection.isArchiveLogMode();
        if (!archivelogMode) {
            if (OracleConnectorTask.redoLogRequired(config, snapshotterService)) {
                throw new DebeziumException("The Oracle server is not configured to use a archive log LOG_MODE, which is required for this connector to work properly. Change the Oracle configuration to use a LOG_MODE=ARCHIVELOG and restart the connector.");
            }
            LOGGER.warn("Failed the archive log check but continuing as redo log isn't strictly required");
        }
    }

    private static boolean redoLogRequired(OracleConnectorConfig config, SnapshotterService snapshotterService) {
        return snapshotterService.getSnapshotter().shouldStream() || config.getLogMiningTransactionSnapshotBoundaryMode() == OracleConnectorConfig.TransactionSnapshotBoundaryMode.ALL;
    }
}

