/*
 * Decompiled with CFR 0.152.
 */
package cz.seznam.euphoria.core.client.operator;

import cz.seznam.euphoria.core.annotation.operator.Recommended;
import cz.seznam.euphoria.core.annotation.operator.StateComplexity;
import cz.seznam.euphoria.core.annotation.stability.Experimental;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.BinaryFunctor;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.io.Collector;
import cz.seznam.euphoria.core.client.operator.Builders;
import cz.seznam.euphoria.core.client.operator.HintAware;
import cz.seznam.euphoria.core.client.operator.JoinHint;
import cz.seznam.euphoria.core.client.operator.MapElements;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.client.operator.OptionalMethodBuilder;
import cz.seznam.euphoria.core.client.operator.ReduceStateByKey;
import cz.seznam.euphoria.core.client.operator.StateAwareWindowWiseOperator;
import cz.seznam.euphoria.core.client.operator.StateSupport;
import cz.seznam.euphoria.core.client.operator.Union;
import cz.seznam.euphoria.core.client.operator.state.ListStorage;
import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.State;
import cz.seznam.euphoria.core.client.operator.state.StorageProvider;
import cz.seznam.euphoria.core.client.util.Either;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.executor.graph.DAG;
import cz.seznam.euphoria.shadow.com.google.common.annotations.VisibleForTesting;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nullable;

