/*
 * Decompiled with CFR 0.152.
 */
package io.activej.cube.service;

import io.activej.aggregation.AggregationChunk;
import io.activej.aggregation.AggregationChunkStorage;
import io.activej.aggregation.ot.AggregationDiff;
import io.activej.async.function.AsyncPredicate;
import io.activej.async.function.AsyncSupplier;
import io.activej.async.function.AsyncSuppliers;
import io.activej.async.util.LogUtils;
import io.activej.cube.Cube;
import io.activej.cube.ot.CubeDiff;
import io.activej.etl.LogDiff;
import io.activej.etl.LogOTProcessor;
import io.activej.etl.LogOTState;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.jmx.EventloopJmxBeanEx;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.jmx.stats.ValueStats;
import io.activej.ot.OTStateManager;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.jmx.PromiseStats;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class CubeLogProcessorController<K, C>
implements EventloopJmxBeanEx {
    private static final Logger logger = LoggerFactory.getLogger(CubeLogProcessorController.class);
    public static final Duration DEFAULT_SMOOTHING_WINDOW = Duration.ofMinutes(5L);
    private final Eventloop eventloop;
    private final List<LogOTProcessor<?, CubeDiff>> logProcessors;
    private final AggregationChunkStorage<C> chunkStorage;
    private final OTStateManager<K, LogDiff<CubeDiff>> stateManager;
    private final AsyncPredicate<K> predicate;
    private boolean parallelRunner;
    private final PromiseStats promiseProcessLogs = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseProcessLogsImpl = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final ValueStats addedChunks = ValueStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final ValueStats addedChunksRecords = ValueStats.create((Duration)DEFAULT_SMOOTHING_WINDOW).withRate();
    private final AsyncSupplier<Boolean> processLogs = AsyncSuppliers.coalesce(this::doProcessLogs);

    CubeLogProcessorController(Eventloop eventloop, List<LogOTProcessor<?, CubeDiff>> logProcessors, AggregationChunkStorage<C> chunkStorage, OTStateManager<K, LogDiff<CubeDiff>> stateManager, AsyncPredicate<K> predicate) {
        this.eventloop = eventloop;
        this.logProcessors = logProcessors;
        this.chunkStorage = chunkStorage;
        this.stateManager = stateManager;
        this.predicate = predicate;
    }

    public static <K, C> CubeLogProcessorController<K, C> create(Eventloop eventloop, LogOTState<CubeDiff> state, OTStateManager<K, LogDiff<CubeDiff>> stateManager, AggregationChunkStorage<C> chunkStorage, List<LogOTProcessor<?, CubeDiff>> logProcessors) {
        Cube cube = (Cube)state.getDataState();
        AsyncPredicate predicate = AsyncPredicate.of(commitId -> {
            if (cube.containsExcessiveNumberOfOverlappingChunks()) {
                logger.info("Cube contains excessive number of overlapping chunks");
                return false;
            }
            return true;
        });
        return new CubeLogProcessorController<K, C>(eventloop, logProcessors, chunkStorage, stateManager, predicate);
    }

    public CubeLogProcessorController<K, C> withParallelRunner(boolean parallelRunner) {
        this.parallelRunner = parallelRunner;
        return this;
    }

    public Promise<Boolean> processLogs() {
        return this.processLogs.get();
    }

    Promise<Boolean> doProcessLogs() {
        return this.process().whenComplete(this.promiseProcessLogs.recordStats()).whenComplete(LogUtils.toLogger((Logger)logger, (String)LogUtils.thisMethod(), (Object[])new Object[]{this.stateManager}));
    }

    Promise<Boolean> process() {
        return Promise.complete().then(() -> this.stateManager.sync()).map($ -> this.stateManager.getCommitId()).then(arg_0 -> this.predicate.test(arg_0)).then(ok -> {
            if (!ok.booleanValue()) {
                return Promise.of((Object)false);
            }
            logger.info("Pull to commit: {}, start log processing", this.stateManager.getCommitId());
            List tasks = this.logProcessors.stream().map(logProcessor -> AsyncSupplier.of(() -> ((LogOTProcessor)logProcessor).processLog())).collect(Collectors.toList());
            Promise promise = this.parallelRunner ? Promises.toList(tasks.stream().map(AsyncSupplier::get)) : Promises.reduce(Collectors.toList(), (int)1, (Iterator)Promises.asPromises(tasks));
            return promise.whenComplete(this.promiseProcessLogsImpl.recordStats()).whenResult(this::cubeDiffJmx).then(diffs -> Promise.complete().then(() -> this.chunkStorage.finish(this.addedChunks((List<LogDiff<CubeDiff>>)diffs))).whenResult(() -> this.stateManager.addAll(diffs)).then(() -> this.stateManager.sync()).whenException(e -> this.stateManager.reset()).map($ -> true));
        }).whenComplete(LogUtils.toLogger((Logger)logger, (String)LogUtils.thisMethod(), (Object[])new Object[]{this.stateManager}));
    }

    private void cubeDiffJmx(List<LogDiff<CubeDiff>> logDiffs) {
        long curAddedChunks = 0L;
        long curAddedChunksRecords = 0L;
        for (LogDiff<CubeDiff> logDiff : logDiffs) {
            for (CubeDiff cubeDiff : logDiff.getDiffs()) {
                for (String key : cubeDiff.keySet()) {
                    AggregationDiff aggregationDiff = cubeDiff.get(key);
                    curAddedChunks += (long)aggregationDiff.getAddedChunks().size();
                    for (AggregationChunk aggregationChunk : aggregationDiff.getAddedChunks()) {
                        curAddedChunksRecords += (long)aggregationChunk.getCount();
                    }
                }
            }
        }
        this.addedChunks.recordValue((double)curAddedChunks);
        this.addedChunksRecords.recordValue((double)curAddedChunksRecords);
    }

    private Set<C> addedChunks(List<LogDiff<CubeDiff>> diffs) {
        return diffs.stream().flatMap(LogDiff::diffs).flatMap(CubeDiff::addedChunks).map(id -> id).collect(Collectors.toSet());
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    @JmxAttribute
    public ValueStats getLastAddedChunks() {
        return this.addedChunks;
    }

    @JmxAttribute
    public ValueStats getLastAddedChunksRecords() {
        return this.addedChunksRecords;
    }

    @JmxAttribute
    public PromiseStats getPromiseProcessLogs() {
        return this.promiseProcessLogs;
    }

    @JmxAttribute
    public PromiseStats getPromiseProcessLogsImpl() {
        return this.promiseProcessLogsImpl;
    }

    @JmxAttribute
    public boolean isParallelRunner() {
        return this.parallelRunner;
    }

    @JmxAttribute
    public void setParallelRunner(boolean parallelRunner) {
        this.parallelRunner = parallelRunner;
    }

    @JmxOperation
    public void processLogsNow() {
        this.processLogs();
    }
}

