/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream.processor;

import io.datakernel.async.Stage;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.file.AsyncFile;
import io.datakernel.serializer.BufferSerializer;
import io.datakernel.stream.StreamConsumerWithResult;
import io.datakernel.stream.StreamProducerWithResult;
import io.datakernel.stream.file.StreamFileReader;
import io.datakernel.stream.file.StreamFileWriter;
import io.datakernel.stream.processor.StreamBinaryDeserializer;
import io.datakernel.stream.processor.StreamBinarySerializer;
import io.datakernel.stream.processor.StreamByteChunker;
import io.datakernel.stream.processor.StreamLZ4Compressor;
import io.datakernel.stream.processor.StreamLZ4Decompressor;
import io.datakernel.stream.processor.StreamSorterStorage;
import io.datakernel.stream.processor.StreamTransformer;
import io.datakernel.util.MemSize;
import io.datakernel.util.Preconditions;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class StreamSorterStorageImpl<T>
implements StreamSorterStorage<T> {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    public static final String DEFAULT_FILE_PATTERN = "%d";
    public static final MemSize DEFAULT_SORTER_BLOCK_SIZE = MemSize.kilobytes((long)256L);
    private static final AtomicInteger PARTITION = new AtomicInteger();
    private final Eventloop eventloop = Eventloop.getCurrentEventloop();
    private final ExecutorService executorService;
    private final BufferSerializer<T> serializer;
    private final Path path;
    private String filePattern = "%d";
    private MemSize readBlockSize = DEFAULT_SORTER_BLOCK_SIZE;
    private MemSize writeBlockSize = DEFAULT_SORTER_BLOCK_SIZE;
    private int compressionLevel = 0;

    private StreamSorterStorageImpl(ExecutorService executorService, BufferSerializer<T> serializer, Path path) {
        this.executorService = executorService;
        this.serializer = serializer;
        this.path = path;
    }

    public static <T> StreamSorterStorageImpl<T> create(ExecutorService executorService, BufferSerializer<T> serializer, Path path) {
        Preconditions.checkArgument((!path.getFileName().toString().contains(DEFAULT_FILE_PATTERN) ? 1 : 0) != 0);
        try {
            Files.createDirectories(path, new FileAttribute[0]);
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        return new StreamSorterStorageImpl<T>(executorService, serializer, path);
    }

    public StreamSorterStorageImpl<T> withFilePattern(String filePattern) {
        Preconditions.checkArgument((!filePattern.contains(DEFAULT_FILE_PATTERN) ? 1 : 0) != 0);
        this.filePattern = filePattern;
        return this;
    }

    public StreamSorterStorageImpl<T> withReadBlockSize(MemSize readBlockSize) {
        this.readBlockSize = readBlockSize;
        return this;
    }

    public StreamSorterStorageImpl<T> withWriteBlockSize(MemSize writeBlockSize) {
        this.writeBlockSize = writeBlockSize;
        return this;
    }

    public StreamSorterStorageImpl<T> withCompressionLevel(int compressionLevel) {
        this.compressionLevel = compressionLevel;
        return this;
    }

    private Path partitionPath(int i) {
        return this.path.resolve(String.format(this.filePattern, i));
    }

    @Override
    public Stage<StreamConsumerWithResult<T, Integer>> write() {
        int partition = PARTITION.incrementAndGet();
        Path path = this.partitionPath(partition);
        return AsyncFile.openAsync((ExecutorService)this.executorService, (Path)path, (OpenOption[])StreamFileWriter.CREATE_OPTIONS).thenApply(file -> StreamTransformer.idenity().with(StreamBinarySerializer.create(this.serializer)).with(StreamByteChunker.create(this.writeBlockSize.map(bytes -> bytes / 2L), this.writeBlockSize)).with(StreamLZ4Compressor.create(this.compressionLevel)).with(StreamByteChunker.create(this.writeBlockSize.map(bytes -> bytes / 2L), this.writeBlockSize)).applyTo(StreamFileWriter.create(file).withFlushAsResult()).thenApply($ -> partition).withLateBinding());
    }

    @Override
    public Stage<StreamProducerWithResult<T, Void>> read(int partition) {
        Path path = this.partitionPath(partition);
        return AsyncFile.openAsync((ExecutorService)this.executorService, (Path)path, (OpenOption[])new OpenOption[]{StandardOpenOption.READ}).thenApply(file -> StreamFileReader.readFile(file).withBufferSize(this.readBlockSize).with(StreamLZ4Decompressor.create()).with(StreamBinaryDeserializer.create(this.serializer)).withEndOfStreamAsResult().withLateBinding());
    }

    @Override
    public Stage<Void> cleanup(List<Integer> partitionsToDelete) {
        return Stage.ofCallable((Executor)this.executorService, () -> {
            for (Integer partitionToDelete : partitionsToDelete) {
                Path path1 = this.partitionPath(partitionToDelete);
                try {
                    Files.delete(path1);
                }
                catch (IOException e) {
                    this.logger.warn("Could not delete {} : {}", (Object)path1, (Object)e.toString());
                }
            }
            return null;
        });
    }
}

