package io.datakernel.datastream.processor;

import io.datakernel.common.MemSize;
import io.datakernel.common.Preconditions;
import io.datakernel.csp.ChannelSupplier;
import io.datakernel.csp.file.ChannelFileReader;
import io.datakernel.csp.file.ChannelFileWriter;
import io.datakernel.csp.process.ChannelByteChunker;
import io.datakernel.csp.process.ChannelLZ4Compressor;
import io.datakernel.csp.process.ChannelLZ4Decompressor;
import io.datakernel.datastream.StreamConsumer;
import io.datakernel.datastream.StreamSupplier;
import io.datakernel.datastream.csp.ChannelDeserializer;
import io.datakernel.datastream.csp.ChannelSerializer;
import io.datakernel.promise.Promise;
import io.datakernel.serializer.BinarySerializer;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/datastream/processor/StreamSorterStorageImpl.class */
public final class StreamSorterStorageImpl<T> implements StreamSorterStorage<T> {
    public static final String DEFAULT_FILE_PATTERN = "%d";
    private final Executor executor;
    private final BinarySerializer<T> serializer;
    private final Path path;
    private String filePattern = DEFAULT_FILE_PATTERN;
    private MemSize readBlockSize = ChannelSerializer.DEFAULT_INITIAL_BUFFER_SIZE;
    private MemSize writeBlockSize = DEFAULT_SORTER_BLOCK_SIZE;
    private int compressionLevel = 0;
    private static final Logger logger = LoggerFactory.getLogger(StreamSorterStorageImpl.class);
    public static final MemSize DEFAULT_SORTER_BLOCK_SIZE = MemSize.kilobytes(256);
    private static final AtomicInteger PARTITION = new AtomicInteger();

    private StreamSorterStorageImpl(Executor executor, BinarySerializer<T> binarySerializer, Path path) {
        this.executor = executor;
        this.serializer = binarySerializer;
        this.path = path;
    }

    public static <T> StreamSorterStorageImpl<T> create(Executor executor, BinarySerializer<T> binarySerializer, Path path) {
        Preconditions.checkArgument(!path.getFileName().toString().contains(DEFAULT_FILE_PATTERN), "Filename should not contain '%d'");
        try {
            Files.createDirectories(path, new FileAttribute[0]);
            return new StreamSorterStorageImpl<>(executor, binarySerializer, path);
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public StreamSorterStorageImpl<T> withFilePattern(String str) {
        Preconditions.checkArgument(!str.contains(DEFAULT_FILE_PATTERN), "File pattern should not contain '%d'");
        this.filePattern = str;
        return this;
    }

    public StreamSorterStorageImpl<T> withReadBlockSize(MemSize memSize) {
        this.readBlockSize = memSize;
        return this;
    }

    public StreamSorterStorageImpl<T> withWriteBlockSize(MemSize memSize) {
        this.writeBlockSize = memSize;
        return this;
    }

    public StreamSorterStorageImpl<T> withCompressionLevel(int i) {
        this.compressionLevel = i;
        return this;
    }

    private Path partitionPath(int i) {
        return this.path.resolve(String.format(this.filePattern, Integer.valueOf(i)));
    }

    @Override // io.datakernel.datastream.processor.StreamSorterStorage
    public Promise<Integer> newPartitionId() {
        return Promise.of(Integer.valueOf(PARTITION.incrementAndGet()));
    }

    @Override // io.datakernel.datastream.processor.StreamSorterStorage
    public Promise<StreamConsumer<T>> write(int i) {
        Path partitionPath = partitionPath(i);
        return Promise.of(StreamConsumer.ofSupplier(streamSupplier -> {
            return ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) streamSupplier.transformWith(ChannelSerializer.create(this.serializer).withInitialBufferSize(this.readBlockSize))).transformWith(ChannelByteChunker.create(this.writeBlockSize.map(l -> {
                return Long.valueOf(l.longValue() / 2);
            }), this.writeBlockSize))).transformWith(ChannelLZ4Compressor.create(this.compressionLevel))).transformWith(ChannelByteChunker.create(this.writeBlockSize.map(l2 -> {
                return Long.valueOf(l2.longValue() / 2);
            }), this.writeBlockSize))).streamTo(ChannelFileWriter.open(this.executor, partitionPath));
        }).withLateBinding());
    }

    @Override // io.datakernel.datastream.processor.StreamSorterStorage
    public Promise<StreamSupplier<T>> read(int i) {
        return ChannelFileReader.open(this.executor, partitionPath(i)).map(channelFileReader -> {
            return ((StreamSupplier) ((ChannelSupplier) channelFileReader.transformWith(ChannelLZ4Decompressor.create())).transformWith(ChannelDeserializer.create(this.serializer))).withLateBinding();
        });
    }

    @Override // io.datakernel.datastream.processor.StreamSorterStorage
    public Promise<Void> cleanup(List<Integer> list) {
        return Promise.ofBlockingCallable(this.executor, () -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                Path partitionPath = partitionPath(((Integer) it.next()).intValue());
                try {
                    Files.delete(partitionPath);
                } catch (IOException e) {
                    logger.warn("Could not delete {} : {}", partitionPath, e.toString());
                }
            }
            return null;
        });
    }
}
