package io.activej.cube;

import io.activej.codegen.DefiningClassLoader;
import io.activej.common.Checks;
import io.activej.csp.process.frame.FrameFormat;
import io.activej.cube.aggregation.AggregationChunk;
import io.activej.cube.aggregation.AggregationChunker;
import io.activej.cube.aggregation.AggregationException;
import io.activej.cube.aggregation.AggregationGroupReducer;
import io.activej.cube.aggregation.AggregationStats;
import io.activej.cube.aggregation.IAggregationChunkStorage;
import io.activej.cube.aggregation.PrimaryKey;
import io.activej.cube.aggregation.QueryPlan;
import io.activej.cube.aggregation.fieldtype.FieldType;
import io.activej.cube.aggregation.ot.AggregationDiff;
import io.activej.cube.aggregation.predicate.AggregationPredicate;
import io.activej.cube.aggregation.predicate.AggregationPredicates;
import io.activej.datastream.consumer.StreamConsumer;
import io.activej.datastream.consumer.StreamConsumerWithResult;
import io.activej.datastream.processor.reducer.StreamReducer;
import io.activej.datastream.processor.transformer.StreamTransformers;
import io.activej.datastream.processor.transformer.sort.StreamSorter;
import io.activej.datastream.processor.transformer.sort.StreamSorterStorage;
import io.activej.datastream.supplier.StreamSupplier;
import io.activej.datastream.supplier.StreamSuppliers;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.promise.Promise;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import io.activej.reactor.jmx.ReactiveJmxBeanWithStats;
import io.activej.serializer.BinarySerializer;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/cube/AggregationExecutor.class */
public final class AggregationExecutor extends AbstractReactive implements ReactiveJmxBeanWithStats {
    private final Logger logger;
    public static final int DEFAULT_CHUNK_SIZE = 1000000;
    public static final int DEFAULT_REDUCER_BUFFER_SIZE = 2000;
    public static final int DEFAULT_SORTER_ITEMS_IN_MEMORY = 1000000;
    public static final Duration DEFAULT_MAX_INCREMENTAL_RELOAD_PERIOD = Duration.ofMinutes(10);
    public static final int DEFAULT_MAX_CHUNKS_TO_CONSOLIDATE = 1000;
    private final Executor executor;
    private final DefiningClassLoader classLoader;
    private final IAggregationChunkStorage aggregationChunkStorage;
    private final FrameFormat frameFormat;

    @Nullable
    private Path temporarySortDir;
    private final AggregationStructure structure;
    private int chunkSize;
    private int reducerBufferSize;
    private int sorterItemsInMemory;
    private Duration maxIncrementalReloadPeriod;
    private boolean ignoreChunkReadingExceptions;
    private int maxChunksToConsolidate;
    private AggregationStats stats;
    private long consolidationStarted;
    private long consolidationLastTimeMillis;
    private int consolidations;
    private Exception consolidationLastError;

    /* loaded from: input_file:io/activej/cube/AggregationExecutor$SequenceStream.class */
    public static final class SequenceStream<S> {
        final StreamSupplier<S> stream;
        final List<String> fields;
        final Class<S> type;

