/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.bigquery;

import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.plugin.bigquery.BigQueryClient;
import io.trino.plugin.bigquery.BigQueryClientFactory;
import io.trino.plugin.bigquery.BigQueryColumnHandle;
import io.trino.plugin.bigquery.BigQueryConfig;
import io.trino.plugin.bigquery.BigQueryErrorCode;
import io.trino.plugin.bigquery.BigQueryFilterQueryBuilder;
import io.trino.plugin.bigquery.BigQueryReadClientFactory;
import io.trino.plugin.bigquery.BigQuerySessionProperties;
import io.trino.plugin.bigquery.BigQuerySplit;
import io.trino.plugin.bigquery.BigQueryTableHandle;
import io.trino.plugin.bigquery.BigQueryUtil;
import io.trino.plugin.bigquery.ReadSessionCreator;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.NodeManager;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.predicate.TupleDomain;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.inject.Inject;

public class BigQuerySplitManager
implements ConnectorSplitManager {
    private static final Logger log = Logger.get(BigQuerySplitManager.class);
    private final BigQueryClientFactory bigQueryClientFactory;
    private final BigQueryReadClientFactory bigQueryReadClientFactory;
    private final Optional<Integer> parallelism;
    private final boolean viewEnabled;
    private final boolean arrowSerializationEnabled;
    private final Duration viewExpiration;
    private final NodeManager nodeManager;
    private final int maxReadRowsRetries;

    @Inject
    public BigQuerySplitManager(BigQueryConfig config, BigQueryClientFactory bigQueryClientFactory, BigQueryReadClientFactory bigQueryReadClientFactory, NodeManager nodeManager) {
        this.bigQueryClientFactory = Objects.requireNonNull(bigQueryClientFactory, "bigQueryClientFactory cannot be null");
        this.bigQueryReadClientFactory = Objects.requireNonNull(bigQueryReadClientFactory, "bigQueryReadClientFactory cannot be null");
        this.parallelism = config.getParallelism();
        this.viewEnabled = config.isViewsEnabled();
        this.arrowSerializationEnabled = config.isArrowSerializationEnabled();
        this.viewExpiration = config.getViewExpireDuration();
        this.nodeManager = Objects.requireNonNull(nodeManager, "nodeManager cannot be null");
        this.maxReadRowsRetries = config.getMaxReadRowsRetries();
    }

    public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableHandle table, DynamicFilter dynamicFilter, Constraint constraint) {
        log.debug("getSplits(transaction=%s, session=%s, table=%s)", new Object[]{transaction, session, table});
        BigQueryTableHandle bigQueryTableHandle = (BigQueryTableHandle)table;
        int actualParallelism = this.parallelism.orElseGet(() -> this.nodeManager.getRequiredWorkerNodes().size());
        TupleDomain<ColumnHandle> tableConstraint = bigQueryTableHandle.getConstraint();
        Optional<String> filter = BigQueryFilterQueryBuilder.buildFilter(tableConstraint);
        if (!bigQueryTableHandle.isNamedRelation()) {
            List<BigQueryColumnHandle> columns = bigQueryTableHandle.getProjectedColumns().orElse((List<BigQueryColumnHandle>)ImmutableList.of());
            return new FixedSplitSource((ConnectorSplit)BigQuerySplit.forViewStream(columns, filter));
        }
        TableId remoteTableId = bigQueryTableHandle.asPlainTable().getRemoteTableName().toTableId();
        List<BigQuerySplit> splits = BigQuerySplitManager.emptyProjectionIsRequired(bigQueryTableHandle.getProjectedColumns()) ? this.createEmptyProjection(session, remoteTableId, actualParallelism, filter) : this.readFromBigQuery(session, TableDefinition.Type.valueOf((String)bigQueryTableHandle.asPlainTable().getType()), remoteTableId, bigQueryTableHandle.getProjectedColumns(), actualParallelism, filter);
        return new FixedSplitSource(splits);
    }

    private static boolean emptyProjectionIsRequired(Optional<List<BigQueryColumnHandle>> projectedColumns) {
        return projectedColumns.isPresent() && projectedColumns.get().isEmpty();
    }

    private List<BigQuerySplit> readFromBigQuery(ConnectorSession session, TableDefinition.Type type, TableId remoteTableId, Optional<List<BigQueryColumnHandle>> projectedColumns, int actualParallelism, Optional<String> filter) {
        Preconditions.checkArgument((projectedColumns.isPresent() && projectedColumns.get().size() > 0 ? 1 : 0) != 0, (Object)"Projected column is empty");
        log.debug("readFromBigQuery(tableId=%s, projectedColumns=%s, actualParallelism=%s, filter=[%s])", new Object[]{remoteTableId, projectedColumns, actualParallelism, filter});
        List<BigQueryColumnHandle> columns = projectedColumns.get();
        List projectedColumnsNames = (List)columns.stream().map(BigQueryColumnHandle::getName).collect(ImmutableList.toImmutableList());
        if (BigQueryUtil.isWildcardTable(type, remoteTableId.getTable())) {
            return ImmutableList.of((Object)BigQuerySplit.forViewStream(columns, filter));
        }
        if (type == TableDefinition.Type.MATERIALIZED_VIEW || type == TableDefinition.Type.EXTERNAL) {
            return ImmutableList.of((Object)BigQuerySplit.forViewStream(columns, filter));
        }
        if (BigQuerySessionProperties.isSkipViewMaterialization(session) && type == TableDefinition.Type.VIEW) {
            return ImmutableList.of((Object)BigQuerySplit.forViewStream(columns, filter));
        }
        ReadSessionCreator readSessionCreator = new ReadSessionCreator(this.bigQueryClientFactory, this.bigQueryReadClientFactory, this.viewEnabled, this.arrowSerializationEnabled, this.viewExpiration, this.maxReadRowsRetries);
        ReadSession readSession = readSessionCreator.create(session, remoteTableId, projectedColumnsNames, filter, actualParallelism);
        return (List)readSession.getStreamsList().stream().map(stream -> BigQuerySplit.forStream(stream.getName(), readSessionCreator.getSchemaAsString(readSession), columns, OptionalInt.of(stream.getSerializedSize()))).collect(ImmutableList.toImmutableList());
    }

    private List<BigQuerySplit> createEmptyProjection(ConnectorSession session, TableId remoteTableId, int actualParallelism, Optional<String> filter) {
        BigQueryClient client = this.bigQueryClientFactory.create(session);
        log.debug("createEmptyProjection(tableId=%s, actualParallelism=%s, filter=[%s])", new Object[]{remoteTableId, actualParallelism, filter});
        try {
            long numberOfRows;
            if (filter.isPresent()) {
                String sql = client.selectSql(remoteTableId, "COUNT(*)");
                TableResult result = client.query(sql, BigQuerySessionProperties.isQueryResultsCacheEnabled(session), BigQuerySessionProperties.createDisposition(session));
                numberOfRows = ((FieldValueList)result.iterateAll().iterator().next()).get(0).getLongValue();
            } else {
                TableInfo tableInfo = client.getTable(remoteTableId).orElseThrow(() -> new TableNotFoundException(new SchemaTableName(remoteTableId.getDataset(), remoteTableId.getTable())));
                if (BigQueryClient.TABLE_TYPES.contains(tableInfo.getDefinition().getType())) {
                    String sql = client.selectSql(remoteTableId, "COUNT(*)");
                    TableResult result = client.query(sql, BigQuerySessionProperties.isQueryResultsCacheEnabled(session), BigQuerySessionProperties.createDisposition(session));
                    numberOfRows = ((FieldValueList)result.iterateAll().iterator().next()).get(0).getLongValue();
                } else {
                    throw new TrinoException((ErrorCodeSupplier)StandardErrorCode.NOT_SUPPORTED, "Unsupported table type: " + tableInfo.getDefinition().getType());
                }
            }
            long rowsPerSplit = numberOfRows / (long)actualParallelism;
            long remainingRows = numberOfRows - rowsPerSplit * (long)actualParallelism;
            List<BigQuerySplit> splits = IntStream.range(0, actualParallelism).mapToObj(ignored -> BigQuerySplit.emptyProjection(rowsPerSplit)).collect(Collectors.toList());
            splits.set(0, BigQuerySplit.emptyProjection(rowsPerSplit + remainingRows));
            return splits;
        }
        catch (BigQueryException e) {
            throw new TrinoException((ErrorCodeSupplier)BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY, "Failed to compute empty projection", (Throwable)e);
        }
    }
}

