package io.trino.plugin.deltalake.functions.tablechanges;

import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.slice.Slices;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.TrinoInputFile;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.plugin.deltalake.DeltaLakeCdfPageSink;
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.DeltaLakeColumnType;
import io.trino.plugin.deltalake.DeltaLakeMergeSink;
import io.trino.plugin.deltalake.DeltaLakePageSource;
import io.trino.plugin.deltalake.DeltaLakeSessionProperties;
import io.trino.plugin.hive.FileFormatDataSourceStats;
import io.trino.plugin.hive.ReaderPageSource;
import io.trino.plugin.hive.parquet.ParquetPageSourceFactory;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.function.table.TableFunctionProcessorState;
import io.trino.spi.function.table.TableFunctionSplitProcessor;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.Utils;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.DateTimeEncoding;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.VarcharType;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import org.joda.time.DateTimeZone;

/* loaded from: input_file:io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.class */
public class TableChangesFunctionProcessor implements TableFunctionSplitProcessor {
    private static final int NUMBER_OF_ADDITIONAL_COLUMNS_FOR_CDF_FILE = 2;
    private static final int NUMBER_OF_ADDITIONAL_COLUMNS_FOR_DATA_FILE = 3;
    private static final Page EMPTY_PAGE = new Page(0);
    private final TableChangesFileType fileType;
    private final DeltaLakePageSource deltaLakePageSource;
    private final Block currentVersionAsBlock;
    private final Block currentVersionCommitTimestampAsBlock;

    public TableChangesFunctionProcessor(ConnectorSession connectorSession, TrinoFileSystemFactory trinoFileSystemFactory, DateTimeZone dateTimeZone, int i, FileFormatDataSourceStats fileFormatDataSourceStats, ParquetReaderOptions parquetReaderOptions, TableChangesTableFunctionHandle tableChangesTableFunctionHandle, TableChangesSplit tableChangesSplit) {
        Objects.requireNonNull(connectorSession, "session is null");
        Objects.requireNonNull(trinoFileSystemFactory, "fileSystemFactory is null");
        Objects.requireNonNull(dateTimeZone, "parquetDateTimeZone is null");
        Objects.requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null");
        Objects.requireNonNull(parquetReaderOptions, "parquetReaderOptions is null");
        Objects.requireNonNull(tableChangesTableFunctionHandle, "handle is null");
        Objects.requireNonNull(tableChangesSplit, "tableChangesSplit is null");
        this.fileType = tableChangesSplit.fileType();
        this.deltaLakePageSource = createDeltaLakePageSource(connectorSession, trinoFileSystemFactory, dateTimeZone, i, fileFormatDataSourceStats, parquetReaderOptions, tableChangesTableFunctionHandle, tableChangesSplit);
        this.currentVersionAsBlock = Utils.nativeValueToBlock(BigintType.BIGINT, Long.valueOf(tableChangesSplit.currentVersion()));
        this.currentVersionCommitTimestampAsBlock = Utils.nativeValueToBlock(TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS, Long.valueOf(DateTimeEncoding.packDateTimeWithZone(tableChangesSplit.currentVersionCommitTimestamp(), TimeZoneKey.UTC_KEY)));
    }

    public TableFunctionProcessorState process() {
        return this.fileType == TableChangesFileType.CDF_FILE ? processCdfFile() : processDataFile();
    }

    private TableFunctionProcessorState processCdfFile() {
        Page nextPage = this.deltaLakePageSource.getNextPage();
        if (nextPage == null) {
            return this.deltaLakePageSource.isFinished() ? TableFunctionProcessorState.Finished.FINISHED : TableFunctionProcessorState.Processed.produced(EMPTY_PAGE);
        }
        int channelCount = nextPage.getChannelCount();
        Block[] blockArr = new Block[channelCount + 2];
        for (int i = 0; i < channelCount; i++) {
            blockArr[i] = nextPage.getBlock(i);
        }
        blockArr[channelCount] = RunLengthEncodedBlock.create(this.currentVersionAsBlock, nextPage.getPositionCount());
        blockArr[channelCount + 1] = RunLengthEncodedBlock.create(this.currentVersionCommitTimestampAsBlock, nextPage.getPositionCount());
        return TableFunctionProcessorState.Processed.produced(new Page(nextPage.getPositionCount(), blockArr));
    }

