package stream.scotty.beamconnector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Instant;
import stream.scotty.core.AggregateWindow;
import stream.scotty.core.windowFunction.AggregateFunction;
import stream.scotty.core.windowType.Window;
import stream.scotty.slicing.SlicingWindowOperator;
import stream.scotty.state.memory.MemoryStateFactory;

/* loaded from: input_file:stream/scotty/beamconnector/KeyedScottyWindowOperator.class */
public class KeyedScottyWindowOperator<K, V> extends DoFn<KV<K, V>, String> {
    private long lastWatermark;
    private final AggregateFunction windowFunction;
    private long allowedLateness;
    private long watermarkEvictionPeriod = 1000;
    private final List<Window> windows = new ArrayList();
    private MemoryStateFactory stateFactory = new MemoryStateFactory();
    private HashMap<K, SlicingWindowOperator<KV<K, V>>> slicingWindowOperatorMap = new HashMap<>();

    public KeyedScottyWindowOperator(long j, AggregateFunction aggregateFunction) {
        this.windowFunction = aggregateFunction;
        this.allowedLateness = j;
    }

    public void addWindow(Window window) {
        this.windows.add(window);
    }

    public SlicingWindowOperator<KV<K, V>> initWindowOperator() {
        SlicingWindowOperator<KV<K, V>> slicingWindowOperator = new SlicingWindowOperator<>(this.stateFactory);
        Iterator<Window> it = this.windows.iterator();
        while (it.hasNext()) {
            slicingWindowOperator.addWindowAssigner(it.next());
        }
        slicingWindowOperator.addAggregation(this.windowFunction);
        slicingWindowOperator.setMaxLateness(this.allowedLateness);
        return slicingWindowOperator;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @DoFn.ProcessElement
    public void processElement(@DoFn.Element KV<K, V> kv, @DoFn.Timestamp Instant instant, DoFn.OutputReceiver<String> outputReceiver) {
        Object key = kv.getKey();
        if (!this.slicingWindowOperatorMap.containsKey(key)) {
            this.slicingWindowOperatorMap.put(key, initWindowOperator());
        }
        this.slicingWindowOperatorMap.get(key).processElement(kv, instant.getMillis());
        processWatermark(instant, outputReceiver);
    }

    private void processWatermark(Instant instant, DoFn.OutputReceiver<String> outputReceiver) {
        if (instant.getMillis() > this.lastWatermark + this.watermarkEvictionPeriod) {
            Iterator<SlicingWindowOperator<KV<K, V>>> it = this.slicingWindowOperatorMap.values().iterator();
            while (it.hasNext()) {
                Iterator it2 = it.next().processWatermark(instant.getMillis()).iterator();
                while (it2.hasNext()) {
                    outputReceiver.output(((AggregateWindow) it2.next()).toString());
                }
            }
            this.lastWatermark = instant.getMillis();
        }
    }
}
