package io.activej.cube;

import io.activej.async.util.LogUtils;
import io.activej.common.Checks;
import io.activej.common.builder.AbstractBuilder;
import io.activej.cube.aggregation.AggregationChunk;
import io.activej.cube.aggregation.ChunksAlreadyLockedException;
import io.activej.cube.aggregation.IChunkLocker;
import io.activej.cube.aggregation.NoOpChunkLocker;
import io.activej.cube.aggregation.ot.ProtoAggregationDiff;
import io.activej.cube.exception.CubeException;
import io.activej.cube.ot.CubeDiff;
import io.activej.cube.ot.ProtoCubeDiff;
import io.activej.etl.LogDiff;
import io.activej.etl.LogState;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.ot.StateManager;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.jmx.ReactiveJmxBeanWithStats;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/cube/CubeConsolidator.class */
public final class CubeConsolidator extends AbstractReactive implements ReactiveJmxBeanWithStats {
    private static final Logger logger = LoggerFactory.getLogger(CubeConsolidator.class);
    private final StateManager<LogDiff<CubeDiff>, LogState<CubeDiff, CubeState>> stateManager;
    private final CubeExecutor executor;
    private final Map<String, IChunkLocker> lockers;
    private Function<String, IChunkLocker> chunkLockerFactory;

    /* loaded from: input_file:io/activej/cube/CubeConsolidator$Builder.class */
    public final class Builder extends AbstractBuilder<Builder, CubeConsolidator> {
        private Builder() {
        }

        public Builder withChunkLockerFactory(Function<String, IChunkLocker> function) {
            checkNotBuilt(this);
            CubeConsolidator.this.chunkLockerFactory = (Function) Checks.checkNotNull(function);
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doBuild, reason: merged with bridge method [inline-methods] */
        public CubeConsolidator m3doBuild() {
            return CubeConsolidator.this;
        }
    }

    /* loaded from: input_file:io/activej/cube/CubeConsolidator$ConsolidationStrategy.class */
    public interface ConsolidationStrategy {
        List<AggregationChunk> getChunksForConsolidation(String str, AggregationState aggregationState, int i, int i2, Set<Long> set);

        static ConsolidationStrategy minKey() {
            return (str, aggregationState, i, i2, set) -> {
                return aggregationState.findChunksForConsolidationMinKey(i, i2, set);
            };
        }

        static ConsolidationStrategy hotSegment() {
            return (str, aggregationState, i, i2, set) -> {
                return aggregationState.findChunksForConsolidationHotSegment(i, set);
            };
        }
    }

    private CubeConsolidator(StateManager<LogDiff<CubeDiff>, LogState<CubeDiff, CubeState>> stateManager, CubeExecutor cubeExecutor) {
        super(cubeExecutor.getReactor());
        this.lockers = new HashMap();
        this.chunkLockerFactory = str -> {
            return NoOpChunkLocker.create(this.reactor);
        };
        this.stateManager = stateManager;
        this.executor = cubeExecutor;
    }

    public static CubeConsolidator create(StateManager<LogDiff<CubeDiff>, LogState<CubeDiff, CubeState>> stateManager, CubeExecutor cubeExecutor) {
        return (CubeConsolidator) builder(stateManager, cubeExecutor).build();
    }

    public static Builder builder(StateManager<LogDiff<CubeDiff>, LogState<CubeDiff, CubeState>> stateManager, CubeExecutor cubeExecutor) {
        return new Builder();
    }

    public StateManager<LogDiff<CubeDiff>, LogState<CubeDiff, CubeState>> getStateManager() {
        return this.stateManager;
    }

    public Promise<CubeDiff> consolidate(List<String> list, ConsolidationStrategy consolidationStrategy) {
        Reactive.checkInReactorThread(this);
        logger.info("Launching consolidation for aggregations {}", list);
        if (list.isEmpty()) {
            return Promise.of(CubeDiff.empty());
        }
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        return Promises.toList(list.stream().map(str -> {
            return findAndLockChunksForConsolidation(str, consolidationStrategy).map(list2 -> {
                return () -> {
                    if (list2.isEmpty()) {
                        return Promise.complete();
                    }
                    hashMap.put(str, list2);
                    return consolidateAggregationChunks(str, list2).whenResult(protoAggregationDiff -> {
                        if (protoAggregationDiff.isEmpty()) {
                            return;
                        }
                        hashMap2.put(str, protoAggregationDiff);
                    }).toVoid();
                };
            });
        })).then(list2 -> {
            return Promises.sequence(list2);
        }).then(() -> {
            return finishConsolidation(hashMap2);
        }).then((cubeDiff, exc) -> {
            return releaseChunks(hashMap).then(() -> {
                return Promise.of(cubeDiff, exc);
            });
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{list}));
    }

