package io.trino.plugin.deltalake;

import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import io.airlift.json.JsonCodec;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.writer.ParquetSchemaConverter;
import io.trino.parquet.writer.ParquetWriterOptions;
import io.trino.plugin.hive.FileFormatDataSourceStats;
import io.trino.plugin.hive.FileWriter;
import io.trino.plugin.hive.ReaderPageSource;
import io.trino.plugin.hive.parquet.ParquetFileWriter;
import io.trino.plugin.hive.parquet.ParquetPageSourceFactory;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.block.ColumnarRow;
import io.trino.spi.connector.ConnectorMergeSink;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.MergePage;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.VarcharType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.joda.time.DateTimeZone;
import org.roaringbitmap.longlong.ImmutableLongBitmapDataProvider;
import org.roaringbitmap.longlong.LongBitmapDataProvider;
import org.roaringbitmap.longlong.Roaring64Bitmap;

/* loaded from: input_file:io/trino/plugin/deltalake/DeltaLakeMergeSink.class */
public class DeltaLakeMergeSink implements ConnectorMergeSink {
    private static final JsonCodec<List<String>> PARTITIONS_CODEC = JsonCodec.listJsonCodec(String.class);
    private final HdfsEnvironment hdfsEnvironment;
    private final ConnectorSession session;
    private final DateTimeZone parquetDateTimeZone;
    private final String trinoVersion;
    private final JsonCodec<DataFileInfo> dataFileInfoCodec;
    private final JsonCodec<DeltaLakeMergeResult> mergeResultJsonCodec;
    private final DeltaLakeWriterStats writerStats;
    private final String rootTableLocation;
    private final ConnectorPageSink insertPageSink;
    private final List<DeltaLakeColumnHandle> dataColumns;
    private final int tableColumnCount;
    private final Map<Slice, FileDeletion> fileDeletions = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/plugin/deltalake/DeltaLakeMergeSink$FileDeletion.class */
    public static class FileDeletion {
        private final List<String> partitionValues;
        private final LongBitmapDataProvider rowsToDelete = new Roaring64Bitmap();

        private FileDeletion(List<String> list) {
            this.partitionValues = ImmutableList.copyOf((Collection) Objects.requireNonNull(list, "partitionValues is null"));
        }

        public List<String> partitionValues() {
            return this.partitionValues;
        }

        public LongBitmapDataProvider rowsToDelete() {
            return this.rowsToDelete;
        }
    }

    public DeltaLakeMergeSink(HdfsEnvironment hdfsEnvironment, ConnectorSession connectorSession, DateTimeZone dateTimeZone, String str, JsonCodec<DataFileInfo> jsonCodec, JsonCodec<DeltaLakeMergeResult> jsonCodec2, DeltaLakeWriterStats deltaLakeWriterStats, String str2, ConnectorPageSink connectorPageSink, List<DeltaLakeColumnHandle> list) {
        this.hdfsEnvironment = (HdfsEnvironment) Objects.requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
        this.session = (ConnectorSession) Objects.requireNonNull(connectorSession, "session is null");
        this.parquetDateTimeZone = (DateTimeZone) Objects.requireNonNull(dateTimeZone, "parquetDateTimeZone is null");
        this.trinoVersion = (String) Objects.requireNonNull(str, "trinoVersion is null");
        this.dataFileInfoCodec = (JsonCodec) Objects.requireNonNull(jsonCodec, "dataFileInfoCodec is null");
        this.mergeResultJsonCodec = (JsonCodec) Objects.requireNonNull(jsonCodec2, "mergeResultJsonCodec is null");
        this.writerStats = (DeltaLakeWriterStats) Objects.requireNonNull(deltaLakeWriterStats, "writerStats is null");
        this.rootTableLocation = (String) Objects.requireNonNull(str2, "rootTableLocation is null");
        this.insertPageSink = (ConnectorPageSink) Objects.requireNonNull(connectorPageSink, "insertPageSink is null");
        Objects.requireNonNull(list, "tableColumns is null");
        this.tableColumnCount = list.size();
        this.dataColumns = (List) list.stream().filter(deltaLakeColumnHandle -> {
            return deltaLakeColumnHandle.getColumnType() == DeltaLakeColumnType.REGULAR;
        }).collect(ImmutableList.toImmutableList());
    }

