package io.debezium.connector.mysql;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.util.Clock;
import io.debezium.util.Collect;
import io.debezium.util.Metronome;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:io/debezium/connector/mysql/MySqlConnectorTask.class */
public final class MySqlConnectorTask extends SourceTask {
    private Tables tables;
    private TableConverters tableConverters;
    private BinaryLogClient client;
    private BlockingQueue<Event> events;
    private Queue<Event> batchEvents;
    private int maxBatchSize;
    private String serverName;
    private Metronome metronome;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Set<String> BUILT_IN_TABLE_NAMES = Collect.unmodifiableSet(new String[]{"db", "user", "func", "plugin", "tables_priv", "columns_priv", "help_topic", "help_category", "help_relation", "help_keyword", "time_zone_name", "time_zone", "time_zone_transition", "time_zone_transition_type", "time_zone_leap_second", "proc", "procs_priv", "general_log", "event", "ndb_binlog_index", "innodb_table_stats", "innodb_index_stats", "slave_relay_log_info", "slave_master_info", "slave_worker_info", "gtid_executed", "server_cost", "engine_cost"});
    private final Set<String> BUILT_IN_DB_NAMES = Collect.unmodifiableSet(new String[]{"mysql", "performance_schema"});
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private EnumMap<EventType, EventHandler> eventHandlers = new EnumMap<>(EventType.class);
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final SourceInfo source = new SourceInfo();
    private final TopicSelector topicSelector = TopicSelector.defaultSelector();
    private DatabaseHistory dbHistory = null;

    @FunctionalInterface
    /* loaded from: input_file:io/debezium/connector/mysql/MySqlConnectorTask$EventHandler.class */
    protected interface EventHandler {
        void handle(Event event, SourceInfo sourceInfo, Consumer<SourceRecord> consumer);
    }

    public String version() {
        return Module.version();
    }

