package io.activej.cube.service;

import io.activej.aggregation.Aggregation;
import io.activej.aggregation.AggregationChunk;
import io.activej.aggregation.ChunksAlreadyLockedException;
import io.activej.aggregation.IAggregationChunkStorage;
import io.activej.aggregation.IChunkLocker;
import io.activej.aggregation.NoOpChunkLocker;
import io.activej.aggregation.ot.AggregationDiff;
import io.activej.aggregation.util.Utils;
import io.activej.async.function.AsyncBiFunction;
import io.activej.async.function.AsyncRunnable;
import io.activej.async.function.AsyncRunnables;
import io.activej.async.util.LogUtils;
import io.activej.common.Checks;
import io.activej.common.builder.AbstractBuilder;
import io.activej.cube.Cube;
import io.activej.cube.exception.CubeException;
import io.activej.cube.ot.CubeDiff;
import io.activej.cube.ot.CubeDiffScheme;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.jmx.stats.ValueStats;
import io.activej.ot.OTStateManager;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.jmx.PromiseStats;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import io.activej.reactor.jmx.ReactiveJmxBeanWithStats;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/cube/service/CubeConsolidationController.class */
public final class CubeConsolidationController<K, D, C> extends AbstractReactive implements ReactiveJmxBeanWithStats {
    private static final Logger logger = LoggerFactory.getLogger(CubeConsolidationController.class);
    public static final Supplier<AsyncBiFunction<Aggregation, Set<Object>, List<AggregationChunk>>> DEFAULT_LOCKER_STRATEGY = new Supplier<AsyncBiFunction<Aggregation, Set<Object>, List<AggregationChunk>>>() { // from class: io.activej.cube.service.CubeConsolidationController.1
        private boolean hotSegment = false;

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.Supplier
        public AsyncBiFunction<Aggregation, Set<Object>, List<AggregationChunk>> get() {
            this.hotSegment = !this.hotSegment;
            return (aggregation, set) -> {
                return Promise.of(aggregation.getChunksForConsolidation(set, this.hotSegment));
            };
        }
    };
    public static final Duration DEFAULT_SMOOTHING_WINDOW = Duration.ofMinutes(5);
    private final CubeDiffScheme<D> cubeDiffScheme;
    private final Cube cube;
    private final OTStateManager<K, D> stateManager;
    private final IAggregationChunkStorage<C> aggregationChunkStorage;
    private final Map<Aggregation, String> aggregationsMapReversed;
    private final PromiseStats promiseConsolidate;
    private final PromiseStats promiseConsolidateImpl;
    private final PromiseStats promiseCleanupIrrelevantChunks;
    private final ValueStats removedChunks;
    private final ValueStats removedChunksRecords;
    private final ValueStats addedChunks;
    private final ValueStats addedChunksRecords;
    private final Map<String, IChunkLocker<Object>> lockers;
    private Supplier<AsyncBiFunction<Aggregation, Set<Object>, List<AggregationChunk>>> strategy;
    private Function<String, IChunkLocker<C>> chunkLockerFactory;
    private boolean consolidating;
    private boolean cleaning;
    private final AsyncRunnable consolidate;
    private final AsyncRunnable cleanupIrrelevantChunks;

    /* loaded from: input_file:io/activej/cube/service/CubeConsolidationController$Builder.class */
    public final class Builder extends AbstractBuilder<CubeConsolidationController<K, D, C>.Builder, CubeConsolidationController<K, D, C>> {
        private Builder() {
        }

        public CubeConsolidationController<K, D, C>.Builder withLockerStrategy(Supplier<AsyncBiFunction<Aggregation, Set<Object>, List<AggregationChunk>>> supplier) {
            checkNotBuilt(this);
            CubeConsolidationController.this.strategy = supplier;
            return this;
        }

        public CubeConsolidationController<K, D, C>.Builder withChunkLockerFactory(Function<String, IChunkLocker<C>> function) {
            checkNotBuilt(this);
            CubeConsolidationController.this.chunkLockerFactory = function;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doBuild, reason: merged with bridge method [inline-methods] */
        public CubeConsolidationController<K, D, C> m28doBuild() {
            return CubeConsolidationController.this;
        }
    }

    CubeConsolidationController(Reactor reactor, CubeDiffScheme<D> cubeDiffScheme, Cube cube, OTStateManager<K, D> oTStateManager, IAggregationChunkStorage<C> iAggregationChunkStorage, Map<Aggregation, String> map) {
        super(reactor);
        this.promiseConsolidate = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.promiseConsolidateImpl = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.promiseCleanupIrrelevantChunks = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.removedChunks = ValueStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.removedChunksRecords = (ValueStats) ValueStats.builder(DEFAULT_SMOOTHING_WINDOW).withRate().build();
        this.addedChunks = ValueStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.addedChunksRecords = (ValueStats) ValueStats.builder(DEFAULT_SMOOTHING_WINDOW).withRate().build();
        this.lockers = new HashMap();
        this.strategy = DEFAULT_LOCKER_STRATEGY;
        this.chunkLockerFactory = str -> {
            return NoOpChunkLocker.create(this.reactor);
        };
        this.consolidate = AsyncRunnables.reuse(this::doConsolidate);
        this.cleanupIrrelevantChunks = AsyncRunnables.reuse(this::doCleanupIrrelevantChunks);
        this.cubeDiffScheme = cubeDiffScheme;
        this.cube = cube;
        this.stateManager = oTStateManager;
        this.aggregationChunkStorage = iAggregationChunkStorage;
        this.aggregationsMapReversed = map;
    }

