package io.datakernel.aggregation;

import io.datakernel.aggregation.ot.AggregationStructure;
import io.datakernel.async.Stage;
import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.codegen.DefiningClassLoader;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.eventloop.EventloopService;
import io.datakernel.file.AsyncFile;
import io.datakernel.jmx.EventloopJmxMBeanEx;
import io.datakernel.jmx.ExceptionStats;
import io.datakernel.jmx.JmxAttribute;
import io.datakernel.jmx.JmxOperation;
import io.datakernel.jmx.StageStats;
import io.datakernel.stream.StreamConsumerWithResult;
import io.datakernel.stream.StreamProducerWithResult;
import io.datakernel.stream.file.StreamFileReader;
import io.datakernel.stream.file.StreamFileWriter;
import io.datakernel.stream.processor.StreamBinaryDeserializer;
import io.datakernel.stream.processor.StreamBinarySerializer;
import io.datakernel.stream.processor.StreamByteChunker;
import io.datakernel.stream.processor.StreamLZ4Compressor;
import io.datakernel.stream.processor.StreamLZ4Decompressor;
import io.datakernel.stream.processor.StreamTransformer;
import io.datakernel.stream.stats.StreamStats;
import io.datakernel.stream.stats.StreamStatsBasic;
import io.datakernel.stream.stats.StreamStatsDetailed;
import io.datakernel.stream.stats.StreamStatsSizeCounter;
import io.datakernel.util.Initializable;
import io.datakernel.util.MemSize;
import io.datakernel.util.ReflectionUtils;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.FileTime;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/aggregation/LocalFsChunkStorage.class */
public class LocalFsChunkStorage implements AggregationChunkStorage, EventloopService, Initializable<LocalFsChunkStorage>, EventloopJmxMBeanEx {
    public static final MemSize DEFAULT_BUFFER_SIZE;
    public static final Duration DEFAULT_SMOOTHING_WINDOW;
    public static final String DEFAULT_BACKUP_FOLDER_NAME = "backups";
    public static final String LOG = ".log";
    public static final String TEMP_LOG = ".temp";
    private final Eventloop eventloop;
    private final ExecutorService executorService;
    private final IdGenerator<Long> idGenerator;
    private final Path dir;
    private Path backupPath;
    private boolean detailed;
    private int cleanupPreservedFiles;
    private int cleanupDeletedFiles;
    private int cleanupDeletedFilesTotal;
    private long cleanupSkipTimeMin;
    private long cleanupSkipTimeMax;
    private int cleanupSkippedFiles;
    private int cleanupSkippedFilesTotal;
    private int finishChunks;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private MemSize bufferSize = DEFAULT_BUFFER_SIZE;
    private final StageStats stageIdGenerator = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StageStats stageOpenR1 = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StageStats stageOpenR2 = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StageStats stageOpenR3 = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StageStats stageOpenW = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StageStats stageFinishChunks = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StageStats stageList = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StageStats stageBackup = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StageStats stageCleanup = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StreamStatsDetailed<ByteBuf> readFile = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsDetailed<ByteBuf> readDecompress = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsBasic<?> readDeserialize = StreamStats.basic();
    private final StreamStatsDetailed<?> readDeserializeDetailed = StreamStats.detailed();
    private final StreamStatsBasic<?> writeSerialize = StreamStats.basic();
    private final StreamStatsDetailed<?> writeSerializeDetailed = StreamStats.detailed();
    private final StreamStatsDetailed<ByteBuf> writeCompress = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsDetailed<ByteBuf> writeChunker = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsDetailed<ByteBuf> writeFile = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());
    private final ExceptionStats cleanupWarnings = ExceptionStats.create();

    private LocalFsChunkStorage(Eventloop eventloop, ExecutorService executorService, IdGenerator<Long> idGenerator, Path path, Path path2) {
        this.eventloop = eventloop;
        this.executorService = executorService;
        this.dir = path;
        this.idGenerator = idGenerator;
        this.backupPath = path2;
    }

    public static LocalFsChunkStorage create(Eventloop eventloop, ExecutorService executorService, IdGenerator<Long> idGenerator, Path path) {
        return new LocalFsChunkStorage(eventloop, executorService, idGenerator, path, path.resolve("backups/"));
    }

    public LocalFsChunkStorage withBufferSize(MemSize memSize) {
        this.bufferSize = memSize;
        return this;
    }

    public LocalFsChunkStorage withBackupPath(Path path) {
        this.backupPath = path;
        return this;
    }

    @Override // io.datakernel.aggregation.AggregationChunkStorage
    public <T> Stage<StreamProducerWithResult<T, Void>> read(AggregationStructure aggregationStructure, List<String> list, Class<T> cls, long j, DefiningClassLoader definingClassLoader) {
        return AsyncFile.openAsync(this.executorService, this.dir.resolve(j + ".log"), new OpenOption[]{StandardOpenOption.READ}).whenComplete(this.stageOpenR1.recordStats()).thenApply(asyncFile -> {
            return StreamFileReader.readFile(asyncFile).withBufferSize(this.bufferSize).with(this.readFile).with(StreamLZ4Decompressor.create()).with(this.readDecompress).with(StreamBinaryDeserializer.create(AggregationUtils.createBufferSerializer(aggregationStructure, cls, aggregationStructure.getKeys(), list, definingClassLoader))).with(this.detailed ? this.readDeserializeDetailed : this.readDeserialize).withEndOfStreamAsResult().withLateBinding();
        });
    }

    @Override // io.datakernel.aggregation.AggregationChunkStorage
    public <T> Stage<StreamConsumerWithResult<T, Void>> write(AggregationStructure aggregationStructure, List<String> list, Class<T> cls, long j, DefiningClassLoader definingClassLoader) {
        return AsyncFile.openAsync(this.executorService, this.dir.resolve(j + ".temp"), StreamFileWriter.CREATE_OPTIONS).whenComplete(this.stageOpenW.recordStats()).thenApply(asyncFile -> {
            return StreamTransformer.idenity().with(this.detailed ? this.writeSerializeDetailed : this.writeSerialize).with(StreamBinarySerializer.create(AggregationUtils.createBufferSerializer(aggregationStructure, cls, aggregationStructure.getKeys(), list, definingClassLoader)).withInitialBufferSize(this.bufferSize)).with(this.writeCompress).with(StreamLZ4Compressor.fastCompressor()).with(this.writeChunker).with(StreamByteChunker.create(this.bufferSize.map(l -> {
                return Long.valueOf(l.longValue() / 2);
            }), this.bufferSize.map(l2 -> {
                return Long.valueOf(l2.longValue() * 2);
            }))).with(this.writeFile).applyTo(StreamFileWriter.create(asyncFile).withForceOnClose(true).withFlushAsResult());
        });
    }

    @Override // io.datakernel.aggregation.IdGenerator
    public Stage<Long> createId() {
        return this.idGenerator.createId().whenComplete(this.stageIdGenerator.recordStats());
    }

    @Override // io.datakernel.aggregation.AggregationChunkStorage
    public Stage<Void> finish(Set<Long> set) {
        this.finishChunks = set.size();
        return Stage.ofCallable(this.executorService, () -> {
            Iterator it = set.iterator();
            while (it.hasNext()) {
                Long l = (Long) it.next();
                Path resolve = this.dir.resolve(l + ".temp");
                Path resolve2 = this.dir.resolve(l + ".log");
                Files.setLastModifiedTime(resolve, FileTime.fromMillis(System.currentTimeMillis()));
                Files.move(resolve, resolve2, StandardCopyOption.ATOMIC_MOVE);
            }
            return (Void) null;
        }).whenComplete(this.stageFinishChunks.recordStats());
    }

    public Stage<Void> backup(String str, Set<Long> set) {
        return Stage.ofCallable(this.executorService, () -> {
            Path resolve = this.backupPath.resolve(str + "_tmp/");
            Files.createDirectories(resolve, new FileAttribute[0]);
            Iterator it = set.iterator();
            while (it.hasNext()) {
                long longValue = ((Long) it.next()).longValue();
                Files.createLink(resolve.resolve(longValue + ".log").toAbsolutePath(), this.dir.resolve(longValue + ".log").toAbsolutePath());
            }
            Files.move(resolve, this.backupPath.resolve(str + "/"), StandardCopyOption.ATOMIC_MOVE);
            return (Void) null;
        }).whenComplete(this.stageBackup.recordStats());
    }

    public Stage<Void> cleanup(Set<Long> set) {
        return cleanupBeforeTimestamp(set, -1L);
    }

    public Stage<Void> cleanupBeforeTimestamp(Set<Long> set, long j) {
        return Stage.ofCallable(this.executorService, () -> {
            this.logger.trace("Cleanup before timestamp, save chunks size: {}, timestamp {}", Integer.valueOf(set.size()), Long.valueOf(j));
            int i = 0;
            DirectoryStream<Path> newDirectoryStream = Files.newDirectoryStream(this.dir);
            Throwable th = null;
            try {
                ArrayList<Path> arrayList = new ArrayList();
                for (Path path : newDirectoryStream) {
                    if (path.toString().endsWith(".log")) {
                        try {
                            String path2 = path.getFileName().toString();
                            if (!set.contains(Long.valueOf(Long.parseLong(path2.substring(0, path2.length() - ".log".length()))))) {
                                FileTime lastModifiedTime = Files.getLastModifiedTime(path, new LinkOption[0]);
                                if (j == -1 || lastModifiedTime.toMillis() <= j) {
                                    arrayList.add(path);
                                } else {
                                    long millis = lastModifiedTime.toMillis() - j;
                                    if (!$assertionsDisabled && millis <= 0) {
                                        throw new AssertionError();
                                    }
                                    if (this.cleanupSkipTimeMin == 0 || millis < this.cleanupSkipTimeMin) {
                                        this.cleanupSkipTimeMin = millis;
                                    }
                                    if (this.cleanupSkipTimeMax == 0 || millis > this.cleanupSkipTimeMax) {
                                        this.cleanupSkipTimeMax = millis;
                                    }
                                    this.logger.warn("File {} timestamp {} > {}", new Object[]{path, Long.valueOf(lastModifiedTime.toMillis()), Long.valueOf(j)});
                                    i++;
                                }
                            }
                        } catch (NumberFormatException e) {
                            this.cleanupWarnings.recordException(e);
                            this.logger.warn("Invalid chunk filename: " + path);
                        }
                    }
                }
                for (Path path3 : arrayList) {
                    try {
                        if (this.logger.isTraceEnabled()) {
                            FileTime lastModifiedTime2 = Files.getLastModifiedTime(path3, new LinkOption[0]);
                            this.logger.trace("Delete file: {} with last modifiedTime: {}({} millis)", new Object[]{path3, lastModifiedTime2, Long.valueOf(lastModifiedTime2.toMillis())});
                        }
                        Files.delete(path3);
                    } catch (IOException e2) {
                        this.cleanupWarnings.recordException(e2);
                        this.logger.warn("Could not delete file: " + path3);
                    }
                }
                this.cleanupPreservedFiles = set.size();
                this.cleanupDeletedFiles = arrayList.size();
                this.cleanupDeletedFilesTotal += arrayList.size();
                this.cleanupSkippedFiles = i;
                this.cleanupSkippedFilesTotal += i;
                if (newDirectoryStream != null) {
                    if (0 != 0) {
                        try {
                            newDirectoryStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        newDirectoryStream.close();
                    }
                }
                return (Void) null;
            } catch (Throwable th3) {
                if (newDirectoryStream != null) {
                    if (0 != 0) {
                        try {
                            newDirectoryStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        newDirectoryStream.close();
                    }
                }
                throw th3;
            }
        }).whenComplete(this.stageCleanup.recordStats());
    }

    public Stage<Set<Long>> list(Predicate<String> predicate, Predicate<Long> predicate2) {
        return Stage.ofCallable(this.executorService, () -> {
            DirectoryStream<Path> newDirectoryStream = Files.newDirectoryStream(this.dir);
            Throwable th = null;
            try {
                try {
                    Set set = (Set) StreamSupport.stream(newDirectoryStream.spliterator(), false).filter(path -> {
                        return lastModifiedFilter(predicate2, path);
                    }).map(path2 -> {
                        return path2.getFileName().toString();
                    }).filter(str -> {
                        return str.endsWith(".log") && predicate.test(str);
                    }).map(str2 -> {
                        return Long.valueOf(Long.parseLong(str2.substring(0, str2.length() - ".log".length())));
                    }).collect(Collectors.toSet());
                    if (newDirectoryStream != null) {
                        if (0 != 0) {
                            try {
                                newDirectoryStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            newDirectoryStream.close();
                        }
                    }
                    return set;
                } finally {
                }
            } catch (Throwable th3) {
                if (newDirectoryStream != null) {
                    if (th != null) {
                        try {
                            newDirectoryStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        newDirectoryStream.close();
                    }
                }
                throw th3;
            }
        }).whenComplete(this.stageList.recordStats());
    }

    private boolean lastModifiedFilter(Predicate<Long> predicate, Path path) {
        try {
            return predicate.test(Long.valueOf(Files.getLastModifiedTime(path, new LinkOption[0]).toMillis()));
        } catch (IOException e) {
            return false;
        }
    }

    public Eventloop getEventloop() {
        return this.eventloop;
    }

    public Stage<Void> start() {
        return Stage.ofCallable(this.executorService, () -> {
            return Files.createDirectories(this.dir, new FileAttribute[0]);
        }).toVoid();
    }

    public Stage<Void> stop() {
        return Stage.of((Object) null);
    }

    @JmxAttribute
    public StageStats getStageIdGenerator() {
        return this.stageIdGenerator;
    }

    @JmxAttribute
    public StageStats getStageFinishChunks() {
        return this.stageFinishChunks;
    }

    @JmxAttribute
    public StageStats getStageBackup() {
        return this.stageBackup;
    }

    @JmxAttribute
    public StageStats getStageCleanup() {
        return this.stageCleanup;
    }

    @JmxAttribute
    public StageStats getStageList() {
        return this.stageList;
    }

    @JmxAttribute
    public StageStats getStageOpenR1() {
        return this.stageOpenR1;
    }

    @JmxAttribute
    public StageStats getStageOpenR2() {
        return this.stageOpenR2;
    }

    @JmxAttribute
    public StageStats getStageOpenR3() {
        return this.stageOpenR3;
    }

    @JmxAttribute
    public StageStats getStageOpenW() {
        return this.stageOpenW;
    }

    @JmxAttribute
    public StreamStatsDetailed getReadFile() {
        return this.readFile;
    }

    @JmxAttribute
    public StreamStatsDetailed getReadDecompress() {
        return this.readDecompress;
    }

    @JmxAttribute
    public StreamStatsBasic getReadDeserialize() {
        return this.readDeserialize;
    }

    @JmxAttribute
    public StreamStatsDetailed getReadDeserializeDetailed() {
        return this.readDeserializeDetailed;
    }

    @JmxAttribute
    public StreamStatsBasic getWriteSerialize() {
        return this.writeSerialize;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteSerializeDetailed() {
        return this.writeSerializeDetailed;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteCompress() {
        return this.writeCompress;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteChunker() {
        return this.writeChunker;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteFile() {
        return this.writeFile;
    }

    @JmxAttribute
    public int getFinishChunks() {
        return this.finishChunks;
    }

    @JmxAttribute
    public ExceptionStats getCleanupWarnings() {
        return this.cleanupWarnings;
    }

    @JmxAttribute
    public int getCleanupPreservedFiles() {
        return this.cleanupPreservedFiles;
    }

    @JmxAttribute
    public int getCleanupDeletedFiles() {
        return this.cleanupDeletedFiles;
    }

    @JmxAttribute
    public int getCleanupDeletedFilesTotal() {
        return this.cleanupDeletedFilesTotal;
    }

    @JmxAttribute
    public int getCleanupSkippedFiles() {
        return this.cleanupSkippedFiles;
    }

    @JmxAttribute
    public int getCleanupSkippedFilesTotal() {
        return this.cleanupSkippedFilesTotal;
    }

    @JmxAttribute
    public long getCleanupSkipTimeMin() {
        return this.cleanupSkipTimeMin;
    }

    @JmxAttribute
    public long getCleanupSkipTimeMax() {
        return this.cleanupSkipTimeMax;
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailed = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailed = false;
    }

    public void resetStats() {
        this.cleanupPreservedFiles = 0;
        this.cleanupDeletedFiles = 0;
        this.cleanupDeletedFilesTotal = 0;
        this.cleanupSkippedFiles = 0;
        this.cleanupSkippedFilesTotal = 0;
        this.cleanupSkipTimeMin = 0L;
        this.cleanupSkipTimeMax = 0L;
        ReflectionUtils.resetStats(this);
    }

    static {
        $assertionsDisabled = !LocalFsChunkStorage.class.desiredAssertionStatus();
        DEFAULT_BUFFER_SIZE = MemSize.kilobytes(256L);
        DEFAULT_SMOOTHING_WINDOW = Duration.ofMinutes(5L);
    }
}
