package io.datakernel.cube.service;

import io.datakernel.aggregation.Aggregation;
import io.datakernel.aggregation.AggregationChunk;
import io.datakernel.aggregation.AggregationChunkStorage;
import io.datakernel.aggregation.ot.AggregationDiff;
import io.datakernel.async.AsyncCallable;
import io.datakernel.async.AsyncFunction;
import io.datakernel.async.Stage;
import io.datakernel.cube.Cube;
import io.datakernel.cube.ot.CubeDiff;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.jmx.EventloopJmxMBeanEx;
import io.datakernel.jmx.JmxAttribute;
import io.datakernel.jmx.JmxOperation;
import io.datakernel.jmx.StageStats;
import io.datakernel.jmx.ValueStats;
import io.datakernel.logfs.ot.LogDiff;
import io.datakernel.logfs.ot.LogOTState;
import io.datakernel.ot.OTStateManager;
import io.datakernel.util.LogUtils;
import java.time.Duration;
import java.util.Iterator;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/cube/service/CubeConsolidationController.class */
public final class CubeConsolidationController implements EventloopJmxMBeanEx {
    public static final Supplier<AsyncFunction<Aggregation, AggregationDiff>> DEFAULT_STRATEGY = new Supplier<AsyncFunction<Aggregation, AggregationDiff>>() { // from class: io.datakernel.cube.service.CubeConsolidationController.1
        private boolean hotSegment = false;

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.Supplier
        public AsyncFunction<Aggregation, AggregationDiff> get() {
            boolean z = !this.hotSegment;
            this.hotSegment = z;
            return z ? (v0) -> {
                return v0.consolidateHotSegment();
            } : (v0) -> {
                return v0.consolidateMinKey();
            };
        }
    };
    public static final Duration DEFAULT_SMOOTHING_WINDOW = Duration.ofMinutes(5);
    private final Eventloop eventloop;
    private final Cube cube;
    private final OTStateManager<Integer, LogDiff<CubeDiff>> stateManager;
    private final AggregationChunkStorage aggregationChunkStorage;
    private final Supplier<AsyncFunction<Aggregation, AggregationDiff>> strategy;
    private final Logger logger = LoggerFactory.getLogger(CubeConsolidationController.class);
    private final StageStats stageConsolidate = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StageStats stageConsolidateImpl = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final ValueStats removedChunks = ValueStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final ValueStats removedChunksRecords = ValueStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final ValueStats addedChunks = ValueStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final ValueStats addedChunksRecords = ValueStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final AsyncCallable<Void> consolidate = AsyncCallable.sharedCall(this::doConsolidate);

    CubeConsolidationController(Eventloop eventloop, Cube cube, OTStateManager<Integer, LogDiff<CubeDiff>> oTStateManager, AggregationChunkStorage aggregationChunkStorage, Supplier<AsyncFunction<Aggregation, AggregationDiff>> supplier) {
        this.eventloop = eventloop;
        this.cube = cube;
        this.stateManager = oTStateManager;
        this.aggregationChunkStorage = aggregationChunkStorage;
        this.strategy = supplier;
    }

    public static CubeConsolidationController create(Eventloop eventloop, OTStateManager<Integer, LogDiff<CubeDiff>> oTStateManager, AggregationChunkStorage aggregationChunkStorage) {
        return new CubeConsolidationController(eventloop, (Cube) ((LogOTState) oTStateManager.getState()).getDataState(), oTStateManager, aggregationChunkStorage, DEFAULT_STRATEGY);
    }

    public CubeConsolidationController withStrategy(Supplier<AsyncFunction<Aggregation, AggregationDiff>> supplier) {
        return new CubeConsolidationController(this.eventloop, this.cube, this.stateManager, this.aggregationChunkStorage, supplier);
    }

    public Stage<Void> consolidate() {
        return this.consolidate.call();
    }

