/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.shuffle.sort;

import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.apache.comet.CometConf$;
import org.apache.comet.Native;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.shuffle.comet.CometShuffleChecksumSupport;
import org.apache.spark.shuffle.comet.CometShuffleMemoryAllocator;
import org.apache.spark.shuffle.comet.CometShuffleMemoryAllocatorTrait;
import org.apache.spark.shuffle.comet.TooLargePageException;
import org.apache.spark.shuffle.sort.RowPartition;
import org.apache.spark.shuffle.sort.ShuffleInMemorySorter;
import org.apache.spark.sql.comet.execution.shuffle.ShuffleThreadPool;
import org.apache.spark.sql.comet.execution.shuffle.SpillInfo;
import org.apache.spark.sql.comet.execution.shuffle.SpillWriter;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.TempShuffleBlockId;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.UnsafeAlignedOffset;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public final class CometShuffleExternalSorter
implements CometShuffleChecksumSupport {
    private static final Logger logger = LoggerFactory.getLogger(CometShuffleExternalSorter.class);
    private final int numPartitions;
    private final BlockManager blockManager;
    private final TaskContext taskContext;
    private final ShuffleWriteMetricsReporter writeMetrics;
    private final StructType schema;
    private final int numElementsForSpillThreshold;
    private int initialSize;
    private final ConcurrentLinkedQueue<SpillSorter> spillingSorters = new ConcurrentLinkedQueue();
    private SpillSorter activeSpillSorter;
    private final LinkedList<SpillInfo> spills = new LinkedList();
    private long peakMemoryUsedBytes;
    private final long[] partitionChecksums;
    private final String checksumAlgorithm;
    private final CometShuffleMemoryAllocatorTrait allocator;
    private final boolean isAsync;
    private final ExecutorService threadPool;
    private final int threadNum;
    private ConcurrentLinkedQueue<Future<Void>> asyncSpillTasks = new ConcurrentLinkedQueue();
    private boolean spilling = false;
    private final int uaoSize = UnsafeAlignedOffset.getUaoSize();
    private final double preferDictionaryRatio;

    public CometShuffleExternalSorter(TaskMemoryManager memoryManager, BlockManager blockManager, TaskContext taskContext, int initialSize, int numPartitions, SparkConf conf, ShuffleWriteMetricsReporter writeMetrics, StructType schema) {
        this.allocator = CometShuffleMemoryAllocator.getInstance(conf, memoryManager, Math.min(0x8000000L, memoryManager.pageSizeBytes()));
        this.blockManager = blockManager;
        this.taskContext = taskContext;
        this.numPartitions = numPartitions;
        this.schema = schema;
        this.numElementsForSpillThreshold = (Integer)CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD().get();
        this.writeMetrics = writeMetrics;
        this.peakMemoryUsedBytes = this.getMemoryUsage();
        this.partitionChecksums = this.createPartitionChecksums(numPartitions, conf);
        this.checksumAlgorithm = this.getChecksumAlgorithm(conf);
        this.initialSize = initialSize;
        this.isAsync = (Boolean)CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED().get();
        if (this.isAsync) {
            this.threadNum = (Integer)CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM().get();
            assert (this.threadNum > 0);
            this.threadPool = ShuffleThreadPool.getThreadPool();
        } else {
            this.threadNum = 0;
            this.threadPool = null;
        }
        this.activeSpillSorter = new SpillSorter();
        this.preferDictionaryRatio = (Double)CometConf$.MODULE$.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO().get();
    }

    public long[] getChecksums() {
        return this.partitionChecksums;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void spill() throws IOException {
        if (this.spilling || this.activeSpillSorter == null || this.activeSpillSorter.numRecords() == 0) {
            return;
        }
        this.spilling = true;
        logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)", new Object[]{Thread.currentThread().getId(), Utils.bytesToString((long)this.getMemoryUsage()), this.spills.size(), this.spills.size() > 1 ? " times" : " time"});
        Tuple2 spilledFileInfo = this.blockManager.diskBlockManager().createTempShuffleBlock();
        File file = (File)spilledFileInfo._2();
        TempShuffleBlockId blockId = (TempShuffleBlockId)spilledFileInfo._1();
        SpillInfo spillInfo = new SpillInfo(this.numPartitions, file, blockId);
        this.activeSpillSorter.setSpillInfo(spillInfo);
        if (this.isAsync) {
            SpillSorter spillingSorter = this.activeSpillSorter;
            Callable<Void> task = () -> {
                spillingSorter.writeSortedFileNative(false);
                long spillSize = spillingSorter.freeMemory();
                spillingSorter.freeArray();
                this.spillingSorters.remove(spillingSorter);
                CometShuffleExternalSorter cometShuffleExternalSorter = this;
                synchronized (cometShuffleExternalSorter) {
                    this.taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
                }
                return null;
            };
            this.spillingSorters.add(spillingSorter);
            this.asyncSpillTasks.add(this.threadPool.submit(task));
            block3: while (this.asyncSpillTasks.size() == this.threadNum) {
                for (Future<Void> spillingTask : this.asyncSpillTasks) {
                    if (!spillingTask.isDone()) continue;
                    this.asyncSpillTasks.remove(spillingTask);
                    continue block3;
                }
            }
            this.activeSpillSorter = new SpillSorter();
        } else {
            this.activeSpillSorter.writeSortedFileNative(false);
            long spillSize = this.activeSpillSorter.freeMemory();
            this.activeSpillSorter.reset();
            CometShuffleExternalSorter cometShuffleExternalSorter = this;
            synchronized (cometShuffleExternalSorter) {
                this.taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
            }
        }
        this.spilling = false;
    }

    private long getMemoryUsage() {
        long totalPageSize = 0L;
        for (SpillSorter sorter : this.spillingSorters) {
            totalPageSize += sorter.getMemoryUsage();
        }
        if (this.activeSpillSorter != null) {
            totalPageSize += this.activeSpillSorter.getMemoryUsage();
        }
        return totalPageSize;
    }

    private void updatePeakMemoryUsed() {
        long mem = this.getMemoryUsage();
        if (mem > this.peakMemoryUsedBytes) {
            this.peakMemoryUsedBytes = mem;
        }
    }

    public long getPeakMemoryUsedBytes() {
        this.updatePeakMemoryUsed();
        return this.peakMemoryUsedBytes;
    }

    private long freeMemory() {
        this.updatePeakMemoryUsed();
        long memoryFreed = 0L;
        if (this.isAsync) {
            for (SpillSorter sorter : this.spillingSorters) {
                memoryFreed += sorter.freeMemory();
                sorter.freeArray();
            }
        }
        this.activeSpillSorter.freeArray();
        return memoryFreed += this.activeSpillSorter.freeMemory();
    }

    public void cleanupResources() {
        this.freeMemory();
        for (SpillInfo spill : this.spills) {
            if (!spill.file.exists() || spill.file.delete()) continue;
            logger.error("Unable to delete spill file {}", (Object)spill.file.getPath());
        }
    }

    private void growPointerArrayIfNecessary() throws IOException {
        assert (this.activeSpillSorter != null);
        if (!this.activeSpillSorter.hasSpaceForAnotherRecord()) {
            LongArray array;
            long used = this.activeSpillSorter.getMemoryUsage();
            try {
                array = this.allocator.allocateArray(used / 8L * 2L);
            }
            catch (TooLargePageException e) {
                this.spill();
                return;
            }
            catch (SparkOutOfMemoryError e) {
                block9: {
                    try {
                        this.spill();
                    }
                    catch (SparkOutOfMemoryError e2) {
                        if (this.activeSpillSorter.hasSpaceForAnotherRecord()) break block9;
                        logger.error("Unable to grow the pointer array");
                        throw e2;
                    }
                }
                return;
            }
            if (this.activeSpillSorter.hasSpaceForAnotherRecord()) {
                this.allocator.freeArray(array);
            } else {
                this.activeSpillSorter.expandPointerArray(array);
            }
        }
    }

    public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId) throws IOException {
        assert (this.activeSpillSorter != null);
        int threshold = this.numElementsForSpillThreshold;
        if (this.activeSpillSorter.numRecords() >= threshold) {
            logger.info("Spilling data because number of spilledRecords crossed the threshold " + threshold);
            this.spill();
        }
        this.growPointerArrayIfNecessary();
        int required = length + this.uaoSize;
        if (!this.activeSpillSorter.acquireNewPageIfNecessary(required)) {
            this.activeSpillSorter.initialCurrentPage(required);
        }
        this.activeSpillSorter.insertRecord(recordBase, recordOffset, length, partitionId);
    }

    public SpillInfo[] closeAndGetSpills() throws IOException {
        if (this.activeSpillSorter != null) {
            Tuple2 spilledFileInfo = this.blockManager.diskBlockManager().createTempShuffleBlock();
            File file = (File)spilledFileInfo._2();
            TempShuffleBlockId blockId = (TempShuffleBlockId)spilledFileInfo._1();
            SpillInfo spillInfo = new SpillInfo(this.numPartitions, file, blockId);
            if (this.isAsync) {
                for (Future<Void> task : this.asyncSpillTasks) {
                    try {
                        task.get();
                    }
                    catch (Exception e) {
                        throw new IOException(e);
                    }
                }
                this.asyncSpillTasks.clear();
            }
            this.activeSpillSorter.setSpillInfo(spillInfo);
            this.activeSpillSorter.writeSortedFileNative(true);
            this.freeMemory();
        }
        return this.spills.toArray(new SpillInfo[this.spills.size()]);
    }

    class SpillSorter
    extends SpillWriter {
        private boolean freed = false;
        private SpillInfo spillInfo = null;
        @Nullable
        private ShuffleInMemorySorter inMemSorter;
        private LongArray sorterArray;

        SpillSorter() {
            this.allocator = CometShuffleExternalSorter.this.allocator;
            try {
                this.inMemSorter = new ShuffleInMemorySorter((MemoryConsumer)this.allocator, 1, true);
            }
            catch (IllegalAccessError e) {
                throw new RuntimeException("Error loading in-memory sorter check class path -- see https://github.com/apache/arrow-datafusion-comet?tab=readme-ov-file#enable-comet-shuffle", e);
            }
            this.sorterArray = this.allocator.allocateArray(CometShuffleExternalSorter.this.initialSize);
            this.inMemSorter.expandPointerArray(this.sorterArray);
            this.allocatedPages = new LinkedList();
            this.nativeLib = new Native();
            this.dataTypes = this.serializeSchema(CometShuffleExternalSorter.this.schema);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public long freeMemory() {
            SpillSorter spillSorter = this;
            synchronized (spillSorter) {
                return super.freeMemory();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public long getMemoryUsage() {
            SpillSorter spillSorter = this;
            synchronized (spillSorter) {
                long totalPageSize = super.getMemoryUsage();
                if (this.freed) {
                    return totalPageSize;
                }
                return (this.inMemSorter == null ? 0L : this.inMemSorter.getMemoryUsage()) + totalPageSize;
            }
        }

        @Override
        protected void spill(int required) throws IOException {
            CometShuffleExternalSorter.this.spill();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void freeArray() {
            SpillSorter spillSorter = this;
            synchronized (spillSorter) {
                this.inMemSorter.free();
                this.freed = true;
            }
        }

        public void reset() {
            this.inMemSorter.reset();
            this.sorterArray = this.allocator.allocateArray(CometShuffleExternalSorter.this.initialSize);
            this.inMemSorter.expandPointerArray(this.sorterArray);
        }

        void setSpillInfo(SpillInfo spillInfo) {
            this.spillInfo = spillInfo;
        }

        public int numRecords() {
            return this.inMemSorter.numRecords();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void writeSortedFileNative(boolean isLastFile) throws IOException {
            long arrayAddr = this.sorterArray.getBaseOffset();
            int pos = this.inMemSorter.numRecords();
            this.nativeLib.sortRowPartitionsNative(arrayAddr, pos);
            ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords = new ShuffleInMemorySorter.ShuffleSorterIterator(pos, this.sorterArray, 0);
            if (!sortedRecords.hasNext()) {
                return;
            }
            Object writeMetricsToUse = isLastFile ? CometShuffleExternalSorter.this.writeMetrics : new ShuffleWriteMetrics();
            int currentPartition = -1;
            RowPartition rowPartition = new RowPartition(CometShuffleExternalSorter.this.initialSize);
            while (sortedRecords.hasNext()) {
                sortedRecords.loadNext();
                int partition = sortedRecords.packedRecordPointer.getPartitionId();
                assert (partition >= currentPartition);
                if (partition != currentPartition) {
                    if (currentPartition != -1) {
                        long written;
                        if (CometShuffleExternalSorter.this.partitionChecksums.length > 0) {
                            this.setChecksum(CometShuffleExternalSorter.this.partitionChecksums[currentPartition]);
                            this.setChecksumAlgo(CometShuffleExternalSorter.this.checksumAlgorithm);
                        }
                        this.spillInfo.partitionLengths[currentPartition] = written = this.doSpilling(this.dataTypes, this.spillInfo.file, rowPartition, (ShuffleWriteMetricsReporter)writeMetricsToUse, CometShuffleExternalSorter.this.preferDictionaryRatio);
                        CometShuffleExternalSorter.this.partitionChecksums[currentPartition] = this.getChecksum();
                    }
                    currentPartition = partition;
                }
                long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
                long recordOffsetInPage = this.allocator.getOffsetInPage(recordPointer);
                int recordSizeInBytes = UnsafeAlignedOffset.getSize(null, (long)recordOffsetInPage) - 4;
                long recordReadPosition = recordOffsetInPage + (long)CometShuffleExternalSorter.this.uaoSize + 4L;
                rowPartition.addRow(recordReadPosition, recordSizeInBytes);
            }
            if (currentPartition != -1) {
                long written;
                this.spillInfo.partitionLengths[currentPartition] = written = this.doSpilling(this.dataTypes, this.spillInfo.file, rowPartition, (ShuffleWriteMetricsReporter)writeMetricsToUse, CometShuffleExternalSorter.this.preferDictionaryRatio);
                LinkedList<SpillInfo> linkedList = CometShuffleExternalSorter.this.spills;
                synchronized (linkedList) {
                    CometShuffleExternalSorter.this.spills.add(this.spillInfo);
                }
            }
            if (!isLastFile) {
                ShuffleWriteMetricsReporter shuffleWriteMetricsReporter = CometShuffleExternalSorter.this.writeMetrics;
                synchronized (shuffleWriteMetricsReporter) {
                    CometShuffleExternalSorter.this.writeMetrics.incRecordsWritten(((ShuffleWriteMetrics)writeMetricsToUse).recordsWritten());
                    CometShuffleExternalSorter.this.taskContext.taskMetrics().incDiskBytesSpilled(((ShuffleWriteMetrics)writeMetricsToUse).bytesWritten());
                }
            }
        }

        public boolean hasSpaceForAnotherRecord() {
            return this.inMemSorter.hasSpaceForAnotherRecord();
        }

        public void expandPointerArray(LongArray newArray) {
            this.inMemSorter.expandPointerArray(newArray);
            this.sorterArray = newArray;
        }

        public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId) {
            Object base = this.currentPage.getBaseObject();
            long recordAddress = this.allocator.encodePageNumberAndOffset(this.currentPage, this.pageCursor);
            UnsafeAlignedOffset.putSize((Object)base, (long)this.pageCursor, (int)length);
            this.pageCursor += (long)CometShuffleExternalSorter.this.uaoSize;
            Platform.copyMemory((Object)recordBase, (long)recordOffset, (Object)base, (long)this.pageCursor, (long)length);
            this.pageCursor += (long)length;
            this.inMemSorter.insertRecord(recordAddress, partitionId);
        }
    }
}

