package io.activej.aggregation;

import io.activej.aggregation.ot.AggregationStructure;
import io.activej.aggregation.util.Utils;
import io.activej.async.service.EventloopService;
import io.activej.async.util.LogUtils;
import io.activej.bytebuf.ByteBuf;
import io.activej.codegen.DefiningClassLoader;
import io.activej.common.Checks;
import io.activej.common.MemSize;
import io.activej.common.api.WithInitializer;
import io.activej.common.collection.CollectionUtils;
import io.activej.common.exception.parse.ParseException;
import io.activej.common.ref.RefInt;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.process.ChannelByteChunker;
import io.activej.csp.process.ChannelLZ4Compressor;
import io.activej.csp.process.ChannelLZ4Decompressor;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.csp.ChannelDeserializer;
import io.activej.datastream.csp.ChannelSerializer;
import io.activej.datastream.stats.StreamStats;
import io.activej.datastream.stats.StreamStatsBasic;
import io.activej.datastream.stats.StreamStatsDetailed;
import io.activej.datastream.stats.StreamStatsSizeCounter;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.jmx.EventloopJmxBeanEx;
import io.activej.fs.ActiveFs;
import io.activej.fs.FileMetadata;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.jmx.stats.ExceptionStats;
import io.activej.jmx.stats.StatsUtils;
import io.activej.jmx.stats.ValueStats;
import io.activej.ot.util.IdGenerator;
import io.activej.promise.Promise;
import io.activej.promise.jmx.PromiseStats;
import java.nio.file.attribute.FileTime;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/aggregation/ActiveFsChunkStorage.class */
public final class ActiveFsChunkStorage<C> implements AggregationChunkStorage<C>, EventloopService, WithInitializer<ActiveFsChunkStorage<C>>, EventloopJmxBeanEx {
    private static final Logger logger = LoggerFactory.getLogger(ActiveFsChunkStorage.class);
    public static final MemSize DEFAULT_BUFFER_SIZE = MemSize.kilobytes(256);
    public static final Duration DEFAULT_SMOOTHING_WINDOW = Duration.ofMinutes(5);
    public static final String DEFAULT_BACKUP_PATH = "backups";
    public static final String SUCCESSFUL_BACKUP_FILE = "_0_SUCCESSFUL_BACKUP";
    public static final String LOG = ".log";
    public static final String TEMP_LOG = ".temp";
    private final Eventloop eventloop;
    private final ChunkIdCodec<C> chunkIdCodec;
    private final IdGenerator<C> idGenerator;
    private final ActiveFs fs;
    private boolean detailed;
    private int cleanupPreservedFiles;
    private int cleanupDeletedFiles;
    private int cleanupDeletedFilesTotal;
    private int cleanupSkippedFiles;
    private int cleanupSkippedFilesTotal;
    private int finishChunks;
    private String chunksPath = "";
    private String tempPath = "";
    private String backupPath = DEFAULT_BACKUP_PATH;
    private MemSize bufferSize = DEFAULT_BUFFER_SIZE;
    private final ValueStats chunksCount = ValueStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseIdGenerator = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseOpenR = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseOpenW = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseFinishChunks = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseList = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseBackup = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseCleanup = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseCleanupCheckRequiredChunks = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StreamStatsDetailed<ByteBuf> readFile = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsDetailed<ByteBuf> readDecompress = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsBasic<?> readDeserialize = StreamStats.basic();
    private final StreamStatsDetailed<?> readDeserializeDetailed = StreamStats.detailed();
    private final StreamStatsBasic<?> writeSerialize = StreamStats.basic();
    private final StreamStatsDetailed<?> writeSerializeDetailed = StreamStats.detailed();
    private final StreamStatsDetailed<ByteBuf> writeCompress = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsDetailed<ByteBuf> writeChunker = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsDetailed<ByteBuf> writeFile = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());
    private final ExceptionStats chunkNameWarnings = ExceptionStats.create();

    private ActiveFsChunkStorage(Eventloop eventloop, ChunkIdCodec<C> chunkIdCodec, IdGenerator<C> idGenerator, ActiveFs activeFs) {
        this.eventloop = eventloop;
        this.chunkIdCodec = chunkIdCodec;
        this.idGenerator = idGenerator;
        this.fs = activeFs;
    }

    public static <C> ActiveFsChunkStorage<C> create(Eventloop eventloop, ChunkIdCodec<C> chunkIdCodec, IdGenerator<C> idGenerator, ActiveFs activeFs) {
        return new ActiveFsChunkStorage<>(eventloop, chunkIdCodec, idGenerator, activeFs);
    }

    public ActiveFsChunkStorage<C> withBufferSize(MemSize memSize) {
        this.bufferSize = memSize;
        return this;
    }

    public ActiveFsChunkStorage<C> withChunksPath(String str) {
        this.chunksPath = str;
        return this;
    }

    public ActiveFsChunkStorage<C> withTempPath(String str) {
        this.tempPath = str;
        return this;
    }

    public ActiveFsChunkStorage<C> withBackupPath(String str) {
        this.backupPath = str;
        return this;
    }

    @Override // io.activej.aggregation.AggregationChunkStorage
    public <T> Promise<StreamSupplier<T>> read(AggregationStructure aggregationStructure, List<String> list, Class<T> cls, C c, DefiningClassLoader definingClassLoader) {
        return this.fs.download(toPath(c)).whenComplete(this.promiseOpenR.recordStats()).map(channelSupplier -> {
            return (StreamSupplier) ((StreamSupplier) ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) channelSupplier.transformWith(this.readFile)).transformWith(ChannelLZ4Decompressor.create())).transformWith(this.readDecompress)).transformWith(ChannelDeserializer.create(Utils.createBinarySerializer(aggregationStructure, cls, aggregationStructure.getKeys(), list, definingClassLoader)))).transformWith(this.detailed ? this.readDeserializeDetailed : this.readDeserialize);
        });
    }

    @Override // io.activej.aggregation.AggregationChunkStorage
    public <T> Promise<StreamConsumer<T>> write(AggregationStructure aggregationStructure, List<String> list, Class<T> cls, C c, DefiningClassLoader definingClassLoader) {
        return this.fs.upload(toTempPath(c)).whenComplete(this.promiseOpenW.recordStats()).map(channelConsumer -> {
            return StreamConsumer.ofSupplier(streamSupplier -> {
                return ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) ((StreamSupplier) streamSupplier.transformWith(this.detailed ? this.writeSerializeDetailed : this.writeSerialize)).transformWith(ChannelSerializer.create(Utils.createBinarySerializer(aggregationStructure, cls, aggregationStructure.getKeys(), list, definingClassLoader)).withInitialBufferSize(this.bufferSize))).transformWith(this.writeCompress)).transformWith(ChannelLZ4Compressor.createFastCompressor())).transformWith(this.writeChunker)).transformWith(ChannelByteChunker.create(this.bufferSize.map(l -> {
                    return Long.valueOf(l.longValue() / 2);
                }), this.bufferSize.map(l2 -> {
                    return Long.valueOf(l2.longValue() * 2);
                })))).transformWith(this.writeFile)).streamTo(channelConsumer);
            });
        });
    }

    @Override // io.activej.aggregation.AggregationChunkStorage
    public Promise<Void> finish(Set<C> set) {
        return this.fs.moveAll((Map) set.stream().collect(Collectors.toMap(this::toTempPath, this::toPath))).whenResult(() -> {
            this.finishChunks = set.size();
        }).whenComplete(this.promiseFinishChunks.recordStats());
    }

    public Promise<C> createId() {
        return this.idGenerator.createId().whenComplete(this.promiseIdGenerator.recordStats());
    }

    public Promise<Void> backup(String str, Set<C> set) {
        return this.fs.copyAll((Map) set.stream().collect(Collectors.toMap(this::toPath, obj -> {
            return toBackupPath(str, obj);
        }))).then(() -> {
            return ChannelSupplier.of().streamTo(this.fs.upload(toBackupPath(str, null), 0L));
        }).whenComplete(this.promiseBackup.recordStats()).toVoid();
    }

    public Promise<Void> cleanup(Set<C> set) {
        return cleanup(set, null);
    }

    public Promise<Void> cleanup(Set<C> set, @Nullable Instant instant) {
        long epochMilli = instant != null ? instant.toEpochMilli() : -1L;
        RefInt refInt = new RefInt(0);
        RefInt refInt2 = new RefInt(0);
        return this.fs.list(toDir(this.chunksPath) + "*" + LOG).then(map -> {
            Set set2 = (Set) map.entrySet().stream().filter(entry -> {
                C fromPath = fromPath((String) entry.getKey());
                if (fromPath == null || set.contains(fromPath)) {
                    return false;
                }
                long timestamp = ((FileMetadata) entry.getValue()).getTimestamp();
                if (epochMilli == -1 || timestamp <= epochMilli) {
                    return true;
                }
                logger.trace("File {} timestamp {} > {}", new Object[]{entry, Long.valueOf(timestamp), Long.valueOf(epochMilli)});
                refInt.inc();
                return false;
            }).peek(entry2 -> {
                if (logger.isTraceEnabled()) {
                    FileTime fromMillis = FileTime.fromMillis(((FileMetadata) entry2.getValue()).getTimestamp());
                    logger.trace("Delete file: {} with last modifiedTime: {}({} millis)", new Object[]{entry2.getKey(), fromMillis, Long.valueOf(fromMillis.toMillis())});
                }
                refInt2.inc();
            }).map((v0) -> {
                return v0.getKey();
            }).collect(Collectors.toSet());
            return set2.isEmpty() ? Promise.complete() : this.fs.deleteAll(set2);
        }).whenResult(() -> {
            this.cleanupPreservedFiles = set.size();
            this.cleanupDeletedFiles = refInt2.get();
            this.cleanupDeletedFilesTotal += refInt2.get();
            this.cleanupSkippedFiles = refInt.get();
            this.cleanupSkippedFilesTotal += refInt.get();
        }).whenComplete(this.promiseCleanup.recordStats());
    }

    public Promise<Set<C>> list(Predicate<C> predicate, Predicate<Long> predicate2) {
        return this.fs.list(toDir(this.chunksPath) + "*" + LOG).map(map -> {
            return (Set) map.entrySet().stream().filter(entry -> {
                return predicate2.test(Long.valueOf(((FileMetadata) entry.getValue()).getTimestamp()));
            }).map((v0) -> {
                return v0.getKey();
            }).map(this::fromPath).filter(Objects::nonNull).filter(predicate).collect(Collectors.toSet());
        }).whenComplete(this.promiseList.recordStats());
    }

    public Promise<Void> checkRequiredChunks(Set<C> set) {
        return list(obj -> {
            return true;
        }, l -> {
            return true;
        }).whenResult(set2 -> {
            this.chunksCount.recordValue(set2.size());
        }).then(set3 -> {
            return set3.containsAll(set) ? Promise.of((Void) null) : Promise.ofException(new IllegalStateException("Missed chunks from storage: " + CollectionUtils.toLimitedString(CollectionUtils.difference(set, set3), 100)));
        }).whenComplete(this.promiseCleanupCheckRequiredChunks.recordStats()).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{CollectionUtils.toLimitedString(set, 6)}));
    }

    private String toPath(C c) {
        return toDir(this.chunksPath) + this.chunkIdCodec.toFileName(c) + LOG;
    }

    private String toTempPath(C c) {
        return toDir(this.tempPath) + this.chunkIdCodec.toFileName(c) + TEMP_LOG;
    }

    private String toBackupPath(String str, @Nullable C c) {
        return toDir(this.backupPath) + str + "/" + (c != null ? this.chunkIdCodec.toFileName(c) + LOG : SUCCESSFUL_BACKUP_FILE);
    }

    private String toDir(String str) {
        return (str.isEmpty() || str.endsWith("/")) ? str : str + "/";
    }

    @Nullable
    private C fromPath(String str) {
        String dir = toDir(this.chunksPath);
        Checks.checkArgument(str.startsWith(dir));
        try {
            return this.chunkIdCodec.fromFileName(str.substring(dir.length(), str.length() - LOG.length()));
        } catch (ParseException e) {
            this.chunkNameWarnings.recordException(e);
            logger.warn("Invalid chunk filename: {}", str);
            return null;
        }
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    @NotNull
    public Promise<Void> start() {
        return this.fs.ping();
    }

    @NotNull
    public Promise<Void> stop() {
        return Promise.complete();
    }

    @JmxAttribute
    public PromiseStats getPromiseIdGenerator() {
        return this.promiseIdGenerator;
    }

    @JmxAttribute
    public PromiseStats getPromiseFinishChunks() {
        return this.promiseFinishChunks;
    }

    @JmxAttribute
    public PromiseStats getPromiseBackup() {
        return this.promiseBackup;
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanup() {
        return this.promiseCleanup;
    }

    @JmxAttribute
    public PromiseStats getPromiseList() {
        return this.promiseList;
    }

    @JmxAttribute
    public PromiseStats getPromiseOpenR() {
        return this.promiseOpenR;
    }

    @JmxAttribute
    public PromiseStats getPromiseOpenW() {
        return this.promiseOpenW;
    }

    @JmxAttribute
    public StreamStatsDetailed getReadFile() {
        return this.readFile;
    }

    @JmxAttribute
    public StreamStatsDetailed getReadDecompress() {
        return this.readDecompress;
    }

    @JmxAttribute
    public StreamStatsBasic getReadDeserialize() {
        return this.readDeserialize;
    }

    @JmxAttribute
    public StreamStatsDetailed getReadDeserializeDetailed() {
        return this.readDeserializeDetailed;
    }

    @JmxAttribute
    public StreamStatsBasic getWriteSerialize() {
        return this.writeSerialize;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteSerializeDetailed() {
        return this.writeSerializeDetailed;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteCompress() {
        return this.writeCompress;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteChunker() {
        return this.writeChunker;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteFile() {
        return this.writeFile;
    }

    @JmxAttribute
    public int getFinishChunks() {
        return this.finishChunks;
    }

    @JmxAttribute
    public ExceptionStats getChunkNameWarnings() {
        return this.chunkNameWarnings;
    }

    @JmxAttribute
    public int getCleanupPreservedFiles() {
        return this.cleanupPreservedFiles;
    }

    @JmxAttribute
    public int getCleanupDeletedFiles() {
        return this.cleanupDeletedFiles;
    }

    @JmxAttribute
    public int getCleanupDeletedFilesTotal() {
        return this.cleanupDeletedFilesTotal;
    }

    @JmxAttribute
    public int getCleanupSkippedFiles() {
        return this.cleanupSkippedFiles;
    }

    @JmxAttribute
    public int getCleanupSkippedFilesTotal() {
        return this.cleanupSkippedFilesTotal;
    }

    @JmxAttribute
    public ValueStats getChunksCount() {
        return this.chunksCount;
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanupCheckRequiredChunks() {
        return this.promiseCleanupCheckRequiredChunks;
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailed = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailed = false;
    }

    public void resetStats() {
        this.cleanupPreservedFiles = 0;
        this.cleanupDeletedFiles = 0;
        this.cleanupDeletedFilesTotal = 0;
        this.cleanupSkippedFiles = 0;
        this.cleanupSkippedFilesTotal = 0;
        StatsUtils.resetStats(this);
    }
}
