package pl.touk.nussknacker.engine.flink.util.transformer;

import java.time.Duration;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import pl.touk.nussknacker.engine.api.Context;
import pl.touk.nussknacker.engine.api.ValueWithContext;
import scala.Option$;
import scala.Predef$;
import scala.collection.immutable.List;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: DelayTransformer.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005uc\u0001B\u0001\u0003\u0001E\u0011Q\u0002R3mCf4UO\\2uS>t'BA\u0002\u0005\u0003-!(/\u00198tM>\u0014X.\u001a:\u000b\u0005\u00151\u0011\u0001B;uS2T!a\u0002\u0005\u0002\u000b\u0019d\u0017N\\6\u000b\u0005%Q\u0011AB3oO&tWM\u0003\u0002\f\u0019\u0005Ya.^:tW:\f7m[3s\u0015\tia\"\u0001\u0003u_V\\'\"A\b\u0002\u0005Ad7\u0001A\n\u0003\u0001I\u0001RaE\u0010\"WAj\u0011\u0001\u0006\u0006\u0003+Y\t\u0011BZ;oGRLwN\\:\u000b\u0005]A\u0012aA1qS*\u0011\u0011DG\u0001\ngR\u0014X-Y7j]\u001eT!aB\u000e\u000b\u0005qi\u0012AB1qC\u000eDWMC\u0001\u001f\u0003\ry'oZ\u0005\u0003AQ\u0011AcS3zK\u0012\u0004&o\\2fgN4UO\\2uS>t\u0007C\u0001\u0012)\u001d\t\u0019c%D\u0001%\u0015\u0005)\u0013!B:dC2\f\u0017BA\u0014%\u0003\u0019\u0001&/\u001a3fM&\u0011\u0011F\u000b\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005\u001d\"\u0003c\u0001\u0017/C5\tQF\u0003\u0002\u0018\u0011%\u0011q&\f\u0002\u0011-\u0006dW/Z,ji\"\u001cuN\u001c;fqR\u00042\u0001\f\u00182!\t\u0019#'\u0003\u00024I\t1\u0011I\\=SK\u001aD\u0001\"\u000e\u0001\u0003\u0002\u0003\u0006IAN\u0001\u0006I\u0016d\u0017-\u001f\t\u0003oqj\u0011\u0001\u000f\u0006\u0003si\nA\u0001^5nK*\t1(\u0001\u0003kCZ\f\u0017BA\u001f9\u0005!!UO]1uS>t\u0007\"B \u0001\t\u0003\u0001\u0015A\u0002\u001fj]&$h\b\u0006\u0002B\u0007B\u0011!\tA\u0007\u0002\u0005!)QG\u0010a\u0001m\u0015!Q\t\u0001\u0001G\u0005!1E.\u001b8l\u0007RD\bC\u0001\nH\u0013\tAuDA\u0004D_:$X\r\u001f;\u0006\t)\u0003\u0001a\u0013\u0002\u000e\r2Lgn\u001b+j[\u0016\u00148\t\u001e=\u0011\u0005Ia\u0015BA' \u00059ye\u000eV5nKJ\u001cuN\u001c;fqRDqa\u0014\u0001C\u0002\u0013%\u0001+\u0001\u0006eKN\u001c'/\u001b9u_J,\u0012!\u0015\t\u0005%bSV,D\u0001T\u0015\t!V+A\u0003ti\u0006$XM\u0003\u0002W/\u000611m\\7n_:T!a\u0006\u000e\n\u0005e\u001b&AE'baN#\u0018\r^3EKN\u001c'/\u001b9u_J\u0004\"aI.\n\u0005q##\u0001\u0002'p]\u001e\u00042A\u00184j\u001d\tyFM\u0004\u0002aG6\t\u0011M\u0003\u0002c!\u00051AH]8pizJ\u0011!J\u0005\u0003K\u0012\nq\u0001]1dW\u0006<W-\u0003\u0002hQ\n!A*[:u\u0015\t)G\u0005\u0005\u0002-U&\u0011\u0001*\f\u0005\u0007Y\u0002\u0001\u000b\u0011B)\u0002\u0017\u0011,7o\u0019:jaR|'\u000f\t\u0005\u0006]\u0002!\te\\\u0001\u0005_B,g\u000e\u0006\u0002qgB\u00111%]\u0005\u0003e\u0012\u0012A!\u00168ji\")A/\u001ca\u0001k\u000611m\u001c8gS\u001e\u0004\"A^=\u000e\u0003]T!\u0001\u001f\u000e\u0002\u001b\r|gNZ5hkJ\fG/[8o\u0013\tQxOA\u0007D_:4\u0017nZ;sCRLwN\u001c\u0005\u0006y\u0002!\t%`\u0001\u000faJ|7-Z:t\u000b2,W.\u001a8u)\u0019\u0001h0!\u0001\u0002\n!)qp\u001fa\u0001W\u0005)a/\u00197vK\"9\u00111A>A\u0002\u0005\u0015\u0011aA2uqB\u0019\u0011q\u0001#\u000e\u0003\u0001Aq!a\u0003|\u0001\u0004\ti!A\u0002pkR\u0004R!a\u0004\u0002\u0014Aj!!!\u0005\u000b\u0005\u0015Q\u0012\u0002BA\u000b\u0003#\u0011\u0011bQ8mY\u0016\u001cGo\u001c:\t\u000f\u0005e\u0001\u0001\"\u0011\u0002\u001c\u00059qN\u001c+j[\u0016\u0014Hc\u00029\u0002\u001e\u0005\u0005\u0012q\u0005\u0005\b\u0003?\t9\u00021\u0001[\u0003%!\u0018.\\3ti\u0006l\u0007\u000f\u0003\u0005\u0002$\u0005]\u0001\u0019AA\u0013\u0003\u00191WO\\\"uqB\u0019\u0011qA%\t\u0011\u0005-\u0011q\u0003a\u0001\u0003\u001bAq!a\u000b\u0001\t#\ti#A\u0005f[&$h+\u00197vKR)\u0001/a\f\u00024!A\u0011\u0011GA\u0015\u0001\u0004\ti!\u0001\u0004pkR\u0004X\u000f\u001e\u0005\b\u0003\u0007\tI\u00031\u0001j\u0011\u001d\t9\u0004\u0001C\u0005\u0003s\tqC]3bIN#\u0018\r^3WC2,Xm\u0014:J]&$\u0018.\u00197\u0015\u0007u\u000bY\u0004C\u0004\u0002 \u0005U\u0002\u0019\u0001.\t\u0015Q\u0003\u0001\u0019!a\u0001\n\u0013\ty$\u0006\u0002\u0002BA)!+a\u0011[;&\u0019\u0011QI*\u0003\u00115\u000b\u0007o\u0015;bi\u0016D1\"!\u0013\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002L\u0005I1\u000f^1uK~#S-\u001d\u000b\u0004a\u00065\u0003BCA(\u0003\u000f\n\t\u00111\u0001\u0002B\u0005\u0019\u0001\u0010J\u0019\t\u0011\u0005M\u0003\u0001)Q\u0005\u0003\u0003\naa\u001d;bi\u0016\u0004\u0003\u0006BA)\u0003/\u00022aIA-\u0013\r\tY\u0006\n\u0002\niJ\fgn]5f]R\u0004")
/* loaded from: input_file:pl/touk/nussknacker/engine/flink/util/transformer/DelayFunction.class */
public class DelayFunction extends KeyedProcessFunction<String, ValueWithContext<String>, ValueWithContext<Object>> {
    private final Duration delay;
    private final MapStateDescriptor<Object, List<Context>> descriptor = new MapStateDescriptor<>("state", (TypeInformation) Predef$.MODULE$.implicitly(BasicTypeInfo.getInfoFor(Long.TYPE)), (TypeInformation) Predef$.MODULE$.implicitly(new DelayFunction$$anon$32(this, new DelayFunction$$anon$24(this))));
    private transient MapState<Object, List<Context>> state;

