/*
 * Decompiled with CFR 0.152.
 */
package io.activej.cube.service;

import io.activej.aggregation.ActiveFsChunkStorage;
import io.activej.async.callback.Callback;
import io.activej.async.function.AsyncSupplier;
import io.activej.async.function.AsyncSuppliers;
import io.activej.async.util.LogUtils;
import io.activej.common.collection.CollectionUtils;
import io.activej.cube.Utils;
import io.activej.cube.ot.CubeDiffScheme;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.jmx.EventloopJmxBeanEx;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.ot.OTAlgorithms;
import io.activej.ot.OTCommit;
import io.activej.ot.reducers.DiffsReducer;
import io.activej.ot.repository.OTRepositoryEx;
import io.activej.ot.system.OTSystem;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.SettablePromise;
import io.activej.promise.jmx.PromiseStats;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class CubeCleanerController<K, D, C>
implements EventloopJmxBeanEx {
    private static final Logger logger = LoggerFactory.getLogger(CubeCleanerController.class);
    public static final Duration DEFAULT_CHUNKS_CLEANUP_DELAY = Duration.ofMinutes(1L);
    public static final int DEFAULT_SNAPSHOTS_COUNT = 1;
    public static final Duration DEFAULT_SMOOTHING_WINDOW = Duration.ofMinutes(5L);
    private final Eventloop eventloop;
    private final OTSystem<D> otSystem;
    private final OTRepositoryEx<K, D> repository;
    private final ActiveFsChunkStorage<C> chunksStorage;
    private final CubeDiffScheme<D> cubeDiffScheme;
    private Duration freezeTimeout;
    private Duration chunksCleanupDelay = DEFAULT_CHUNKS_CLEANUP_DELAY;
    private int extraSnapshotsCount = 1;
    private final PromiseStats promiseCleanup = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseCleanupCollectRequiredChunks = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseCleanupRepository = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseCleanupChunks = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final AsyncSupplier<Void> cleanup = AsyncSuppliers.reuse(this::doCleanup);

    CubeCleanerController(Eventloop eventloop, CubeDiffScheme<D> cubeDiffScheme, OTRepositoryEx<K, D> repository, OTSystem<D> otSystem, ActiveFsChunkStorage<C> chunksStorage) {
        this.eventloop = eventloop;
        this.cubeDiffScheme = cubeDiffScheme;
        this.otSystem = otSystem;
        this.repository = repository;
        this.chunksStorage = chunksStorage;
    }

    public static <K, D, C> CubeCleanerController<K, D, C> create(Eventloop eventloop, CubeDiffScheme<D> cubeDiffScheme, OTRepositoryEx<K, D> repository, OTSystem<D> otSystem, ActiveFsChunkStorage<C> storage) {
        return new CubeCleanerController<K, D, C>(eventloop, cubeDiffScheme, repository, otSystem, storage);
    }

    public CubeCleanerController<K, D, C> withChunksCleanupDelay(Duration chunksCleanupDelay) {
        this.chunksCleanupDelay = chunksCleanupDelay;
        return this;
    }

    public CubeCleanerController<K, D, C> withExtraSnapshotsCount(int extraSnapshotsCount) {
        this.extraSnapshotsCount = extraSnapshotsCount;
        return this;
    }

    public CubeCleanerController<K, D, C> withFreezeTimeout(Duration freezeTimeout) {
        this.freezeTimeout = freezeTimeout;
        return this;
    }

    public Promise<Void> cleanup() {
        return this.cleanup.get();
    }

    Promise<Void> doCleanup() {
        return this.repository.getHeads().then(heads -> OTAlgorithms.excludeParents(this.repository, this.otSystem, (Set)heads)).then(heads -> this.findFrozenCut((Set<K>)heads, this.eventloop.currentInstant().minus(this.freezeTimeout))).then(this::cleanupFrozenCut).thenEx((v, e) -> {
            if (e == OTAlgorithms.GRAPH_EXHAUSTED) {
                return Promise.of(null);
            }
            return Promise.of((Object)v, (Throwable)e);
        }).whenComplete(this.promiseCleanup.recordStats()).whenComplete(LogUtils.toLogger((Logger)logger, (String)LogUtils.thisMethod(), (Object[])new Object[0]));
    }

    Promise<Set<K>> findFrozenCut(Set<K> heads, Instant freezeTimestamp) {
        return OTAlgorithms.findCut(this.repository, this.otSystem, heads, commits -> commits.stream().allMatch(commit -> commit.getInstant().compareTo(freezeTimestamp) < 0)).whenComplete(LogUtils.toLogger((Logger)logger, (String)LogUtils.thisMethod(), (Object[])new Object[]{heads, freezeTimestamp}));
    }

    Promise<Void> cleanupFrozenCut(Set<K> frozenCut) {
        return OTAlgorithms.findAllCommonParents(this.repository, this.otSystem, frozenCut).then(parents -> OTAlgorithms.findAnyCommonParent(this.repository, this.otSystem, (Set)parents)).then(checkpointNode -> this.repository.hasSnapshot(checkpointNode).then(snapshot -> {
            if (snapshot.booleanValue()) {
                logger.info("Snapshot already exists, skip cleanup");
                return Promise.complete();
            }
            return this.trySaveSnapshotAndCleanupChunks(checkpointNode);
        })).whenComplete(LogUtils.toLogger((Logger)logger, (String)LogUtils.thisMethod(), (Object[])new Object[]{frozenCut}));
    }

    Promise<Void> trySaveSnapshotAndCleanupChunks(K checkpointNode) {
        return OTAlgorithms.checkout(this.repository, this.otSystem, checkpointNode).then(checkpointDiffs -> this.repository.saveSnapshot(checkpointNode, checkpointDiffs).then(() -> this.findSnapshot(Collections.singleton(checkpointNode), this.extraSnapshotsCount)).then(lastSnapshot -> {
            if (lastSnapshot.isPresent()) {
                return Promises.toTuple(Tuple::new, this.collectRequiredChunks(checkpointNode), (Promise)this.repository.loadCommit(lastSnapshot.get())).then(tuple -> this.cleanup(lastSnapshot.get(), CollectionUtils.union(Utils.chunksInDiffs(this.cubeDiffScheme, checkpointDiffs), tuple.collectedChunks), tuple.lastSnapshot.getInstant().minus(this.chunksCleanupDelay)));
            }
            logger.info("Not enough snapshots, skip cleanup");
            return Promise.complete();
        })).whenComplete(LogUtils.toLogger((Logger)logger, (String)LogUtils.thisMethod(), (Object[])new Object[]{checkpointNode}));
    }

    Promise<Optional<K>> findSnapshot(Set<K> heads, int skipSnapshots) {
        return Promise.ofCallback(cb -> this.findSnapshotImpl(heads, skipSnapshots, (SettablePromise<Optional<K>>)cb));
    }

    private void findSnapshotImpl(Set<K> heads, int skipSnapshots, SettablePromise<Optional<K>> cb) {
        OTAlgorithms.findParent(this.repository, this.otSystem, heads, (DiffsReducer)DiffsReducer.toVoid(), commit -> this.repository.hasSnapshot(commit.getId())).async().whenResult(findResult -> {
            if (skipSnapshots <= 0) {
                cb.set(Optional.of(findResult.getCommit()));
            } else if (findResult.getCommitParents().isEmpty()) {
                cb.set(Optional.empty());
            } else {
                this.findSnapshotImpl(findResult.getCommitParents(), skipSnapshots - 1, cb);
            }
        }).whenException(arg_0 -> cb.setException(arg_0));
    }

    private Promise<Set<C>> collectRequiredChunks(K checkpointNode) {
        return this.repository.getHeads().then(heads -> OTAlgorithms.reduceEdges(this.repository, this.otSystem, (Set)heads, (Object)checkpointNode, (DiffsReducer)DiffsReducer.of(new HashSet(), (accumulatedChunks, diffs) -> CollectionUtils.union((Set)accumulatedChunks, Utils.chunksInDiffs(this.cubeDiffScheme, diffs)), CollectionUtils::union)).whenComplete(this.promiseCleanupCollectRequiredChunks.recordStats())).map(accumulators -> accumulators.values().stream().flatMap(Collection::stream).collect(Collectors.toSet())).whenComplete(CubeCleanerController.transform(Set::size, LogUtils.toLogger((Logger)logger, (String)LogUtils.thisMethod(), (Object[])new Object[]{checkpointNode})));
    }

    private Promise<Void> cleanup(K checkpointNode, Set<C> requiredChunks, Instant chunksCleanupTimestamp) {
        return this.chunksStorage.checkRequiredChunks(requiredChunks).then(() -> this.repository.cleanup(checkpointNode).whenComplete(this.promiseCleanupRepository.recordStats())).then(() -> this.chunksStorage.cleanup(requiredChunks, chunksCleanupTimestamp).whenComplete(this.promiseCleanupChunks.recordStats())).whenComplete(logger.isTraceEnabled() ? LogUtils.toLogger((Logger)logger, (LogUtils.Level)LogUtils.Level.TRACE, (String)LogUtils.thisMethod(), (Object[])new Object[]{checkpointNode, chunksCleanupTimestamp, requiredChunks}) : LogUtils.toLogger((Logger)logger, (String)LogUtils.thisMethod(), (Object[])new Object[]{checkpointNode, chunksCleanupTimestamp, CollectionUtils.toLimitedString(requiredChunks, (int)6)}));
    }

    @JmxAttribute
    public Duration getChunksCleanupDelay() {
        return this.chunksCleanupDelay;
    }

    @JmxAttribute
    public void setChunksCleanupDelay(Duration chunksCleanupDelay) {
        this.chunksCleanupDelay = chunksCleanupDelay;
    }

    @JmxAttribute
    public int getExtraSnapshotsCount() {
        return this.extraSnapshotsCount;
    }

    @JmxAttribute
    public void setExtraSnapshotsCount(int extraSnapshotsCount) {
        this.extraSnapshotsCount = extraSnapshotsCount;
    }

    @JmxAttribute
    public Duration getFreezeTimeout() {
        return this.freezeTimeout;
    }

    @JmxAttribute
    public void setFreezeTimeout(Duration freezeTimeout) {
        this.freezeTimeout = freezeTimeout;
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanup() {
        return this.promiseCleanup;
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanupCollectRequiredChunks() {
        return this.promiseCleanupCollectRequiredChunks;
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanupRepository() {
        return this.promiseCleanupRepository;
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanupChunks() {
        return this.promiseCleanupChunks;
    }

    @JmxOperation
    public void cleanupNow() {
        this.cleanup();
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    private static <T, R> Callback<R> transform(Function<? super R, ? extends T> fn, Callback<? super T> toConsumer) {
        return (value, e) -> toConsumer.accept(value != null ? fn.apply((Object)value) : null, e);
    }

    static class Tuple<K, D, C> {
        final Set<C> collectedChunks;
        final OTCommit<K, D> lastSnapshot;

        Tuple(Set<C> collectedChunks, OTCommit<K, D> lastSnapshot) {
            this.collectedChunks = collectedChunks;
            this.lastSnapshot = lastSnapshot;
        }
    }
}

