package org.mongoflink.source.reader;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.mongodb.client.MongoCursor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Queue;
import javax.annotation.Nullable;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.bson.Document;
import org.mongoflink.internal.connection.MongoClientProvider;
import org.mongoflink.source.split.MongoRecords;
import org.mongoflink.source.split.MongoSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/mongoflink/source/reader/MongoSplitReader.class */
public class MongoSplitReader implements SplitReader<Document, MongoSplit> {

    @Nullable
    private MongoSplit currentSplit;
    private final MongoClientProvider clientProvider;

    @Nullable
    private transient MongoCursor<Document> cursor;
    private static final int DEFAULT_FETCH_SIZE = 200;
    private static final Logger LOG = LoggerFactory.getLogger(MongoSplitReader.class);
    private int offset = 0;
    private int fetchSize = DEFAULT_FETCH_SIZE;
    private final Queue<MongoSplit> pendingSplits = Queues.newArrayDeque();

    public MongoSplitReader(MongoClientProvider mongoClientProvider) {
        this.clientProvider = mongoClientProvider;
    }

    public RecordsWithSplitIds<Document> fetch() throws IOException {
        prepareRead();
        Preconditions.checkNotNull(this.currentSplit);
        Preconditions.checkNotNull(this.cursor);
        ArrayList newArrayList = Lists.newArrayList();
        while (newArrayList.size() < this.fetchSize && this.cursor.hasNext()) {
            newArrayList.add(this.cursor.next());
        }
        this.offset += newArrayList.size();
        if (this.cursor.hasNext()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Fetched {} records from split {}, current offset: {}", new Object[]{Integer.valueOf(newArrayList.size()), this.currentSplit, Integer.valueOf(this.offset)});
            }
            return MongoRecords.forRecords(this.currentSplit.splitId(), newArrayList);
        }
        String splitId = this.currentSplit.splitId();
        closeCurrentSplit();
        return MongoRecords.finishedSplit(splitId, newArrayList);
    }

    public void handleSplitsChanges(SplitsChange<MongoSplit> splitsChange) {
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        LOG.info("Handling split change {}", splitsChange);
        this.pendingSplits.addAll(splitsChange.splits());
    }

    public void wakeUp() {
    }

    public void close() throws Exception {
        if (this.cursor != null) {
            this.cursor.close();
        }
    }

    private void prepareRead() throws IOException {
        if (this.cursor != null) {
            return;
        }
        this.currentSplit = this.pendingSplits.poll();
        if (this.currentSplit == null) {
            throw new IOException("No more splits can be read.");
        }
        LOG.info("Prepared to read split {}", this.currentSplit.splitId());
        this.offset = 0;
        this.cursor = this.clientProvider.getDefaultCollection().find(this.currentSplit.getQuery()).projection(this.currentSplit.getProjection()).batchSize(this.fetchSize).iterator();
    }

    private void closeCurrentSplit() {
        Preconditions.checkNotNull(this.currentSplit);
        LOG.info("Finished reading split {}.", this.currentSplit.splitId());
        this.currentSplit = null;
        Preconditions.checkNotNull(this.cursor);
        this.cursor.close();
        this.cursor = null;
    }
}
