package io.trino.plugin.deltalake.functions.tablechanges;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.trino.filesystem.Locations;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.deltalake.DeltaLakeErrorCode;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.CdcEntry;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.TransactionLogUtil;
import io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorSplitSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.LongStream;
import java.util.stream.Stream;

/* loaded from: input_file:io/trino/plugin/deltalake/functions/tablechanges/TableChangesSplitSource.class */
public class TableChangesSplitSource implements ConnectorSplitSource {
    private final String tableLocation;
    private final Iterator<ConnectorSplit> splits;

    public TableChangesSplitSource(ConnectorSession connectorSession, TrinoFileSystemFactory trinoFileSystemFactory, TableChangesTableFunctionHandle tableChangesTableFunctionHandle) {
        this.tableLocation = tableChangesTableFunctionHandle.tableLocation();
        this.splits = prepareSplits(tableChangesTableFunctionHandle.firstReadVersion(), tableChangesTableFunctionHandle.tableReadVersion(), TransactionLogUtil.getTransactionLogDir(tableChangesTableFunctionHandle.tableLocation()), trinoFileSystemFactory.create(connectorSession)).iterator();
    }

    private Stream<ConnectorSplit> prepareSplits(long j, long j2, String str, TrinoFileSystem trinoFileSystem) {
        return LongStream.range(j, j2 + 1).boxed().flatMap(l -> {
            try {
                List<DeltaLakeTransactionLogEntry> orElseThrow = TransactionLogTail.getEntriesFromJson(l.longValue(), str, trinoFileSystem).orElseThrow(() -> {
                    return new TrinoException(DeltaLakeErrorCode.DELTA_LAKE_BAD_DATA, "Delta Lake log entries are missing for version " + l);
                });
                if (orElseThrow.isEmpty()) {
                    return ImmutableList.of().stream();
                }
                List list = (List) orElseThrow.stream().map((v0) -> {
                    return v0.getCommitInfo();
                }).filter((v0) -> {
                    return Objects.nonNull(v0);
                }).collect(ImmutableList.toImmutableList());
                if (list.size() != 1) {
                    throw new TrinoException(DeltaLakeErrorCode.DELTA_LAKE_BAD_DATA, "There should be exactly 1 commitInfo present in a metadata file");
                }
                CommitInfoEntry commitInfoEntry = (CommitInfoEntry) Iterables.getOnlyElement(list);
                ArrayList arrayList = new ArrayList();
                boolean z = false;
                boolean z2 = false;
                for (DeltaLakeTransactionLogEntry deltaLakeTransactionLogEntry : orElseThrow) {
                    CdcEntry cdc = deltaLakeTransactionLogEntry.getCDC();
                    if (cdc != null) {
                        z = true;
                        arrayList.add(mapToDeltaLakeTableChangesSplit(commitInfoEntry, TableChangesFileType.CDF_FILE, cdc.getSize(), cdc.getPath(), cdc.getCanonicalPartitionValues()));
                    }
                    if (deltaLakeTransactionLogEntry.getRemove() != null && deltaLakeTransactionLogEntry.getRemove().isDataChange()) {
                        z2 = true;
                    }
                }
                if (z2 && !z) {
                    throw new TrinoException(StandardErrorCode.INVALID_FUNCTION_ARGUMENT, String.format("Change Data Feed is not enabled at version %d. Version contains 'remove' entries without 'cdc' entries", l));
                }
                if (!z2) {
                    for (DeltaLakeTransactionLogEntry deltaLakeTransactionLogEntry2 : orElseThrow) {
                        if (deltaLakeTransactionLogEntry2.getAdd() != null && deltaLakeTransactionLogEntry2.getAdd().isDataChange()) {
                            AddFileEntry add = deltaLakeTransactionLogEntry2.getAdd();
                            arrayList.add(mapToDeltaLakeTableChangesSplit(commitInfoEntry, TableChangesFileType.DATA_FILE, add.getSize(), add.getPath(), add.getCanonicalPartitionValues()));
                        }
                    }
                }
                return arrayList.stream();
            } catch (IOException e) {
                throw new TrinoException(DeltaLakeErrorCode.DELTA_LAKE_FILESYSTEM_ERROR, "Failed to access table metadata", e);
            }
        });
    }

    public CompletableFuture<ConnectorSplitSource.ConnectorSplitBatch> getNextBatch(int i) {
        ImmutableList.Builder builder = ImmutableList.builder();
        for (int i2 = 0; i2 < i && this.splits.hasNext(); i2++) {
            builder.add(this.splits.next());
        }
        return CompletableFuture.completedFuture(new ConnectorSplitSource.ConnectorSplitBatch(builder.build(), isFinished()));
    }

    private TableChangesSplit mapToDeltaLakeTableChangesSplit(CommitInfoEntry commitInfoEntry, TableChangesFileType tableChangesFileType, long j, String str, Map<String, Optional<String>> map) {
        return new TableChangesSplit(Locations.appendPath(this.tableLocation, str), j, map, commitInfoEntry.getTimestamp(), tableChangesFileType, commitInfoEntry.getVersion());
    }

    public void close() {
    }

    public boolean isFinished() {
        return !this.splits.hasNext();
    }
}
