/*
 * Decompiled with CFR 0.152.
 */
package io.activej.cube.service;

import io.activej.aggregation.Aggregation;
import io.activej.aggregation.AggregationChunk;
import io.activej.aggregation.AggregationChunkStorage;
import io.activej.aggregation.ot.AggregationDiff;
import io.activej.async.function.AsyncSupplier;
import io.activej.async.function.AsyncSuppliers;
import io.activej.async.util.LogUtils;
import io.activej.cube.Cube;
import io.activej.cube.ot.CubeDiff;
import io.activej.cube.ot.CubeDiffScheme;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.jmx.EventloopJmxBeanEx;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.jmx.stats.ValueStats;
import io.activej.ot.OTStateManager;
import io.activej.promise.Promise;
import io.activej.promise.jmx.PromiseStats;
import java.time.Duration;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class CubeConsolidationController<K, D, C>
implements EventloopJmxBeanEx {
    private static final Logger logger = LoggerFactory.getLogger(CubeConsolidationController.class);
    public static final Supplier<Function<Aggregation, Promise<AggregationDiff>>> DEFAULT_STRATEGY = new Supplier<Function<Aggregation, Promise<AggregationDiff>>>(){
        private boolean hotSegment = false;

        @Override
        public Function<Aggregation, Promise<AggregationDiff>> get() {
            this.hotSegment = !this.hotSegment;
            return this.hotSegment ? Aggregation::consolidateHotSegment : Aggregation::consolidateMinKey;
        }
    };
    public static final Duration DEFAULT_SMOOTHING_WINDOW = Duration.ofMinutes(5L);
    private final Eventloop eventloop;
    private final CubeDiffScheme<D> cubeDiffScheme;
    private final Cube cube;
    private final OTStateManager<K, D> stateManager;
    private final AggregationChunkStorage<C> aggregationChunkStorage;
    private final Supplier<Function<Aggregation, Promise<AggregationDiff>>> strategy;
    private final PromiseStats promiseConsolidate = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseConsolidateImpl = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final ValueStats removedChunks = ValueStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final ValueStats removedChunksRecords = ValueStats.create((Duration)DEFAULT_SMOOTHING_WINDOW).withRate();
    private final ValueStats addedChunks = ValueStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final ValueStats addedChunksRecords = ValueStats.create((Duration)DEFAULT_SMOOTHING_WINDOW).withRate();
    private final AsyncSupplier<Void> consolidate = AsyncSuppliers.reuse(this::doConsolidate);

    CubeConsolidationController(Eventloop eventloop, CubeDiffScheme<D> cubeDiffScheme, Cube cube, OTStateManager<K, D> stateManager, AggregationChunkStorage<C> aggregationChunkStorage, Supplier<Function<Aggregation, Promise<AggregationDiff>>> strategy) {
        this.eventloop = eventloop;
        this.cubeDiffScheme = cubeDiffScheme;
        this.cube = cube;
        this.stateManager = stateManager;
        this.aggregationChunkStorage = aggregationChunkStorage;
        this.strategy = strategy;
    }

    public static <K, D, C> CubeConsolidationController<K, D, C> create(Eventloop eventloop, CubeDiffScheme<D> cubeDiffScheme, Cube cube, OTStateManager<K, D> stateManager, AggregationChunkStorage<C> aggregationChunkStorage) {
        return new CubeConsolidationController<K, D, C>(eventloop, cubeDiffScheme, cube, stateManager, aggregationChunkStorage, DEFAULT_STRATEGY);
    }

    public CubeConsolidationController<K, D, C> withStrategy(Supplier<Function<Aggregation, Promise<AggregationDiff>>> strategy) {
        return new CubeConsolidationController<K, D, C>(this.eventloop, this.cubeDiffScheme, this.cube, this.stateManager, this.aggregationChunkStorage, strategy);
    }

    public Promise<Void> consolidate() {
        return this.consolidate.get();
    }

    Promise<Void> doConsolidate() {
        return Promise.complete().then(() -> this.stateManager.sync()).then(() -> this.cube.consolidate(this.strategy.get()).whenComplete(this.promiseConsolidateImpl.recordStats())).whenResult(this::cubeDiffJmx).whenComplete(this::logCubeDiff).then(cubeDiff -> {
            if (cubeDiff.isEmpty()) {
                return Promise.complete();
            }
            return Promise.complete().then(() -> this.aggregationChunkStorage.finish(CubeConsolidationController.addedChunks(cubeDiff))).whenResult(() -> this.stateManager.add(this.cubeDiffScheme.wrap((CubeDiff)cubeDiff))).then(() -> this.stateManager.sync()).whenException(e -> this.stateManager.reset()).whenComplete(LogUtils.toLogger((Logger)logger, (String)LogUtils.thisMethod(), (Object[])new Object[]{cubeDiff}));
        }).whenComplete(this.promiseConsolidate.recordStats()).whenComplete(LogUtils.toLogger((Logger)logger, (String)LogUtils.thisMethod(), (Object[])new Object[]{this.stateManager}));
    }

    private void cubeDiffJmx(CubeDiff cubeDiff) {
        long curAddedChunks = 0L;
        long curAddedChunksRecords = 0L;
        long curRemovedChunks = 0L;
        long curRemovedChunksRecords = 0L;
        for (String key : cubeDiff.keySet()) {
            AggregationDiff aggregationDiff = cubeDiff.get(key);
            curAddedChunks += (long)aggregationDiff.getAddedChunks().size();
            for (AggregationChunk aggregationChunk : aggregationDiff.getAddedChunks()) {
                curAddedChunksRecords += (long)aggregationChunk.getCount();
            }
            curRemovedChunks += (long)aggregationDiff.getRemovedChunks().size();
            for (AggregationChunk aggregationChunk : aggregationDiff.getRemovedChunks()) {
                curRemovedChunksRecords += (long)aggregationChunk.getCount();
            }
        }
        this.addedChunks.recordValue((double)curAddedChunks);
        this.addedChunksRecords.recordValue((double)curAddedChunksRecords);
        this.removedChunks.recordValue((double)curRemovedChunks);
        this.removedChunksRecords.recordValue((double)curRemovedChunksRecords);
    }

    private static <C> Set<C> addedChunks(CubeDiff cubeDiff) {
        return cubeDiff.addedChunks().map(id -> id).collect(Collectors.toSet());
    }

    private void logCubeDiff(CubeDiff cubeDiff, Throwable e) {
        if (e != null) {
            logger.warn("Consolidation failed", e);
        } else if (cubeDiff.isEmpty()) {
            logger.info("Previous consolidation did not merge any chunks");
        } else {
            logger.info("Consolidation finished. Launching consolidation task again.");
        }
    }

    @JmxAttribute
    public ValueStats getRemovedChunks() {
        return this.removedChunks;
    }

    @JmxAttribute
    public ValueStats getAddedChunks() {
        return this.addedChunks;
    }

    @JmxAttribute
    public ValueStats getRemovedChunksRecords() {
        return this.removedChunksRecords;
    }

    @JmxAttribute
    public ValueStats getAddedChunksRecords() {
        return this.addedChunksRecords;
    }

    @JmxAttribute
    public PromiseStats getPromiseConsolidate() {
        return this.promiseConsolidate;
    }

    @JmxAttribute
    public PromiseStats getPromiseConsolidateImpl() {
        return this.promiseConsolidateImpl;
    }

    @JmxOperation
    public void consolidateNow() {
        this.consolidate();
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }
}

