package io.delta.flink.source.internal.enumerator;

import io.delta.flink.source.internal.state.DeltaEnumeratorStateCheckpoint;
import io.delta.flink.source.internal.state.DeltaSourceSplit;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
import org.apache.flink.core.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/delta/flink/source/internal/enumerator/DeltaSourceSplitEnumerator.class */
public abstract class DeltaSourceSplitEnumerator implements SplitEnumerator<DeltaSourceSplit, DeltaEnumeratorStateCheckpoint<DeltaSourceSplit>> {
    private static final Logger LOG = LoggerFactory.getLogger(DeltaSourceSplitEnumerator.class);
    protected final Path deltaTablePath;
    protected final FileSplitAssigner splitAssigner;
    protected final SplitEnumeratorContext<DeltaSourceSplit> enumContext;
    protected final LinkedHashMap<Integer, String> readersAwaitingSplit = new LinkedHashMap<>();

    /* loaded from: input_file:io/delta/flink/source/internal/enumerator/DeltaSourceSplitEnumerator$AssignSplitStatus.class */
    public enum AssignSplitStatus {
        NO_MORE_SPLITS,
        NO_MORE_READERS
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DeltaSourceSplitEnumerator(Path path, FileSplitAssigner fileSplitAssigner, SplitEnumeratorContext<DeltaSourceSplit> splitEnumeratorContext) {
        this.deltaTablePath = path;
        this.splitAssigner = fileSplitAssigner;
        this.enumContext = splitEnumeratorContext;
    }

    public void handleSplitRequest(int i, @Nullable String str) {
        if (this.enumContext.registeredReaders().containsKey(Integer.valueOf(i))) {
            if (LOG.isInfoEnabled()) {
                LOG.info("Subtask {} {} is requesting a file source split", Integer.valueOf(i), str == null ? "(no host locality info)" : "(on host '" + str + "')");
            }
            this.readersAwaitingSplit.put(Integer.valueOf(i), str);
            assignSplits(i);
        }
    }

    public void addSplitsBack(List<DeltaSourceSplit> list, int i) {
        LOG.debug("Bounded Delta Source Enumerator adds splits back: {}", list);
        addSplits(list);
    }

    public void addReader(int i) {
    }

    public void close() throws IOException {
    }

    protected abstract void handleNoMoreSplits(int i);

    /* JADX INFO: Access modifiers changed from: protected */
    public Collection<DeltaSourceSplit> getRemainingSplits() {
        return this.splitAssigner.remainingSplits();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addSplits(List<DeltaSourceSplit> list) {
        this.splitAssigner.addSplits(list);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AssignSplitStatus assignSplits() {
        Iterator<Map.Entry<Integer, String>> it = this.readersAwaitingSplit.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Integer, String> next = it.next();
            if (this.enumContext.registeredReaders().containsKey(next.getKey())) {
                String value = next.getValue();
                int intValue = next.getKey().intValue();
                Optional next2 = this.splitAssigner.getNext(value);
                if (!next2.isPresent()) {
                    return AssignSplitStatus.NO_MORE_SPLITS;
                }
                FileSourceSplit fileSourceSplit = (FileSourceSplit) next2.get();
                this.enumContext.assignSplit((DeltaSourceSplit) fileSourceSplit, intValue);
                LOG.info("Assigned split to subtask {} : {}", Integer.valueOf(intValue), fileSourceSplit);
                it.remove();
            } else {
                it.remove();
            }
        }
        return AssignSplitStatus.NO_MORE_READERS;
    }

    private void assignSplits(int i) {
        if (AssignSplitStatus.NO_MORE_SPLITS.equals(assignSplits())) {
            LOG.info("No more splits available for subtasks");
            handleNoMoreSplits(i);
        }
    }
}
