package io.activej.cube.service;

import io.activej.aggregation.AggregationChunk;
import io.activej.aggregation.AggregationChunkStorage;
import io.activej.aggregation.util.Utils;
import io.activej.async.function.AsyncPredicate;
import io.activej.async.function.AsyncSupplier;
import io.activej.async.function.AsyncSuppliers;
import io.activej.async.util.LogUtils;
import io.activej.cube.Cube;
import io.activej.cube.exception.CubeException;
import io.activej.cube.ot.CubeDiff;
import io.activej.etl.LogDiff;
import io.activej.etl.LogOTProcessor;
import io.activej.etl.LogOTState;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.jmx.EventloopJmxBeanEx;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.jmx.stats.ValueStats;
import io.activej.ot.OTStateManager;
import io.activej.promise.CompleteNullPromise;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.jmx.PromiseStats;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/cube/service/CubeLogProcessorController.class */
public final class CubeLogProcessorController<K, C> implements EventloopJmxBeanEx {
    private static final Logger logger = LoggerFactory.getLogger(CubeLogProcessorController.class);
    public static final Duration DEFAULT_SMOOTHING_WINDOW = Duration.ofMinutes(5);
    private final Eventloop eventloop;
    private final List<LogOTProcessor<?, CubeDiff>> logProcessors;
    private final AggregationChunkStorage<C> chunkStorage;
    private final OTStateManager<K, LogDiff<CubeDiff>> stateManager;
    private final AsyncPredicate<K> predicate;
    private boolean parallelRunner;
    private final PromiseStats promiseProcessLogs = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseProcessLogsImpl = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final ValueStats addedChunks = ValueStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final ValueStats addedChunksRecords = ValueStats.create(DEFAULT_SMOOTHING_WINDOW).withRate();
    private final AsyncSupplier<Boolean> processLogs = AsyncSuppliers.coalesce(this::doProcessLogs);

    CubeLogProcessorController(Eventloop eventloop, List<LogOTProcessor<?, CubeDiff>> list, AggregationChunkStorage<C> aggregationChunkStorage, OTStateManager<K, LogDiff<CubeDiff>> oTStateManager, AsyncPredicate<K> asyncPredicate) {
        this.eventloop = eventloop;
        this.logProcessors = list;
        this.chunkStorage = aggregationChunkStorage;
        this.stateManager = oTStateManager;
        this.predicate = asyncPredicate;
    }

    public static <K, C> CubeLogProcessorController<K, C> create(Eventloop eventloop, LogOTState<CubeDiff> logOTState, OTStateManager<K, LogDiff<CubeDiff>> oTStateManager, AggregationChunkStorage<C> aggregationChunkStorage, List<LogOTProcessor<?, CubeDiff>> list) {
        Cube cube = (Cube) logOTState.getDataState();
        return new CubeLogProcessorController<>(eventloop, list, aggregationChunkStorage, oTStateManager, AsyncPredicate.of(obj -> {
            if (!cube.containsExcessiveNumberOfOverlappingChunks()) {
                return true;
            }
            logger.info("Cube contains excessive number of overlapping chunks");
            return false;
        }));
    }

    public CubeLogProcessorController<K, C> withParallelRunner(boolean z) {
        this.parallelRunner = z;
        return this;
    }

    public Promise<Boolean> processLogs() {
        return this.processLogs.get();
    }

    Promise<Boolean> doProcessLogs() {
        return process().whenComplete(this.promiseProcessLogs.recordStats()).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{this.stateManager}));
    }

    Promise<Boolean> process() {
        CompleteNullPromise complete = Promise.complete();
        OTStateManager<K, LogDiff<CubeDiff>> oTStateManager = this.stateManager;
        Objects.requireNonNull(oTStateManager);
        return complete.then(oTStateManager::sync).thenEx(Utils.wrapException(th -> {
            return new CubeException("Failed to synchronize state prior to log processing", th);
        })).map(r3 -> {
            return this.stateManager.getCommitId();
        }).then(obj -> {
            return this.predicate.test(obj).thenEx(Utils.wrapException(th2 -> {
                return new CubeException("Failed to test commit '" + obj + "' with predicate", th2);
            }));
        }).then(bool -> {
            if (!bool.booleanValue()) {
                return Promise.of(false);
            }
            logger.info("Pull to commit: {}, start log processing", this.stateManager.getCommitId());
            List list = (List) this.logProcessors.stream().map(logOTProcessor -> {
                Objects.requireNonNull(logOTProcessor);
                return AsyncSupplier.of(logOTProcessor::processLog);
            }).collect(Collectors.toList());
            return (this.parallelRunner ? Promises.toList(list.stream().map((v0) -> {
                return v0.get();
            })) : Promises.reduce(Collectors.toList(), 1, Promises.asPromises(list))).thenEx(Utils.wrapException(th2 -> {
                return new CubeException("Failed to process logs", th2);
            })).whenComplete(this.promiseProcessLogsImpl.recordStats()).whenResult(this::cubeDiffJmx).then(list2 -> {
                return Promise.complete().then(() -> {
                    return this.chunkStorage.finish(addedChunks(list2));
                }).thenEx(Utils.wrapException(th3 -> {
                    return new CubeException("Failed to finalize chunks in storage", th3);
                })).whenResult(() -> {
                    this.stateManager.addAll(list2);
                }).then(() -> {
                    return this.stateManager.sync().thenEx(Utils.wrapException(th4 -> {
                        return new CubeException("Failed to synchronize state after log processing, resetting", th4);
                    }));
                }).whenException(th4 -> {
                    this.stateManager.reset();
                }).map(r2 -> {
                    return true;
                });
            });
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{this.stateManager}));
    }

    private void cubeDiffJmx(List<LogDiff<CubeDiff>> list) {
        long j = 0;
        long j2 = 0;
        Iterator<LogDiff<CubeDiff>> it = list.iterator();
        while (it.hasNext()) {
            for (CubeDiff cubeDiff : it.next().getDiffs()) {
                Iterator<String> it2 = cubeDiff.keySet().iterator();
                while (it2.hasNext()) {
                    j += r0.getAddedChunks().size();
                    while (cubeDiff.get(it2.next()).getAddedChunks().iterator().hasNext()) {
                        j2 += ((AggregationChunk) r0.next()).getCount();
                    }
                }
            }
        }
        this.addedChunks.recordValue(j);
        this.addedChunksRecords.recordValue(j2);
    }

    private Set<C> addedChunks(List<LogDiff<CubeDiff>> list) {
        return (Set) list.stream().flatMap((v0) -> {
            return v0.diffs();
        }).flatMap((v0) -> {
            return v0.addedChunks();
        }).map(obj -> {
            return obj;
        }).collect(Collectors.toSet());
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    @JmxAttribute
    public ValueStats getLastAddedChunks() {
        return this.addedChunks;
    }

    @JmxAttribute
    public ValueStats getLastAddedChunksRecords() {
        return this.addedChunksRecords;
    }

    @JmxAttribute
    public PromiseStats getPromiseProcessLogs() {
        return this.promiseProcessLogs;
    }

    @JmxAttribute
    public PromiseStats getPromiseProcessLogsImpl() {
        return this.promiseProcessLogsImpl;
    }

    @JmxAttribute
    public boolean isParallelRunner() {
        return this.parallelRunner;
    }

    @JmxAttribute
    public void setParallelRunner(boolean z) {
        this.parallelRunner = z;
    }

    @JmxOperation
    public void processLogsNow() {
        processLogs();
    }
}
