package io.activej.cube.service;

import io.activej.async.function.AsyncRunnable;
import io.activej.async.function.AsyncRunnables;
import io.activej.async.util.LogUtils;
import io.activej.common.Utils;
import io.activej.common.builder.AbstractBuilder;
import io.activej.common.function.BiConsumerEx;
import io.activej.common.function.FunctionEx;
import io.activej.cube.aggregation.AggregationChunkStorage;
import io.activej.cube.exception.CubeException;
import io.activej.cube.ot.CubeDiffScheme;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.ot.OTAlgorithms;
import io.activej.ot.OTCommit;
import io.activej.ot.exception.GraphExhaustedException;
import io.activej.ot.reducers.DiffsReducer;
import io.activej.ot.repository.AsyncOTRepository;
import io.activej.ot.system.OTSystem;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.SettableCallback;
import io.activej.promise.jmx.PromiseStats;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import io.activej.reactor.jmx.ReactiveJmxBeanWithStats;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/cube/service/CubeCleanerController.class */
public final class CubeCleanerController<K, D> extends AbstractReactive implements ReactiveJmxBeanWithStats {
    public static final int DEFAULT_SNAPSHOTS_COUNT = 1;
    private final OTSystem<D> otSystem;
    private final AsyncOTRepository<K, D> repository;
    private final AggregationChunkStorage chunksStorage;
    private final CubeDiffScheme<D> cubeDiffScheme;
    private Duration freezeTimeout;
    private Duration chunksCleanupDelay;
    private int extraSnapshotsCount;
    private final PromiseStats promiseCleanup;
    private final PromiseStats promiseCleanupCollectRequiredChunks;
    private final PromiseStats promiseCleanupRepository;
    private final PromiseStats promiseCleanupChunks;
    private final AsyncRunnable cleanup;
    private static final Logger logger = LoggerFactory.getLogger(CubeCleanerController.class);
    public static final Duration DEFAULT_CHUNKS_CLEANUP_DELAY = Duration.ofMinutes(1);
    public static final Duration DEFAULT_SMOOTHING_WINDOW = Duration.ofMinutes(5);

    /* loaded from: input_file:io/activej/cube/service/CubeCleanerController$Builder.class */
    public final class Builder extends AbstractBuilder<CubeCleanerController<K, D>.Builder, CubeCleanerController<K, D>> {
        private Builder() {
        }

        public CubeCleanerController<K, D>.Builder withChunksCleanupDelay(Duration duration) {
            checkNotBuilt(this);
            CubeCleanerController.this.chunksCleanupDelay = duration;
            return this;
        }

        public CubeCleanerController<K, D>.Builder withExtraSnapshotsCount(int i) {
            checkNotBuilt(this);
            CubeCleanerController.this.extraSnapshotsCount = i;
            return this;
        }

        public CubeCleanerController<K, D>.Builder withFreezeTimeout(Duration duration) {
            checkNotBuilt(this);
            CubeCleanerController.this.freezeTimeout = duration;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doBuild, reason: merged with bridge method [inline-methods] */
        public CubeCleanerController<K, D> m63doBuild() {
            return CubeCleanerController.this;
        }
    }

    /* loaded from: input_file:io/activej/cube/service/CubeCleanerController$Tuple.class */
    public static final class Tuple<K, D> extends Record {
        private final Set<Long> collectedChunks;
        private final OTCommit<K, D> lastSnapshot;

        public Tuple(Set<Long> set, OTCommit<K, D> oTCommit) {
            this.collectedChunks = set;
            this.lastSnapshot = oTCommit;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, Tuple.class), Tuple.class, "collectedChunks;lastSnapshot", "FIELD:Lio/activej/cube/service/CubeCleanerController$Tuple;->collectedChunks:Ljava/util/Set;", "FIELD:Lio/activej/cube/service/CubeCleanerController$Tuple;->lastSnapshot:Lio/activej/ot/OTCommit;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, Tuple.class), Tuple.class, "collectedChunks;lastSnapshot", "FIELD:Lio/activej/cube/service/CubeCleanerController$Tuple;->collectedChunks:Ljava/util/Set;", "FIELD:Lio/activej/cube/service/CubeCleanerController$Tuple;->lastSnapshot:Lio/activej/ot/OTCommit;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, Tuple.class, Object.class), Tuple.class, "collectedChunks;lastSnapshot", "FIELD:Lio/activej/cube/service/CubeCleanerController$Tuple;->collectedChunks:Ljava/util/Set;", "FIELD:Lio/activej/cube/service/CubeCleanerController$Tuple;->lastSnapshot:Lio/activej/ot/OTCommit;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Set<Long> collectedChunks() {
            return this.collectedChunks;
        }