    public void start(Map<String, String> map) {
        if (this.context == null) {
            throw new ConnectException("Unexpected null context");
        }
        Configuration from = Configuration.from(map);
        Collection<Field> collection = MySqlConnectorConfig.ALL_FIELDS;
        Logger logger = this.logger;
        logger.getClass();
        if (!from.validate(collection, logger::error)) {
            throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
        }
        this.dbHistory = (DatabaseHistory) from.getInstance(MySqlConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class);
        if (this.dbHistory == null) {
            throw new ConnectException("Unable to instantiate the database history class " + from.getString(MySqlConnectorConfig.DATABASE_HISTORY));
        }
        this.dbHistory.configure(from.subset("database.history.", false));
        this.dbHistory.start();
        this.running.set(true);
        String string = from.getString(MySqlConnectorConfig.USER);
        String string2 = from.getString(MySqlConnectorConfig.PASSWORD);
        String string3 = from.getString(MySqlConnectorConfig.HOSTNAME);
        int integer = from.getInteger(MySqlConnectorConfig.PORT);
        String string4 = from.getString(MySqlConnectorConfig.INITIAL_BINLOG_FILENAME);
        long j = from.getLong(MySqlConnectorConfig.SERVER_ID);
        this.serverName = from.getString(MySqlConnectorConfig.SERVER_NAME.name(), string3 + ":" + integer);
        boolean z = from.getBoolean(MySqlConnectorConfig.KEEP_ALIVE);
        int integer2 = from.getInteger(MySqlConnectorConfig.MAX_QUEUE_SIZE);
        long j2 = from.getLong(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS);
        boolean z2 = from.getBoolean(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES);
        long j3 = from.getLong(MySqlConnectorConfig.POLL_INTERVAL_MS);
        this.maxBatchSize = from.getInteger(MySqlConnectorConfig.MAX_BATCH_SIZE);
        this.metronome = Metronome.parker(j3, TimeUnit.MILLISECONDS, Clock.SYSTEM);
        Predicate filter = TableId.filter(from.getString(MySqlConnectorConfig.DATABASE_WHITELIST), from.getString(MySqlConnectorConfig.DATABASE_BLACKLIST), from.getString(MySqlConnectorConfig.TABLE_WHITELIST), from.getString(MySqlConnectorConfig.TABLE_BLACKLIST));
        if (from.getBoolean(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN)) {
            Predicate predicate = tableId -> {
                return this.BUILT_IN_DB_NAMES.contains(tableId.catalog().toLowerCase()) || this.BUILT_IN_TABLE_NAMES.contains(tableId.table().toLowerCase());
            };
            filter = filter.and(predicate.negate());
        }
        this.events = new LinkedBlockingDeque(integer2);
        this.batchEvents = new ArrayDeque(this.maxBatchSize);
        this.tables = new Tables();
        this.tableConverters = new TableConverters(this.topicSelector, this.dbHistory, z2, this.tables, filter);
        EnumMap<EventType, EventHandler> enumMap = this.eventHandlers;
        EventType eventType = EventType.ROTATE;
        TableConverters tableConverters = this.tableConverters;
        tableConverters.getClass();
        enumMap.put((EnumMap<EventType, EventHandler>) eventType, (EventType) tableConverters::rotateLogs);
        EnumMap<EventType, EventHandler> enumMap2 = this.eventHandlers;
        EventType eventType2 = EventType.TABLE_MAP;
        TableConverters tableConverters2 = this.tableConverters;
        tableConverters2.getClass();
        enumMap2.put((EnumMap<EventType, EventHandler>) eventType2, (EventType) tableConverters2::updateTableMetadata);
        EnumMap<EventType, EventHandler> enumMap3 = this.eventHandlers;
        EventType eventType3 = EventType.QUERY;
        TableConverters tableConverters3 = this.tableConverters;
        tableConverters3.getClass();
        enumMap3.put((EnumMap<EventType, EventHandler>) eventType3, (EventType) tableConverters3::updateTableCommand);
        EnumMap<EventType, EventHandler> enumMap4 = this.eventHandlers;
        EventType eventType4 = EventType.EXT_WRITE_ROWS;
        TableConverters tableConverters4 = this.tableConverters;
        tableConverters4.getClass();
        enumMap4.put((EnumMap<EventType, EventHandler>) eventType4, (EventType) tableConverters4::handleInsert);
        EnumMap<EventType, EventHandler> enumMap5 = this.eventHandlers;
        EventType eventType5 = EventType.EXT_UPDATE_ROWS;
        TableConverters tableConverters5 = this.tableConverters;
        tableConverters5.getClass();
        enumMap5.put((EnumMap<EventType, EventHandler>) eventType5, (EventType) tableConverters5::handleUpdate);
        EnumMap<EventType, EventHandler> enumMap6 = this.eventHandlers;
        EventType eventType6 = EventType.EXT_DELETE_ROWS;
        TableConverters tableConverters6 = this.tableConverters;
        tableConverters6.getClass();
        enumMap6.put((EnumMap<EventType, EventHandler>) eventType6, (EventType) tableConverters6::handleDelete);
        this.client = new BinaryLogClient(string3, integer, string, string2);
        this.client.setServerId(j);
        this.client.setKeepAlive(z);
        if (this.logger.isDebugEnabled()) {
            this.client.registerEventListener(this::logEvent);
        }
        this.client.registerEventListener(this::enqueue);
        this.client.registerLifecycleListener(traceLifecycleListener());
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
        this.client.setEventDeserializer(eventDeserializer);
        this.source.setServerName(this.serverName);
        Map<String, ?> offset = this.context.offsetStorageReader().offset(this.source.partition());
        if (offset != null) {
            this.source.setOffset(offset);
            this.client.setBinlogFilename(this.source.binlogFilename());
            this.client.setBinlogPosition(this.source.binlogPosition());
            this.logger.info("Restarting MySQL connector '{}' from binlog file {}, position {}, and event row {}", new Object[]{this.serverName, this.source.binlogFilename(), Long.valueOf(this.source.binlogPosition()), Integer.valueOf(this.source.eventRowNumber())});
            try {
                this.logger.info("Recovering MySQL connector '{}' database schemas from history stored in {}", this.serverName, this.dbHistory);
                this.dbHistory.recover(this.source.partition(), this.source.offset(), this.tables, new MySqlDdlParser());
                this.tableConverters.loadTables();
                this.logger.debug("Recovered MySQL connector '{}' database schemas: {}", this.serverName, this.tables.subset(filter));
            } catch (Throwable th) {
                throw new ConnectException("Failure while recovering database schemas", th);
            }
        } else {
            this.client.setBinlogFilename(string4);
            this.logger.info("Starting MySQL connector from beginning of binlog file {}, position {}", this.source.binlogFilename(), Long.valueOf(this.source.binlogPosition()));
        }
        try {
            this.logger.debug("Connecting to MySQL server");
            this.client.connect(j2);
            this.logger.debug("Successfully connected to MySQL server and beginning to read binlog");
        } catch (AuthenticationException e) {
            throw new ConnectException("Failed to authenticate to the MySQL database at " + string3 + ":" + integer + " with user '" + string + "'", e);
        } catch (TimeoutException e2) {
            throw new ConnectException("Timed out after " + TimeUnit.MILLISECONDS.toSeconds(j2) + " seconds while waiting to connect to the MySQL database at " + string3 + ":" + integer + " with user '" + string + "'", e2);
        } catch (Throwable th2) {
            throw new ConnectException("Unable to connect to the MySQL database at " + string3 + ":" + integer + " with user '" + string + "': " + th2.getMessage(), th2);
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        this.logger.trace("Polling for events from MySQL server '{}'", this.serverName);
        while (this.running.get() && (this.events.drainTo(this.batchEvents, this.maxBatchSize - this.batchEvents.size()) == 0 || this.batchEvents.isEmpty())) {
            this.metronome.pause();
        }
        this.logger.trace("Preparing {} events from MySQL server '{}'", Integer.valueOf(this.events.size()), this.serverName);
        ArrayList arrayList = new ArrayList(this.batchEvents.size());
        while (!this.batchEvents.isEmpty()) {
            Event poll = this.batchEvents.poll();
            if (poll != null) {
                EventHeaderV4 header = poll.getHeader();
                EventType eventType = header.getEventType();
                if (eventType == EventType.ROTATE) {
                    EventDeserializer.EventDataWrapper data = poll.getData();
                    RotateEventData rotateEventData = data instanceof EventDeserializer.EventDataWrapper ? (RotateEventData) data.getInternal() : (RotateEventData) data;
                    this.source.setBinlogFilename(rotateEventData.getBinlogFilename());
                    this.source.setBinlogPosition(rotateEventData.getBinlogPosition());
                    this.source.setRowInEvent(0);
                } else if (header instanceof EventHeaderV4) {
                    long nextPosition = header.getNextPosition();
                    if (nextPosition > 0) {
                        this.source.setBinlogPosition(nextPosition);
                        this.source.setRowInEvent(0);
                    }
                }
                if (!this.running.get()) {
                    break;
                }
                EventHandler eventHandler = this.eventHandlers.get(eventType);
                if (eventHandler != null) {
                    SourceInfo sourceInfo = this.source;
                    arrayList.getClass();
                    eventHandler.handle(poll, sourceInfo, (v1) -> {
                        r3.add(v1);
                    });
                }
            }
        }
        this.logger.trace("Completed processing {} events from MySQL server '{}'", this.serverName);
        if (!this.running.get()) {
            return null;
        }
        if ($assertionsDisabled || this.batchEvents.isEmpty()) {
            return arrayList;
        }
        throw new AssertionError();
    }

    public void stop() {
        try {
            try {
                this.running.set(false);
                this.logger.debug("Stopping database history for MySQL server '{}'", this.serverName);
                this.dbHistory.stop();
            } catch (Throwable th) {
                this.logger.error("Unexpected error shutting down the database history", th);
                try {
                    this.logger.debug("Disconnecting from MySQL server '{}'", this.serverName);
                    this.client.disconnect();
                    this.logger.info("Stopped connector to MySQL server '{}'", this.serverName);
                } catch (IOException e) {
                    this.logger.error("Unexpected error when disconnecting from the MySQL binary log reader", e);
                }
            }
        } finally {
            try {
                this.logger.debug("Disconnecting from MySQL server '{}'", this.serverName);
                this.client.disconnect();
                this.logger.info("Stopped connector to MySQL server '{}'", this.serverName);
            } catch (IOException e2) {
                this.logger.error("Unexpected error when disconnecting from the MySQL binary log reader", e2);
            }
        }
    }

    protected void enqueue(Event event) {
        if (event != null) {
            try {
                this.events.put(event);
            } catch (InterruptedException e) {
                Thread.interrupted();
                throw new ConnectException("Interrupted while waiting to add event to queue", e);
            }
        }
    }

    protected void logEvent(Event event) {
        this.logger.debug("Received event: " + event);
    }

    protected void sleep(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

    protected BinaryLogClient.LifecycleListener traceLifecycleListener() {
        return new BinaryLogClient.LifecycleListener() { // from class: io.debezium.connector.mysql.MySqlConnectorTask.1
            public void onDisconnect(BinaryLogClient binaryLogClient) {
                MySqlConnectorTask.this.logger.debug("MySQL Connector disconnected");
            }

            public void onConnect(BinaryLogClient binaryLogClient) {
                MySqlConnectorTask.this.logger.info("MySQL Connector connected");
            }

            public void onCommunicationFailure(BinaryLogClient binaryLogClient, Exception exc) {
                MySqlConnectorTask.this.logger.error("MySQL Connector communication failure", exc);
            }

            public void onEventDeserializationFailure(BinaryLogClient binaryLogClient, Exception exc) {
                MySqlConnectorTask.this.logger.error("MySQL Connector received event deserialization failure", exc);
            }
        };
    }

    static {
        $assertionsDisabled = !MySqlConnectorTask.class.desiredAssertionStatus();
    }
}
