/*
 * Decompiled with CFR 0.152.
 */
package cz.seznam.euphoria.core.executor.greduce;

import cz.seznam.euphoria.core.client.accumulators.AccumulatorProvider;
import cz.seznam.euphoria.core.client.accumulators.Counter;
import cz.seznam.euphoria.core.client.accumulators.Histogram;
import cz.seznam.euphoria.core.client.accumulators.Timer;
import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.functional.BinaryFunction;
import cz.seznam.euphoria.core.client.io.Context;
import cz.seznam.euphoria.core.client.operator.state.ListStorage;
import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.MergingStorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.State;
import cz.seznam.euphoria.core.client.operator.state.StateContext;
import cz.seznam.euphoria.core.client.operator.state.StateFactory;
import cz.seznam.euphoria.core.client.operator.state.StateMerger;
import cz.seznam.euphoria.core.client.operator.state.Storage;
import cz.seznam.euphoria.core.client.operator.state.StorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.ValueStorage;
import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor;
import cz.seznam.euphoria.core.client.triggers.Trigger;
import cz.seznam.euphoria.core.client.triggers.TriggerContext;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.executor.greduce.TimerSupport;
import cz.seznam.euphoria.core.executor.greduce.TriggerStorage;
import cz.seznam.euphoria.shadow.com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;

public class GroupReducer<WID extends Window, KEY, I> {
    private final StateFactory<I, ?, State<I, ?>> stateFactory;
    private final StateMerger<I, ?, State<I, ?>> stateCombiner;
    private final WindowedElementFactory<WID, Object> elementFactory;
    private final StateContext stateContext;
    private final Collector<WindowedElement<?, Pair<KEY, ?>>> collector;
    private final Windowing windowing;
    private final Trigger trigger;
    private final AccumulatorProvider accumulators;
    final TriggerStorage triggerStorage;
    final TimerSupport<WID> clock = new TimerSupport();
    final HashMap<WID, State> states = new HashMap();
    KEY key;

    public GroupReducer(StateFactory<I, ?, State<I, ?>> stateFactory, StateMerger<I, ?, State<I, ?>> stateCombiner, StateContext stateContext, WindowedElementFactory<WID, Object> elementFactory, Windowing windowing, Trigger trigger, Collector<WindowedElement<?, Pair<KEY, ?>>> collector, AccumulatorProvider accumulators) {
        this.stateFactory = Objects.requireNonNull(stateFactory);
        this.elementFactory = Objects.requireNonNull(elementFactory);
        this.stateCombiner = Objects.requireNonNull(stateCombiner);
        this.stateContext = Objects.requireNonNull(stateContext);
        this.collector = Objects.requireNonNull(collector);
        this.windowing = Objects.requireNonNull(windowing);
        this.trigger = Objects.requireNonNull(trigger);
        this.accumulators = Objects.requireNonNull(accumulators);
        this.triggerStorage = new TriggerStorage(stateContext.getStorageProvider());
    }

    public void process(WindowedElement<WID, Pair<KEY, I>> elem) {
        this.updateKey(elem);
        this.clock.updateStamp(elem.getTimestamp(), this::onTimerCallback);
        WID window = elem.getWindow();
        if (this.windowing instanceof MergingWindowing) {
            window = this.mergeWindows(window);
        }
        State state = this.getStateForUpdate(window);
        state.add(elem.getElement().getSecond());
        ElementTriggerContext trgCtx = new ElementTriggerContext((Window)window);
        Trigger.TriggerResult windowTr = this.trigger.onElement(elem.getTimestamp(), window, trgCtx);
        this.processTriggerResult(window, trgCtx, windowTr);
    }

    private State getStateForUpdate(WID window) {
        return this.states.computeIfAbsent(window, w -> this.stateFactory.createState(this.stateContext, null));
    }

    public void close() {
        this.clock.updateStamp(Long.MAX_VALUE, this::onTimerCallback);
        for (Window window : new ArrayList<WID>(this.states.keySet())) {
            this.processTriggerResult(window, new ElementTriggerContext(window), Trigger.TriggerResult.FLUSH_AND_PURGE);
        }
    }

    private void onTimerCallback(long stamp, WID window) {
        ElementTriggerContext trgCtx = new ElementTriggerContext((Window)window);
        this.processTriggerResult(window, trgCtx, this.trigger.onTimer(stamp, window, trgCtx));
    }

    private WID mergeWindows(WID newWindow) {
        if (this.states.containsKey(newWindow)) {
            return newWindow;
        }
        Collection<Pair<Collection<WID>, WID>> merges = ((MergingWindowing)this.windowing).mergeWindows(this.getActivesWindowsPlus(newWindow));
        for (Pair<Collection<WID>, WID> merge : merges) {
            Collection<WID> sources = merge.getFirst();
            Window target = (Window)merge.getSecond();
            if (sources.contains(newWindow)) {
                newWindow = target;
            }
            sources.remove(target);
            if (sources.isEmpty()) continue;
            State targetState = this.getStateForUpdate(target);
            List<State> sourceStates = this.removeStatesForMerging(sources);
            this.stateCombiner.merge(targetState, sourceStates);
            this.trigger.onMerge(target, new MergingTriggerContext(sources, target));
            for (Window source : sources) {
                if (source.equals(newWindow)) continue;
                this.trigger.onClear(source, new ElementTriggerContext(source));
            }
        }
        return newWindow;
    }

