package io.datakernel.aggregation;

import io.datakernel.aggregation.ot.AggregationStructure;
import io.datakernel.async.service.EventloopService;
import io.datakernel.async.util.LogUtils;
import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.codegen.DefiningClassLoader;
import io.datakernel.common.Initializable;
import io.datakernel.common.MemSize;
import io.datakernel.common.collection.CollectionUtils;
import io.datakernel.common.ref.RefInt;
import io.datakernel.csp.ChannelSupplier;
import io.datakernel.csp.process.ChannelByteChunker;
import io.datakernel.csp.process.ChannelLZ4Compressor;
import io.datakernel.csp.process.ChannelLZ4Decompressor;
import io.datakernel.datastream.StreamConsumer;
import io.datakernel.datastream.StreamSupplier;
import io.datakernel.datastream.csp.ChannelDeserializer;
import io.datakernel.datastream.csp.ChannelSerializer;
import io.datakernel.datastream.stats.StreamStats;
import io.datakernel.datastream.stats.StreamStatsBasic;
import io.datakernel.datastream.stats.StreamStatsDetailed;
import io.datakernel.datastream.stats.StreamStatsSizeCounter;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.eventloop.jmx.EventloopJmxMBeanEx;
import io.datakernel.eventloop.jmx.ExceptionStats;
import io.datakernel.eventloop.jmx.ValueStats;
import io.datakernel.eventloop.util.ReflectionUtils;
import io.datakernel.jmx.api.JmxAttribute;
import io.datakernel.jmx.api.JmxOperation;
import io.datakernel.promise.Promise;
import io.datakernel.promise.Promises;
import io.datakernel.promise.jmx.PromiseStats;
import io.datakernel.remotefs.FsClient;
import java.io.File;
import java.nio.file.attribute.FileTime;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/aggregation/RemoteFsChunkStorage.class */
public final class RemoteFsChunkStorage<C> implements AggregationChunkStorage<C>, EventloopService, Initializable<RemoteFsChunkStorage<C>>, EventloopJmxMBeanEx {
    private static final Logger logger;
    public static final MemSize DEFAULT_BUFFER_SIZE;
    public static final Duration DEFAULT_SMOOTHING_WINDOW;
    public static final String DEFAULT_BACKUP_FOLDER_NAME = "backups";
    public static final String LOG = ".log";
    public static final String TEMP_LOG = ".temp";
    private final Eventloop eventloop;
    private final ChunkIdCodec<C> chunkIdCodec;
    private final IdGenerator<C> idGenerator;
    private final FsClient client;
    private boolean detailed;
    private int cleanupPreservedFiles;
    private int cleanupDeletedFiles;
    private int cleanupDeletedFilesTotal;
    private int cleanupSkippedFiles;
    private int cleanupSkippedFilesTotal;
    private int finishChunks;
    static final /* synthetic */ boolean $assertionsDisabled;
    private String backupDir = DEFAULT_BACKUP_FOLDER_NAME;
    private MemSize bufferSize = DEFAULT_BUFFER_SIZE;
    private final ValueStats chunksCount = ValueStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseIdGenerator = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseOpenR = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseOpenW = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseFinishChunks = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseList = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseBackup = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseCleanup = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseCleanupCheckRequiredChunks = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StreamStatsDetailed<ByteBuf> readFile = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsDetailed<ByteBuf> readDecompress = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsBasic<?> readDeserialize = StreamStats.basic();
    private final StreamStatsDetailed<?> readDeserializeDetailed = StreamStats.detailed();
    private final StreamStatsBasic<?> writeSerialize = StreamStats.basic();
    private final StreamStatsDetailed<?> writeSerializeDetailed = StreamStats.detailed();
    private final StreamStatsDetailed<ByteBuf> writeCompress = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsDetailed<ByteBuf> writeChunker = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsDetailed<ByteBuf> writeFile = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());
    private final ExceptionStats cleanupWarnings = ExceptionStats.create();

    private RemoteFsChunkStorage(Eventloop eventloop, ChunkIdCodec<C> chunkIdCodec, IdGenerator<C> idGenerator, FsClient fsClient) {
        this.eventloop = eventloop;
        this.chunkIdCodec = chunkIdCodec;
        this.idGenerator = idGenerator;
        this.client = fsClient;
    }

    public static <C> RemoteFsChunkStorage<C> create(Eventloop eventloop, ChunkIdCodec<C> chunkIdCodec, IdGenerator<C> idGenerator, FsClient fsClient) {
        return new RemoteFsChunkStorage<>(eventloop, chunkIdCodec, idGenerator, fsClient);
    }

    public RemoteFsChunkStorage<C> withBufferSize(MemSize memSize) {
        this.bufferSize = memSize;
        return this;
    }

    public RemoteFsChunkStorage<C> withBackupPath(String str) {
        this.backupDir = str;
        return this;
    }

    private String getPath(C c) {
        return toFileName(c) + LOG;
    }

    private String getTempPath(C c) {
        return toFileName(c) + TEMP_LOG;
    }

    private String toFileName(C c) {
        return this.chunkIdCodec.toFileName(c);
    }

    private C fromFileName(String str) {
        return this.chunkIdCodec.fromFileName(str);
    }

    @Override // io.datakernel.aggregation.AggregationChunkStorage
    public <T> Promise<StreamSupplier<T>> read(AggregationStructure aggregationStructure, List<String> list, Class<T> cls, C c, DefiningClassLoader definingClassLoader) {
        return this.client.download(getPath(c)).whenComplete(this.promiseOpenR.recordStats()).map(channelSupplier -> {
            return ((StreamSupplier) ((StreamSupplier) ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) channelSupplier.transformWith(this.readFile)).transformWith(ChannelLZ4Decompressor.create())).transformWith(this.readDecompress)).transformWith(ChannelDeserializer.create(AggregationUtils.createBinarySerializer(aggregationStructure, cls, aggregationStructure.getKeys(), list, definingClassLoader)))).transformWith(this.detailed ? this.readDeserializeDetailed : this.readDeserialize)).withLateBinding();
        });
    }

    @Override // io.datakernel.aggregation.AggregationChunkStorage
    public <T> Promise<StreamConsumer<T>> write(AggregationStructure aggregationStructure, List<String> list, Class<T> cls, C c, DefiningClassLoader definingClassLoader) {
        return this.client.upload(getTempPath(c)).whenComplete(this.promiseOpenW.recordStats()).map(channelConsumer -> {
            return StreamConsumer.ofSupplier(streamSupplier -> {
                return ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) ((StreamSupplier) streamSupplier.transformWith(this.detailed ? this.writeSerializeDetailed : this.writeSerialize)).transformWith(ChannelSerializer.create(AggregationUtils.createBinarySerializer(aggregationStructure, cls, aggregationStructure.getKeys(), list, definingClassLoader)).withInitialBufferSize(this.bufferSize))).transformWith(this.writeCompress)).transformWith(ChannelLZ4Compressor.createFastCompressor())).transformWith(this.writeChunker)).transformWith(ChannelByteChunker.create(this.bufferSize.map(l -> {
                    return Long.valueOf(l.longValue() / 2);
                }), this.bufferSize.map(l2 -> {
                    return Long.valueOf(l2.longValue() * 2);
                })))).transformWith(this.writeFile)).streamTo(channelConsumer);
            });
        });
    }

    @Override // io.datakernel.aggregation.AggregationChunkStorage
    public Promise<Void> finish(Set<C> set) {
        this.finishChunks = set.size();
        return Promises.all(set.stream().map(obj -> {
            return this.client.move(getTempPath(obj), getPath(obj)).toTry();
        })).whenComplete(this.promiseFinishChunks.recordStats());
    }

    @Override // io.datakernel.aggregation.IdGenerator
    public Promise<C> createId() {
        return this.idGenerator.createId().whenComplete(this.promiseIdGenerator.recordStats());
    }

    public Promise<Void> backup(String str, Set<C> set) {
        String str2 = this.backupDir + File.separator + str + "_tmp";
        return Promises.all(set.stream().map(obj -> {
            return this.client.copy(obj + LOG, str2 + File.separator + obj + LOG);
        })).then(r8 -> {
            return this.client.moveDir(str2, this.backupDir + File.separator + str);
        }).whenComplete(this.promiseBackup.recordStats());
    }

    public Promise<Void> cleanup(Set<C> set) {
        return cleanup(set, null);
    }

    public Promise<Void> cleanup(Set<C> set, @Nullable Instant instant) {
        long epochMilli = instant != null ? instant.toEpochMilli() : -1L;
        RefInt refInt = new RefInt(0);
        RefInt refInt2 = new RefInt(0);
        return this.client.list("*.log").then(list -> {
            return Promises.all(list.stream().filter(fileMetadata -> {
                try {
                    String name = fileMetadata.getName();
                    if (set.contains(fromFileName(name.substring(0, name.length() - LOG.length())))) {
                        return false;
                    }
                    long timestamp = fileMetadata.getTimestamp();
                    if (epochMilli == -1 || timestamp <= epochMilli) {
                        return true;
                    }
                    long j = timestamp - epochMilli;
                    if (!$assertionsDisabled && j <= 0) {
                        throw new AssertionError();
                    }
                    logger.trace("File {} timestamp {} > {}", new Object[]{fileMetadata, Long.valueOf(timestamp), Long.valueOf(epochMilli)});
                    refInt.inc();
                    return false;
                } catch (NumberFormatException e) {
                    this.cleanupWarnings.recordException(e);
                    logger.warn("Invalid chunk filename: " + fileMetadata);
                    return false;
                }
            }).map(fileMetadata2 -> {
                if (logger.isTraceEnabled()) {
                    FileTime fromMillis = FileTime.fromMillis(fileMetadata2.getTimestamp());
                    logger.trace("Delete file: {} with last modifiedTime: {}({} millis)", new Object[]{fileMetadata2.getName(), fromMillis, Long.valueOf(fromMillis.toMillis())});
                }
                refInt2.inc();
                return this.client.delete(fileMetadata2.getName());
            })).whenResult(r8 -> {
                this.cleanupPreservedFiles = set.size();
                this.cleanupDeletedFiles = refInt2.get();
                this.cleanupDeletedFilesTotal += refInt2.get();
                this.cleanupSkippedFiles = refInt.get();
                this.cleanupSkippedFilesTotal += refInt.get();
            });
        }).whenComplete(this.promiseCleanup.recordStats());
    }

    public Promise<Set<Long>> list(Predicate<String> predicate, Predicate<Long> predicate2) {
        return this.client.list("*.log").map(list -> {
            return (Set) list.stream().filter(fileMetadata -> {
                return predicate2.test(Long.valueOf(fileMetadata.getTimestamp()));
            }).map((v0) -> {
                return v0.getName();
            }).filter(predicate).map(str -> {
                return Long.valueOf(Long.parseLong(str.substring(0, str.length() - LOG.length())));
            }).collect(Collectors.toSet());
        }).whenComplete(this.promiseList.recordStats());
    }

    public Promise<Void> checkRequiredChunks(Set<C> set) {
        return list(str -> {
            return true;
        }, l -> {
            return true;
        }).whenResult(set2 -> {
            this.chunksCount.recordValue(set2.size());
        }).then(set3 -> {
            return set3.containsAll(set) ? Promise.of((Void) null) : Promise.ofException(new IllegalStateException("Missed chunks from storage: " + CollectionUtils.toLimitedString(CollectionUtils.difference(set, set3), 100)));
        }).whenComplete(this.promiseCleanupCheckRequiredChunks.recordStats()).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{CollectionUtils.toLimitedString(set, 6)}));
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    @NotNull
    public Promise<Void> start() {
        return this.client.ping();
    }

    @NotNull
    public Promise<Void> stop() {
        return Promise.complete();
    }

    @JmxAttribute
    public PromiseStats getPromiseIdGenerator() {
        return this.promiseIdGenerator;
    }

    @JmxAttribute
    public PromiseStats getPromiseFinishChunks() {
        return this.promiseFinishChunks;
    }

    @JmxAttribute
    public PromiseStats getPromiseBackup() {
        return this.promiseBackup;
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanup() {
        return this.promiseCleanup;
    }

    @JmxAttribute
    public PromiseStats getPromiseList() {
        return this.promiseList;
    }

    @JmxAttribute
    public PromiseStats getPromiseOpenR() {
        return this.promiseOpenR;
    }

    @JmxAttribute
    public PromiseStats getPromiseOpenW() {
        return this.promiseOpenW;
    }

    @JmxAttribute
    public StreamStatsDetailed getReadFile() {
        return this.readFile;
    }

    @JmxAttribute
    public StreamStatsDetailed getReadDecompress() {
        return this.readDecompress;
    }

    @JmxAttribute
    public StreamStatsBasic getReadDeserialize() {
        return this.readDeserialize;
    }

    @JmxAttribute
    public StreamStatsDetailed getReadDeserializeDetailed() {
        return this.readDeserializeDetailed;
    }

    @JmxAttribute
    public StreamStatsBasic getWriteSerialize() {
        return this.writeSerialize;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteSerializeDetailed() {
        return this.writeSerializeDetailed;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteCompress() {
        return this.writeCompress;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteChunker() {
        return this.writeChunker;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteFile() {
        return this.writeFile;
    }

    @JmxAttribute
    public int getFinishChunks() {
        return this.finishChunks;
    }

    @JmxAttribute
    public ExceptionStats getCleanupWarnings() {
        return this.cleanupWarnings;
    }

    @JmxAttribute
    public int getCleanupPreservedFiles() {
        return this.cleanupPreservedFiles;
    }

    @JmxAttribute
    public int getCleanupDeletedFiles() {
        return this.cleanupDeletedFiles;
    }

    @JmxAttribute
    public int getCleanupDeletedFilesTotal() {
        return this.cleanupDeletedFilesTotal;
    }

    @JmxAttribute
    public int getCleanupSkippedFiles() {
        return this.cleanupSkippedFiles;
    }

    @JmxAttribute
    public int getCleanupSkippedFilesTotal() {
        return this.cleanupSkippedFilesTotal;
    }

    @JmxAttribute
    public ValueStats getChunksCount() {
        return this.chunksCount;
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanupCheckRequiredChunks() {
        return this.promiseCleanupCheckRequiredChunks;
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailed = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailed = false;
    }

    public void resetStats() {
        this.cleanupPreservedFiles = 0;
        this.cleanupDeletedFiles = 0;
        this.cleanupDeletedFilesTotal = 0;
        this.cleanupSkippedFiles = 0;
        this.cleanupSkippedFilesTotal = 0;
        ReflectionUtils.resetStats(this);
    }

    static {
        $assertionsDisabled = !RemoteFsChunkStorage.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(RemoteFsChunkStorage.class);
        DEFAULT_BUFFER_SIZE = MemSize.kilobytes(256L);
        DEFAULT_SMOOTHING_WINDOW = Duration.ofMinutes(5L);
    }
}
