package io.activej.cube.service;

import io.activej.aggregation.AggregationChunkStorage;
import io.activej.async.function.AsyncRunnable;
import io.activej.async.function.AsyncRunnables;
import io.activej.async.util.LogUtils;
import io.activej.common.Utils;
import io.activej.cube.exception.CubeException;
import io.activej.cube.ot.CubeDiffScheme;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.ot.OTAlgorithms;
import io.activej.ot.OTCommit;
import io.activej.ot.repository.AsyncOTRepository;
import io.activej.ot.system.OTSystem;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.jmx.PromiseStats;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import io.activej.reactor.jmx.ReactiveJmxBeanWithStats;
import java.time.Duration;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/cube/service/CubeBackupController.class */
public final class CubeBackupController<K, D, C> extends AbstractReactive implements ReactiveJmxBeanWithStats {
    private static final Logger logger = LoggerFactory.getLogger(CubeBackupController.class);
    public static final Duration DEFAULT_SMOOTHING_WINDOW = Duration.ofMinutes(5);
    private final OTSystem<D> otSystem;
    private final AsyncOTRepository<K, D> repository;
    private final AggregationChunkStorage<C> storage;
    private final CubeDiffScheme<D> cubeDiffScheme;
    private final PromiseStats promiseBackup;
    private final PromiseStats promiseBackupDb;
    private final PromiseStats promiseBackupChunks;
    private final AsyncRunnable backup;

    CubeBackupController(Reactor reactor, CubeDiffScheme<D> cubeDiffScheme, AsyncOTRepository<K, D> asyncOTRepository, OTSystem<D> oTSystem, AggregationChunkStorage<C> aggregationChunkStorage) {
        super(reactor);
        this.promiseBackup = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.promiseBackupDb = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.promiseBackupChunks = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.backup = AsyncRunnables.reuse(this::backupHead);
        this.cubeDiffScheme = cubeDiffScheme;
        this.otSystem = oTSystem;
        this.repository = asyncOTRepository;
        this.storage = aggregationChunkStorage;
    }

    public static <K, D, C> CubeBackupController<K, D, C> create(Reactor reactor, CubeDiffScheme<D> cubeDiffScheme, AsyncOTRepository<K, D> asyncOTRepository, OTSystem<D> oTSystem, AggregationChunkStorage<C> aggregationChunkStorage) {
        return new CubeBackupController<>(reactor, cubeDiffScheme, asyncOTRepository, oTSystem, aggregationChunkStorage);
    }

    public Promise<Void> backup() {
        Reactive.checkInReactorThread(this);
        return this.backup.run();
    }

    public Promise<Void> backupHead() {
        Reactive.checkInReactorThread(this);
        return this.repository.getHeads().mapException(exc -> {
            return new CubeException("Failed to get heads", exc);
        }).whenResult(set -> {
            if (set.isEmpty()) {
                throw new CubeException("Heads are empty");
            }
        }).then(set2 -> {
            return backup(Utils.first(set2));
        }).whenComplete(this.promiseBackup.recordStats()).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[0]));
    }

    public Promise<Void> backup(K k) {
        Reactive.checkInReactorThread(this);
        return Promises.toTuple(this.repository.loadCommit(k), OTAlgorithms.checkout(this.repository, this.otSystem, k)).mapException(exc -> {
            return new CubeException("Failed to check out commit '" + k + "'", exc);
        }).then(tuple2 -> {
            return Promises.sequence(() -> {
                return backupChunks(k, io.activej.cube.Utils.chunksInDiffs(this.cubeDiffScheme, (List) tuple2.value2()));
            }, () -> {
                return backupDb((OTCommit) tuple2.value1(), (List) tuple2.value2());
            });
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{k}));
    }

    private Promise<Void> backupChunks(K k, Set<C> set) {
        return this.storage.backup(String.valueOf(k), set).mapException(exc -> {
            return new CubeException("Failed to backup chunks on storage: " + this.storage, exc);
        }).whenComplete(this.promiseBackupChunks.recordStats()).whenComplete(logger.isTraceEnabled() ? LogUtils.toLogger(logger, LogUtils.Level.TRACE, LogUtils.thisMethod(), new Object[]{set}) : LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{Utils.toString(set)}));
    }

    private Promise<Void> backupDb(OTCommit<K, D> oTCommit, List<D> list) {
        return this.repository.backup(oTCommit, list).mapException(exc -> {
            return new CubeException("Failed to backup chunks in repository: " + this.repository, exc);
        }).whenComplete(this.promiseBackupDb.recordStats()).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{oTCommit, list}));
    }

    @JmxOperation
    public void backupNow() {
        backup();
    }

    @JmxAttribute
    public PromiseStats getPromiseBackup() {
        return this.promiseBackup;
    }

    @JmxAttribute
    public PromiseStats getPromiseBackupDb() {
        return this.promiseBackupDb;
    }

    @JmxAttribute
    public PromiseStats getPromiseBackupChunks() {
        return this.promiseBackupChunks;
    }
}
