package com.google.cloud.flink.bigquery.source.split.assigner;

import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.common.utils.BigQueryPartitionUtils;
import com.google.cloud.flink.bigquery.services.BigQueryServicesFactory;
import com.google.cloud.flink.bigquery.source.config.BigQueryReadOptions;
import com.google.cloud.flink.bigquery.source.enumerator.BigQuerySourceEnumState;
import com.google.cloud.flink.bigquery.source.split.SplitDiscoverer;
import com.google.cloud.flink.bigquery.source.split.SplitDiscoveryScheduler;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:com/google/cloud/flink/bigquery/source/split/assigner/UnboundedSplitAssigner.class */
public class UnboundedSplitAssigner extends BigQuerySourceSplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(UnboundedSplitAssigner.class);
    private final SplitDiscoveryScheduler observer;

    /* loaded from: input_file:com/google/cloud/flink/bigquery/source/split/assigner/UnboundedSplitAssigner$DiscoveryResult.class */
    static class DiscoveryResult {
        private final List<String> newPartitions;
        private final List<String> readStreams;

        public DiscoveryResult(List<String> list, List<String> list2) {
            this.newPartitions = list;
            this.readStreams = list2;
        }

        public List<String> getNewPartitions() {
            return this.newPartitions;
        }

        public List<String> getReadStreams() {
            return this.readStreams;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public UnboundedSplitAssigner(SplitDiscoveryScheduler splitDiscoveryScheduler, BigQueryReadOptions bigQueryReadOptions, BigQuerySourceEnumState bigQuerySourceEnumState) {
        super(bigQueryReadOptions, bigQuerySourceEnumState);
        this.observer = splitDiscoveryScheduler;
    }

    @VisibleForTesting
    DiscoveryResult discoverNewSplits() {
        LOG.info("Searching for new data to read.");
        BigQueryConnectOptions bigQueryConnectOptions = this.readOptions.getBigQueryConnectOptions();
        try {
            List retrievePartitionsStatus = BigQueryServicesFactory.instance(bigQueryConnectOptions).queryClient().retrievePartitionsStatus(bigQueryConnectOptions.getProjectId(), bigQueryConnectOptions.getDataset(), bigQueryConnectOptions.getTable());
            LOG.info("Table partitions and their status: {}", retrievePartitionsStatus);
            LOG.info("Already seen partition ids: {}", this.lastSeenPartitions);
            List list = (List) retrievePartitionsStatus.stream().filter(partitionIdWithInfoAndStatus -> {
                return ((Boolean) this.readOptions.getOldestPartitionId().map(str -> {
                    return Boolean.valueOf(partitionIdWithInfoAndStatus.getPartitionId().compareTo(str) >= 0);
                }).orElse(true)).booleanValue();
            }).filter(partitionIdWithInfoAndStatus2 -> {
                return partitionIdWithInfoAndStatus2.isCompleted().booleanValue();
            }).filter(partitionIdWithInfoAndStatus3 -> {
                return !this.lastSeenPartitions.contains(partitionIdWithInfoAndStatus3.getPartitionId());
            }).collect(Collectors.toList());
            return new DiscoveryResult((List) list.stream().map(partitionIdWithInfoAndStatus4 -> {
                return partitionIdWithInfoAndStatus4.getPartitionId();
            }).collect(Collectors.toList()), (List) list.stream().map(partitionIdWithInfoAndStatus5 -> {
                return BigQueryPartitionUtils.formatPartitionRestrictionBasedOnInfo(Optional.of(partitionIdWithInfoAndStatus5.getInfo()), partitionIdWithInfoAndStatus5.getInfo().getColumnName(), (String) BigQueryPartitionUtils.partitionValuesFromIdAndDataType(Arrays.asList(partitionIdWithInfoAndStatus5.getPartitionId()), partitionIdWithInfoAndStatus5.getInfo().getColumnType()).get(0));
            }).flatMap(str -> {
                return SplitDiscoverer.discoverSplits(bigQueryConnectOptions, DataFormat.AVRO, this.readOptions.getColumnNames(), combineRestrictions(this.readOptions.getRowRestriction(), str), this.readOptions.getSnapshotTimestampInMillis(), this.readOptions.getMaxStreamCount()).stream();
            }).collect(Collectors.toList()));
        } catch (Exception e) {
            throw new RuntimeException("Problems while trying to discover new splits.", e);
        }
    }

    @VisibleForTesting
    String combineRestrictions(String str, String str2) {
        return str.trim().isEmpty() ? str2 : str + " AND " + str2;
    }

    @VisibleForTesting
    void handlePartitionSplitDiscovery(DiscoveryResult discoveryResult, Throwable th) {
        if (th != null && this.remainingTableStreams.isEmpty()) {
            throw new RuntimeException(th);
        }
        if (th != null) {
            LOG.error("Failed to poll for new read streams, continuing", th);
            return;
        }
        if (discoveryResult.getReadStreams().isEmpty() && discoveryResult.getNewPartitions().isEmpty()) {
            LOG.info("No new partitions for now.");
            return;
        }
        LOG.info("Discovered new partitions: {}", discoveryResult.getNewPartitions());
        LOG.info("Discovered new read streams: {}", discoveryResult.getReadStreams());
        this.lastSeenPartitions.addAll(discoveryResult.getNewPartitions());
        this.remainingTableStreams.addAll(discoveryResult.getReadStreams());
        this.observer.notifySplits();
    }

    @Override // com.google.cloud.flink.bigquery.source.split.assigner.BigQuerySourceSplitAssigner
    public void discoverSplits() {
        this.observer.schedule(this::discoverNewSplits, this::handlePartitionSplitDiscovery, 0L, Duration.ofMinutes(this.readOptions.getPartitionDiscoveryRefreshIntervalInMinutes().intValue()).toMillis());
    }

    @Override // com.google.cloud.flink.bigquery.source.split.assigner.BigQuerySourceSplitAssigner
    public boolean noMoreSplits() {
        return false;
    }
}
