package pl.touk.nussknacker.engine.flink.util.transformer.aggregate;

import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.scala.KeyedStream;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import pl.touk.nussknacker.engine.api.Context;
import pl.touk.nussknacker.engine.api.LazyParameter;
import pl.touk.nussknacker.engine.api.ValueWithContext;
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError;
import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomNodeContext;
import pl.touk.nussknacker.engine.flink.util.orderedmap.FlinkRangeMap$SortedMapFlinkRangeMap$;
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.transformers;
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.triggers;
import pl.touk.nussknacker.engine.flink.util.transformer.richflink$;
import scala.Function1;
import scala.MatchError;
import scala.Serializable;
import scala.concurrent.duration.Duration;
import scala.reflect.runtime.package$;
import scala.runtime.AbstractFunction2;

/* compiled from: transformers.scala */
/* loaded from: input_file:pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers$$anonfun$tumblingTransformer$2.class */
public final class transformers$$anonfun$tumblingTransformer$2 extends AbstractFunction2<DataStream<Context>, FlinkCustomNodeContext, DataStream<ValueWithContext<Object>>> implements Serializable {
    public static final long serialVersionUID = 0;
    private final LazyParameter groupBy$2;
    public final LazyParameter aggregateBy$2;
    private final Aggregator aggregator$2;
    private final Duration windowLength$2;
    private final TumblingWindowTrigger tumblingWindowTrigger$1;
    private final Function1 explicitUidInStatefulOperators$2;
    private final ProcessCompilationError.NodeId nodeId$2;

    public final DataStream<ValueWithContext<Object>> apply(DataStream<Context> dataStream, FlinkCustomNodeContext flinkCustomNodeContext) {
        DataStream process;
        transformers.AggregatorTypeInformations aggregatorTypeInformations = new transformers.AggregatorTypeInformations(flinkCustomNodeContext, this.aggregator$2, this.aggregateBy$2);
        KeyedStream groupByWithValue = richflink$.MODULE$.FlinkKeyOperations(dataStream).groupByWithValue(this.groupBy$2, (Function1) new transformers$$anonfun$tumblingTransformer$2$$anonfun$1(this), package$.MODULE$.universe().TypeTag().AnyRef(), TypeExtractor.createTypeInfo(Object.class), flinkCustomNodeContext);
        richflink$ richflink_ = richflink$.MODULE$;
        TumblingWindowTrigger tumblingWindowTrigger = this.tumblingWindowTrigger$1;
        if (TumblingWindowTrigger.OnEvent.equals(tumblingWindowTrigger)) {
            process = groupByWithValue.window(TumblingEventTimeWindows.of(Time.milliseconds(this.windowLength$2.toMillis()))).trigger(new triggers.FireOnEachEvent(EventTimeTrigger.create())).aggregate(new UnwrappingAggregateFunction(this.aggregator$2, this.aggregateBy$2.returnType(), new transformers$$anonfun$tumblingTransformer$2$$anonfun$apply$2(this)), new EnrichingWithKeyFunction(), aggregatorTypeInformations.storedTypeInfo(), aggregatorTypeInformations.returnTypeInfo(), aggregatorTypeInformations.returnedValueTypeInfo());
        } else if (TumblingWindowTrigger.OnEnd.equals(tumblingWindowTrigger)) {
            process = groupByWithValue.window(TumblingEventTimeWindows.of(Time.milliseconds(this.windowLength$2.toMillis()))).aggregate(new UnwrappingAggregateFunction(this.aggregator$2, this.aggregateBy$2.returnType(), new transformers$$anonfun$tumblingTransformer$2$$anonfun$apply$3(this)), new EnrichingWithKeyFunction(), aggregatorTypeInformations.storedTypeInfo(), aggregatorTypeInformations.returnTypeInfo(), aggregatorTypeInformations.returnedValueTypeInfo());
        } else {
            if (!TumblingWindowTrigger.OnEndWithExtraWindow.equals(tumblingWindowTrigger)) {
                throw new MatchError(tumblingWindowTrigger);
            }
            process = groupByWithValue.process(new EmitExtraWindowWhenNoDataTumblingAggregatorFunction(this.aggregator$2, this.windowLength$2.toMillis(), this.nodeId$2, this.aggregateBy$2.returnType(), aggregatorTypeInformations.storedTypeInfo(), FlinkRangeMap$SortedMapFlinkRangeMap$.MODULE$), new transformers$$anonfun$tumblingTransformer$2$$anon$12(this));
        }
        return richflink_.ExplicitUid(process).setUidWithName(flinkCustomNodeContext, this.explicitUidInStatefulOperators$2);
    }

    public transformers$$anonfun$tumblingTransformer$2(LazyParameter lazyParameter, LazyParameter lazyParameter2, Aggregator aggregator, Duration duration, TumblingWindowTrigger tumblingWindowTrigger, Function1 function1, ProcessCompilationError.NodeId nodeId) {
        this.groupBy$2 = lazyParameter;
        this.aggregateBy$2 = lazyParameter2;
        this.aggregator$2 = aggregator;
        this.windowLength$2 = duration;
        this.tumblingWindowTrigger$1 = tumblingWindowTrigger;
        this.explicitUidInStatefulOperators$2 = function1;
        this.nodeId$2 = nodeId;
    }
}