    Stage<Void> doConsolidate() {
        Stage thenCompose = this.stateManager.pull().thenCompose(num -> {
            return this.stateManager.getAlgorithms().mergeHeadsAndPush();
        });
        OTStateManager<Integer, LogDiff<CubeDiff>> oTStateManager = this.stateManager;
        oTStateManager.getClass();
        return thenCompose.thenCompose((v1) -> {
            return r1.pull(v1);
        }).thenCompose(bool -> {
            return this.stateManager.pull();
        }).thenCompose(num2 -> {
            return this.cube.consolidate(this.strategy.get()).whenComplete(this.stageConsolidateImpl.recordStats());
        }).whenResult(this::cubeDiffJmx).whenComplete(this::logCubeDiff).thenCompose(this::tryPushConsolidation).whenComplete(this.stageConsolidate.recordStats()).whenComplete(LogUtils.toLogger(this.logger, LogUtils.thisMethod(), new Object[]{this.stateManager}));
    }

    private void cubeDiffJmx(CubeDiff cubeDiff) {
        long j = 0;
        long j2 = 0;
        long j3 = 0;
        long j4 = 0;
        Iterator<String> it = cubeDiff.keySet().iterator();
        while (it.hasNext()) {
            AggregationDiff aggregationDiff = cubeDiff.get(it.next());
            j += aggregationDiff.getAddedChunks().size();
            while (aggregationDiff.getAddedChunks().iterator().hasNext()) {
                j2 += ((AggregationChunk) r0.next()).getCount();
            }
            j3 += aggregationDiff.getRemovedChunks().size();
            while (aggregationDiff.getRemovedChunks().iterator().hasNext()) {
                j4 += ((AggregationChunk) r0.next()).getCount();
            }
        }
        this.addedChunks.recordValue(j);
        this.addedChunksRecords.recordValue(j2);
        this.removedChunks.recordValue(j3);
        this.removedChunksRecords.recordValue(j4);
    }

    private Stage<Void> tryPushConsolidation(CubeDiff cubeDiff) {
        if (cubeDiff.isEmpty()) {
            return Stage.of((Object) null);
        }
        this.stateManager.add(LogDiff.forCurrentPosition(cubeDiff));
        Stage thenCompose = this.stateManager.pull().thenCompose(num -> {
            return this.stateManager.getAlgorithms().mergeHeadsAndPush();
        });
        OTStateManager<Integer, LogDiff<CubeDiff>> oTStateManager = this.stateManager;
        oTStateManager.getClass();
        return thenCompose.thenCompose((v1) -> {
            return r1.pull(v1);
        }).thenCompose(bool -> {
            return this.stateManager.pull();
        }).thenCompose(num2 -> {
            return this.stateManager.commit();
        }).thenCompose(num3 -> {
            return this.aggregationChunkStorage.finish(addedChunks(cubeDiff));
        }).thenCompose(r3 -> {
            return this.stateManager.push();
        }).whenComplete(LogUtils.toLogger(this.logger, LogUtils.thisMethod(), new Object[]{cubeDiff}));
    }

    private static Set<Long> addedChunks(CubeDiff cubeDiff) {
        return (Set) cubeDiff.addedChunks().collect(Collectors.toSet());
    }

    private void logCubeDiff(CubeDiff cubeDiff, Throwable th) {
        if (th != null) {
            this.logger.warn("Consolidation failed", th);
        } else if (cubeDiff.isEmpty()) {
            this.logger.info("Previous consolidation did not merge any chunks");
        } else {
            this.logger.info("Consolidation finished. Launching consolidation task again.");
        }
    }

    @JmxAttribute
    public ValueStats getRemovedChunks() {
        return this.removedChunks;
    }

    @JmxAttribute
    public ValueStats getAddedChunks() {
        return this.addedChunks;
    }

    @JmxAttribute
    public ValueStats getRemovedChunksRecords() {
        return this.removedChunksRecords;
    }

    @JmxAttribute
    public ValueStats getAddedChunksRecords() {
        return this.addedChunksRecords;
    }

    @JmxAttribute
    public StageStats getStageConsolidate() {
        return this.stageConsolidate;
    }

    @JmxAttribute
    public StageStats getStageConsolidateImpl() {
        return this.stageConsolidateImpl;
    }

    @JmxOperation
    public void consolidateNow() {
        consolidate();
    }

    public Eventloop getEventloop() {
        return this.eventloop;
    }
}
