package org.apache.flink.streaming.api.datastream;

import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;

@Public
/* loaded from: input_file:org/apache/flink/streaming/api/datastream/WindowedStream.class */
public class WindowedStream<T, K, W extends Window> {
    private final KeyedStream<T, K> input;
    private final WindowAssigner<? super T, W> windowAssigner;
    private Trigger<? super T, ? super W> trigger;
    private Evictor<? super T, ? super W> evictor;
    private long allowedLateness = 0;

    @PublicEvolving
    public WindowedStream(KeyedStream<T, K> keyedStream, WindowAssigner<? super T, W> windowAssigner) {
        this.input = keyedStream;
        this.windowAssigner = windowAssigner;
        this.trigger = windowAssigner.getDefaultTrigger(keyedStream.getExecutionEnvironment());
    }

    @PublicEvolving
    public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
        if ((this.windowAssigner instanceof MergingWindowAssigner) && !trigger.canMerge()) {
            throw new UnsupportedOperationException("A merging window assigner cannot be used with a trigger that does not support merging.");
        }
        this.trigger = trigger;
        return this;
    }

    @PublicEvolving
    public WindowedStream<T, K, W> allowedLateness(Time time) {
        long milliseconds = time.toMilliseconds();
        if (this.allowedLateness < 0) {
            throw new IllegalArgumentException("The allowed lateness cannot be negative.");
        }
        if (this.allowedLateness != 0 && !this.windowAssigner.isEventTime()) {
            throw new IllegalArgumentException("Setting the allowed lateness is only valid for event-time windows.");
        }
        this.allowedLateness = milliseconds;
        return this;
    }

    @PublicEvolving
    public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            throw new UnsupportedOperationException("Cannot use a merging WindowAssigner with an Evictor.");
        }
        this.evictor = evictor;
        return this;
    }

    public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reduceFunction) {
        if (reduceFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction. Please use apply(ReduceFunction, WindowFunction) instead.");
        }
        ReduceFunction<T> reduceFunction2 = (ReduceFunction) this.input.getExecutionEnvironment().clean(reduceFunction);
        SingleOutputStreamOperator<T> singleOutputStreamOperator = (SingleOutputStreamOperator<T>) createFastTimeOperatorIfValid(reduceFunction2, this.input.getType(), "WindowedStream." + Utils.getCallLocationName());
        return singleOutputStreamOperator != null ? singleOutputStreamOperator : (SingleOutputStreamOperator<T>) apply(reduceFunction2, new PassThroughWindowFunction());
    }

    public <R> SingleOutputStreamOperator<R> fold(R r, FoldFunction<T, R> foldFunction) {
        if (foldFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. Please use apply(FoldFunction, WindowFunction) instead.");
        }
        return fold(r, foldFunction, TypeExtractor.getFoldReturnTypes(foldFunction, this.input.getType(), Utils.getCallLocationName(), true));
    }

    public <R> SingleOutputStreamOperator<R> fold(R r, FoldFunction<T, R> foldFunction, TypeInformation<R> typeInformation) {
        if (foldFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. Please use apply(FoldFunction, WindowFunction) instead.");
        }
        return apply(r, foldFunction, new PassThroughWindowFunction(), typeInformation);
    }

    public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> windowFunction) {
        return apply(windowFunction, TypeExtractor.getUnaryOperatorReturnType(windowFunction, WindowFunction.class, true, true, getInputType(), (String) null, false));
    }

    public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> windowFunction, TypeInformation<R> typeInformation) {
        String str;
        WindowOperator windowOperator;
        WindowFunction windowFunction2 = (WindowFunction) this.input.getExecutionEnvironment().clean(windowFunction);
        String str2 = "WindowedStream." + Utils.getCallLocationName();
        SingleOutputStreamOperator<R> createFastTimeOperatorIfValid = createFastTimeOperatorIfValid(windowFunction2, typeInformation, str2);
        if (createFastTimeOperatorIfValid != null) {
            return createFastTimeOperatorIfValid;
        }
        KeySelector<T, K> keySelector = this.input.getKeySelector();
        if (this.evictor != null) {
            ListStateDescriptor listStateDescriptor = new ListStateDescriptor("window-contents", new StreamRecordSerializer(this.input.getType().createSerializer(getExecutionEnvironment().getConfig())));
            str = "TriggerWindow(" + this.windowAssigner + ", " + listStateDescriptor + ", " + this.trigger + ", " + this.evictor + ", " + str2 + DefaultExpressionEngine.DEFAULT_INDEX_END;
            windowOperator = new EvictingWindowOperator(this.windowAssigner, this.windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySelector, this.input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), listStateDescriptor, new InternalIterableWindowFunction(windowFunction2), this.trigger, this.evictor, this.allowedLateness);
        } else {
            ListStateDescriptor listStateDescriptor2 = new ListStateDescriptor("window-contents", this.input.getType().createSerializer(getExecutionEnvironment().getConfig()));
            str = "TriggerWindow(" + this.windowAssigner + ", " + listStateDescriptor2 + ", " + this.trigger + ", " + str2 + DefaultExpressionEngine.DEFAULT_INDEX_END;
            windowOperator = new WindowOperator(this.windowAssigner, this.windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySelector, this.input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), listStateDescriptor2, new InternalIterableWindowFunction(windowFunction2), this.trigger, this.allowedLateness);
        }
        return this.input.transform(str, typeInformation, windowOperator);
    }

    public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> windowFunction) {
        return apply(reduceFunction, windowFunction, TypeExtractor.getUnaryOperatorReturnType(windowFunction, WindowFunction.class, true, true, this.input.getType(), (String) null, false));
    }

    public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> windowFunction, TypeInformation<R> typeInformation) {
        String str;
        WindowOperator windowOperator;
        if (reduceFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
        }
        WindowFunction windowFunction2 = (WindowFunction) this.input.getExecutionEnvironment().clean(windowFunction);
        ReduceFunction reduceFunction2 = (ReduceFunction) this.input.getExecutionEnvironment().clean(reduceFunction);
        String str2 = "WindowedStream." + Utils.getCallLocationName();
        KeySelector<T, K> keySelector = this.input.getKeySelector();
        if (this.evictor != null) {
            ListStateDescriptor listStateDescriptor = new ListStateDescriptor("window-contents", new StreamRecordSerializer(this.input.getType().createSerializer(getExecutionEnvironment().getConfig())));
            str = "TriggerWindow(" + this.windowAssigner + ", " + listStateDescriptor + ", " + this.trigger + ", " + this.evictor + ", " + str2 + DefaultExpressionEngine.DEFAULT_INDEX_END;
            windowOperator = new EvictingWindowOperator(this.windowAssigner, this.windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySelector, this.input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), listStateDescriptor, new InternalIterableWindowFunction(new ReduceApplyWindowFunction(reduceFunction2, windowFunction2)), this.trigger, this.evictor, this.allowedLateness);
        } else {
            ReducingStateDescriptor reducingStateDescriptor = new ReducingStateDescriptor("window-contents", reduceFunction2, this.input.getType().createSerializer(getExecutionEnvironment().getConfig()));
            str = "TriggerWindow(" + this.windowAssigner + ", " + reducingStateDescriptor + ", " + this.trigger + ", " + str2 + DefaultExpressionEngine.DEFAULT_INDEX_END;
            windowOperator = new WindowOperator(this.windowAssigner, this.windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySelector, this.input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), reducingStateDescriptor, new InternalSingleValueWindowFunction(windowFunction2), this.trigger, this.allowedLateness);
        }
        return this.input.transform(str, typeInformation, windowOperator);
    }

    public <R> SingleOutputStreamOperator<R> apply(R r, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> windowFunction) {
        return apply(r, foldFunction, windowFunction, TypeExtractor.getFoldReturnTypes(foldFunction, this.input.getType(), Utils.getCallLocationName(), true));
    }

    public <R> SingleOutputStreamOperator<R> apply(R r, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> windowFunction, TypeInformation<R> typeInformation) {
        String str;
        WindowOperator windowOperator;
        if (foldFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("FoldFunction of apply can not be a RichFunction.");
        }
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner.");
        }
        WindowFunction windowFunction2 = (WindowFunction) this.input.getExecutionEnvironment().clean(windowFunction);
        FoldFunction foldFunction2 = (FoldFunction) this.input.getExecutionEnvironment().clean(foldFunction);
        String str2 = "WindowedStream." + Utils.getCallLocationName();
        KeySelector<T, K> keySelector = this.input.getKeySelector();
        if (this.evictor != null) {
            ListStateDescriptor listStateDescriptor = new ListStateDescriptor("window-contents", new StreamRecordSerializer(this.input.getType().createSerializer(getExecutionEnvironment().getConfig())));
            str = "TriggerWindow(" + this.windowAssigner + ", " + listStateDescriptor + ", " + this.trigger + ", " + this.evictor + ", " + str2 + DefaultExpressionEngine.DEFAULT_INDEX_END;
            windowOperator = new EvictingWindowOperator(this.windowAssigner, this.windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySelector, this.input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), listStateDescriptor, new InternalIterableWindowFunction(new FoldApplyWindowFunction(r, foldFunction2, windowFunction2)), this.trigger, this.evictor, this.allowedLateness);
        } else {
            FoldingStateDescriptor foldingStateDescriptor = new FoldingStateDescriptor("window-contents", r, foldFunction2, typeInformation.createSerializer(getExecutionEnvironment().getConfig()));
            str = "TriggerWindow(" + this.windowAssigner + ", " + foldingStateDescriptor + ", " + this.trigger + ", " + str2 + DefaultExpressionEngine.DEFAULT_INDEX_END;
            windowOperator = new WindowOperator(this.windowAssigner, this.windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySelector, this.input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), foldingStateDescriptor, new InternalSingleValueWindowFunction(windowFunction2), this.trigger, this.allowedLateness);
        }
        return this.input.transform(str, typeInformation, windowOperator);
    }

    public SingleOutputStreamOperator<T> sum(int i) {
        return aggregate(new SumAggregator(i, this.input.getType(), this.input.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> sum(String str) {
        return aggregate(new SumAggregator(str, this.input.getType(), this.input.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> min(int i) {
        return aggregate(new ComparableAggregator(i, this.input.getType(), AggregationFunction.AggregationType.MIN, this.input.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> min(String str) {
        return aggregate(new ComparableAggregator(str, (TypeInformation) this.input.getType(), AggregationFunction.AggregationType.MIN, false, this.input.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> minBy(int i) {
        return minBy(i, true);
    }

    public SingleOutputStreamOperator<T> minBy(String str) {
        return minBy(str, true);
    }

    public SingleOutputStreamOperator<T> minBy(int i, boolean z) {
        return aggregate(new ComparableAggregator(i, this.input.getType(), AggregationFunction.AggregationType.MINBY, z, this.input.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> minBy(String str, boolean z) {
        return aggregate(new ComparableAggregator(str, this.input.getType(), AggregationFunction.AggregationType.MINBY, z, this.input.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> max(int i) {
        return aggregate(new ComparableAggregator(i, this.input.getType(), AggregationFunction.AggregationType.MAX, this.input.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> max(String str) {
        return aggregate(new ComparableAggregator(str, (TypeInformation) this.input.getType(), AggregationFunction.AggregationType.MAX, false, this.input.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> maxBy(int i) {
        return maxBy(i, true);
    }

    public SingleOutputStreamOperator<T> maxBy(String str) {
        return maxBy(str, true);
    }

    public SingleOutputStreamOperator<T> maxBy(int i, boolean z) {
        return aggregate(new ComparableAggregator(i, this.input.getType(), AggregationFunction.AggregationType.MAXBY, z, this.input.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> maxBy(String str, boolean z) {
        return aggregate(new ComparableAggregator(str, this.input.getType(), AggregationFunction.AggregationType.MAXBY, z, this.input.getExecutionConfig()));
    }

    private SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregationFunction) {
        return reduce(aggregationFunction);
    }

    private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(Function function, TypeInformation<R> typeInformation, String str) {
        if ((this.windowAssigner instanceof SlidingProcessingTimeWindows) && (this.trigger instanceof ProcessingTimeTrigger) && this.evictor == null) {
            SlidingProcessingTimeWindows slidingProcessingTimeWindows = (SlidingProcessingTimeWindows) this.windowAssigner;
            long size = slidingProcessingTimeWindows.getSize();
            long slide = slidingProcessingTimeWindows.getSlide();
            String str2 = "Fast " + slidingProcessingTimeWindows + " of " + str;
            if (function instanceof ReduceFunction) {
                return this.input.transform(str2, typeInformation, new AggregatingProcessingTimeWindowOperator((ReduceFunction) function, this.input.getKeySelector(), this.input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), this.input.getType().createSerializer(getExecutionEnvironment().getConfig()), size, slide));
            }
            if (!(function instanceof WindowFunction)) {
                return null;
            }
            return this.input.transform(str2, typeInformation, new AccumulatingProcessingTimeWindowOperator((WindowFunction) function, this.input.getKeySelector(), this.input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), this.input.getType().createSerializer(getExecutionEnvironment().getConfig()), size, slide));
        }
        if (!(this.windowAssigner instanceof TumblingProcessingTimeWindows) || !(this.trigger instanceof ProcessingTimeTrigger) || this.evictor != null) {
            return null;
        }
        TumblingProcessingTimeWindows tumblingProcessingTimeWindows = (TumblingProcessingTimeWindows) this.windowAssigner;
        long size2 = tumblingProcessingTimeWindows.getSize();
        long size3 = tumblingProcessingTimeWindows.getSize();
        String str3 = "Fast " + tumblingProcessingTimeWindows + " of " + str;
        if (function instanceof ReduceFunction) {
            return this.input.transform(str3, typeInformation, new AggregatingProcessingTimeWindowOperator((ReduceFunction) function, this.input.getKeySelector(), this.input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), this.input.getType().createSerializer(getExecutionEnvironment().getConfig()), size2, size3));
        }
        if (!(function instanceof WindowFunction)) {
            return null;
        }
        return this.input.transform(str3, typeInformation, new AccumulatingProcessingTimeWindowOperator((WindowFunction) function, this.input.getKeySelector(), this.input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), this.input.getType().createSerializer(getExecutionEnvironment().getConfig()), size2, size3));
    }

    public StreamExecutionEnvironment getExecutionEnvironment() {
        return this.input.getExecutionEnvironment();
    }

    public TypeInformation<T> getInputType() {
        return this.input.getType();
    }
}
