package org.apache.seatunnel.connectors.cdc.base.source.enumerator;

import io.debezium.relational.TableId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.SnapshotPhaseState;
import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.class */
public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotSplitAssigner.class);
    private final SplitAssigner.Context<C> context;
    private final C sourceConfig;
    private final List<TableId> alreadyProcessedTables;
    private final Queue<SnapshotSplit> remainingSplits;
    private final Map<String, SnapshotSplit> assignedSplits;
    private final Map<String, SnapshotSplitWatermark> splitCompletedOffsets;
    private boolean assignerCompleted;
    private final int currentParallelism;
    private final Deque<TableId> remainingTables;
    private final boolean isRemainingTablesCheckpointed;
    private ChunkSplitter chunkSplitter;
    private boolean isTableIdCaseSensitive;
    private Long checkpointIdToFinish;
    private final DataSourceDialect<C> dialect;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SnapshotSplitAssigner(SplitAssigner.Context<C> context, int i, List<TableId> list, boolean z, DataSourceDialect<C> dataSourceDialect) {
        this(context, i, new ArrayList(), new ArrayList(), new HashMap(), new HashMap(), false, list, z, true, dataSourceDialect);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SnapshotSplitAssigner(SplitAssigner.Context<C> context, int i, SnapshotPhaseState snapshotPhaseState, DataSourceDialect<C> dataSourceDialect) {
        this(context, i, snapshotPhaseState.getAlreadyProcessedTables(), snapshotPhaseState.getRemainingSplits(), snapshotPhaseState.getAssignedSplits(), snapshotPhaseState.getSplitCompletedOffsets(), snapshotPhaseState.isAssignerCompleted(), snapshotPhaseState.getRemainingTables(), snapshotPhaseState.isTableIdCaseSensitive(), snapshotPhaseState.isRemainingTablesCheckpointed(), dataSourceDialect);
    }

    private SnapshotSplitAssigner(SplitAssigner.Context<C> context, int i, List<TableId> list, List<SnapshotSplit> list2, Map<String, SnapshotSplit> map, Map<String, SnapshotSplitWatermark> map2, boolean z, List<TableId> list3, boolean z2, boolean z3, DataSourceDialect<C> dataSourceDialect) {
        this.context = context;
        this.sourceConfig = context.getSourceConfig();
        this.currentParallelism = i;
        this.alreadyProcessedTables = Collections.synchronizedList(list);
        this.remainingSplits = new ConcurrentLinkedQueue(list2);
        this.assignedSplits = new ConcurrentHashMap(map);
        this.splitCompletedOffsets = new ConcurrentHashMap(map2);
        this.assignerCompleted = z;
        this.remainingTables = new ConcurrentLinkedDeque(list3);
        this.isRemainingTablesCheckpointed = z3;
        this.isTableIdCaseSensitive = z2;
        this.dialect = dataSourceDialect;
        LOG.info("SnapshotSplitAssigner created with remaining tables: {}", this.remainingTables);
        LOG.info("SnapshotSplitAssigner created with remaining splits: [{}]", this.remainingSplits.stream().map((v0) -> {
            return v0.splitId();
        }).collect(Collectors.joining(",")));
        LOG.info("SnapshotSplitAssigner created with assigned splits: {}", this.assignedSplits.keySet());
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner
    public void open() {
        this.chunkSplitter = this.dialect.createChunkSplitter(this.sourceConfig);
        if (this.isRemainingTablesCheckpointed || this.assignerCompleted) {
            return;
        }
        try {
            List<TableId> discoverDataCollections = this.dialect.discoverDataCollections(this.sourceConfig);
            this.context.getCapturedTables().addAll(discoverDataCollections);
            discoverDataCollections.removeAll(this.alreadyProcessedTables);
            this.remainingTables.addAll(discoverDataCollections);
            this.isTableIdCaseSensitive = this.dialect.isDataCollectionIdCaseSensitive(this.sourceConfig);
        } catch (Exception e) {
            throw new RuntimeException("Failed to discover remaining tables to capture", e);
        }
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner
    public Optional<SourceSplitBase> getNext() {
        if (this.chunkSplitter == null) {
            return Optional.empty();
        }
        if (!this.remainingSplits.isEmpty()) {
            Iterator<SnapshotSplit> it = this.remainingSplits.iterator();
            SnapshotSplit next = it.next();
            it.remove();
            this.assignedSplits.put(next.splitId(), next);
            this.context.getAssignedSnapshotSplit().put(next.splitId(), next);
            return Optional.of(next);
        }
        TableId pollFirst = this.remainingTables.pollFirst();
        if (pollFirst == null) {
            return Optional.empty();
        }
        this.remainingSplits.addAll(this.chunkSplitter.generateSplits(pollFirst));
        this.alreadyProcessedTables.add(pollFirst);
        return getNext();
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner
    public boolean waitingForCompletedSplits() {
        return !allSplitsCompleted();
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner
    public void onCompletedSplits(List<SnapshotSplitWatermark> list) {
        list.forEach(snapshotSplitWatermark -> {
            this.splitCompletedOffsets.put(snapshotSplitWatermark.getSplitId(), snapshotSplitWatermark);
        });
        if (allSplitsCompleted()) {
            if (this.currentParallelism != 1) {
                LOG.info("Snapshot split assigner received all splits completed, waiting for a complete checkpoint to mark the assigner completed.");
            } else {
                this.assignerCompleted = true;
                LOG.info("Snapshot split assigner received all splits completed and the job parallelism is 1, snapshot split assigner is turn into completed status.");
            }
        }
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner
    public void addSplits(Collection<SourceSplitBase> collection) {
        for (SourceSplitBase sourceSplitBase : collection) {
            this.remainingSplits.add(sourceSplitBase.asSnapshotSplit());
            this.assignedSplits.remove(sourceSplitBase.splitId());
            this.splitCompletedOffsets.remove(sourceSplitBase.splitId());
        }
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner
    public SnapshotPhaseState snapshotState(long j) {
        SnapshotPhaseState snapshotPhaseState = new SnapshotPhaseState(this.alreadyProcessedTables, this.remainingSplits.isEmpty() ? new ArrayList() : new ArrayList(this.remainingSplits), this.assignedSplits, this.splitCompletedOffsets, this.assignerCompleted, this.remainingTables.isEmpty() ? new ArrayList() : new ArrayList(this.remainingTables), this.isTableIdCaseSensitive, true);
        if (this.checkpointIdToFinish == null && !this.assignerCompleted && allSplitsCompleted()) {
            this.checkpointIdToFinish = Long.valueOf(j);
        }
        return snapshotPhaseState;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner
    public void notifyCheckpointComplete(long j) {
        if (this.checkpointIdToFinish == null || this.assignerCompleted || !allSplitsCompleted()) {
            return;
        }
        this.assignerCompleted = j >= this.checkpointIdToFinish.longValue();
        LOG.info("Snapshot split assigner is turn into completed status.");
    }

    public boolean noMoreSplits() {
        return this.remainingTables.isEmpty() && this.remainingSplits.isEmpty();
    }

    public boolean isCompleted() {
        return this.assignerCompleted;
    }

    private boolean allSplitsCompleted() {
        return noMoreSplits() && this.assignedSplits.size() == this.splitCompletedOffsets.size();
    }

    @VisibleForTesting
    Map<String, SnapshotSplit> getAssignedSplits() {
        return this.assignedSplits;
    }

    @VisibleForTesting
    Map<String, SnapshotSplitWatermark> getSplitCompletedOffsets() {
        return this.splitCompletedOffsets;
    }

    public boolean completedSnapshotPhase(List<TableId> list) {
        Preconditions.checkArgument(isCompleted() && allSplitsCompleted());
        Iterator it = new ArrayList(this.assignedSplits.keySet()).iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            SnapshotSplit snapshotSplit = this.assignedSplits.get(str);
            if (list.contains(snapshotSplit.getTableId())) {
                this.assignedSplits.remove(str);
                this.splitCompletedOffsets.remove(snapshotSplit.splitId());
            }
        }
        return this.assignedSplits.isEmpty() && this.splitCompletedOffsets.isEmpty();
    }
}
