package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Throwables;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Maps;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Sets;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.util.ActiveWindowSet;
import com.google.cloud.dataflow.sdk.util.ReduceFnContextFactory;
import com.google.cloud.dataflow.sdk.util.TimerInternals;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.util.state.StateContents;
import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
import com.google.cloud.dataflow.sdk.values.KV;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/util/ReduceFnRunner.class */
public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
    public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
    public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
    private final WindowingStrategy<Object, W> windowingStrategy;
    private final WindowingInternals<?, KV<K, OutputT>> windowingInternals;
    private final Aggregator<Long, Long> droppedDueToClosedWindow;
    private final Aggregator<Long, Long> droppedDueToLateness;
    private final K key;
    private final ActiveWindowSet<W> activeWindows = createActiveWindowSet();
    private final ReduceFn<K, InputT, OutputT, W> reduceFn;
    private final TimerInternals timerInternals;
    private final TriggerRunner<W> triggerRunner;
    private final WatermarkHold<W> watermarkHold;
    private final ReduceFnContextFactory<K, InputT, OutputT, W> contextFactory;
    private final PaneInfoTracker paneInfoTracker;
    private final NonEmptyPanes<W> nonEmptyPanes;

    public ReduceFnRunner(K k, WindowingStrategy<?, W> windowingStrategy, TimerInternals timerInternals, WindowingInternals<?, KV<K, OutputT>> windowingInternals, Aggregator<Long, Long> aggregator, Aggregator<Long, Long> aggregator2, ReduceFn<K, InputT, OutputT, W> reduceFn) {
        this.key = k;
        this.timerInternals = timerInternals;
        this.paneInfoTracker = new PaneInfoTracker(timerInternals);
        this.windowingInternals = windowingInternals;
        this.droppedDueToClosedWindow = aggregator;
        this.droppedDueToLateness = aggregator2;
        this.reduceFn = reduceFn;
        this.windowingStrategy = windowingStrategy;
        this.nonEmptyPanes = NonEmptyPanes.create(this.windowingStrategy, this.reduceFn);
        this.contextFactory = new ReduceFnContextFactory<>(k, reduceFn, this.windowingStrategy, this.windowingInternals.stateInternals(), this.activeWindows, timerInternals);
        this.watermarkHold = new WatermarkHold<>(timerInternals, windowingStrategy);
        this.triggerRunner = new TriggerRunner<>(windowingStrategy.getTrigger(), new TriggerContextFactory(windowingStrategy, this.windowingInternals.stateInternals(), this.activeWindows));
    }

    private ActiveWindowSet<W> createActiveWindowSet() {
        return this.windowingStrategy.getWindowFn().isNonMerging() ? new NonMergingActiveWindowSet() : new MergingActiveWindowSet(this.windowingStrategy.getWindowFn(), this.windowingInternals.stateInternals());
    }

    @VisibleForTesting
    boolean isFinished(W w) {
        return this.triggerRunner.isClosed(this.contextFactory.base(w).state());
    }

    public void processElements(Iterable<WindowedValue<InputT>> iterable) {
        HashMap newHashMap = Maps.newHashMap();
        if (!this.windowingStrategy.getWindowFn().isNonMerging()) {
            collectAndMergeWindows(iterable, newHashMap);
        }
        Iterator<WindowedValue<InputT>> it = iterable.iterator();
        while (it.hasNext()) {
            processElement(newHashMap, it.next());
        }
        for (Map.Entry<W, Trigger.TriggerResult> entry : newHashMap.entrySet()) {
            handleTriggerResult(this.contextFactory.base(entry.getKey()), false, entry.getValue());
        }
        this.activeWindows.removeEphemeralWindows();
    }

    public void persist() {
        this.activeWindows.persist();
    }

    private boolean canDropDueToExpiredWindow(W w) {
        Instant currentInputWatermarkTime = this.timerInternals.currentInputWatermarkTime();
        return currentInputWatermarkTime != null && w.maxTimestamp().plus(this.windowingStrategy.getAllowedLateness()).isBefore(currentInputWatermarkTime);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void collectAndMergeWindows(Iterable<WindowedValue<InputT>> iterable, Map<W, Trigger.TriggerResult> map) {
        HashSet newHashSet = Sets.newHashSet(this.activeWindows.getActiveWindows());
        Iterator<WindowedValue<InputT>> it = iterable.iterator();
        while (it.hasNext()) {
            for (BoundedWindow boundedWindow : it.next().getWindows()) {
                if (!canDropDueToExpiredWindow(boundedWindow)) {
                    if (!this.triggerRunner.isClosed(this.contextFactory.base(boundedWindow).state())) {
                        this.activeWindows.addNew(boundedWindow);
                    }
                }
            }
        }
        mergeActiveWindows(map);
        Iterator it2 = Sets.difference(this.activeWindows.getActiveWindows(), newHashSet).iterator();
        while (it2.hasNext()) {
            scheduleEndOfWindowOrGarbageCollectionTimer(this.contextFactory.base((BoundedWindow) it2.next()));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void mergeActiveWindows(final Map<W, Trigger.TriggerResult> map) {
        try {
            this.activeWindows.merge(new ActiveWindowSet.MergeCallback<W>() { // from class: com.google.cloud.dataflow.sdk.util.ReduceFnRunner.1
                @Override // com.google.cloud.dataflow.sdk.util.ActiveWindowSet.MergeCallback
                public void onMerge(Collection<W> collection, Collection<W> collection2, W w) throws Exception {
                    ReduceFn<K, InputT, OutputT, W>.OnMergeContext forMerge = ReduceFnRunner.this.contextFactory.forMerge(collection, w);
                    ReduceFnRunner.this.triggerRunner.prefetchForMerge(forMerge.state());
                    try {
                        ReduceFnRunner.this.reduceFn.onMerge(forMerge);
                        ReduceFnRunner.this.watermarkHold.onMerge(forMerge);
                        try {
                            Trigger.TriggerResult onMerge = ReduceFnRunner.this.triggerRunner.onMerge(forMerge);
                            if (onMerge.isFire()) {
                                map.put(w, onMerge);
                            }
                            for (W w2 : collection2) {
                                if (!w2.equals(w)) {
                                    WindowTracing.debug("ReduceFnRunner.mergeActiveWindows/onMerge: Merging {} into {}", w2, w);
                                    ReduceFn<K, InputT, OutputT, W>.Context base = ReduceFnRunner.this.contextFactory.base(w2);
                                    ReduceFnRunner.this.cancelEndOfWindowAndGarbageCollectionTimers(base);
                                    ReduceFnRunner.this.triggerRunner.clearEverything(base);
                                    ReduceFnRunner.this.paneInfoTracker.clear(base.state());
                                }
                            }
                        } catch (Exception e) {
                            Throwables.propagateIfPossible(e);
                            throw new RuntimeException("Failed to merge the triggers", e);
                        }
                    } catch (Exception e2) {
                        throw ReduceFnRunner.this.wrapMaybeUserException(e2);
                    }
                }
            });
        } catch (Exception e) {
            Throwables.propagateIfPossible(e);
            throw new RuntimeException("Exception while merging windows", e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void processElement(Map<W, Trigger.TriggerResult> map, WindowedValue<InputT> windowedValue) {
        ArrayList<BoundedWindow> arrayList = new ArrayList();
        for (BoundedWindow boundedWindow : windowedValue.getWindows()) {
            if (canDropDueToExpiredWindow(boundedWindow)) {
                this.droppedDueToLateness.addValue(1L);
                WindowTracing.debug("ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} since too far behind inputWatermark:{}; outputWatermark:{}", windowedValue.getTimestamp(), this.key, boundedWindow, this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime());
            } else {
                BoundedWindow representative = this.activeWindows.representative(boundedWindow);
                Preconditions.checkState(representative != null, "Window %s should have been added", boundedWindow);
                arrayList.add(representative);
            }
        }
        for (BoundedWindow boundedWindow2 : arrayList) {
            if (!map.containsKey(boundedWindow2)) {
                this.triggerRunner.prefetchForValue(this.contextFactory.forValue(boundedWindow2, windowedValue.getValue(), windowedValue.getTimestamp()).state());
            }
        }
        for (BoundedWindow boundedWindow3 : arrayList) {
            ReduceFn<?, ?, ?, W>.ProcessValueContext forValue = this.contextFactory.forValue(boundedWindow3, windowedValue.getValue(), windowedValue.getTimestamp());
            if (map.containsKey(boundedWindow3) || !this.triggerRunner.isClosed(forValue.state())) {
                this.nonEmptyPanes.recordContent(forValue);
                if (this.windowingStrategy.getWindowFn().isNonMerging()) {
                    scheduleEndOfWindowOrGarbageCollectionTimer(forValue);
                }
                this.watermarkHold.addHolds(forValue);
                try {
                    this.reduceFn.processValue(forValue);
                    if (map.containsKey(boundedWindow3)) {
                        continue;
                    } else {
                        try {
                            Trigger.TriggerResult processValue = this.triggerRunner.processValue(forValue);
                            if (processValue.isFire()) {
                                map.put(boundedWindow3, processValue);
                            }
                        } catch (Exception e) {
                            Throwables.propagateIfPossible(e);
                            throw new RuntimeException("Failed to run trigger", e);
                        }
                    }
                } catch (Exception e2) {
                    throw wrapMaybeUserException(e2);
                }
            } else {
                this.droppedDueToClosedWindow.addValue(1L);
                WindowTracing.debug("ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} since window is no longer active at inputWatermark:{}; outputWatermark:{}", windowedValue.getTimestamp(), this.key, boundedWindow3, this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime());
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void onTimer(TimerInternals.TimerData timerData) {
        Preconditions.checkArgument(timerData.getNamespace() instanceof StateNamespaces.WindowNamespace, "Expected timer to be in WindowNamespace, but was in %s", timerData.getNamespace());
        StateNamespaces.WindowNamespace windowNamespace = (StateNamespaces.WindowNamespace) timerData.getNamespace();
        BoundedWindow window = windowNamespace.getWindow();
        if (!this.activeWindows.isActive(window)) {
            WindowTracing.debug("ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timerData, window);
        }
        ReduceFn<K, InputT, OutputT, W>.Context base = this.contextFactory.base(window);
        boolean z = TimeDomain.EVENT_TIME == timerData.getDomain() && timerData.getTimestamp().equals(window.maxTimestamp());
        Instant plus = window.maxTimestamp().plus(this.windowingStrategy.getAllowedLateness());
        if (TimeDomain.EVENT_TIME == timerData.getDomain() && timerData.getTimestamp().equals(plus)) {
            WindowTracing.debug("ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with inputWatermark:{}; outputWatermark:{}", this.key, window, timerData.getTimestamp(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime());
            if (this.activeWindows.isActive(window) && !this.triggerRunner.isClosed(base.state())) {
                onTrigger(base, z && runTriggersForTimer(base, timerData).isFire(), true, false);
            }
            try {
                clearAllState(base);
                return;
            } catch (Exception e) {
                Throwables.propagateIfInstanceOf(e, UserCodeException.class);
                String valueOf = String.valueOf(windowNamespace.getWindow());
                throw new RuntimeException(new StringBuilder(42 + String.valueOf(valueOf).length()).append("Exception while garbage collecting window ").append(valueOf).toString(), e);
            }
        }
        WindowTracing.debug("ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with inputWatermark:{}; outputWatermark:{}", this.key, window, timerData.getTimestamp(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime());
        if (this.activeWindows.isActive(window) && !this.triggerRunner.isClosed(base.state())) {
            handleTriggerResult(base, z, runTriggersForTimer(base, timerData));
        }
        if (z) {
            Preconditions.checkState(this.windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO), "Unexpected zero getAllowedLateness");
            WindowTracing.debug("ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with inputWatermark:{}; outputWatermark:{}", this.key, base.window(), plus, this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime());
            base.timers().setTimer(plus, TimeDomain.EVENT_TIME);
        }
    }

    private Trigger.TriggerResult runTriggersForTimer(ReduceFn<K, InputT, OutputT, W>.Context context, TimerInternals.TimerData timerData) {
        this.triggerRunner.prefetchForTimer(context.state());
        try {
            return this.triggerRunner.onTimer(context, timerData);
        } catch (Exception e) {
            Throwables.propagateIfPossible(e);
            throw new RuntimeException("Exception in onTimer for trigger", e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void clearAllState(ReduceFn<K, InputT, OutputT, W>.Context context) throws Exception {
        boolean isActive = this.activeWindows.isActive(context.window());
        this.watermarkHold.clearHolds(context, isActive);
        if (isActive) {
            try {
                this.reduceFn.clearState(context);
                this.nonEmptyPanes.clearPane(context);
                this.activeWindows.remove(context.window());
            } catch (Exception e) {
                throw wrapMaybeUserException(e);
            }
        }
        this.triggerRunner.clearEverything(context);
        this.paneInfoTracker.clear(context.state());
    }

    private boolean shouldDiscardAfterFiring(Trigger.TriggerResult triggerResult) {
        return triggerResult.isFinish() || this.windowingStrategy.getMode() == WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void handleTriggerResult(ReduceFn<K, InputT, OutputT, W>.Context context, boolean z, Trigger.TriggerResult triggerResult) {
        if (triggerResult.isFire()) {
            boolean shouldDiscardAfterFiring = shouldDiscardAfterFiring(triggerResult);
            onTrigger(context, z, triggerResult.isFinish(), !shouldDiscardAfterFiring);
            this.nonEmptyPanes.clearPane(context);
            if (shouldDiscardAfterFiring) {
                try {
                    this.reduceFn.clearState(context);
                    WindowTracing.debug("ReduceFnRunner.handleTriggerResult: removing {}", context.window());
                    this.activeWindows.remove(context.window());
                    if (!triggerResult.isFinish()) {
                        this.activeWindows.addActive(context.window());
                    }
                } catch (Exception e) {
                    throw wrapMaybeUserException(e);
                }
            }
            if (triggerResult.isFinish()) {
                try {
                    this.triggerRunner.clearState(context);
                    this.paneInfoTracker.clear(context.state());
                } catch (Exception e2) {
                    Throwables.propagateIfPossible(e2);
                    throw new RuntimeException("Exception while clearing trigger state", e2);
                }
            }
        }
    }

    private boolean needToEmit(boolean z, boolean z2, boolean z3, PaneInfo.Timing timing) {
        if (!z) {
            return true;
        }
        if (z2 && timing == PaneInfo.Timing.ON_TIME) {
            return true;
        }
        return z3 && this.windowingStrategy.getClosingBehavior() == Window.ClosingBehavior.FIRE_ALWAYS;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void onTrigger(final ReduceFn<K, InputT, OutputT, W>.Context context, boolean z, boolean z2, boolean z3) {
        StateContents<Instant> extractAndRelease = this.watermarkHold.extractAndRelease(context, z2, z3);
        StateContents<PaneInfo> nextPaneInfo = this.paneInfoTracker.getNextPaneInfo(context, z, z2);
        StateContents<Boolean> isEmpty = this.nonEmptyPanes.isEmpty(context);
        this.reduceFn.prefetchOnTrigger(context.state());
        final PaneInfo read = nextPaneInfo.read();
        final Instant read2 = extractAndRelease.read();
        if (needToEmit(isEmpty.read().booleanValue(), z, z2, read.getTiming())) {
            final List singletonList = Collections.singletonList(context.window());
            try {
                this.reduceFn.onTrigger(this.contextFactory.forTrigger(context.window(), nextPaneInfo, new ReduceFnContextFactory.OnTriggerCallbacks<OutputT>() { // from class: com.google.cloud.dataflow.sdk.util.ReduceFnRunner.2
                    @Override // com.google.cloud.dataflow.sdk.util.ReduceFnContextFactory.OnTriggerCallbacks
                    public void output(OutputT outputt) {
                        ReduceFnRunner.this.paneInfoTracker.storeCurrentPaneInfo(context, read);
                        ReduceFnRunner.this.windowingInternals.outputWindowedValue(KV.of(ReduceFnRunner.this.key, outputt), read2, singletonList, read);
                    }
                }));
            } catch (Exception e) {
                throw wrapMaybeUserException(e);
            }
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow] */
    private void scheduleEndOfWindowOrGarbageCollectionTimer(ReduceFn<?, ?, ?, W>.Context context) {
        Instant maxTimestamp = context.window().maxTimestamp();
        Object obj = "end-of-window";
        Instant currentInputWatermarkTime = this.timerInternals.currentInputWatermarkTime();
        if (currentInputWatermarkTime != null && maxTimestamp.isBefore(currentInputWatermarkTime)) {
            maxTimestamp = maxTimestamp.plus(this.windowingStrategy.getAllowedLateness());
            obj = "garbage collection";
            Preconditions.checkState(!maxTimestamp.isBefore(currentInputWatermarkTime), "Asking to set a timer at %s behind input watermark %s", maxTimestamp, currentInputWatermarkTime);
        }
        WindowTracing.trace("ReduceFnRunner.scheduleEndOfWindowOrGarbageCollectionTimer: Scheduling {} timer at {} for key:{}; window:{} where inputWatermark:{}; outputWatermark:{}", obj, maxTimestamp, this.key, context.window(), currentInputWatermarkTime, this.timerInternals.currentOutputWatermarkTime());
        context.timers().setTimer(maxTimestamp, TimeDomain.EVENT_TIME);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Type inference failed for: r0v2, types: [com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow] */
    public void cancelEndOfWindowAndGarbageCollectionTimers(ReduceFn<?, ?, ?, W>.Context context) {
        WindowTracing.debug("ReduceFnRunner.cancelEndOfWindowAndGarbageCollectionTimers: Deleting timers for key:{}; window:{} where inputWatermark:{}; outputWatermark:{}", this.key, context.window(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime());
        Instant maxTimestamp = context.window().maxTimestamp();
        context.timers().deleteTimer(maxTimestamp, TimeDomain.EVENT_TIME);
        if (this.windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) {
            context.timers().deleteTimer(maxTimestamp.plus(this.windowingStrategy.getAllowedLateness()), TimeDomain.EVENT_TIME);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RuntimeException wrapMaybeUserException(Throwable th) {
        if (this.reduceFn instanceof SystemReduceFn) {
            throw Throwables.propagate(th);
        }
        throw new UserCodeException(th);
    }
}