    public void storeMergedRows(Page page) {
        MergePage createDeleteAndInsertPages = MergePage.createDeleteAndInsertPages(page, this.tableColumnCount);
        Optional insertionsPage = createDeleteAndInsertPages.getInsertionsPage();
        ConnectorPageSink connectorPageSink = this.insertPageSink;
        Objects.requireNonNull(connectorPageSink);
        insertionsPage.ifPresent(connectorPageSink::appendPage);
        createDeleteAndInsertPages.getDeletionsPage().ifPresent(page2 -> {
            ColumnarRow columnarRow = ColumnarRow.toColumnarRow(page2.getBlock(page2.getChannelCount() - 1));
            for (int i = 0; i < columnarRow.getPositionCount(); i++) {
                Slice slice = VarcharType.VARCHAR.getSlice(columnarRow.getField(0), i);
                long j = BigintType.BIGINT.getLong(columnarRow.getField(1), i);
                List list = (List) PARTITIONS_CODEC.fromJson(VarcharType.VARCHAR.getSlice(columnarRow.getField(2), i).toStringUtf8());
                this.fileDeletions.computeIfAbsent(slice, slice2 -> {
                    return new FileDeletion(list);
                }).rowsToDelete().addLong(j);
            }
        });
    }

    public CompletableFuture<Collection<Slice>> finish() {
        ArrayList arrayList = new ArrayList();
        Stream map = ((Collection) this.insertPageSink.finish().join()).stream().map((v0) -> {
            return v0.getBytes();
        });
        JsonCodec<DataFileInfo> jsonCodec = this.dataFileInfoCodec;
        Objects.requireNonNull(jsonCodec);
        Stream map2 = map.map(jsonCodec::fromJson).map(dataFileInfo -> {
            return new DeltaLakeMergeResult(Optional.empty(), Optional.of(dataFileInfo));
        });
        JsonCodec<DeltaLakeMergeResult> jsonCodec2 = this.mergeResultJsonCodec;
        Objects.requireNonNull(jsonCodec2);
        Stream map3 = map2.map((v1) -> {
            return r1.toJsonBytes(v1);
        }).map(Slices::wrappedBuffer);
        Objects.requireNonNull(arrayList);
        map3.forEach((v1) -> {
            r1.add(v1);
        });
        this.fileDeletions.forEach((slice, fileDeletion) -> {
            arrayList.addAll(rewriteFile(new Path(slice.toStringUtf8()), fileDeletion));
        });
        return CompletableFuture.completedFuture(arrayList);
    }

    private List<Slice> rewriteFile(Path path, FileDeletion fileDeletion) {
        try {
            Path path2 = new Path(this.rootTableLocation);
            String uri = path2.toUri().relativize(path.toUri()).toString();
            FileSystem fileSystem = this.hdfsEnvironment.getFileSystem(new HdfsContext(this.session.getIdentity()), path2);
            Path path3 = new Path(path.getParent(), this.session.getQueryId() + "_" + UUID.randomUUID());
            return ImmutableList.of(Slices.utf8Slice(this.mergeResultJsonCodec.toJson(new DeltaLakeMergeResult(Optional.of(uri), Optional.of(rewriteParquetFile(path, fileDeletion.rowsToDelete(), new DeltaLakeWriter(fileSystem, createParquetFileWriter(fileSystem, path3, this.dataColumns), path2, path2.toUri().relativize(path3.toUri()).toString(), fileDeletion.partitionValues(), this.writerStats, this.dataColumns)))))));
        } catch (IOException e) {
            throw new TrinoException(DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE, "Unable to rewrite Parquet file", e);
        }
    }