    private MapStateDescriptor<Object, List<Context>> descriptor() {
        return this.descriptor;
    }

    private MapState<Object, List<Context>> state() {
        return this.state;
    }

    private void state_$eq(MapState<Object, List<Context>> mapState) {
        this.state = mapState;
    }

    public void open(Configuration configuration) {
        state_$eq(getRuntimeContext().getMapState(descriptor()));
    }

    public void processElement(ValueWithContext<String> valueWithContext, KeyedProcessFunction<String, ValueWithContext<String>, ValueWithContext<Object>>.Context context, Collector<ValueWithContext<Object>> collector) {
        long Long2long = Predef$.MODULE$.Long2long(context.timestamp()) + this.delay.toMillis();
        state().put(BoxesRunTime.boxToLong(Long2long), readStateValueOrInitial(Long2long).$colon$colon(valueWithContext.context()));
        context.timerService().registerEventTimeTimer(Long2long);
    }

    public void onTimer(long j, KeyedProcessFunction<String, ValueWithContext<String>, ValueWithContext<Object>>.OnTimerContext onTimerContext, Collector<ValueWithContext<Object>> collector) {
        readStateValueOrInitial(j).reverse().foreach(new DelayFunction$$anonfun$onTimer$1(this, collector));
        state().remove(BoxesRunTime.boxToLong(j));
    }

    public void emitValue(Collector<ValueWithContext<Object>> collector, Context context) {
        collector.collect(new ValueWithContext((Object) null, context));
    }

    private List<Context> readStateValueOrInitial(long j) {
        return (List) Option$.MODULE$.apply(state().get(BoxesRunTime.boxToLong(j))).getOrElse(new DelayFunction$$anonfun$readStateValueOrInitial$1(this));
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) {
        processElement((ValueWithContext<String>) obj, (KeyedProcessFunction<String, ValueWithContext<String>, ValueWithContext<Object>>.Context) context, (Collector<ValueWithContext<Object>>) collector);
    }

    public DelayFunction(Duration duration) {
        this.delay = duration;
    }
}