    private TableFunctionProcessorState processDataFile() {
        Page nextPage = this.deltaLakePageSource.getNextPage();
        if (nextPage == null) {
            return this.deltaLakePageSource.isFinished() ? TableFunctionProcessorState.Finished.FINISHED : TableFunctionProcessorState.Processed.produced(EMPTY_PAGE);
        }
        int channelCount = nextPage.getChannelCount();
        Block[] blockArr = new Block[channelCount + 3];
        for (int i = 0; i < channelCount; i++) {
            blockArr[i] = nextPage.getBlock(i);
        }
        blockArr[channelCount] = RunLengthEncodedBlock.create(Utils.nativeValueToBlock(VarcharType.VARCHAR, Slices.utf8Slice(DeltaLakeMergeSink.INSERT_CDF_LABEL)), nextPage.getPositionCount());
        blockArr[channelCount + 1] = RunLengthEncodedBlock.create(this.currentVersionAsBlock, nextPage.getPositionCount());
        blockArr[channelCount + 2] = RunLengthEncodedBlock.create(this.currentVersionCommitTimestampAsBlock, nextPage.getPositionCount());
        return TableFunctionProcessorState.Processed.produced(new Page(nextPage.getPositionCount(), blockArr));
    }

    private static DeltaLakePageSource createDeltaLakePageSource(ConnectorSession connectorSession, TrinoFileSystemFactory trinoFileSystemFactory, DateTimeZone dateTimeZone, int i, FileFormatDataSourceStats fileFormatDataSourceStats, ParquetReaderOptions parquetReaderOptions, TableChangesTableFunctionHandle tableChangesTableFunctionHandle, TableChangesSplit tableChangesSplit) {
        ImmutableList columns;
        TrinoInputFile newInputFile = trinoFileSystemFactory.create(connectorSession).newInputFile(Location.of(tableChangesSplit.path()), tableChangesSplit.fileSize());
        Map<String, Optional<String>> partitionKeys = tableChangesSplit.partitionKeys();
        ParquetReaderOptions withUseColumnIndex = parquetReaderOptions.withMaxReadBlockSize(DeltaLakeSessionProperties.getParquetMaxReadBlockSize(connectorSession)).withMaxReadBlockRowCount(DeltaLakeSessionProperties.getParquetMaxReadBlockRowCount(connectorSession)).withUseColumnIndex(DeltaLakeSessionProperties.isParquetUseColumnIndex(connectorSession));
        switch (tableChangesSplit.fileType()) {
            case CDF_FILE:
                columns = ImmutableList.builder().addAll(tableChangesTableFunctionHandle.columns()).add(new DeltaLakeColumnHandle(DeltaLakeCdfPageSink.CHANGE_TYPE_COLUMN_NAME, VarcharType.VARCHAR, OptionalInt.empty(), DeltaLakeCdfPageSink.CHANGE_TYPE_COLUMN_NAME, VarcharType.VARCHAR, DeltaLakeColumnType.REGULAR, Optional.empty())).build();
                break;
            case DATA_FILE:
                columns = tableChangesTableFunctionHandle.columns();
                break;
            default:
                throw new IncompatibleClassChangeError();
        }
        ImmutableList immutableList = columns;
        ReaderPageSource createPageSource = ParquetPageSourceFactory.createPageSource(newInputFile, 0L, tableChangesSplit.fileSize(), (List) immutableList.stream().filter(deltaLakeColumnHandle -> {
            return deltaLakeColumnHandle.getColumnType() == DeltaLakeColumnType.REGULAR;
        }).map((v0) -> {
            return v0.toHiveColumnHandle();
        }).collect(ImmutableList.toImmutableList()), ImmutableList.of(TupleDomain.all()), true, dateTimeZone, fileFormatDataSourceStats, withUseColumnIndex, Optional.empty(), i, OptionalLong.empty());
        Verify.verify(createPageSource.getReaderColumns().isEmpty(), "Unexpected reader columns: %s", createPageSource.getReaderColumns().orElse(null));
        return new DeltaLakePageSource(immutableList, ImmutableSet.of(), partitionKeys, Optional.empty(), createPageSource.get(), Optional.empty(), tableChangesSplit.path(), tableChangesSplit.fileSize(), 0L, Optional::empty);
    }
}