        private SequenceStream(StreamSupplier<S> streamSupplier, List<String> list, Class<S> cls) {
            this.stream = streamSupplier;
            this.fields = list;
            this.type = cls;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AggregationExecutor(Reactor reactor, Executor executor, DefiningClassLoader definingClassLoader, IAggregationChunkStorage iAggregationChunkStorage, FrameFormat frameFormat, AggregationStructure aggregationStructure) {
        super(reactor);
        this.logger = LoggerFactory.getLogger(getClass());
        this.chunkSize = 1000000;
        this.reducerBufferSize = DEFAULT_REDUCER_BUFFER_SIZE;
        this.sorterItemsInMemory = 1000000;
        this.maxIncrementalReloadPeriod = DEFAULT_MAX_INCREMENTAL_RELOAD_PERIOD;
        this.ignoreChunkReadingExceptions = false;
        this.maxChunksToConsolidate = DEFAULT_MAX_CHUNKS_TO_CONSOLIDATE;
        this.stats = new AggregationStats();
        this.executor = executor;
        this.classLoader = definingClassLoader;
        this.aggregationChunkStorage = iAggregationChunkStorage;
        this.frameFormat = frameFormat;
        this.structure = aggregationStructure;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setReducerBufferSize(int i) {
        this.reducerBufferSize = i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setTemporarySortDir(@Nullable Path path) {
        this.temporarySortDir = path;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setStats(AggregationStats aggregationStats) {
        this.stats = aggregationStats;
    }

    public AggregationStructure getStructure() {
        return this.structure;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T, K extends Comparable> StreamConsumerWithResult<T, AggregationDiff> consume(Class<T> cls, Map<String, String> map, Map<String, String> map2) {
        Reactive.checkInReactorThread(this);
        Checks.checkArgument(new HashSet(this.structure.getKeys()).equals(map.keySet()), "Expected keys: %s, actual keyFields: %s", new Object[]{this.structure.getKeys(), map});
        Checks.checkArgument(this.structure.getMeasureTypes().keySet().containsAll(map2.keySet()), "Unknown measures: %s", new Object[]{io.activej.common.Utils.difference(map2.keySet(), this.structure.getMeasureTypes().keySet())});
        this.logger.info("Started consuming data in aggregation {}. Keys: {} Measures: {}", new Object[]{this, map.keySet(), map2.keySet()});
        Stream<String> stream = this.structure.getKeys().stream();
        Map<String, FieldType> keyTypes = this.structure.getKeyTypes();
        Objects.requireNonNull(keyTypes);
        Class createKeyClass = io.activej.cube.aggregation.util.Utils.createKeyClass((Map) stream.collect(io.activej.common.Utils.toLinkedHashMap((v1) -> {
            return r1.get(v1);
        })), this.classLoader);
        Set<String> keySet = map2.keySet();
        Stream<String> stream2 = this.structure.getMeasureTypes().keySet().stream();
        Objects.requireNonNull(keySet);
        List list = (List) stream2.filter((v1) -> {
            return r1.contains(v1);
        }).collect(Collectors.toList());
        Class createRecordClass = io.activej.cube.aggregation.util.Utils.createRecordClass(this.structure, this.structure.getKeys(), list, this.classLoader);
        AggregationGroupReducer aggregationGroupReducer = new AggregationGroupReducer(this.aggregationChunkStorage, this.structure, list, createRecordClass, io.activej.cube.aggregation.util.Utils.createPartitionPredicate(createRecordClass, this.structure.getPartitioningKey(), this.classLoader), io.activej.cube.aggregation.util.Utils.createKeyFunction(cls, createKeyClass, this.structure.getKeys(), this.classLoader), io.activej.cube.aggregation.util.Utils.createPreaggregator(this.structure, cls, createRecordClass, map, map2, this.classLoader), this.chunkSize, this.classLoader);
        return StreamConsumerWithResult.of(aggregationGroupReducer, aggregationGroupReducer.getResult().map(list2 -> {
            return AggregationDiff.of(new HashSet(list2));
        }).mapException(exc -> {
            return new AggregationException("Failed to consume data", exc);
        }));
    }

    <T> StreamConsumerWithResult<T, AggregationDiff> consume(Class<T> cls) {
        Reactive.checkInReactorThread(this);
        return consume(cls, io.activej.cube.aggregation.util.Utils.scanKeyFields(cls), io.activej.cube.aggregation.util.Utils.scanMeasureFields(cls));
    }

    <T> StreamSupplier<T> query(List<AggregationChunk> list, AggregationQuery aggregationQuery, Class<T> cls) {
        Reactive.checkInReactorThread(this);
        return query(list, aggregationQuery, cls, this.classLoader);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> StreamSupplier<T> query(List<AggregationChunk> list, AggregationQuery aggregationQuery, Class<T> cls, DefiningClassLoader definingClassLoader) {
        Reactive.checkInReactorThread(this);
        Checks.checkArgument(io.activej.common.Utils.iterate(definingClassLoader, (v0) -> {
            return Objects.nonNull(v0);
        }, (v0) -> {
            return v0.getParent();
        }).anyMatch(Predicate.isEqual(this.classLoader)), "Unrelated queryClassLoader");
        return consolidatedSupplier(aggregationQuery.getKeys(), this.structure.findFields(aggregationQuery.getMeasures()), cls, aggregationQuery.getPredicate(), aggregationQuery.getPrecondition(), list, definingClassLoader).withEndOfStream(promise -> {
            return promise.mapException(exc -> {
                return new AggregationException("Query " + aggregationQuery + " failed", exc);
            });
        });
    }

    private <T> StreamSupplier<T> sortStream(StreamSupplier<T> streamSupplier, Class<T> cls, List<String> list, List<String> list2, DefiningClassLoader definingClassLoader) {
        Path createSortDir;
        Comparator createKeyComparator = io.activej.cube.aggregation.util.Utils.createKeyComparator(cls, list, definingClassLoader);
        BinarySerializer createBinarySerializer = io.activej.cube.aggregation.util.Utils.createBinarySerializer(this.structure, cls, this.structure.getKeys(), list2, definingClassLoader);
        if (this.temporarySortDir != null) {
            createSortDir = this.temporarySortDir;
        } else {
            try {
                createSortDir = createSortDir();
            } catch (AggregationException e) {
                return StreamSuppliers.closingWithError(e);
            }
        }
        StreamSorter create = StreamSorter.create(StreamSorterStorage.create(this.reactor, this.executor, createBinarySerializer, this.frameFormat, createSortDir), Function.identity(), createKeyComparator, false, this.sorterItemsInMemory);
        Path path = createSortDir;
        create.getInput().getAcknowledgement().whenComplete(() -> {
            if (this.temporarySortDir == null) {
                deleteSortDirSilent(path);
            }
        });
        return (StreamSupplier) streamSupplier.transformWith(create);
    }

    private Promise<List<AggregationChunk>> doConsolidation(List<AggregationChunk> list) {
        HashSet hashSet = new HashSet(this.structure.getMeasures());
        Stream<R> flatMap = list.stream().flatMap(aggregationChunk -> {
            return aggregationChunk.getMeasures().stream();
        });
        Objects.requireNonNull(hashSet);
        Set set = (Set) flatMap.filter((v1) -> {
            return r1.contains(v1);
        }).collect(Collectors.toSet());
        Stream<String> stream = this.structure.getMeasures().stream();
        Objects.requireNonNull(set);
        List<String> list2 = (List) stream.filter((v1) -> {
            return r1.contains(v1);
        }).collect(Collectors.toList());
        Class createRecordClass = io.activej.cube.aggregation.util.Utils.createRecordClass(this.structure, this.structure.getKeys(), list2, this.classLoader);
        StreamSupplier consolidatedSupplier = consolidatedSupplier(this.structure.getKeys(), list2, createRecordClass, AggregationPredicates.alwaysTrue(), AggregationPredicates.alwaysTrue(), list, this.classLoader);
        AggregationChunker create = AggregationChunker.create(this.structure, list2, createRecordClass, io.activej.cube.aggregation.util.Utils.createPartitionPredicate(createRecordClass, this.structure.getPartitioningKey(), this.classLoader), this.aggregationChunkStorage, this.classLoader, this.chunkSize);
        Promise streamTo = consolidatedSupplier.streamTo(create);
        Objects.requireNonNull(create);
        return streamTo.then(create::getResult);
    }

    private static void addChunkToPlan(Map<List<String>, TreeMap<PrimaryKey, List<QueryPlan.Sequence>>> map, AggregationChunk aggregationChunk, List<String> list) {
        QueryPlan.Sequence remove;
        ArrayList arrayList = new ArrayList(list);
        arrayList.retainAll(aggregationChunk.getMeasures());
        Checks.checkArgument(!arrayList.isEmpty(), "All of query fields are contained in measures of a chunk");
        TreeMap<PrimaryKey, List<QueryPlan.Sequence>> computeIfAbsent = map.computeIfAbsent(arrayList, list2 -> {
            return new TreeMap();
        });
        Map.Entry<PrimaryKey, List<QueryPlan.Sequence>> lowerEntry = computeIfAbsent.lowerEntry(aggregationChunk.getMinPrimaryKey());
        if (lowerEntry == null) {
            remove = new QueryPlan.Sequence(arrayList);
        } else {
            List<QueryPlan.Sequence> value = lowerEntry.getValue();
            remove = value.remove(value.size() - 1);
            if (value.isEmpty()) {
                computeIfAbsent.remove(lowerEntry.getKey());
            }
        }
        remove.add(aggregationChunk);
        ((List) computeIfAbsent.computeIfAbsent(aggregationChunk.getMaxPrimaryKey(), primaryKey -> {
            return new ArrayList();
        })).add(remove);
    }

    private static QueryPlan createPlan(List<AggregationChunk> list, List<String> list2) {
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList(list);
        arrayList.sort(Comparator.comparing((v0) -> {
            return v0.getMinPrimaryKey();
        }));
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            addChunkToPlan(hashMap, (AggregationChunk) it.next(), list2);
        }
        ArrayList arrayList2 = new ArrayList();
        Iterator it2 = hashMap.values().iterator();
        while (it2.hasNext()) {
            Iterator it3 = ((TreeMap) it2.next()).values().iterator();
            while (it3.hasNext()) {
                arrayList2.addAll((List) it3.next());
            }
        }
        return new QueryPlan(arrayList2);
    }

    private <R, S> StreamSupplier<R> consolidatedSupplier(List<String> list, List<String> list2, Class<R> cls, AggregationPredicate aggregationPredicate, AggregationPredicate aggregationPredicate2, List<AggregationChunk> list3, DefiningClassLoader definingClassLoader) {
        QueryPlan createPlan = createPlan(list3, list2);
        this.logger.info("Query plan for {} in aggregation {}: {}", new Object[]{list, this, createPlan});
        boolean equals = this.structure.getKeys().subList(0, Math.min(this.structure.getKeys().size(), list.size())).equals(list);
        ArrayList arrayList = new ArrayList();
        for (QueryPlan.Sequence sequence : createPlan.getSequences()) {
            Class createRecordClass = io.activej.cube.aggregation.util.Utils.createRecordClass(this.structure, this.structure.getKeys(), sequence.getChunksFields(), this.classLoader);
            StreamSupplier sequenceStream = sequenceStream(aggregationPredicate, aggregationPredicate2, sequence.getChunks(), createRecordClass, definingClassLoader);
            if (!equals) {
                sequenceStream = sortStream(sequenceStream, createRecordClass, list, sequence.getQueryFields(), this.classLoader);
            }
            arrayList.add(new SequenceStream<>(sequenceStream, sequence.getQueryFields(), createRecordClass));
        }
        return mergeSequences(list, list2, cls, arrayList, definingClassLoader);
    }

    private <S, R, K extends Comparable> StreamSupplier<R> mergeSequences(List<String> list, List<String> list2, Class<R> cls, List<SequenceStream<S>> list3, DefiningClassLoader definingClassLoader) {
        if (list3.size() == 1 && new HashSet(list).equals(new HashSet(this.structure.getKeys()))) {
            SequenceStream<S> sequenceStream = list3.get(0);
            Class<S> cls2 = sequenceStream.type;
            Stream<String> stream = list2.stream();
            List<String> list4 = sequenceStream.fields;
            Objects.requireNonNull(list4);
            return (StreamSupplier) ((StreamSupplier) sequenceStream.stream.transformWith(StreamTransformers.mapper(io.activej.cube.aggregation.util.Utils.createMapper(cls2, cls, list, (List) stream.filter((v1) -> {
                return r4.contains(v1);
            }).collect(Collectors.toList()), definingClassLoader)))).transformWith(this.stats.getMergeMapOutput());
        }
        StreamReducer.Builder builder = StreamReducer.builder();
        if (this.reducerBufferSize != 0 && this.reducerBufferSize != 2000) {
            builder.withBufferSize(this.reducerBufferSize);
        }
        StreamReducer streamReducer = (StreamReducer) builder.build();
        Stream<String> stream2 = list.stream();
        Map<String, FieldType> keyTypes = this.structure.getKeyTypes();
        Objects.requireNonNull(keyTypes);
        Class createKeyClass = io.activej.cube.aggregation.util.Utils.createKeyClass((Map) stream2.collect(io.activej.common.Utils.toLinkedHashMap((v1) -> {
            return r1.get(v1);
        })), this.classLoader);
        for (SequenceStream<S> sequenceStream2 : list3) {
            Function createKeyFunction = io.activej.cube.aggregation.util.Utils.createKeyFunction(sequenceStream2.type, createKeyClass, list, this.classLoader);
            ArrayList arrayList = new ArrayList();
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            for (String str : list2) {
                if (sequenceStream2.fields.contains(str)) {
                    arrayList.add(str);
                } else {
                    linkedHashMap.put(str, this.structure.getMeasure(str));
                }
            }
            sequenceStream2.stream.streamTo((StreamConsumer) streamReducer.newInput(createKeyFunction, io.activej.cube.aggregation.util.Utils.aggregationReducer(this.structure, sequenceStream2.type, cls, list, arrayList, linkedHashMap, definingClassLoader)).transformWith(this.stats.getMergeReducerInput()));
        }
        return (StreamSupplier) streamReducer.getOutput().transformWith(this.stats.getMergeReducerOutput());
    }

    private <T> StreamSupplier<T> sequenceStream(final AggregationPredicate aggregationPredicate, final AggregationPredicate aggregationPredicate2, List<AggregationChunk> list, final Class<T> cls, final DefiningClassLoader definingClassLoader) {
        final Iterator<AggregationChunk> it = list.iterator();
        return StreamSuppliers.concat(new Iterator<StreamSupplier<T>>() { // from class: io.activej.cube.AggregationExecutor.1
            @Override // java.util.Iterator
            public boolean hasNext() {
                return it.hasNext();
            }

            @Override // java.util.Iterator
            public StreamSupplier<T> next() {
                return AggregationExecutor.this.chunkReaderWithFilter(aggregationPredicate, aggregationPredicate2, (AggregationChunk) it.next(), cls, definingClassLoader);
            }
        });
    }

    private <T> StreamSupplier<T> chunkReaderWithFilter(AggregationPredicate aggregationPredicate, AggregationPredicate aggregationPredicate2, AggregationChunk aggregationChunk, Class<T> cls, DefiningClassLoader definingClassLoader) {
        StreamSupplier<T> ofPromise = StreamSuppliers.ofPromise(this.aggregationChunkStorage.read(this.structure, aggregationChunk.getMeasures(), cls, aggregationChunk.getChunkId(), this.classLoader));
        return (aggregationPredicate.equals(AggregationPredicates.alwaysTrue()) && aggregationPredicate2.equals(AggregationPredicates.alwaysTrue())) ? ofPromise : (StreamSupplier) ofPromise.transformWith(StreamTransformers.filter(io.activej.cube.aggregation.util.Utils.createPredicateWithPrecondition(cls, aggregationPredicate, aggregationPredicate2, this.structure.getKeyTypes(), definingClassLoader, str -> {
            return AggregationPredicates.alwaysTrue();
        })));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Promise<AggregationDiff> consolidate(List<AggregationChunk> list) {
        Reactive.checkInReactorThread(this);
        this.consolidationStarted = this.reactor.currentTimeMillis();
        this.logger.info("Starting consolidation of aggregation '{}'", this);
        return doConsolidation(list).map(list2 -> {
            return AggregationDiff.of(new LinkedHashSet(list2), new LinkedHashSet(list));
        }).whenResult(() -> {
            this.consolidationLastTimeMillis = this.reactor.currentTimeMillis() - this.consolidationStarted;
            this.consolidations++;
        }).whenException(exc -> {
            this.consolidationStarted = 0L;
            this.consolidationLastError = exc;
        });
    }

    private Path createSortDir() throws AggregationException {
        try {
            return Files.createTempDirectory("aggregation_sort_dir", new FileAttribute[0]);
        } catch (IOException e) {
            throw new AggregationException("Could not create sort dir", e);
        }
    }

    private void deleteSortDirSilent(Path path) {
        try {
            Files.delete(path);
        } catch (IOException e) {
            this.logger.warn("Could not delete temporal directory {}", this.temporarySortDir, e);
        }
    }

    @JmxAttribute
    public Duration getMaxIncrementalReloadPeriod() {
        return this.maxIncrementalReloadPeriod;
    }

    @JmxAttribute
    public void setMaxIncrementalReloadPeriod(Duration duration) {
        this.maxIncrementalReloadPeriod = duration;
    }

    @JmxAttribute
    public int getChunkSize() {
        return this.chunkSize;
    }

    @JmxAttribute
    public void setChunkSize(int i) {
        this.chunkSize = i;
    }

    @JmxAttribute
    public int getSorterItemsInMemory() {
        return this.sorterItemsInMemory;
    }

    @JmxAttribute
    public void setSorterItemsInMemory(int i) {
        this.sorterItemsInMemory = i;
    }

    @JmxAttribute
    public boolean isIgnoreChunkReadingExceptions() {
        return this.ignoreChunkReadingExceptions;
    }

    @JmxAttribute
    public void setIgnoreChunkReadingExceptions(boolean z) {
        this.ignoreChunkReadingExceptions = z;
    }

    @JmxAttribute
    public int getMaxChunksToConsolidate() {
        return this.maxChunksToConsolidate;
    }

    @JmxAttribute
    public void setMaxChunksToConsolidate(int i) {
        this.maxChunksToConsolidate = i;
    }

    @JmxAttribute
    @Nullable
    public Integer getConsolidationSeconds() {
        if (this.consolidationStarted == 0) {
            return null;
        }
        return Integer.valueOf((int) ((this.reactor.currentTimeMillis() - this.consolidationStarted) / 1000));
    }

    @JmxAttribute
    @Nullable
    public Integer getConsolidationLastTimeSeconds() {
        if (this.consolidationLastTimeMillis == 0) {
            return null;
        }
        return Integer.valueOf((int) (this.consolidationLastTimeMillis / 1000));
    }

    @JmxAttribute
    public int getConsolidations() {
        return this.consolidations;
    }

    @JmxAttribute
    public Exception getConsolidationLastError() {
        return this.consolidationLastError;
    }

    @JmxAttribute
    public AggregationStats getStats() {
        return this.stats;
    }

    public String toString() {
        return "{" + this.structure.getKeyTypes().keySet() + " " + this.structure.getMeasures() + "}";
    }
}
