/*
 * Decompiled with CFR 0.152.
 */
package io.activej.aggregation;

import io.activej.aggregation.AggregationChunkStorage;
import io.activej.aggregation.AggregationException;
import io.activej.aggregation.ChunkIdCodec;
import io.activej.aggregation.ot.AggregationStructure;
import io.activej.aggregation.util.Utils;
import io.activej.async.service.EventloopService;
import io.activej.async.util.LogUtils;
import io.activej.bytebuf.ByteBuf;
import io.activej.codegen.DefiningClassLoader;
import io.activej.common.Checks;
import io.activej.common.MemSize;
import io.activej.common.api.WithInitializer;
import io.activej.common.collection.CollectionUtils;
import io.activej.common.exception.MalformedDataException;
import io.activej.common.ref.RefInt;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.dsl.ChannelSupplierTransformer;
import io.activej.csp.process.ChannelByteChunker;
import io.activej.csp.process.frames.ChannelFrameDecoder;
import io.activej.csp.process.frames.ChannelFrameEncoder;
import io.activej.csp.process.frames.FrameFormat;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.csp.ChannelDeserializer;
import io.activej.datastream.csp.ChannelSerializer;
import io.activej.datastream.processor.StreamSupplierTransformer;
import io.activej.datastream.stats.StreamStats;
import io.activej.datastream.stats.StreamStatsBasic;
import io.activej.datastream.stats.StreamStatsDetailed;
import io.activej.datastream.stats.StreamStatsSizeCounter;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.jmx.EventloopJmxBeanEx;
import io.activej.fs.ActiveFs;
import io.activej.fs.FileMetadata;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.jmx.stats.ExceptionStats;
import io.activej.jmx.stats.StatsUtils;
import io.activej.jmx.stats.ValueStats;
import io.activej.ot.util.IdGenerator;
import io.activej.promise.Promise;
import io.activej.promise.jmx.PromiseStats;
import java.nio.file.attribute.FileTime;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ActiveFsChunkStorage<C>
implements AggregationChunkStorage<C>,
EventloopService,
WithInitializer<ActiveFsChunkStorage<C>>,
EventloopJmxBeanEx {
    private static final Logger logger = LoggerFactory.getLogger(ActiveFsChunkStorage.class);
    public static final MemSize DEFAULT_BUFFER_SIZE = MemSize.kilobytes((long)256L);
    public static final Duration DEFAULT_SMOOTHING_WINDOW = Duration.ofMinutes(5L);
    public static final String DEFAULT_BACKUP_PATH = "backups";
    public static final String SUCCESSFUL_BACKUP_FILE = "_0_SUCCESSFUL_BACKUP";
    public static final String LOG = ".log";
    public static final String TEMP_LOG = ".temp";
    private final Eventloop eventloop;
    private final ChunkIdCodec<C> chunkIdCodec;
    private final IdGenerator<C> idGenerator;
    private final FrameFormat frameFormat;
    private final ActiveFs fs;
    private String chunksPath = "";
    private String tempPath = "";
    private String backupPath = "backups";
    private MemSize bufferSize = DEFAULT_BUFFER_SIZE;
    private final ValueStats chunksCount = ValueStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseIdGenerator = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseOpenR = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseOpenW = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseFinishChunks = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseList = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseBackup = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseCleanup = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseCleanupCheckRequiredChunks = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private boolean detailed;
    private final StreamStatsDetailed<ByteBuf> readFile = StreamStats.detailed((StreamStatsSizeCounter)StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsDetailed<ByteBuf> readDecompress = StreamStats.detailed((StreamStatsSizeCounter)StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsBasic<?> readDeserialize = StreamStats.basic();
    private final StreamStatsDetailed<?> readDeserializeDetailed = StreamStats.detailed();
    private final StreamStatsBasic<?> writeSerialize = StreamStats.basic();
    private final StreamStatsDetailed<?> writeSerializeDetailed = StreamStats.detailed();
    private final StreamStatsDetailed<ByteBuf> writeCompress = StreamStats.detailed((StreamStatsSizeCounter)StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsDetailed<ByteBuf> writeChunker = StreamStats.detailed((StreamStatsSizeCounter)StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsDetailed<ByteBuf> writeFile = StreamStats.detailed((StreamStatsSizeCounter)StreamStatsSizeCounter.forByteBufs());
    private final ExceptionStats chunkNameWarnings = ExceptionStats.create();
    private int cleanupPreservedFiles;
    private int cleanupDeletedFiles;
    private int cleanupDeletedFilesTotal;
    private int cleanupSkippedFiles;
    private int cleanupSkippedFilesTotal;
    private int finishChunks;

    private ActiveFsChunkStorage(Eventloop eventloop, ChunkIdCodec<C> chunkIdCodec, IdGenerator<C> idGenerator, FrameFormat frameFormat, ActiveFs fs) {
        this.eventloop = eventloop;
        this.chunkIdCodec = chunkIdCodec;
        this.idGenerator = idGenerator;
        this.frameFormat = frameFormat;
        this.fs = fs;
    }

    public static <C> ActiveFsChunkStorage<C> create(Eventloop eventloop, ChunkIdCodec<C> chunkIdCodec, IdGenerator<C> idGenerator, FrameFormat frameFormat, ActiveFs fs) {
        return new ActiveFsChunkStorage<C>(eventloop, chunkIdCodec, idGenerator, frameFormat, fs);
    }

    public ActiveFsChunkStorage<C> withBufferSize(MemSize bufferSize) {
        this.bufferSize = bufferSize;
        return this;
    }

    public ActiveFsChunkStorage<C> withChunksPath(String path) {
        this.chunksPath = path;
        return this;
    }

    public ActiveFsChunkStorage<C> withTempPath(String path) {
        this.tempPath = path;
        return this;
    }

    public ActiveFsChunkStorage<C> withBackupPath(String path) {
        this.backupPath = path;
        return this;
    }

    @Override
    public <T> Promise<StreamSupplier<T>> read(AggregationStructure aggregation, List<String> fields, Class<T> recordClass, C chunkId, DefiningClassLoader classLoader) {
        return this.fs.download(this.toPath(chunkId)).thenEx(Utils.wrapException(e -> new AggregationException("Failed to download chunk '" + chunkId + '\'', (Throwable)e))).whenComplete(this.promiseOpenR.recordStats()).map(supplier -> ((StreamSupplier)((StreamSupplier)((ChannelSupplier)((ChannelSupplier)((ChannelSupplier)supplier.transformWith(this.readFile)).transformWith((ChannelSupplierTransformer)ChannelFrameDecoder.create((FrameFormat)this.frameFormat))).transformWith(this.readDecompress)).transformWith((ChannelSupplierTransformer)ChannelDeserializer.create(Utils.createBinarySerializer(aggregation, recordClass, aggregation.getKeys(), fields, classLoader)))).transformWith(this.detailed ? this.readDeserializeDetailed : this.readDeserialize)).withEndOfStream(eos -> eos.thenEx(Utils.wrapException(e -> new AggregationException("Failed to read chunk '" + chunkId + '\'', (Throwable)e)))));
    }

    @Override
    public <T> Promise<StreamConsumer<T>> write(AggregationStructure aggregation, List<String> fields, Class<T> recordClass, C chunkId, DefiningClassLoader classLoader) {
        return this.fs.upload(this.toTempPath(chunkId)).thenEx(Utils.wrapException(e -> new AggregationException("Failed to upload chunk '" + chunkId + '\'', (Throwable)e))).whenComplete(this.promiseOpenW.recordStats()).map(consumer -> StreamConsumer.ofSupplier(supplier -> ((ChannelSupplier)((ChannelSupplier)((ChannelSupplier)((ChannelSupplier)((ChannelSupplier)((ChannelSupplier)((StreamSupplier)supplier.transformWith(this.detailed ? this.writeSerializeDetailed : this.writeSerialize)).transformWith((StreamSupplierTransformer)ChannelSerializer.create(Utils.createBinarySerializer(aggregation, recordClass, aggregation.getKeys(), fields, classLoader)).withInitialBufferSize(this.bufferSize))).transformWith(this.writeCompress)).transformWith((ChannelSupplierTransformer)ChannelFrameEncoder.create((FrameFormat)this.frameFormat))).transformWith(this.writeChunker)).transformWith((ChannelSupplierTransformer)ChannelByteChunker.create((MemSize)this.bufferSize.map(bytes -> bytes / 2L), (MemSize)this.bufferSize.map(bytes -> bytes * 2L)))).transformWith(this.writeFile)).streamTo(consumer)).withAcknowledgement(ack -> ack.thenEx(Utils.wrapException(e -> new AggregationException("Failed to write chunk '" + chunkId + '\'', (Throwable)e)))));
    }

    @Override
    public Promise<Void> finish(Set<C> chunkIds) {
        return this.fs.moveAll(chunkIds.stream().collect(Collectors.toMap(this::toTempPath, this::toPath))).thenEx(Utils.wrapException(e -> new AggregationException("Failed to finalize chunks: " + CollectionUtils.toLimitedString((Collection)chunkIds, (int)10), (Throwable)e))).whenResult(() -> {
            this.finishChunks = chunkIds.size();
        }).whenComplete(this.promiseFinishChunks.recordStats());
    }

    public Promise<C> createId() {
        return this.idGenerator.createId().thenEx(Utils.wrapException(e -> new AggregationException("Could not create ID", (Throwable)e))).whenComplete(this.promiseIdGenerator.recordStats());
    }

    public Promise<Void> backup(String backupId, Set<C> chunkIds) {
        return this.fs.copyAll(chunkIds.stream().collect(Collectors.toMap(this::toPath, c -> this.toBackupPath(backupId, c)))).then(() -> ChannelSupplier.of().streamTo(this.fs.upload(this.toBackupPath(backupId, null), 0L))).thenEx(Utils.wrapException(e -> new AggregationException("Backup '" + backupId + "' of chunks " + CollectionUtils.toLimitedString((Collection)chunkIds, (int)10) + " failed", (Throwable)e))).whenComplete(this.promiseBackup.recordStats()).toVoid();
    }

    public Promise<Void> cleanup(Set<C> saveChunks) {
        return this.cleanup(saveChunks, null);
    }

    public Promise<Void> cleanup(Set<C> preserveChunks, @Nullable Instant instant) {
        long timestamp = instant != null ? instant.toEpochMilli() : -1L;
        RefInt skipped = new RefInt(0);
        RefInt deleted = new RefInt(0);
        return this.fs.list(this.toDir(this.chunksPath) + "*" + LOG).thenEx(Utils.wrapException(e -> new AggregationException("Failed to list chunks for cleanup", (Throwable)e))).then(list -> {
            Set toDelete = list.entrySet().stream().filter(entry -> {
                C id = this.fromPath((String)entry.getKey());
                if (id == null || preserveChunks.contains(id)) {
                    return false;
                }
                long fileTimestamp = ((FileMetadata)entry.getValue()).getTimestamp();
                if (timestamp == -1L || fileTimestamp <= timestamp) {
                    return true;
                }
                logger.trace("File {} timestamp {} > {}", new Object[]{entry, fileTimestamp, timestamp});
                skipped.inc();
                return false;
            }).peek(entry -> {
                if (logger.isTraceEnabled()) {
                    FileTime lastModifiedTime = FileTime.fromMillis(((FileMetadata)entry.getValue()).getTimestamp());
                    logger.trace("Delete file: {} with last modifiedTime: {}({} millis)", new Object[]{entry.getKey(), lastModifiedTime, lastModifiedTime.toMillis()});
                }
                deleted.inc();
            }).map(Map.Entry::getKey).collect(Collectors.toSet());
            if (toDelete.isEmpty()) {
                return Promise.complete();
            }
            return this.fs.deleteAll(toDelete).thenEx(Utils.wrapException(e -> new AggregationException("Failed to clean up chunks", (Throwable)e)));
        }).whenResult(() -> {
            this.cleanupPreservedFiles = preserveChunks.size();
            this.cleanupDeletedFiles = deleted.get();
            this.cleanupDeletedFilesTotal += deleted.get();
            this.cleanupSkippedFiles = skipped.get();
            this.cleanupSkippedFilesTotal += skipped.get();
        }).whenComplete(this.promiseCleanup.recordStats());
    }

    public Promise<Set<C>> list(Predicate<C> chunkIdPredicate, Predicate<Long> lastModifiedPredicate) {
        return this.fs.list(this.toDir(this.chunksPath) + "*" + LOG).thenEx(Utils.wrapException(e -> new AggregationException("Failed to list chunks", (Throwable)e))).map(list -> list.entrySet().stream().filter(entry -> lastModifiedPredicate.test(((FileMetadata)entry.getValue()).getTimestamp())).map(Map.Entry::getKey).map(this::fromPath).filter(Objects::nonNull).filter(chunkIdPredicate).collect(Collectors.toSet())).whenComplete(this.promiseList.recordStats());
    }

    public Promise<Void> checkRequiredChunks(Set<C> requiredChunks) {
        return this.list(s -> true, timestamp -> true).whenResult(actualChunks -> this.chunksCount.recordValue(actualChunks.size())).then(actualChunks -> actualChunks.containsAll(requiredChunks) ? Promise.of((Object)null) : Promise.ofException((Throwable)new AggregationException("Missed chunks from storage: " + CollectionUtils.toLimitedString((Collection)CollectionUtils.difference((Set)requiredChunks, (Set)actualChunks), (int)100)))).whenComplete(this.promiseCleanupCheckRequiredChunks.recordStats()).whenComplete(LogUtils.toLogger((Logger)logger, (String)LogUtils.thisMethod(), (Object[])new Object[]{CollectionUtils.toLimitedString(requiredChunks, (int)6)}));
    }

    private String toPath(C chunkId) {
        return this.toDir(this.chunksPath) + this.chunkIdCodec.toFileName(chunkId) + LOG;
    }

    private String toTempPath(C chunkId) {
        return this.toDir(this.tempPath) + this.chunkIdCodec.toFileName(chunkId) + TEMP_LOG;
    }

    private String toBackupPath(String backupId, @Nullable C chunkId) {
        return this.toDir(this.backupPath) + backupId + "/" + (chunkId != null ? this.chunkIdCodec.toFileName(chunkId) + LOG : SUCCESSFUL_BACKUP_FILE);
    }

    private String toDir(String path) {
        return path.isEmpty() || path.endsWith("/") ? path : path + "/";
    }

    @Nullable
    private C fromPath(String path) {
        String chunksDir = this.toDir(this.chunksPath);
        Checks.checkArgument((boolean)path.startsWith(chunksDir));
        try {
            return this.chunkIdCodec.fromFileName(path.substring(chunksDir.length(), path.length() - LOG.length()));
        }
        catch (MalformedDataException e) {
            this.chunkNameWarnings.recordException((Throwable)e);
            logger.warn("Invalid chunk filename: {}", (Object)path);
            return null;
        }
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    @NotNull
    public Promise<Void> start() {
        return this.fs.ping().thenEx(Utils.wrapException(e -> new AggregationException("Failed to start storage", (Throwable)e)));
    }

    @NotNull
    public Promise<Void> stop() {
        return Promise.complete();
    }

    @JmxAttribute
    public PromiseStats getPromiseIdGenerator() {
        return this.promiseIdGenerator;
    }

    @JmxAttribute
    public PromiseStats getPromiseFinishChunks() {
        return this.promiseFinishChunks;
    }

    @JmxAttribute
    public PromiseStats getPromiseBackup() {
        return this.promiseBackup;
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanup() {
        return this.promiseCleanup;
    }

    @JmxAttribute
    public PromiseStats getPromiseList() {
        return this.promiseList;
    }

    @JmxAttribute
    public PromiseStats getPromiseOpenR() {
        return this.promiseOpenR;
    }

    @JmxAttribute
    public PromiseStats getPromiseOpenW() {
        return this.promiseOpenW;
    }

    @JmxAttribute
    public StreamStatsDetailed getReadFile() {
        return this.readFile;
    }

    @JmxAttribute
    public StreamStatsDetailed getReadDecompress() {
        return this.readDecompress;
    }

    @JmxAttribute
    public StreamStatsBasic getReadDeserialize() {
        return this.readDeserialize;
    }

    @JmxAttribute
    public StreamStatsDetailed getReadDeserializeDetailed() {
        return this.readDeserializeDetailed;
    }

    @JmxAttribute
    public StreamStatsBasic getWriteSerialize() {
        return this.writeSerialize;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteSerializeDetailed() {
        return this.writeSerializeDetailed;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteCompress() {
        return this.writeCompress;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteChunker() {
        return this.writeChunker;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteFile() {
        return this.writeFile;
    }

    @JmxAttribute
    public int getFinishChunks() {
        return this.finishChunks;
    }

    @JmxAttribute
    public ExceptionStats getChunkNameWarnings() {
        return this.chunkNameWarnings;
    }

    @JmxAttribute
    public int getCleanupPreservedFiles() {
        return this.cleanupPreservedFiles;
    }

    @JmxAttribute
    public int getCleanupDeletedFiles() {
        return this.cleanupDeletedFiles;
    }

    @JmxAttribute
    public int getCleanupDeletedFilesTotal() {
        return this.cleanupDeletedFilesTotal;
    }

    @JmxAttribute
    public int getCleanupSkippedFiles() {
        return this.cleanupSkippedFiles;
    }

    @JmxAttribute
    public int getCleanupSkippedFilesTotal() {
        return this.cleanupSkippedFilesTotal;
    }

    @JmxAttribute
    public ValueStats getChunksCount() {
        return this.chunksCount;
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanupCheckRequiredChunks() {
        return this.promiseCleanupCheckRequiredChunks;
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailed = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailed = false;
    }

    public void resetStats() {
        this.cleanupPreservedFiles = 0;
        this.cleanupDeletedFiles = 0;
        this.cleanupDeletedFilesTotal = 0;
        this.cleanupSkippedFiles = 0;
        this.cleanupSkippedFilesTotal = 0;
        StatsUtils.resetStats((Object)this);
    }
}