    private List<WID> getActivesWindowsPlus(WID newWindow) {
        ArrayList<WID> actives = new ArrayList<WID>(this.states.keySet().size() + 1);
        actives.addAll(this.states.keySet());
        actives.add(newWindow);
        return actives;
    }

    private List<State> removeStatesForMerging(Collection<WID> windows) {
        ArrayList<State> xs = new ArrayList<State>(windows.size());
        for (Window window : windows) {
            State x = this.states.remove(window);
            if (x == null) continue;
            xs.add(x);
        }
        return xs;
    }

    private void updateKey(WindowedElement<WID, Pair<KEY, I>> elem) {
        if (this.key == null) {
            this.key = elem.getElement().getFirst();
        } else {
            Preconditions.checkState(this.key.equals(elem.getElement().getFirst()));
        }
    }

    private void processTriggerResult(WID window, ElementTriggerContext trgCtx, Trigger.TriggerResult tr) {
        if (tr.isFlush() && tr.isPurge()) {
            State state = this.states.remove(window);
            if (state != null) {
                state.flush(new ElementCollector(this, this.collector, window));
                state.close();
            }
            this.trigger.onClear(window, trgCtx);
        }
    }

    class MergingTriggerContext
    extends ElementTriggerContext
    implements TriggerContext.TriggerMergeContext {
        private Collection<? extends Window> sources;

        MergingTriggerContext(Collection<? extends Window> srcs, Window trgt) {
            super(trgt);
            this.sources = srcs;
        }

        @Override
        public void mergeStoredState(StorageDescriptor descriptor) {
            Storage merged;
            if (!(descriptor instanceof MergingStorageDescriptor)) {
                throw new IllegalStateException("Storage descriptor must support merging!");
            }
            MergingStorageDescriptor descr = (MergingStorageDescriptor)((Object)descriptor);
            BinaryFunction mergeFn = descr.getMerger();
            if (descr instanceof ValueStorageDescriptor) {
                merged = this.getValueStorage((ValueStorageDescriptor)((Object)descr));
            } else if (descr instanceof ListStorageDescriptor) {
                merged = this.getListStorage((ListStorageDescriptor)((Object)descr));
            } else {
                throw new IllegalStateException("Cannot merge states for " + descr);
            }
            for (Window window : this.sources) {
                Storage<?> s = GroupReducer.this.triggerStorage.getStorage(window, descriptor);
                if (s == null) continue;
                mergeFn.apply(merged, s);
            }
        }
    }

    class ElementTriggerContext
    implements TriggerContext {
        protected final Window window;

        ElementTriggerContext(Window window) {
            this.window = Objects.requireNonNull(window);
        }

        @Override
        public boolean registerTimer(long stamp, Window window) {
            GroupReducer.this.clock.registerTimer(stamp, window);
            return true;
        }

        @Override
        public void deleteTimer(long stamp, Window window) {
            GroupReducer.this.clock.deleteTimer(stamp, window);
        }

        @Override
        public long getCurrentTimestamp() {
            return GroupReducer.this.clock.getStamp();
        }

        @Override
        public <T> ValueStorage<T> getValueStorage(ValueStorageDescriptor<T> descriptor) {
            return GroupReducer.this.triggerStorage.getValueStorage(this.window, descriptor);
        }

        @Override
        public <T> ListStorage<T> getListStorage(ListStorageDescriptor<T> descriptor) {
            return GroupReducer.this.triggerStorage.getListStorage(this.window, descriptor);
        }
    }

    static class ElementCollector<T>
    implements Context,
    cz.seznam.euphoria.core.client.io.Collector<T> {
        final Collector<WindowedElement<WID, Pair<KEY, T>>> out;
        final WID window;
        final /* synthetic */ GroupReducer this$0;

        ElementCollector(Collector<WindowedElement<WID, Pair<KEY, T>>> out, WID window) {
            this.this$0 = this$0;
            this.out = out;
            this.window = window;
        }

        @Override
        public void collect(T elem) {
            this.out.collect(this.this$0.elementFactory.create(this.window, ((Window)this.window).maxTimestamp() - 1L, Pair.of(this.this$0.key, elem)));
        }

        @Override
        public Context asContext() {
            return this;
        }

        @Override
        public Window<?> getWindow() {
            return this.window;
        }

        @Override
        public Counter getCounter(String name) {
            return this.this$0.accumulators.getCounter(name);
        }

        @Override
        public Histogram getHistogram(String name) {
            return this.this$0.accumulators.getHistogram(name);
        }

        @Override
        public Timer getTimer(String name) {
            return this.this$0.accumulators.getTimer(name);
        }
    }

    @FunctionalInterface
    public static interface WindowedElementFactory<W extends Window, T> {
        public WindowedElement<W, T> create(W var1, long var2, T var4);
    }

    @FunctionalInterface
    public static interface Collector<T> {
        public void collect(T var1);
    }
}