@Recommended(reason="Might be useful to override because of performance reasons in a specific join types (e.g. sort join), which might reduce the space complexity", state=StateComplexity.LINEAR, repartitions=1)
public class Join<LEFT, RIGHT, KEY, OUT, W extends Window<W>>
extends StateAwareWindowWiseOperator<Object, Either<LEFT, RIGHT>, Either<LEFT, RIGHT>, KEY, Pair<KEY, OUT>, W, Join<LEFT, RIGHT, KEY, OUT, W>>
implements HintAware<JoinHint>,
Builders.OutputValues<KEY, OUT> {
    private final Dataset<LEFT> left;
    private final Dataset<RIGHT> right;
    private final Dataset<Pair<KEY, OUT>> output;
    private final BinaryFunctor<LEFT, RIGHT, OUT> functor;
    @VisibleForTesting
    final UnaryFunction<LEFT, KEY> leftKeyExtractor;
    @VisibleForTesting
    final UnaryFunction<RIGHT, KEY> rightKeyExtractor;
    private final Type type;
    private final Set<JoinHint> hints;
    private static final ListStorageDescriptor LEFT_STATE_DESCR = ListStorageDescriptor.of("left", Object.class);
    private static final ListStorageDescriptor RIGHT_STATE_DESCR = ListStorageDescriptor.of("right", Object.class);

    public static <LEFT, RIGHT> ByBuilder<LEFT, RIGHT> of(Dataset<LEFT> left, Dataset<RIGHT> right) {
        return new OfBuilder("Join").of(left, right);
    }

    public static OfBuilder named(String name) {
        return new OfBuilder(name);
    }

    Join(String name, Flow flow, Dataset<LEFT> left, Dataset<RIGHT> right, UnaryFunction<LEFT, KEY> leftKeyExtractor, UnaryFunction<RIGHT, KEY> rightKeyExtractor, BinaryFunctor<LEFT, RIGHT, OUT> functor, Type type, @Nullable Windowing<Either<LEFT, RIGHT>, W> windowing, Set<JoinHint> hints) {
        super(name, flow, windowing, elem -> {
            if (elem.isLeft()) {
                return leftKeyExtractor.apply(elem.left());
            }
            return rightKeyExtractor.apply(elem.right());
        });
        this.left = left;
        this.right = right;
        this.leftKeyExtractor = leftKeyExtractor;
        this.rightKeyExtractor = rightKeyExtractor;
        this.functor = functor;
        Dataset output = this.createOutput(left);
        this.output = output;
        this.type = type;
        this.hints = Objects.requireNonNull(hints);
    }

    @Override
    public Collection<Dataset<Object>> listInputs() {
        return Arrays.asList(this.left, this.right);
    }

    @Override
    public Dataset<Pair<KEY, OUT>> output() {
        return this.output;
    }

    public Type getType() {
        return this.type;
    }

    public UnaryFunction<LEFT, KEY> getLeftKeyExtractor() {
        return this.leftKeyExtractor;
    }

    public UnaryFunction<RIGHT, KEY> getRightKeyExtractor() {
        return this.rightKeyExtractor;
    }

    public BinaryFunctor<LEFT, RIGHT, OUT> getJoiner() {
        return this.functor;
    }

    @Override
    public Set<JoinHint> getHints() {
        return this.hints;
    }

    @Override
    public DAG<Operator<?, ?>> getBasicOps() {
        Flow flow = this.getFlow();
        MapElements<Object, Either> leftMap = new MapElements<Object, Either>(this.getName() + "::Map-left", flow, this.left, Either::left);
        MapElements<Object, Either> rightMap = new MapElements<Object, Either>(this.getName() + "::Map-right", flow, this.right, Either::right);
        Union union = new Union(this.getName() + "::Union", flow, Arrays.asList(leftMap.output(), rightMap.output()));
        ReduceStateByKey reduce = new ReduceStateByKey(this.getName() + "::ReduceStateByKey", flow, union.output(), this.keyExtractor, e -> e, this.getWindowing(), (context, ctx) -> {
            StorageProvider storages = context.getStorageProvider();
            return ctx == null ? new StableJoinState(storages) : new EarlyEmittingJoinState(storages, ctx);
        }, new StateSupport.MergeFromStateMerger());
        DAG<Operator<?, ?>> dag = DAG.of(leftMap, rightMap);
        dag.add(union, (Operator[])new Operator[]{leftMap, rightMap});
        dag.add(reduce, (Operator[])new Operator[]{union});
        return dag;
    }

    @Experimental
    private class EarlyEmittingJoinState
    extends AbstractJoinState
    implements State<Either<LEFT, RIGHT>, OUT>,
    StateSupport.MergeFrom<EarlyEmittingJoinState> {
        private final Collector<OUT> context;

        public EarlyEmittingJoinState(StorageProvider storageProvider, Collector<OUT> context) {
            super(storageProvider);
            this.context = Objects.requireNonNull(context);
        }

        @Override
        public void add(Either<LEFT, RIGHT> elem) {
            if (elem.isLeft()) {
                this.leftElements.add(elem.left());
                this.emitJoinedElements(elem, this.rightElements);
            } else {
                this.rightElements.add(elem.right());
                this.emitJoinedElements(elem, this.leftElements);
            }
        }

        private void emitJoinedElements(Either<LEFT, RIGHT> elem, ListStorage others) {
            assert (this.context != null);
            if (elem.isLeft()) {
                for (Object right : others.get()) {
                    Join.this.functor.apply(elem.left(), right, this.context);
                }
            } else {
                for (Object left : others.get()) {
                    Join.this.functor.apply(left, elem.right(), this.context);
                }
            }
        }

        @Override
        public void flush(Collector<OUT> context) {
        }

        @Override
        public void close() {
            if (Join.this.type != Type.INNER) {
                this.flushUnjoinedElems(this.context, this.leftElements.get(), this.rightElements.get());
            }
            super.close();
        }

        @Override
        public void mergeFrom(EarlyEmittingJoinState other) {
            Iterable otherLefts = other.leftElements.get();
            Iterable thisRights = this.rightElements.get();
            for (Object l : otherLefts) {
                for (Object r : thisRights) {
                    Join.this.functor.apply(l, r, this.context);
                }
            }
            Iterable otherRights = other.rightElements.get();
            Iterable thisLefts = this.leftElements.get();
            for (Object r : otherRights) {
                for (Object l : thisLefts) {
                    Join.this.functor.apply(l, r, this.context);
                }
            }
            this.leftElements.addAll(otherLefts);
            this.rightElements.addAll(otherRights);
        }
    }

    private class StableJoinState
    extends AbstractJoinState
    implements StateSupport.MergeFrom<StableJoinState> {
        StableJoinState(StorageProvider storageProvider) {
            super(storageProvider);
        }

        @Override
        public void add(Either<LEFT, RIGHT> elem) {
            if (elem.isLeft()) {
                this.leftElements.add(elem.left());
            } else {
                this.rightElements.add(elem.right());
            }
        }

        @Override
        public void flush(Collector<OUT> context) {
            Iterable lefts = this.leftElements.get();
            Iterable rights = this.rightElements.get();
            for (Object l : lefts) {
                for (Object r : rights) {
                    Join.this.functor.apply(l, r, context);
                }
            }
            if (Join.this.type != Type.INNER) {
                this.flushUnjoinedElems(context, lefts, rights);
            }
        }

        @Override
        public void mergeFrom(StableJoinState other) {
            this.leftElements.addAll(other.leftElements.get());
            this.rightElements.addAll(other.rightElements.get());
        }
    }

    private abstract class AbstractJoinState
    implements State<Either<LEFT, RIGHT>, OUT> {
        final ListStorage<LEFT> leftElements;
        final ListStorage<RIGHT> rightElements;

        AbstractJoinState(StorageProvider storageProvider) {
            this.leftElements = storageProvider.getListStorage(LEFT_STATE_DESCR);
            this.rightElements = storageProvider.getListStorage(RIGHT_STATE_DESCR);
        }

        @Override
        public void close() {
            this.leftElements.clear();
            this.rightElements.clear();
        }

        void flushUnjoinedElems(Collector<OUT> context, Iterable<LEFT> lefts, Iterable<RIGHT> rights) {
            boolean rightEmpty;
            boolean leftEmpty = !lefts.iterator().hasNext();
            boolean bl = rightEmpty = !rights.iterator().hasNext();
            if (leftEmpty != rightEmpty) {
                switch (Join.this.getType()) {
                    case LEFT: {
                        if (!rightEmpty) break;
                        for (Object elem : lefts) {
                            Join.this.functor.apply(elem, null, context);
                        }
                        break;
                    }
                    case RIGHT: {
                        if (!leftEmpty) break;
                        for (Object elem : rights) {
                            Join.this.functor.apply(null, elem, context);
                        }
                        break;
                    }
                    case FULL: {
                        if (leftEmpty) {
                            for (Object elem : rights) {
                                Join.this.functor.apply(null, elem, context);
                            }
                        } else {
                            for (Object elem : lefts) {
                                Join.this.functor.apply(elem, null, context);
                            }
                        }
                        break;
                    }
                    default: {
                        throw new IllegalArgumentException("Unsupported type: " + (Object)((Object)Join.this.getType()));
                    }
                }
            }
        }
    }

    public static class OutputBuilder<LEFT, RIGHT, KEY, OUT, W extends Window<W>>
    implements Builders.OutputValues<KEY, OUT>,
    Builders.Output<Pair<KEY, OUT>> {
        private final String name;
        private final Dataset<LEFT> left;
        private final Dataset<RIGHT> right;
        private final UnaryFunction<LEFT, KEY> leftKeyExtractor;
        private final UnaryFunction<RIGHT, KEY> rightKeyExtractor;
        private final BinaryFunctor<LEFT, RIGHT, OUT> joinFunc;
        private final Type type;
        @Nullable
        private final Windowing<Either<LEFT, RIGHT>, W> windowing;
        private final Set<JoinHint> hints;

        OutputBuilder(String name, Dataset<LEFT> left, Dataset<RIGHT> right, UnaryFunction<LEFT, KEY> leftKeyExtractor, UnaryFunction<RIGHT, KEY> rightKeyExtractor, BinaryFunctor<LEFT, RIGHT, OUT> joinFunc, Type type, @Nullable Windowing<Either<LEFT, RIGHT>, W> windowing, Set<JoinHint> hints) {
            this.name = Objects.requireNonNull(name);
            this.left = Objects.requireNonNull(left);
            this.right = Objects.requireNonNull(right);
            this.leftKeyExtractor = Objects.requireNonNull(leftKeyExtractor);
            this.rightKeyExtractor = Objects.requireNonNull(rightKeyExtractor);
            this.joinFunc = Objects.requireNonNull(joinFunc);
            this.type = Objects.requireNonNull(type);
            this.windowing = windowing;
            this.hints = Objects.requireNonNull(hints);
        }

        @Override
        public Dataset<Pair<KEY, OUT>> output() {
            Flow flow = this.left.getFlow();
            Join<LEFT, RIGHT, KEY, OUT, W> join = new Join<LEFT, RIGHT, KEY, OUT, W>(this.name, flow, this.left, this.right, this.leftKeyExtractor, this.rightKeyExtractor, this.joinFunc, this.type, this.windowing, this.hints);
            flow.add(join);
            return join.output();
        }
    }

    public static class HintBuilderOutput<LEFT, RIGHT, KEY, OUT, W extends Window<W>>
    implements Builders.OutputWithHint<Pair<KEY, OUT>, JoinHint>,
    Builders.OutputValues<KEY, OUT> {
        private final String name;
        private final Dataset<LEFT> left;
        private final Dataset<RIGHT> right;
        private final UnaryFunction<LEFT, KEY> leftKeyExtractor;
        private final UnaryFunction<RIGHT, KEY> rightKeyExtractor;
        private final BinaryFunctor<LEFT, RIGHT, OUT> joinFunc;
        private final Type type;
        @Nullable
        private final Windowing<Either<LEFT, RIGHT>, W> windowing;

        HintBuilderOutput(String name, Dataset<LEFT> left, Dataset<RIGHT> right, UnaryFunction<LEFT, KEY> leftKeyExtractor, UnaryFunction<RIGHT, KEY> rightKeyExtractor, BinaryFunctor<LEFT, RIGHT, OUT> joinFunc, Type type, @Nullable Windowing<Either<LEFT, RIGHT>, W> windowing) {
            this.name = Objects.requireNonNull(name);
            this.left = Objects.requireNonNull(left);
            this.right = Objects.requireNonNull(right);
            this.leftKeyExtractor = Objects.requireNonNull(leftKeyExtractor);
            this.rightKeyExtractor = Objects.requireNonNull(rightKeyExtractor);
            this.joinFunc = Objects.requireNonNull(joinFunc);
            this.type = Objects.requireNonNull(type);
            this.windowing = windowing;
        }

        @Override
        public Dataset<Pair<KEY, OUT>> output() {
            return ((OutputBuilder)this.withHints(Collections.emptySet())).output();
        }

        public OutputBuilder<LEFT, RIGHT, KEY, OUT, W> withHints(Set<JoinHint> hints) {
            return new OutputBuilder<LEFT, RIGHT, KEY, OUT, W>(this.name, this.left, this.right, this.leftKeyExtractor, this.rightKeyExtractor, this.joinFunc, this.type, this.windowing, hints);
        }
    }

    public static class WindowingBuilder<LEFT, RIGHT, KEY, OUT>
    implements Builders.OutputWithHint<Pair<KEY, OUT>, JoinHint>,
    Builders.OutputValues<KEY, OUT>,
    OptionalMethodBuilder<WindowingBuilder<LEFT, RIGHT, KEY, OUT>> {
        private final String name;
        private final Dataset<LEFT> left;
        private final Dataset<RIGHT> right;
        private final UnaryFunction<LEFT, KEY> leftKeyExtractor;
        private final UnaryFunction<RIGHT, KEY> rightKeyExtractor;
        private final BinaryFunctor<LEFT, RIGHT, OUT> joinFunc;
        private final Type type;

        WindowingBuilder(String name, Dataset<LEFT> left, Dataset<RIGHT> right, UnaryFunction<LEFT, KEY> leftKeyExtractor, UnaryFunction<RIGHT, KEY> rightKeyExtractor, BinaryFunctor<LEFT, RIGHT, OUT> joinFunc, Type type) {
            this.name = Objects.requireNonNull(name);
            this.left = Objects.requireNonNull(left);
            this.right = Objects.requireNonNull(right);
            this.leftKeyExtractor = Objects.requireNonNull(leftKeyExtractor);
            this.rightKeyExtractor = Objects.requireNonNull(rightKeyExtractor);
            this.joinFunc = Objects.requireNonNull(joinFunc);
            this.type = Objects.requireNonNull(type);
        }

        @Override
        public Dataset<Pair<KEY, OUT>> output() {
            return ((OutputBuilder)this.windowBy(null).withHints(Collections.emptySet())).output();
        }

        public OutputBuilder<LEFT, RIGHT, KEY, OUT, ?> withHints(Set<JoinHint> hints) {
            return this.windowBy(null).withHints((Set)hints);
        }

        public <W extends Window<W>> HintBuilderOutput<LEFT, RIGHT, KEY, OUT, W> windowBy(Windowing<Either<LEFT, RIGHT>, W> windowing) {
            return new HintBuilderOutput<LEFT, RIGHT, KEY, OUT, W>(this.name, this.left, this.right, this.leftKeyExtractor, this.rightKeyExtractor, this.joinFunc, this.type, windowing);
        }
    }

    public static class UsingBuilder<LEFT, RIGHT, KEY> {
        private final String name;
        private final Dataset<LEFT> left;
        private final Dataset<RIGHT> right;
        private final UnaryFunction<LEFT, KEY> leftKeyExtractor;
        private final UnaryFunction<RIGHT, KEY> rightKeyExtractor;

        UsingBuilder(String name, Dataset<LEFT> left, Dataset<RIGHT> right, UnaryFunction<LEFT, KEY> leftKeyExtractor, UnaryFunction<RIGHT, KEY> rightKeyExtractor) {
            this.name = name;
            this.left = left;
            this.right = right;
            this.leftKeyExtractor = leftKeyExtractor;
            this.rightKeyExtractor = rightKeyExtractor;
        }

        public <OUT> WindowingBuilder<LEFT, RIGHT, KEY, OUT> using(BinaryFunctor<LEFT, RIGHT, OUT> functor) {
            return new WindowingBuilder<LEFT, RIGHT, KEY, OUT>(this.name, this.left, this.right, this.leftKeyExtractor, this.rightKeyExtractor, functor, Type.INNER);
        }
    }

    public static class ByBuilder<LEFT, RIGHT> {
        private final String name;
        private final Dataset<LEFT> left;
        private final Dataset<RIGHT> right;

        ByBuilder(String name, Dataset<LEFT> left, Dataset<RIGHT> right) {
            this.name = Objects.requireNonNull(name);
            this.left = Objects.requireNonNull(left);
            this.right = Objects.requireNonNull(right);
        }

        public <KEY> UsingBuilder<LEFT, RIGHT, KEY> by(UnaryFunction<LEFT, KEY> leftKeyExtractor, UnaryFunction<RIGHT, KEY> rightKeyExtractor) {
            return new UsingBuilder<LEFT, RIGHT, KEY>(this.name, this.left, this.right, leftKeyExtractor, rightKeyExtractor);
        }
    }

    public static class OfBuilder {
        private final String name;

        OfBuilder(String name) {
            this.name = name;
        }

        public <LEFT, RIGHT> ByBuilder<LEFT, RIGHT> of(Dataset<LEFT> left, Dataset<RIGHT> right) {
            if (right.getFlow() != left.getFlow()) {
                throw new IllegalArgumentException("Pass inputs from the same flow");
            }
            return new ByBuilder<LEFT, RIGHT>(this.name, left, right);
        }
    }

    public static enum Type {
        INNER,
        LEFT,
        RIGHT,
        FULL;

    }
}

