package com.google.cloud.flink.bigquery.source.enumerator;

import com.google.cloud.flink.bigquery.source.config.BigQueryReadOptions;
import com.google.cloud.flink.bigquery.source.split.BigQuerySourceSplit;
import com.google.cloud.flink.bigquery.source.split.SplitDiscoveryScheduler;
import com.google.cloud.flink.bigquery.source.split.assigner.BigQuerySourceSplitAssigner;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.function.BiConsumer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:com/google/cloud/flink/bigquery/source/enumerator/BigQuerySourceEnumerator.class */
public class BigQuerySourceEnumerator implements SplitEnumerator<BigQuerySourceSplit, BigQuerySourceEnumState>, SplitDiscoveryScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(BigQuerySourceEnumerator.class);
    private final Boundedness boundedness;
    private final SplitEnumeratorContext<BigQuerySourceSplit> context;
    private final BigQuerySourceSplitAssigner splitAssigner;
    private final TreeSet<Integer> readersAwaitingSplit = new TreeSet<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.google.cloud.flink.bigquery.source.enumerator.BigQuerySourceEnumerator$1, reason: invalid class name */
    /* loaded from: input_file:com/google/cloud/flink/bigquery/source/enumerator/BigQuerySourceEnumerator$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$api$connector$source$Boundedness = new int[Boundedness.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$api$connector$source$Boundedness[Boundedness.BOUNDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
        }
    }

    public BigQuerySourceEnumerator(Boundedness boundedness, SplitEnumeratorContext<BigQuerySourceSplit> splitEnumeratorContext, BigQueryReadOptions bigQueryReadOptions, BigQuerySourceEnumState bigQuerySourceEnumState) {
        this.boundedness = boundedness;
        this.context = splitEnumeratorContext;
        this.splitAssigner = createBigQuerySourceSplitAssigner(bigQueryReadOptions, bigQuerySourceEnumState);
    }

    final BigQuerySourceSplitAssigner createBigQuerySourceSplitAssigner(BigQueryReadOptions bigQueryReadOptions, BigQuerySourceEnumState bigQuerySourceEnumState) {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$api$connector$source$Boundedness[this.boundedness.ordinal()]) {
            case 1:
                return BigQuerySourceSplitAssigner.createBounded(bigQueryReadOptions, bigQuerySourceEnumState);
            default:
                throw new IllegalArgumentException("Non supported boundedness: " + this.boundedness);
        }
    }

    public void start() {
        this.splitAssigner.openAndDiscoverSplits();
    }

    public void handleSplitRequest(int i, String str) {
        if (this.context.registeredReaders().containsKey(Integer.valueOf(i))) {
            this.readersAwaitingSplit.add(Integer.valueOf(i));
            assignSplits();
        }
    }

    public void addSplitsBack(List<BigQuerySourceSplit> list, int i) {
        LOG.debug("BigQuery Source Enumerator adds splits back: {}", list);
        this.splitAssigner.addSplitsBack(list);
    }

    public void addReader(int i) {
        LOG.debug("Adding reader {} to BigQuerySourceEnumerator.", Integer.valueOf(i));
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public BigQuerySourceEnumState m26snapshotState(long j) throws Exception {
        BigQuerySourceEnumState snapshotState = this.splitAssigner.snapshotState(j);
        LOG.debug("Checkpointing state {}", snapshotState);
        return snapshotState;
    }

    public void close() throws IOException {
        this.splitAssigner.close();
    }

    private void assignSplits() {
        Iterator<Integer> it = this.readersAwaitingSplit.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            if (this.context.registeredReaders().containsKey(Integer.valueOf(intValue))) {
                Optional<BigQuerySourceSplit> next = this.splitAssigner.getNext();
                if (next.isPresent()) {
                    BigQuerySourceSplit bigQuerySourceSplit = next.get();
                    this.context.assignSplit(bigQuerySourceSplit, intValue);
                    it.remove();
                    LOG.info("Assign split {} to subtask {}", bigQuerySourceSplit, Integer.valueOf(intValue));
                    return;
                }
                if (!this.splitAssigner.noMoreSplits() || this.boundedness != Boundedness.BOUNDED) {
                    LOG.info("All splits have been assigned, will check later on.");
                    return;
                }
                LOG.info("All splits have been assigned");
                Set keySet = this.context.registeredReaders().keySet();
                SplitEnumeratorContext<BigQuerySourceSplit> splitEnumeratorContext = this.context;
                splitEnumeratorContext.getClass();
                keySet.forEach((v1) -> {
                    r1.signalNoMoreSplits(v1);
                });
                return;
            }
            it.remove();
        }
    }

    public <T> void schedule(Callable<T> callable, BiConsumer<T, Throwable> biConsumer, long j, long j2) {
        this.context.callAsync(callable, biConsumer, j, j2);
    }

    public void notifySplits() {
        assignSplits();
    }
}
