package pl.touk.nussknacker.engine.flink.util.transformer.aggregate;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import pl.touk.nussknacker.engine.api.Context;
import pl.touk.nussknacker.engine.api.LazyParameter;
import pl.touk.nussknacker.engine.api.ValueWithContext;
import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomNodeContext;
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.transformers;
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.triggers;
import pl.touk.nussknacker.engine.flink.util.transformer.richflink;
import pl.touk.nussknacker.engine.flink.util.transformer.richflink$;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Serializable;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.concurrent.duration.Duration;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.AbstractFunction2;

/* compiled from: transformers.scala */
/* loaded from: input_file:pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers$$anonfun$sessionWindowTransformer$1.class */
public final class transformers$$anonfun$sessionWindowTransformer$1 extends AbstractFunction2<DataStream<Context>, FlinkCustomNodeContext, DataStream<ValueWithContext<Object>>> implements Serializable {
    public static final long serialVersionUID = 0;
    private final LazyParameter groupBy$3;
    public final LazyParameter aggregateBy$3;
    private final Aggregator aggregator$3;
    private final Duration sessionTimeout$1;
    public final LazyParameter endSessionCondition$1;
    private final SessionWindowTrigger sessionWindowTrigger$1;

    public final DataStream<ValueWithContext<Object>> apply(DataStream<Context> dataStream, FlinkCustomNodeContext flinkCustomNodeContext) {
        Serializable serializable;
        transformers.AggregatorTypeInformations aggregatorTypeInformations = new transformers.AggregatorTypeInformations(flinkCustomNodeContext, this.aggregator$3, this.aggregateBy$3);
        Serializable closingEndEventTrigger = new triggers.ClosingEndEventTrigger(EventTimeTrigger.create(), new transformers$$anonfun$sessionWindowTransformer$1$$anonfun$2(this));
        SessionWindowTrigger sessionWindowTrigger = this.sessionWindowTrigger$1;
        if (SessionWindowTrigger.OnEvent.equals(sessionWindowTrigger)) {
            serializable = new triggers.FireOnEachEvent(closingEndEventTrigger);
        } else {
            if (!SessionWindowTrigger.OnEnd.equals(sessionWindowTrigger)) {
                throw new MatchError(sessionWindowTrigger);
            }
            serializable = closingEndEventTrigger;
        }
        richflink$ richflink_ = richflink$.MODULE$;
        richflink.FlinkKeyOperations FlinkKeyOperations = richflink$.MODULE$.FlinkKeyOperations(dataStream);
        LazyParameter<CharSequence> lazyParameter = this.groupBy$3;
        transformers$$anonfun$sessionWindowTransformer$1$$anonfun$apply$4 transformers__anonfun_sessionwindowtransformer_1__anonfun_apply_4 = new transformers$$anonfun$sessionWindowTransformer$1$$anonfun$apply$4(this);
        TypeTags universe = package$.MODULE$.universe();
        return richflink_.ExplicitUid(FlinkKeyOperations.groupByWithValue(lazyParameter, (Function1) transformers__anonfun_sessionwindowtransformer_1__anonfun_apply_4, universe.TypeTag().apply(package$.MODULE$.universe().runtimeMirror(transformers$.MODULE$.getClass().getClassLoader()), new TypeCreator(this) { // from class: pl.touk.nussknacker.engine.flink.util.transformer.aggregate.transformers$$anonfun$sessionWindowTransformer$1$$typecreator1$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe2 = mirror.universe();
                return universe2.internal().reificationSupport().TypeRef(universe2.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Tuple2"), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Types.TypeApi[]{universe2.internal().reificationSupport().TypeRef(universe2.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), universe2.internal().reificationSupport().selectType(mirror.staticPackage("scala").asModule().moduleClass(), "AnyRef"), Nil$.MODULE$), mirror.staticClass("java.lang.Boolean").asType().toTypeConstructor()})));
            }
        }), (TypeInformation) new transformers$$anonfun$sessionWindowTransformer$1$$anon$16(this), flinkCustomNodeContext).window(EventTimeSessionWindows.withGap(Time.milliseconds(this.sessionTimeout$1.toMillis()))).trigger(serializable).aggregate(new UnwrappingAggregateFunction(this.aggregator$3, this.aggregateBy$3.returnType(), new transformers$$anonfun$sessionWindowTransformer$1$$anonfun$apply$5(this)), new EnrichingWithKeyFunction(), aggregatorTypeInformations.storedTypeInfo(), aggregatorTypeInformations.returnTypeInfo(), aggregatorTypeInformations.returnedValueTypeInfo())).setUidWithName(flinkCustomNodeContext, new transformers$$anonfun$sessionWindowTransformer$1$$anonfun$apply$6(this));
    }

    public transformers$$anonfun$sessionWindowTransformer$1(LazyParameter lazyParameter, LazyParameter lazyParameter2, Aggregator aggregator, Duration duration, LazyParameter lazyParameter3, SessionWindowTrigger sessionWindowTrigger) {
        this.groupBy$3 = lazyParameter;
        this.aggregateBy$3 = lazyParameter2;
        this.aggregator$3 = aggregator;
        this.sessionTimeout$1 = duration;
        this.endSessionCondition$1 = lazyParameter3;
        this.sessionWindowTrigger$1 = sessionWindowTrigger;
    }
}