        public OTCommit<K, D> lastSnapshot() {
            return this.lastSnapshot;
        }
    }

    private CubeCleanerController(Reactor reactor, CubeDiffScheme<D> cubeDiffScheme, AsyncOTRepository<K, D> asyncOTRepository, OTSystem<D> oTSystem, AggregationChunkStorage aggregationChunkStorage) {
        super(reactor);
        this.chunksCleanupDelay = DEFAULT_CHUNKS_CLEANUP_DELAY;
        this.extraSnapshotsCount = 1;
        this.promiseCleanup = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.promiseCleanupCollectRequiredChunks = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.promiseCleanupRepository = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.promiseCleanupChunks = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.cleanup = AsyncRunnables.reuse(this::doCleanup);
        this.cubeDiffScheme = cubeDiffScheme;
        this.otSystem = oTSystem;
        this.repository = asyncOTRepository;
        this.chunksStorage = aggregationChunkStorage;
    }

    public static <K, D> CubeCleanerController<K, D> create(Reactor reactor, CubeDiffScheme<D> cubeDiffScheme, AsyncOTRepository<K, D> asyncOTRepository, OTSystem<D> oTSystem, AggregationChunkStorage aggregationChunkStorage) {
        return (CubeCleanerController) builder(reactor, cubeDiffScheme, asyncOTRepository, oTSystem, aggregationChunkStorage).build();
    }

    public static <K, D> CubeCleanerController<K, D>.Builder builder(Reactor reactor, CubeDiffScheme<D> cubeDiffScheme, AsyncOTRepository<K, D> asyncOTRepository, OTSystem<D> oTSystem, AggregationChunkStorage aggregationChunkStorage) {
        return new Builder();
    }

    public Promise<Void> cleanup() {
        Reactive.checkInReactorThread(this);
        return this.cleanup.run();
    }

    private Promise<Void> doCleanup() {
        return this.repository.getHeads().then(set -> {
            return OTAlgorithms.excludeParents(this.repository, this.otSystem, set);
        }).mapException(exc -> {
            return exc instanceof GraphExhaustedException ? exc : new CubeException("Failed to get heads", exc);
        }).then(set2 -> {
            return findFrozenCut(set2, this.reactor.currentInstant().minus((TemporalAmount) this.freezeTimeout));
        }).then(this::cleanupFrozenCut).then((r3, exc2) -> {
            return exc2 instanceof GraphExhaustedException ? Promise.of((Object) null) : Promise.of(r3, exc2);
        }).whenComplete(this.promiseCleanup.recordStats()).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[0]));
    }

    private Promise<Set<K>> findFrozenCut(Set<K> set, Instant instant) {
        return OTAlgorithms.findCut(this.repository, this.otSystem, set, collection -> {
            return collection.stream().allMatch(oTCommit -> {
                return oTCommit.getInstant().compareTo(instant) < 0;
            });
        }).mapException(exc -> {
            return exc instanceof GraphExhaustedException ? exc : new CubeException("Failed to find frozen cut, freeze timestamp: " + instant, exc);
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{set, instant}));
    }

    private Promise<Void> cleanupFrozenCut(Set<K> set) {
        return OTAlgorithms.findAllCommonParents(this.repository, this.otSystem, set).then(set2 -> {
            return OTAlgorithms.findAnyCommonParent(this.repository, this.otSystem, set2);
        }).then(obj -> {
            return this.repository.hasSnapshot(obj).then(bool -> {
                if (!bool.booleanValue()) {
                    return trySaveSnapshotAndCleanupChunks(obj);
                }
                logger.info("Snapshot already exists, skip cleanup");
                return Promise.complete();
            });
        }).mapException(exc -> {
            return exc instanceof GraphExhaustedException ? exc : new CubeException("Failed to cleanup frozen cut: " + Utils.toString(set), exc);
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{set}));
    }

    private Promise<Void> trySaveSnapshotAndCleanupChunks(K k) {
        return OTAlgorithms.checkout(this.repository, this.otSystem, k).then(list -> {
            return this.repository.saveSnapshot(k, list).then(() -> {
                return findSnapshot(Set.of(k), this.extraSnapshotsCount);
            }).then(optional -> {
                if (!optional.isEmpty()) {
                    return Promises.toTuple(Tuple::new, collectRequiredChunks(k), this.repository.loadCommit(optional.get())).then(tuple -> {
                        return cleanup(optional.get(), Utils.union(io.activej.cube.Utils.chunksInDiffs(this.cubeDiffScheme, list), tuple.collectedChunks), tuple.lastSnapshot.getInstant().minus((TemporalAmount) this.chunksCleanupDelay));
                    });
                }
                logger.info("Not enough snapshots, skip cleanup");
                return Promise.complete();
            });
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{k}));
    }

    private Promise<Optional<K>> findSnapshot(Set<K> set, int i) {
        return Promise.ofCallback(settableCallback -> {
            findSnapshotImpl(set, i, settableCallback);
        });
    }

    private void findSnapshotImpl(Set<K> set, int i, SettableCallback<Optional<K>> settableCallback) {
        Promise whenResult = OTAlgorithms.findParent(this.repository, this.otSystem, set, DiffsReducer.toVoid(), oTCommit -> {
            return this.repository.hasSnapshot(oTCommit.getId());
        }).async().whenResult(findResult -> {
            if (i <= 0) {
                settableCallback.set(Optional.of(findResult.getCommit()));
            } else if (findResult.getCommitParents().isEmpty()) {
                settableCallback.set(Optional.empty());
            } else {
                findSnapshotImpl(findResult.getCommitParents(), i - 1, settableCallback);
            }
        });
        Objects.requireNonNull(settableCallback);
        whenResult.whenException(settableCallback::setException);
    }

    private Promise<Set<Long>> collectRequiredChunks(K k) {
        return this.repository.getHeads().then(set -> {
            return OTAlgorithms.reduceEdges(this.repository, this.otSystem, set, k, DiffsReducer.of(new HashSet(), (set, list) -> {
                return Utils.union(set, io.activej.cube.Utils.chunksInDiffs(this.cubeDiffScheme, list));
            }, Utils::union)).whenComplete(this.promiseCleanupCollectRequiredChunks.recordStats());
        }).map(map -> {
            return (Set) map.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toSet());
        }).whenComplete(transform((v0) -> {
            return v0.size();
        }, LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{k})));
    }

    private Promise<Void> cleanup(K k, Set<Long> set, Instant instant) {
        return this.chunksStorage.checkRequiredChunks(set).then(() -> {
            return this.repository.cleanup(k).whenComplete(this.promiseCleanupRepository.recordStats());
        }).then(() -> {
            return this.chunksStorage.cleanup(set, instant).whenComplete(this.promiseCleanupChunks.recordStats());
        }).whenComplete(logger.isTraceEnabled() ? LogUtils.toLogger(logger, LogUtils.Level.TRACE, LogUtils.thisMethod(), new Object[]{k, instant, set}) : LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{k, instant, Utils.toString(set)}));
    }

    @JmxAttribute
    public Duration getChunksCleanupDelay() {
        return this.chunksCleanupDelay;
    }

    @JmxAttribute
    public void setChunksCleanupDelay(Duration duration) {
        this.chunksCleanupDelay = duration;
    }

    @JmxAttribute
    public int getExtraSnapshotsCount() {
        return this.extraSnapshotsCount;
    }

    @JmxAttribute
    public void setExtraSnapshotsCount(int i) {
        this.extraSnapshotsCount = i;
    }

    @JmxAttribute
    public Duration getFreezeTimeout() {
        return this.freezeTimeout;
    }

    @JmxAttribute
    public void setFreezeTimeout(Duration duration) {
        this.freezeTimeout = duration;
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanup() {
        return this.promiseCleanup;
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanupCollectRequiredChunks() {
        return this.promiseCleanupCollectRequiredChunks;
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanupRepository() {
        return this.promiseCleanupRepository;
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanupChunks() {
        return this.promiseCleanupChunks;
    }

    @JmxOperation
    public void cleanupNow() {
        cleanup();
    }

    private static <T, R> BiConsumerEx<R, Exception> transform(FunctionEx<? super R, ? extends T> functionEx, BiConsumerEx<? super T, Exception> biConsumerEx) {
        return (obj, exc) -> {
            biConsumerEx.accept(obj != null ? functionEx.apply(obj) : null, exc);
        };
    }
}