    public static <K, D, C> CubeConsolidationController<K, D, C> create(Reactor reactor, CubeDiffScheme<D> cubeDiffScheme, Cube cube, OTStateManager<K, D> oTStateManager, IAggregationChunkStorage<C> iAggregationChunkStorage) {
        return (CubeConsolidationController) builder(reactor, cubeDiffScheme, cube, oTStateManager, iAggregationChunkStorage).build();
    }

    public static <K, D, C> CubeConsolidationController<K, D, C>.Builder builder(Reactor reactor, CubeDiffScheme<D> cubeDiffScheme, Cube cube, OTStateManager<K, D> oTStateManager, IAggregationChunkStorage<C> iAggregationChunkStorage) {
        IdentityHashMap identityHashMap = new IdentityHashMap();
        for (String str : cube.getAggregationIds()) {
            identityHashMap.put(cube.getAggregation(str), str);
        }
        return new Builder();
    }

    public Promise<Void> consolidate() {
        Reactive.checkInReactorThread(this);
        return this.consolidate.run();
    }

    public Promise<Void> cleanupIrrelevantChunks() {
        Reactive.checkInReactorThread(this);
        return this.cleanupIrrelevantChunks.run();
    }

    Promise<Void> doConsolidate() {
        Reactive.checkInReactorThread(this);
        Checks.checkState(!this.cleaning, "Cannot consolidate and clean up irrelevant chunks at the same time");
        this.consolidating = true;
        AsyncBiFunction<Aggregation, Set<Object>, List<AggregationChunk>> asyncBiFunction = this.strategy.get();
        HashMap hashMap = new HashMap();
        Promise complete = Promise.complete();
        OTStateManager<K, D> oTStateManager = this.stateManager;
        Objects.requireNonNull(oTStateManager);
        return complete.then(oTStateManager::sync).mapException(exc -> {
            return new CubeException("Failed to synchronize state prior to consolidation", exc);
        }).then(() -> {
            return Promises.all(this.cube.getAggregationIds().stream().map(str -> {
                return findAndLockChunksForConsolidation(str, asyncBiFunction).whenResult(list -> {
                    if (list.isEmpty()) {
                        return;
                    }
                    hashMap.put(str, list);
                });
            }));
        }).then(() -> {
            return this.cube.consolidate(aggregation -> {
                List list = (List) hashMap.get(this.aggregationsMapReversed.get(aggregation));
                return list == null ? Promise.of(AggregationDiff.empty()) : aggregation.consolidate(list);
            }).whenComplete(this.promiseConsolidateImpl.recordStats());
        }).whenResult(this::cubeDiffJmx).whenComplete(this::logCubeDiff).then(cubeDiff -> {
            return cubeDiff.isEmpty() ? Promise.complete() : this.aggregationChunkStorage.finish(addedChunks(cubeDiff)).mapException(exc2 -> {
                return new CubeException("Failed to finalize chunks in storage", exc2);
            }).whenResult(() -> {
                this.stateManager.add(this.cubeDiffScheme.wrap(cubeDiff));
            }).then(() -> {
                return this.stateManager.sync().mapException(exc3 -> {
                    return new CubeException("Failed to synchronize state after consolidation, resetting", exc3);
                });
            }).whenException(exc3 -> {
                this.stateManager.reset();
            }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{cubeDiff}));
        }).then((r6, exc2) -> {
            return releaseChunks(hashMap).then(() -> {
                return Promise.of(r6, exc2);
            });
        }).whenComplete(this.promiseConsolidate.recordStats()).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{this.stateManager})).whenComplete(() -> {
            this.consolidating = false;
        });
    }

    private Promise<List<AggregationChunk>> findAndLockChunksForConsolidation(String str, AsyncBiFunction<Aggregation, Set<Object>, List<AggregationChunk>> asyncBiFunction) {
        IChunkLocker<Object> ensureLocker = ensureLocker(str);
        Aggregation aggregation = this.cube.getAggregation(str);
        return Promises.retry((list, exc) -> {
            return !(exc instanceof ChunksAlreadyLockedException);
        }, () -> {
            return ensureLocker.getLockedChunks().then(set -> {
                return asyncBiFunction.apply(aggregation, set);
            }).then(list2 -> {
                if (!list2.isEmpty()) {
                    return ensureLocker.lockChunks(Utils.collectChunkIds(list2)).map(r3 -> {
                        return list2;
                    });
                }
                logger.info("Nothing to consolidate in aggregation '{}", this);
                return Promise.of(list2);
            });
        });
    }

    private Promise<Void> releaseChunks(Map<String, List<AggregationChunk>> map) {
        return Promises.all(map.entrySet().stream().map(entry -> {
            String str = (String) entry.getKey();
            Set collectChunkIds = Utils.collectChunkIds((Collection) entry.getValue());
            return ensureLocker(str).releaseChunks(collectChunkIds).map((r9, exc) -> {
                if (exc == null) {
                    return null;
                }
                logger.warn("Failed to release chunks: {} in aggregation {}", new Object[]{collectChunkIds, str, exc});
                return null;
            });
        }));
    }

    private Promise<Void> doCleanupIrrelevantChunks() {
        Checks.checkState(!this.consolidating, "Cannot consolidate and clean up irrelevant chunks at the same time");
        this.cleaning = true;
        return this.stateManager.sync().mapException(exc -> {
            return new CubeException("Failed to synchronize state prior to cleaning up irrelevant chunks", exc);
        }).then(() -> {
            Map<String, Set<AggregationChunk>> irrelevantChunks = this.cube.getIrrelevantChunks();
            if (irrelevantChunks.isEmpty()) {
                logger.info("Found no irrelevant chunks");
                return Promise.complete();
            }
            logger.info("Removing irrelevant chunks: {}", irrelevantChunks.keySet());
            CubeDiff of = CubeDiff.of((Map) irrelevantChunks.entrySet().stream().collect(io.activej.common.Utils.entriesToLinkedHashMap(set -> {
                return AggregationDiff.of(Set.of(), set);
            })));
            cubeDiffJmx(of);
            this.stateManager.add(this.cubeDiffScheme.wrap(of));
            return this.stateManager.sync().mapException(exc2 -> {
                return new CubeException("Failed to synchronize state after cleaning up irrelevant chunks, resetting", exc2);
            }).whenException(exc3 -> {
                this.stateManager.reset();
            });
        }).whenComplete(this.promiseCleanupIrrelevantChunks.recordStats()).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{this.stateManager})).whenComplete(() -> {
            this.cleaning = false;
        });
    }

    private void cubeDiffJmx(CubeDiff cubeDiff) {
        long j = 0;
        long j2 = 0;
        long j3 = 0;
        long j4 = 0;
        Iterator<String> it = cubeDiff.keySet().iterator();
        while (it.hasNext()) {
            AggregationDiff aggregationDiff = cubeDiff.get(it.next());
            j += aggregationDiff.getAddedChunks().size();
            while (aggregationDiff.getAddedChunks().iterator().hasNext()) {
                j2 += ((AggregationChunk) r0.next()).getCount();
            }
            j3 += aggregationDiff.getRemovedChunks().size();
            while (aggregationDiff.getRemovedChunks().iterator().hasNext()) {
                j4 += ((AggregationChunk) r0.next()).getCount();
            }
        }
        this.addedChunks.recordValue(j);
        this.addedChunksRecords.recordValue(j2);
        this.removedChunks.recordValue(j3);
        this.removedChunksRecords.recordValue(j4);
    }

    private static <C> Set<C> addedChunks(CubeDiff cubeDiff) {
        return (Set) cubeDiff.addedChunks().map(obj -> {
            return obj;
        }).collect(Collectors.toSet());
    }

    private void logCubeDiff(CubeDiff cubeDiff, Exception exc) {
        if (exc != null) {
            logger.warn("Consolidation failed", exc);
        } else if (cubeDiff.isEmpty()) {
            logger.info("Previous consolidation did not merge any chunks");
        } else {
            logger.info("Consolidation finished. Launching consolidation task again.");
        }
    }

    private IChunkLocker<Object> ensureLocker(String str) {
        return this.lockers.computeIfAbsent(str, str2 -> {
            return this.chunkLockerFactory.apply(str);
        });
    }

    @JmxAttribute
    public ValueStats getRemovedChunks() {
        return this.removedChunks;
    }

    @JmxAttribute
    public ValueStats getAddedChunks() {
        return this.addedChunks;
    }

    @JmxAttribute
    public ValueStats getRemovedChunksRecords() {
        return this.removedChunksRecords;
    }

    @JmxAttribute
    public ValueStats getAddedChunksRecords() {
        return this.addedChunksRecords;
    }

    @JmxAttribute
    public PromiseStats getPromiseConsolidate() {
        return this.promiseConsolidate;
    }

    @JmxAttribute
    public PromiseStats getPromiseConsolidateImpl() {
        return this.promiseConsolidateImpl;
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanupIrrelevantChunks() {
        return this.promiseCleanupIrrelevantChunks;
    }

    @JmxOperation
    public void consolidateNow() {
        consolidate();
    }

    @JmxOperation
    public void cleanupIrrelevantChunksNow() {
        cleanupIrrelevantChunks();
    }
}
