package cn.tenmg.cdc.log.connectors.mysql.source.assigners;

import cn.tenmg.cdc.log.connectors.mysql.debezium.DebeziumUtils;
import cn.tenmg.cdc.log.connectors.mysql.schema.MySqlSchema;
import cn.tenmg.cdc.log.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState;
import cn.tenmg.cdc.log.connectors.mysql.source.config.MySqlSourceConfig;
import cn.tenmg.cdc.log.connectors.mysql.source.offset.BinlogOffset;
import cn.tenmg.cdc.log.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import cn.tenmg.cdc.log.connectors.mysql.source.split.MySqlSnapshotSplit;
import cn.tenmg.cdc.log.connectors.mysql.source.split.MySqlSplit;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/tenmg/cdc/log/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.class */
public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlSnapshotSplitAssigner.class);
    private final List<TableId> alreadyProcessedTables;
    private final List<MySqlSnapshotSplit> remainingSplits;
    private final Map<String, MySqlSnapshotSplit> assignedSplits;
    private final Map<String, BinlogOffset> splitFinishedOffsets;
    private final MySqlSourceConfig sourceConfig;
    private final int currentParallelism;
    private final List<TableId> remainingTables;
    private final boolean isRemainingTablesCheckpointed;
    private final Object lock;
    private volatile Throwable uncaughtSplitterException;
    private AssignerStatus assignerStatus;
    private ChunkSplitter chunkSplitter;
    private boolean isTableIdCaseSensitive;
    private ExecutorService executor;

    @Nullable
    private Long checkpointIdToFinish;

    public MySqlSnapshotSplitAssigner(MySqlSourceConfig mySqlSourceConfig, int i, List<TableId> list, boolean z) {
        this(mySqlSourceConfig, i, new ArrayList(), new ArrayList(), new HashMap(), new HashMap(), AssignerStatus.INITIAL_ASSIGNING, list, z, true);
    }

    public MySqlSnapshotSplitAssigner(MySqlSourceConfig mySqlSourceConfig, int i, SnapshotPendingSplitsState snapshotPendingSplitsState) {
        this(mySqlSourceConfig, i, snapshotPendingSplitsState.getAlreadyProcessedTables(), snapshotPendingSplitsState.getRemainingSplits(), snapshotPendingSplitsState.getAssignedSplits(), snapshotPendingSplitsState.getSplitFinishedOffsets(), snapshotPendingSplitsState.getSnapshotAssignerStatus(), snapshotPendingSplitsState.getRemainingTables(), snapshotPendingSplitsState.isTableIdCaseSensitive(), snapshotPendingSplitsState.isRemainingTablesCheckpointed());
    }

    private MySqlSnapshotSplitAssigner(MySqlSourceConfig mySqlSourceConfig, int i, List<TableId> list, List<MySqlSnapshotSplit> list2, Map<String, MySqlSnapshotSplit> map, Map<String, BinlogOffset> map2, AssignerStatus assignerStatus, List<TableId> list3, boolean z, boolean z2) {
        this.lock = new Object();
        this.sourceConfig = mySqlSourceConfig;
        this.currentParallelism = i;
        this.alreadyProcessedTables = list;
        this.remainingSplits = new CopyOnWriteArrayList(list2);
        this.assignedSplits = map;
        this.splitFinishedOffsets = map2;
        this.assignerStatus = assignerStatus;
        this.remainingTables = new CopyOnWriteArrayList(list3);
        this.isRemainingTablesCheckpointed = z2;
        this.isTableIdCaseSensitive = z;
    }

    @Override // cn.tenmg.cdc.log.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void open() {
        this.chunkSplitter = createChunkSplitter(this.sourceConfig, this.isTableIdCaseSensitive);
        if (!this.isRemainingTablesCheckpointed && !AssignerStatus.isAssigningFinished(this.assignerStatus)) {
            try {
                JdbcConnection openJdbcConnection = DebeziumUtils.openJdbcConnection(this.sourceConfig);
                Throwable th = null;
                try {
                    List<TableId> discoverCapturedTables = DebeziumUtils.discoverCapturedTables(openJdbcConnection, this.sourceConfig);
                    discoverCapturedTables.removeAll(this.alreadyProcessedTables);
                    this.remainingTables.addAll(discoverCapturedTables);
                    this.isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(openJdbcConnection);
                    if (openJdbcConnection != null) {
                        if (0 != 0) {
                            try {
                                openJdbcConnection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            openJdbcConnection.close();
                        }
                    }
                } finally {
                }
            } catch (Exception e) {
                throw new FlinkRuntimeException("Failed to discover remaining tables to capture", e);
            }
        }
        captureNewlyAddedTables();
        startAsynchronouslySplit();
    }

    private void captureNewlyAddedTables() {
        if (this.sourceConfig.isScanNewlyAddedTableEnabled()) {
            try {
                JdbcConnection openJdbcConnection = DebeziumUtils.openJdbcConnection(this.sourceConfig);
                Throwable th = null;
                try {
                    List<TableId> discoverCapturedTables = DebeziumUtils.discoverCapturedTables(openJdbcConnection, this.sourceConfig);
                    discoverCapturedTables.removeAll(this.alreadyProcessedTables);
                    discoverCapturedTables.removeAll(this.remainingTables);
                    if (!discoverCapturedTables.isEmpty()) {
                        LOG.info("Found newly added tables, start capture newly added tables process");
                        this.remainingTables.addAll(discoverCapturedTables);
                        if (AssignerStatus.isAssigningFinished(this.assignerStatus)) {
                            LOG.info("Found newly added tables, start capture newly added tables process under binlog reading phase");
                            suspend();
                        }
                    }
                    if (openJdbcConnection != null) {
                        if (0 != 0) {
                            try {
                                openJdbcConnection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            openJdbcConnection.close();
                        }
                    }
                } finally {
                }
            } catch (Exception e) {
                throw new FlinkRuntimeException("Failed to discover remaining tables to capture", e);
            }
        }
    }

    private void startAsynchronouslySplit() {
        if (this.remainingTables.isEmpty()) {
            return;
        }
        if (this.executor == null) {
            this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build());
        }
        this.executor.submit(this::splitChunksForRemainingTables);
    }

    @Override // cn.tenmg.cdc.log.connectors.mysql.source.assigners.MySqlSplitAssigner
    public Optional<MySqlSplit> getNext() {
        checkSplitterErrors();
        synchronized (this.lock) {
            if (!this.remainingSplits.isEmpty()) {
                MySqlSnapshotSplit next = this.remainingSplits.iterator().next();
                this.remainingSplits.remove(next);
                this.assignedSplits.put(next.splitId(), next);
                addAlreadyProcessedTablesIfNotExists(next.getTableId());
                return Optional.of(next);
            }
            if (this.remainingTables.isEmpty()) {
                closeExecutorService();
                return Optional.empty();
            }
            try {
                this.lock.wait();
                return getNext();
            } catch (InterruptedException e) {
                throw new FlinkRuntimeException("InterruptedException while waiting for asynchronously snapshot split");
            }
        }
    }

    @Override // cn.tenmg.cdc.log.connectors.mysql.source.assigners.MySqlSplitAssigner
    public boolean waitingForFinishedSplits() {
        return !allSplitsFinished();
    }

    @Override // cn.tenmg.cdc.log.connectors.mysql.source.assigners.MySqlSplitAssigner
    public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
        if (waitingForFinishedSplits()) {
            LOG.error("The assigner is not ready to offer finished split information, this should not be called");
            throw new FlinkRuntimeException("The assigner is not ready to offer finished split information, this should not be called");
        }
        List<MySqlSnapshotSplit> list = (List) this.assignedSplits.values().stream().sorted(Comparator.comparing((v0) -> {
            return v0.splitId();
        })).collect(Collectors.toList());
        ArrayList arrayList = new ArrayList();
        for (MySqlSnapshotSplit mySqlSnapshotSplit : list) {
            arrayList.add(new FinishedSnapshotSplitInfo(mySqlSnapshotSplit.getTableId(), mySqlSnapshotSplit.splitId(), mySqlSnapshotSplit.getSplitStart(), mySqlSnapshotSplit.getSplitEnd(), this.splitFinishedOffsets.get(mySqlSnapshotSplit.splitId())));
        }
        return arrayList;
    }

    @Override // cn.tenmg.cdc.log.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void onFinishedSplits(Map<String, BinlogOffset> map) {
        this.splitFinishedOffsets.putAll(map);
        if (allSplitsFinished() && AssignerStatus.isAssigning(this.assignerStatus)) {
            if (this.currentParallelism != 1) {
                LOG.info("Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished.");
            } else {
                this.assignerStatus = this.assignerStatus.onFinish();
                LOG.info("Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status.");
            }
        }
    }

    @Override // cn.tenmg.cdc.log.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void addSplits(Collection<MySqlSplit> collection) {
        for (MySqlSplit mySqlSplit : collection) {
            this.remainingSplits.add(mySqlSplit.asSnapshotSplit());
            this.assignedSplits.remove(mySqlSplit.splitId());
            this.splitFinishedOffsets.remove(mySqlSplit.splitId());
        }
    }

    @Override // cn.tenmg.cdc.log.connectors.mysql.source.assigners.MySqlSplitAssigner
    public SnapshotPendingSplitsState snapshotState(long j) {
        SnapshotPendingSplitsState snapshotPendingSplitsState = new SnapshotPendingSplitsState(this.alreadyProcessedTables, this.remainingSplits, this.assignedSplits, this.splitFinishedOffsets, this.assignerStatus, this.remainingTables, this.isTableIdCaseSensitive, true);
        if (this.checkpointIdToFinish == null && !AssignerStatus.isAssigningFinished(this.assignerStatus) && allSplitsFinished()) {
            this.checkpointIdToFinish = Long.valueOf(j);
        }
        return snapshotPendingSplitsState;
    }

    @Override // cn.tenmg.cdc.log.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void notifyCheckpointComplete(long j) {
        if (this.checkpointIdToFinish == null || AssignerStatus.isAssigningFinished(this.assignerStatus) || !allSplitsFinished()) {
            return;
        }
        if (j >= this.checkpointIdToFinish.longValue()) {
            this.assignerStatus = this.assignerStatus.onFinish();
        }
        LOG.info("Snapshot split assigner is turn into finished status.");
    }

    @Override // cn.tenmg.cdc.log.connectors.mysql.source.assigners.MySqlSplitAssigner
    public AssignerStatus getAssignerStatus() {
        return this.assignerStatus;
    }

    @Override // cn.tenmg.cdc.log.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void suspend() {
        Preconditions.checkState(AssignerStatus.isAssigningFinished(this.assignerStatus), "Invalid assigner status {}", new Object[]{this.assignerStatus});
        this.assignerStatus = this.assignerStatus.suspend();
    }

    @Override // cn.tenmg.cdc.log.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void wakeup() {
        Preconditions.checkState(AssignerStatus.isSuspended(this.assignerStatus), "Invalid assigner status {}", new Object[]{this.assignerStatus});
        this.assignerStatus = this.assignerStatus.wakeup();
    }

    @Override // cn.tenmg.cdc.log.connectors.mysql.source.assigners.MySqlSplitAssigner
    public void close() {
        closeExecutorService();
    }

    private void closeExecutorService() {
        if (this.executor != null) {
            this.executor.shutdown();
        }
    }

    private void addAlreadyProcessedTablesIfNotExists(TableId tableId) {
        if (this.alreadyProcessedTables.contains(tableId)) {
            return;
        }
        this.alreadyProcessedTables.add(tableId);
    }

    public boolean noMoreSplits() {
        return this.remainingTables.isEmpty() && this.remainingSplits.isEmpty();
    }

    public Map<String, MySqlSnapshotSplit> getAssignedSplits() {
        return this.assignedSplits;
    }

    public Map<String, BinlogOffset> getSplitFinishedOffsets() {
        return this.splitFinishedOffsets;
    }

    private boolean allSplitsFinished() {
        return noMoreSplits() && this.assignedSplits.size() == this.splitFinishedOffsets.size();
    }

    private void splitChunksForRemainingTables() {
        try {
            for (TableId tableId : this.remainingTables) {
                Collection<MySqlSnapshotSplit> generateSplits = this.chunkSplitter.generateSplits(tableId);
                synchronized (this.lock) {
                    this.remainingSplits.addAll(generateSplits);
                    this.remainingTables.remove(tableId);
                    this.lock.notify();
                }
            }
        } catch (Exception e) {
            if (this.uncaughtSplitterException == null) {
                this.uncaughtSplitterException = e;
            } else {
                this.uncaughtSplitterException.addSuppressed(e);
            }
            synchronized (this.lock) {
                this.lock.notify();
            }
        }
    }

    private void checkSplitterErrors() {
        if (this.uncaughtSplitterException != null) {
            throw new FlinkRuntimeException("Chunk splitting has encountered exception", this.uncaughtSplitterException);
        }
    }

    private static ChunkSplitter createChunkSplitter(MySqlSourceConfig mySqlSourceConfig, boolean z) {
        return new ChunkSplitter(new MySqlSchema(mySqlSourceConfig, z), mySqlSourceConfig);
    }
}
