/*
 * Decompiled with CFR 0.152.
 */
package cz.seznam.euphoria.core.client.operator;

import cz.seznam.euphoria.core.annotation.operator.Recommended;
import cz.seznam.euphoria.core.annotation.operator.StateComplexity;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.BinaryFunction;
import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction;
import cz.seznam.euphoria.core.client.functional.ReduceFunction;
import cz.seznam.euphoria.core.client.functional.ReduceFunctor;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.io.Collector;
import cz.seznam.euphoria.core.client.io.ExternalIterable;
import cz.seznam.euphoria.core.client.io.SpillTools;
import cz.seznam.euphoria.core.client.operator.Builders;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.client.operator.ReduceStateByKey;
import cz.seznam.euphoria.core.client.operator.StateAwareWindowWiseSingleInputOperator;
import cz.seznam.euphoria.core.client.operator.StateSupport;
import cz.seznam.euphoria.core.client.operator.state.ListStorage;
import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.State;
import cz.seznam.euphoria.core.client.operator.state.StateContext;
import cz.seznam.euphoria.core.client.operator.state.StateFactory;
import cz.seznam.euphoria.core.client.operator.state.StorageProvider;
import cz.seznam.euphoria.core.client.operator.state.ValueStorage;
import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor;
import cz.seznam.euphoria.core.client.type.TypeAwareReduceFunctor;
import cz.seznam.euphoria.core.client.type.TypeAwareUnaryFunction;
import cz.seznam.euphoria.core.client.type.TypeHint;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.executor.graph.DAG;
import cz.seznam.euphoria.core.executor.util.SingleValueContext;
import java.util.Comparator;
import java.util.Objects;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;