    private List<AggregationChunk> findChunksForConsolidation(String str, Set<Long> set, ConsolidationStrategy consolidationStrategy) {
        AggregationExecutor aggregationExecutor = this.executor.getAggregationExecutors().get(str);
        int maxChunksToConsolidate = aggregationExecutor.getMaxChunksToConsolidate();
        int chunkSize = aggregationExecutor.getChunkSize();
        return (List) this.stateManager.query(logState -> {
            return consolidationStrategy.getChunksForConsolidation(str, ((CubeState) logState.getDataState()).getAggregationState(str), maxChunksToConsolidate, chunkSize, set);
        });
    }

    private Promise<List<AggregationChunk>> findAndLockChunksForConsolidation(String str, ConsolidationStrategy consolidationStrategy) {
        IChunkLocker ensureLocker = ensureLocker(str);
        return Promises.retry((list, exc) -> {
            return !(exc instanceof ChunksAlreadyLockedException);
        }, () -> {
            return ensureLocker.getLockedChunks().map(set -> {
                return findChunksForConsolidation(str, set, consolidationStrategy);
            }).then(list2 -> {
                if (!list2.isEmpty()) {
                    return ensureLocker.lockChunks(io.activej.cube.aggregation.util.Utils.collectChunkIds(list2)).map(r3 -> {
                        return list2;
                    });
                }
                logger.info("Nothing to consolidate in aggregation '{}'", str);
                return Promise.of(list2);
            });
        });
    }

    private Promise<ProtoAggregationDiff> consolidateAggregationChunks(String str, List<AggregationChunk> list) {
        return this.executor.getAggregationExecutors().get(str).consolidate(list).mapException(exc -> {
            return new CubeException("Failed to consolidate aggregation '" + str + "'", exc);
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{str}));
    }

    private IChunkLocker ensureLocker(String str) {
        return this.lockers.computeIfAbsent(str, str2 -> {
            return this.chunkLockerFactory.apply(str);
        });
    }

    private static Set<String> addedProtoChunks(ProtoCubeDiff protoCubeDiff) {
        return (Set) protoCubeDiff.addedProtoChunks().collect(Collectors.toSet());
    }

    private Promise<CubeDiff> finishConsolidation(Map<String, ProtoAggregationDiff> map) {
        if (map.isEmpty()) {
            return Promise.of(CubeDiff.empty());
        }
        ProtoCubeDiff protoCubeDiff = new ProtoCubeDiff(map);
        return this.executor.getAggregationChunkStorage().finish(addedProtoChunks(protoCubeDiff)).mapException(exc -> {
            return new CubeException("Failed to finalize chunks in storage", exc);
        }).then(map2 -> {
            CubeDiff materializeProtoDiff = io.activej.cube.aggregation.util.Utils.materializeProtoDiff(protoCubeDiff, (Map<String, Long>) map2);
            return this.stateManager.push(List.of(LogDiff.forCurrentPosition(materializeProtoDiff))).mapException(exc2 -> {
                return new CubeException("Failed to synchronize state after consolidation, resetting", exc2);
            }).map(r3 -> {
                return materializeProtoDiff;
            });
        });
    }

    private Promise<Void> releaseChunks(Map<String, List<AggregationChunk>> map) {
        return map.isEmpty() ? Promise.complete() : Promises.all(map.entrySet().stream().map(entry -> {
            String str = (String) entry.getKey();
            Set<Long> collectChunkIds = io.activej.cube.aggregation.util.Utils.collectChunkIds((Collection) entry.getValue());
            return ensureLocker(str).releaseChunks(collectChunkIds).map((r9, exc) -> {
                if (exc == null) {
                    return null;
                }
                logger.warn("Failed to release chunks: {} in aggregation {}", new Object[]{collectChunkIds, str, exc});
                return null;
            });
        }));
    }

    @JmxOperation
    public Map<String, String> getIrrelevantChunksIds() {
        return (Map) this.stateManager.query(logState -> {
            return (LinkedHashMap) ((CubeState) logState.getDataState()).getIrrelevantChunks().entrySet().stream().collect(io.activej.common.Utils.entriesToLinkedHashMap(set -> {
                return (String) set.stream().map(aggregationChunk -> {
                    return String.valueOf(aggregationChunk.getChunkId());
                }).collect(Collectors.joining(", "));
            }));
        });
    }
}