    private FileWriter createParquetFileWriter(FileSystem fileSystem, Path path, List<DeltaLakeColumnHandle> list) {
        ParquetWriterOptions build = ParquetWriterOptions.builder().setMaxBlockSize(DeltaLakeSessionProperties.getParquetWriterBlockSize(this.session)).setMaxPageSize(DeltaLakeSessionProperties.getParquetWriterPageSize(this.session)).build();
        CompressionCodecName parquetCompressionCodec = DeltaLakeSessionProperties.getCompressionCodec(this.session).getParquetCompressionCodec();
        try {
            Callable callable = () -> {
                fileSystem.delete(path, false);
                return null;
            };
            List list2 = (List) list.stream().map(deltaLakeColumnHandle -> {
                TimestampWithTimeZoneType type = deltaLakeColumnHandle.getType();
                if (!(type instanceof TimestampWithTimeZoneType)) {
                    return type;
                }
                Verify.verify(type.getPrecision() == 3, "Unsupported type: %s", type);
                return TimestampType.TIMESTAMP_MILLIS;
            }).collect(ImmutableList.toImmutableList());
            ParquetSchemaConverter parquetSchemaConverter = new ParquetSchemaConverter(list2, (List) list.stream().map((v0) -> {
                return v0.getName();
            }).collect(ImmutableList.toImmutableList()), false, false);
            return new ParquetFileWriter(fileSystem.create(path), callable, list2, parquetSchemaConverter.getMessageType(), parquetSchemaConverter.getPrimitiveTypes(), build, IntStream.range(0, list.size()).toArray(), parquetCompressionCodec, this.trinoVersion, Optional.empty());
        } catch (IOException e) {
            throw new TrinoException(DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE, "Error creating Parquet file", e);
        }
    }

    private DataFileInfo rewriteParquetFile(Path path, ImmutableLongBitmapDataProvider immutableLongBitmapDataProvider, DeltaLakeWriter deltaLakeWriter) throws IOException {
        try {
            ConnectorPageSource connectorPageSource = createParquetPageSource(path).get();
            long j = 0;
            while (!connectorPageSource.isFinished()) {
                try {
                    Page nextPage = connectorPageSource.getNextPage();
                    if (nextPage != null) {
                        int positionCount = nextPage.getPositionCount();
                        int[] iArr = new int[positionCount];
                        int i = 0;
                        for (int i2 = 0; i2 < positionCount; i2++) {
                            if (!immutableLongBitmapDataProvider.contains(j)) {
                                iArr[i] = i2;
                                i++;
                            }
                            j++;
                        }
                        if (i != positionCount) {
                            nextPage = nextPage.getPositions(iArr, 0, i);
                        }
                        deltaLakeWriter.appendRows(nextPage);
                    }
                } finally {
                }
            }
            deltaLakeWriter.commit();
            if (connectorPageSource != null) {
                connectorPageSource.close();
            }
            return deltaLakeWriter.getDataFileInfo();
        } catch (Throwable th) {
            try {
                deltaLakeWriter.rollback();
            } catch (RuntimeException e) {
                if (!th.equals(e)) {
                    th.addSuppressed(e);
                }
            }
            throw th;
        }
    }

    private ReaderPageSource createParquetPageSource(Path path) throws IOException {
        HdfsContext hdfsContext = new HdfsContext(this.session);
        Configuration configuration = this.hdfsEnvironment.getConfiguration(hdfsContext, path);
        long len = this.hdfsEnvironment.getFileSystem(hdfsContext, path).getFileStatus(path).getLen();
        return ParquetPageSourceFactory.createPageSource(path, 0L, len, len, (List) this.dataColumns.stream().map((v0) -> {
            return v0.toHiveColumnHandle();
        }).collect(ImmutableList.toImmutableList()), TupleDomain.all(), true, this.hdfsEnvironment, configuration, this.session.getIdentity(), this.parquetDateTimeZone, new FileFormatDataSourceStats(), new ParquetReaderOptions());
    }
}