@Recommended(reason="Is very recommended to override because of performance in a specific area of (mostly) batch calculations where combiners can be efficiently used in the executor-specific implementation", state=StateComplexity.CONSTANT_IF_COMBINABLE, repartitions=1)
public class ReduceByKey<IN, KEY, VALUE, OUT, W extends Window<W>>
extends StateAwareWindowWiseSingleInputOperator<IN, IN, IN, KEY, Pair<KEY, OUT>, W, ReduceByKey<IN, KEY, VALUE, OUT, W>>
implements Builders.OutputValues<KEY, OUT> {
    final ReduceFunctor<VALUE, OUT> reducer;
    final UnaryFunction<IN, VALUE> valueExtractor;
    @Nullable
    final BinaryFunction<VALUE, VALUE, Integer> valueComparator;

    public static <IN> KeyByBuilder<IN> of(Dataset<IN> input) {
        return new KeyByBuilder<IN>("ReduceByKey", input);
    }

    public static OfBuilder named(String name) {
        return new OfBuilder(name);
    }

    ReduceByKey(String name, Flow flow, Dataset<IN> input, UnaryFunction<IN, KEY> keyExtractor, UnaryFunction<IN, VALUE> valueExtractor, @Nullable Windowing<IN, W> windowing, CombinableReduceFunction<OUT> reducer) {
        this(name, flow, input, keyExtractor, valueExtractor, windowing, ReduceByKey.toReduceFunctor(reducer), null);
    }

    ReduceByKey(String name, Flow flow, Dataset<IN> input, UnaryFunction<IN, KEY> keyExtractor, UnaryFunction<IN, VALUE> valueExtractor, @Nullable Windowing<IN, W> windowing, ReduceFunctor<VALUE, OUT> reducer, @Nullable BinaryFunction<VALUE, VALUE, Integer> valueComparator) {
        super(name, flow, input, keyExtractor, windowing);
        this.reducer = reducer;
        this.valueExtractor = valueExtractor;
        this.valueComparator = valueComparator;
    }

    public ReduceFunctor<VALUE, OUT> getReducer() {
        return this.reducer;
    }

    public boolean isCombinable() {
        return this.reducer.isCombinable();
    }

    public UnaryFunction<IN, VALUE> getValueExtractor() {
        return this.valueExtractor;
    }

    @Override
    public DAG<Operator<?, ?>> getBasicOps() {
        StateSupport.MergeFromStateMerger stateCombine = new StateSupport.MergeFromStateMerger();
        StateFactory stateFactory = this.reducer.isCombinable() ? new CombiningReduceState.Factory<VALUE>(this.reducer) : new NonCombiningReduceState.Factory<VALUE, OUT>(this.reducer, this.valueComparator);
        Flow flow = this.getFlow();
        ReduceStateByKey reduceState = new ReduceStateByKey(this.getName(), flow, this.input, this.keyExtractor, this.valueExtractor, this.windowing, stateFactory, stateCombine);
        return DAG.of(reduceState);
    }

    static <VALUE> ReduceFunctor<VALUE, VALUE> toReduceFunctor(final CombinableReduceFunction<VALUE> reducer1) {
        return new ReduceFunctor<VALUE, VALUE>(){

            @Override
            public boolean isCombinable() {
                return true;
            }

            @Override
            public void apply(Stream<VALUE> elem, Collector<VALUE> context) {
                context.collect(reducer1.apply(elem));
            }
        };
    }

    private static class NonCombiningReduceState<IN, OUT>
    implements State<IN, OUT>,
    StateSupport.MergeFrom<NonCombiningReduceState<IN, OUT>> {
        private static final ListStorageDescriptor STORAGE_DESC = ListStorageDescriptor.of("values", Object.class);
        private final ReduceFunctor<IN, OUT> reducer;
        private final ListStorage<IN> reducibleValues;
        private final SpillTools spill;
        @Nullable
        private final BinaryFunction<IN, IN, Integer> comparator;

        NonCombiningReduceState(StateContext context, ReduceFunctor<IN, OUT> reducer, BinaryFunction<IN, IN, Integer> comparator) {
            this.reducer = Objects.requireNonNull(reducer);
            this.comparator = comparator;
            ListStorage ls = context.getStorageProvider().getListStorage(STORAGE_DESC);
            this.reducibleValues = ls;
            this.spill = context.getSpillTools();
        }

        @Override
        public void add(IN element) {
            this.reducibleValues.add(element);
        }

        @Override
        public void flush(Collector<OUT> ctx) {
            block15: {
                if (this.comparator != null) {
                    try {
                        Comparator c = this.comparator::apply;
                        Iterable<IN> values = this.reducibleValues.get();
                        try (ExternalIterable<IN> sorted = this.spill.sorted(values, c);){
                            this.reducer.apply(StreamSupport.stream(sorted.spliterator(), false), ctx);
                            break block15;
                        }
                    }
                    catch (InterruptedException ex) {
                        throw new RuntimeException(ex);
                    }
                }
                this.reducer.apply(StreamSupport.stream(this.reducibleValues.get().spliterator(), false), ctx);
            }
        }

        @Override
        public void close() {
            this.reducibleValues.clear();
        }

        @Override
        public void mergeFrom(NonCombiningReduceState<IN, OUT> other) {
            this.reducibleValues.addAll(other.reducibleValues.get());
        }

        static final class Factory<IN, OUT>
        implements StateFactory<IN, OUT, NonCombiningReduceState<IN, OUT>> {
            private final ReduceFunctor<IN, OUT> r;
            private final BinaryFunction<IN, IN, Integer> comparator;

            Factory(ReduceFunctor<IN, OUT> r, @Nullable BinaryFunction<IN, IN, Integer> comparator) {
                this.r = Objects.requireNonNull(r);
                this.comparator = comparator;
            }

            @Override
            public NonCombiningReduceState<IN, OUT> createState(StateContext context, Collector<OUT> collector) {
                return new NonCombiningReduceState<IN, OUT>(context, this.r, this.comparator);
            }
        }
    }

    static class CombiningReduceState<E>
    implements State<E, E>,
    StateSupport.MergeFrom<CombiningReduceState<E>> {
        private static final ValueStorageDescriptor STORAGE_DESC = ValueStorageDescriptor.of("rbsk-value", Object.class, null);
        private final ReduceFunctor<E, E> reducer;
        private final ValueStorage<E> storage;
        private final SingleValueContext<E> context = new SingleValueContext();

        CombiningReduceState(StorageProvider storageProvider, ReduceFunctor<E, E> reducer) {
            this.reducer = Objects.requireNonNull(reducer);
            ValueStorage vs = storageProvider.getValueStorage(STORAGE_DESC);
            this.storage = vs;
        }

        @Override
        public void add(E element) {
            E v = this.storage.get();
            if (v == null) {
                this.storage.set(element);
            } else {
                this.reducer.apply(Stream.of(v, element), this.context);
                this.storage.set(this.context.getAndResetValue());
            }
        }

        @Override
        public void flush(Collector<E> context) {
            context.collect(this.storage.get());
        }

        @Override
        public void close() {
            this.storage.clear();
        }

        @Override
        public void mergeFrom(CombiningReduceState<E> other) {
            this.add(other.storage.get());
        }

        static final class Factory<E>
        implements StateFactory<E, E, State<E, E>> {
            private final ReduceFunctor<E, E> r;

            Factory(ReduceFunctor<E, E> r) {
                this.r = Objects.requireNonNull(r);
            }

            @Override
            public State<E, E> createState(StateContext context, Collector<E> collector) {
                return new CombiningReduceState<E>(context.getStorageProvider(), this.r);
            }
        }
    }

    public static class DatasetBuilder5<IN, KEY, VALUE, OUT, W extends Window<W>>
    extends DatasetBuilder4<IN, KEY, VALUE, OUT>
    implements Builders.OutputValues<KEY, OUT> {
        @Nullable
        private final Windowing<IN, W> windowing;

        DatasetBuilder5(String name, Dataset<IN> input, UnaryFunction<IN, KEY> keyExtractor, UnaryFunction<IN, VALUE> valueExtractor, ReduceFunctor<VALUE, OUT> reducer, @Nullable Windowing<IN, W> windowing, @Nullable BinaryFunction<VALUE, VALUE, Integer> valuesComparator) {
            super(name, input, keyExtractor, valueExtractor, reducer, valuesComparator);
            this.windowing = windowing;
        }

        @Override
        public Dataset<Pair<KEY, OUT>> output() {
            Flow flow = this.input.getFlow();
            ReduceByKey reduce = new ReduceByKey(this.name, flow, this.input, this.keyExtractor, this.valueExtractor, this.windowing, this.reducer, this.valuesComparator);
            flow.add(reduce);
            return reduce.output();
        }
    }

    public static class SortableDatasetBuilder4<IN, KEY, VALUE, OUT>
    extends DatasetBuilder4<IN, KEY, VALUE, OUT> {
        SortableDatasetBuilder4(String name, Dataset<IN> input, UnaryFunction<IN, KEY> keyExtractor, UnaryFunction<IN, VALUE> valueExtractor, ReduceFunctor<VALUE, OUT> reducer, @Nullable BinaryFunction<VALUE, VALUE, Integer> valuesComparator) {
            super(name, input, keyExtractor, valueExtractor, reducer, valuesComparator);
        }

        public DatasetBuilder4<IN, KEY, VALUE, OUT> withSortedValues(BinaryFunction<VALUE, VALUE, Integer> comparator) {
            return new SortableDatasetBuilder4<IN, KEY, VALUE, OUT>(this.name, this.input, this.keyExtractor, this.valueExtractor, this.reducer, comparator);
        }
    }

    public static class DatasetBuilder4<IN, KEY, VALUE, OUT>
    implements Builders.Output<Pair<KEY, OUT>>,
    Builders.OutputValues<KEY, OUT>,
    Builders.WindowBy<IN, DatasetBuilder4<IN, KEY, VALUE, OUT>> {
        final String name;
        final Dataset<IN> input;
        final UnaryFunction<IN, KEY> keyExtractor;
        final UnaryFunction<IN, VALUE> valueExtractor;
        final ReduceFunctor<VALUE, OUT> reducer;
        @Nullable
        final BinaryFunction<VALUE, VALUE, Integer> valuesComparator;

        DatasetBuilder4(String name, Dataset<IN> input, UnaryFunction<IN, KEY> keyExtractor, UnaryFunction<IN, VALUE> valueExtractor, ReduceFunctor<VALUE, OUT> reducer, @Nullable BinaryFunction<VALUE, VALUE, Integer> valuesComparator) {
            this.name = Objects.requireNonNull(name);
            this.input = Objects.requireNonNull(input);
            this.keyExtractor = Objects.requireNonNull(keyExtractor);
            this.valueExtractor = Objects.requireNonNull(valueExtractor);
            this.reducer = Objects.requireNonNull(reducer);
            this.valuesComparator = valuesComparator;
        }

        @Override
        public <W extends Window<W>> DatasetBuilder5<IN, KEY, VALUE, OUT, W> windowBy(Windowing<IN, W> windowing) {
            return new DatasetBuilder5<IN, KEY, VALUE, OUT, W>(this.name, this.input, this.keyExtractor, this.valueExtractor, this.reducer, Objects.requireNonNull(windowing), this.valuesComparator);
        }

        @Override
        public Dataset<Pair<KEY, OUT>> output() {
            return new DatasetBuilder5(this.name, this.input, this.keyExtractor, this.valueExtractor, this.reducer, null, this.valuesComparator).output();
        }
    }

    public static class DatasetBuilder3<IN, KEY, VALUE>
    implements ReduceBy<IN, KEY, VALUE> {
        private final String name;
        private final Dataset<IN> input;
        private final UnaryFunction<IN, KEY> keyExtractor;
        private final UnaryFunction<IN, VALUE> valueExtractor;

        DatasetBuilder3(String name, Dataset<IN> input, UnaryFunction<IN, KEY> keyExtractor, UnaryFunction<IN, VALUE> valueExtractor) {
            this.name = Objects.requireNonNull(name);
            this.input = Objects.requireNonNull(input);
            this.keyExtractor = Objects.requireNonNull(keyExtractor);
            this.valueExtractor = Objects.requireNonNull(valueExtractor);
        }

        @Override
        public <OUT> SortableDatasetBuilder4<IN, KEY, VALUE, OUT> reduceBy(ReduceFunctor<VALUE, OUT> reducer) {
            return new SortableDatasetBuilder4<IN, KEY, VALUE, OUT>(this.name, this.input, this.keyExtractor, this.valueExtractor, reducer, null);
        }
    }

    public static class DatasetBuilder2<IN, KEY>
    implements ReduceBy<IN, KEY, IN> {
        private final String name;
        private final Dataset<IN> input;
        private final UnaryFunction<IN, KEY> keyExtractor;

        DatasetBuilder2(String name, Dataset<IN> input, UnaryFunction<IN, KEY> keyExtractor) {
            this.name = Objects.requireNonNull(name);
            this.input = Objects.requireNonNull(input);
            this.keyExtractor = Objects.requireNonNull(keyExtractor);
        }

        public <VALUE> DatasetBuilder3<IN, KEY, VALUE> valueBy(UnaryFunction<IN, VALUE> valueExtractor) {
            return new DatasetBuilder3<IN, KEY, VALUE>(this.name, this.input, this.keyExtractor, valueExtractor);
        }

        public <VALUE> DatasetBuilder3<IN, KEY, VALUE> valueBy(UnaryFunction<IN, VALUE> valueExtractor, TypeHint<VALUE> typeHint) {
            return this.valueBy(TypeAwareUnaryFunction.of(valueExtractor, typeHint));
        }

        @Override
        public <OUT> SortableDatasetBuilder4<IN, KEY, IN, OUT> reduceBy(ReduceFunctor<IN, OUT> reducer) {
            return new SortableDatasetBuilder4<Object, KEY, Object, OUT>(this.name, this.input, this.keyExtractor, e -> e, reducer, null);
        }
    }

    public static interface ReduceBy<IN, KEY, VALUE> {
        default public <OUT> SortableDatasetBuilder4<IN, KEY, VALUE, OUT> reduceBy(ReduceFunction<VALUE, OUT> reducer) {
            return this.reduceBy((in, ctx) -> ctx.collect(reducer.apply(in)));
        }

        public <OUT> SortableDatasetBuilder4<IN, KEY, VALUE, OUT> reduceBy(ReduceFunctor<VALUE, OUT> var1);

        default public DatasetBuilder4<IN, KEY, VALUE, VALUE> combineBy(CombinableReduceFunction<VALUE> reducer) {
            return this.reduceBy(ReduceByKey.toReduceFunctor(reducer));
        }

        default public DatasetBuilder4<IN, KEY, VALUE, VALUE> combineBy(CombinableReduceFunction<VALUE> reducer, TypeHint<VALUE> typeHint) {
            return this.reduceBy(TypeAwareReduceFunctor.of(ReduceByKey.toReduceFunctor(reducer), typeHint));
        }
    }

    public static class KeyByBuilder<IN>
    implements Builders.KeyBy<IN> {
        private final String name;
        private final Dataset<IN> input;

        KeyByBuilder(String name, Dataset<IN> input) {
            this.name = Objects.requireNonNull(name);
            this.input = Objects.requireNonNull(input);
        }

        @Override
        public <KEY> DatasetBuilder2<IN, KEY> keyBy(UnaryFunction<IN, KEY> keyExtractor) {
            return new DatasetBuilder2<IN, KEY>(this.name, this.input, keyExtractor);
        }

        @Override
        public <KEY> DatasetBuilder2<IN, KEY> keyBy(UnaryFunction<IN, KEY> keyExtractor, TypeHint<KEY> typeHint) {
            return new DatasetBuilder2<IN, KEY>(this.name, this.input, TypeAwareUnaryFunction.of(keyExtractor, typeHint));
        }
    }

    public static class OfBuilder
    implements Builders.Of {
        private final String name;

        OfBuilder(String name) {
            this.name = name;
        }

        @Override
        public <IN> KeyByBuilder<IN> of(Dataset<IN> input) {
            return new KeyByBuilder<IN>(this.name, input);
        }
    }
}

