package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.Collection;
import java.util.Objects;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.shaded.com.google.common.base.Function;
import org.apache.flink.shaded.com.google.common.collect.FluentIterable;
import org.apache.flink.shaded.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.class */
public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends WindowOperator<K, IN, Iterable<IN>, OUT, W> {
    private static final long serialVersionUID = 1;
    private final Evictor<? super IN, ? super W> evictor;
    private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor;

    public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> typeSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> typeSerializer2, StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> stateDescriptor, InternalWindowFunction<Iterable<IN>, OUT, K, W> internalWindowFunction, Trigger<? super IN, ? super W> trigger, Evictor<? super IN, ? super W> evictor, long j) {
        super(windowAssigner, typeSerializer, keySelector, typeSerializer2, null, internalWindowFunction, trigger, j);
        this.evictor = (Evictor) Objects.requireNonNull(evictor);
        this.windowStateDescriptor = stateDescriptor;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperator, org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        Collection<W> assignWindows = this.windowAssigner.assignWindows(streamRecord.getValue(), streamRecord.getTimestamp(), this.windowAssignerContext);
        final K k = (K) getStateBackend().getCurrentKey();
        if (!(this.windowAssigner instanceof MergingWindowAssigner)) {
            for (W w : assignWindows) {
                if (!isLate(w)) {
                    ListState partitionedState = getPartitionedState(w, this.windowSerializer, this.windowStateDescriptor);
                    partitionedState.add(streamRecord);
                    this.context.key = k;
                    this.context.window = w;
                    TriggerResult onElement = this.context.onElement(streamRecord);
                    if (onElement.isFire()) {
                        Iterable iterable = (Iterable) partitionedState.get();
                        if (iterable != null) {
                            fire((EvictingWindowOperator<K, IN, OUT, W>) w, iterable);
                        }
                    }
                    if (onElement.isPurge()) {
                        cleanup((EvictingWindowOperator<K, IN, OUT, W>) w, partitionedState, (MergingWindowSet<EvictingWindowOperator<K, IN, OUT, W>>) null);
                    } else {
                        registerCleanupTimer(w);
                    }
                }
            }
            return;
        }
        MergingWindowSet mergingWindowSet = getMergingWindowSet();
        for (W w2 : assignWindows) {
            final Tuple1 tuple1 = new Tuple1(TriggerResult.CONTINUE);
            W w3 = (W) mergingWindowSet.addWindow(w2, new MergingWindowSet.MergeFunction<W>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.1
                public void merge(W w4, Collection<W> collection, W w5, Collection<W> collection2) throws Exception {
                    EvictingWindowOperator.this.context.key = (K) k;
                    EvictingWindowOperator.this.context.window = w4;
                    tuple1.f0 = EvictingWindowOperator.this.context.onMerge(collection);
                    for (W w6 : collection) {
                        EvictingWindowOperator.this.context.window = w6;
                        EvictingWindowOperator.this.context.clear();
                        EvictingWindowOperator.this.deleteCleanupTimer(w6);
                    }
                    EvictingWindowOperator.this.getStateBackend().mergePartitionedStates(w5, collection2, EvictingWindowOperator.this.windowSerializer, EvictingWindowOperator.this.windowStateDescriptor);
                }

                /* JADX WARN: Multi-variable type inference failed */
                @Override // org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.MergeFunction
                public /* bridge */ /* synthetic */ void merge(Object obj, Collection collection, Object obj2, Collection collection2) throws Exception {
                    merge((Collection) obj, (Collection<Collection>) collection, (Collection) obj2, (Collection<Collection>) collection2);
                }
            });
            if (isLate(w3)) {
                mergingWindowSet.retireWindow(w3);
            } else {
                Window stateWindow = mergingWindowSet.getStateWindow(w3);
                if (stateWindow == null) {
                    throw new IllegalStateException("Window " + w2 + " is not in in-flight window set.");
                }
                ListState partitionedState2 = getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor);
                partitionedState2.add(streamRecord);
                this.context.key = k;
                this.context.window = w3;
                TriggerResult merge = TriggerResult.merge(this.context.onElement(streamRecord), (TriggerResult) tuple1.f0);
                if (merge.isFire()) {
                    Iterable iterable2 = (Iterable) partitionedState2.get();
                    if (iterable2 != null) {
                        fire((EvictingWindowOperator<K, IN, OUT, W>) w3, iterable2);
                    }
                }
                if (merge.isPurge()) {
                    cleanup((EvictingWindowOperator<K, IN, OUT, W>) w3, partitionedState2, (MergingWindowSet<EvictingWindowOperator<K, IN, OUT, W>>) mergingWindowSet);
                } else {
                    registerCleanupTimer(w3);
                }
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperator, org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processWatermark(Watermark watermark) throws Exception {
        boolean z;
        ListState<StreamRecord<IN>> listState;
        do {
            WindowOperator.Timer<K, W> peek = this.watermarkTimersQueue.peek();
            if (peek == null || peek.timestamp > watermark.getTimestamp()) {
                z = false;
            } else {
                z = true;
                this.watermarkTimers.remove(peek);
                this.watermarkTimersQueue.remove();
                this.context.key = peek.key;
                this.context.window = peek.window;
                setKeyContext(peek.key);
                MergingWindowSet<W> mergingWindowSet = null;
                if (this.windowAssigner instanceof MergingWindowAssigner) {
                    mergingWindowSet = getMergingWindowSet();
                    W stateWindow = mergingWindowSet.getStateWindow(this.context.window);
                    if (stateWindow != null) {
                        listState = (ListState) getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor);
                    }
                } else {
                    listState = (ListState) getPartitionedState(this.context.window, this.windowSerializer, this.windowStateDescriptor);
                }
                Iterable<StreamRecord<IN>> iterable = (Iterable) listState.get();
                if (iterable != null) {
                    TriggerResult onEventTime = this.context.onEventTime(peek.timestamp);
                    if (onEventTime.isFire()) {
                        fire((EvictingWindowOperator<K, IN, OUT, W>) this.context.window, (Iterable) iterable);
                    }
                    if (onEventTime.isPurge() || (this.windowAssigner.isEventTime() && isCleanupTime(this.context.window, peek.timestamp))) {
                        cleanup((EvictingWindowOperator<K, IN, OUT, W>) this.context.window, (ListState) listState, (MergingWindowSet<EvictingWindowOperator<K, IN, OUT, W>>) mergingWindowSet);
                    }
                }
            }
        } while (z);
        this.output.emitWatermark(watermark);
        this.currentWatermark = watermark.getTimestamp();
    }

    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperator, org.apache.flink.streaming.runtime.operators.Triggerable
    public void trigger(long j) throws Exception {
        boolean z;
        ListState<StreamRecord<IN>> listState;
        this.processingTimeTimerFutures.remove(Long.valueOf(j));
        this.processingTimeTimerTimestamps.remove(Long.valueOf(j), this.processingTimeTimerTimestamps.count(Long.valueOf(j)));
        do {
            WindowOperator.Timer<K, W> peek = this.processingTimeTimersQueue.peek();
            if (peek == null || peek.timestamp > j) {
                z = false;
            } else {
                z = true;
                this.processingTimeTimers.remove(peek);
                this.processingTimeTimersQueue.remove();
                this.context.key = peek.key;
                this.context.window = peek.window;
                setKeyContext(peek.key);
                MergingWindowSet<W> mergingWindowSet = null;
                if (this.windowAssigner instanceof MergingWindowAssigner) {
                    mergingWindowSet = getMergingWindowSet();
                    W stateWindow = mergingWindowSet.getStateWindow(this.context.window);
                    if (stateWindow != null) {
                        listState = (ListState) getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor);
                    }
                } else {
                    listState = (ListState) getPartitionedState(this.context.window, this.windowSerializer, this.windowStateDescriptor);
                }
                Iterable<StreamRecord<IN>> iterable = (Iterable) listState.get();
                if (iterable != null) {
                    TriggerResult onProcessingTime = this.context.onProcessingTime(peek.timestamp);
                    if (onProcessingTime.isFire()) {
                        fire((EvictingWindowOperator<K, IN, OUT, W>) this.context.window, (Iterable) iterable);
                    }
                    if (onProcessingTime.isPurge() || (!this.windowAssigner.isEventTime() && isCleanupTime(this.context.window, peek.timestamp))) {
                        cleanup((EvictingWindowOperator<K, IN, OUT, W>) this.context.window, (ListState) listState, (MergingWindowSet<EvictingWindowOperator<K, IN, OUT, W>>) mergingWindowSet);
                    }
                }
            }
        } while (z);
    }

    private void fire(W w, Iterable<StreamRecord<IN>> iterable) throws Exception {
        this.timestampedCollector.setAbsoluteTimestamp(w.maxTimestamp());
        ((InternalWindowFunction) this.userFunction).apply(this.context.key, this.context.window, FluentIterable.from(iterable).skip(this.evictor.evict(iterable, Iterables.size(iterable), (Object) this.context.window)).transform(new Function<StreamRecord<IN>, IN>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.2
            public IN apply(StreamRecord<IN> streamRecord) {
                return streamRecord.getValue();
            }
        }), this.timestampedCollector);
    }

    private void cleanup(W w, ListState<StreamRecord<IN>> listState, MergingWindowSet<W> mergingWindowSet) throws Exception {
        listState.clear();
        if (mergingWindowSet != null) {
            mergingWindowSet.retireWindow(w);
        }
        this.context.clear();
    }

    @VisibleForTesting
    public Evictor<? super IN, ? super W> getEvictor() {
        return this.evictor;
    }

    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
    @VisibleForTesting
    public StateDescriptor<? extends AppendingState<IN, Iterable<IN>>, ?> getStateDescriptor() {
        return this.windowStateDescriptor;
    }
}
