package ai.chronon.flink;

import ai.chronon.aggregator.windowing.ResolutionUtils$;
import ai.chronon.api.Extensions$;
import ai.chronon.api.Source;
import ai.chronon.flink.window.AlwaysFireOnElementTrigger;
import ai.chronon.flink.window.FlinkRowAggProcessFunction;
import ai.chronon.flink.window.FlinkRowAggregationFunction;
import ai.chronon.flink.window.KeySelector$;
import ai.chronon.flink.window.TimestampedIR;
import ai.chronon.flink.window.TimestampedTile;
import ai.chronon.online.GroupByServingInfoParsed;
import ai.chronon.online.KVStore;
import ai.chronon.online.SparkConversions$;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.api.scala.typeutils.OptionTypeInfo;
import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.scala.OutputTag;
import org.apache.flink.streaming.api.scala.OutputTag$;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.spark.sql.Encoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Array$;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: FlinkJob.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005Ub\u0001\u0002\t\u0012\u0001aA\u0001\u0002\t\u0001\u0003\u0002\u0003\u0006I!\t\u0005\ta\u0001\u0011\t\u0011)A\u0005c!Aq\u000b\u0001B\u0001B\u0003%\u0001\f\u0003\u0005]\u0001\t\u0005\t\u0015!\u0003^\u0011!)\u0007A!A!\u0002\u00131\u0007\"B5\u0001\t\u0003Q\u0007BB9\u0001A\u0003%!\u000fC\u0004y\u0001\t\u0007I\u0011A=\t\u000f\u0005\u0015\u0001\u0001)A\u0005u\"I\u0011q\u0001\u0001C\u0002\u0013E\u0011\u0011\u0002\u0005\t\u0003#\u0001\u0001\u0015!\u0003\u0002\f!A\u00111\u0003\u0001C\u0002\u0013\u0005\u0011\u0010C\u0004\u0002\u0016\u0001\u0001\u000b\u0011\u0002>\t\u000f\u0005]\u0001\u0001\"\u0001\u0002\u001a!9\u0011q\u0006\u0001\u0005\u0002\u0005E\"\u0001\u0003$mS:\\'j\u001c2\u000b\u0005I\u0019\u0012!\u00024mS:\\'B\u0001\u000b\u0016\u0003\u001d\u0019\u0007N]8o_:T\u0011AF\u0001\u0003C&\u001c\u0001!\u0006\u0002\u001aOM\u0011\u0001A\u0007\t\u00037yi\u0011\u0001\b\u0006\u0002;\u0005)1oY1mC&\u0011q\u0004\b\u0002\u0007\u0003:L(+\u001a4\u0002\u0011\u00154XM\u001c;Te\u000e\u00042AI\u0012&\u001b\u0005\t\u0012B\u0001\u0013\u0012\u0005-1E.\u001b8l'>,(oY3\u0011\u0005\u0019:C\u0002\u0001\u0003\u0006Q\u0001\u0011\r!\u000b\u0002\u0002)F\u0011!&\f\t\u00037-J!\u0001\f\u000f\u0003\u000f9{G\u000f[5oOB\u00111DL\u0005\u0003_q\u00111!\u00118z\u0003\u0019\u0019\u0018N\\6G]B!!\u0007\u0011\"U\u001b\u0005\u0019$B\u0001\u001b6\u0003\u0015\t7/\u001f8d\u0015\t1t'A\u0005gk:\u001cG/[8og*\u0011\u0001(O\u0001\u0004CBL'B\u0001\u001e<\u0003%\u0019HO]3b[&twM\u0003\u0002\u0013y)\u0011QHP\u0001\u0007CB\f7\r[3\u000b\u0003}\n1a\u001c:h\u0013\t\t5GA\tSS\u000eD\u0017i]=oG\u001a+hn\u0019;j_:\u0004\"aQ)\u000f\u0005\u0011seBA#M\u001d\t15J\u0004\u0002H\u00156\t\u0001J\u0003\u0002J/\u00051AH]8pizJ\u0011AF\u0005\u0003)UI!!T\n\u0002\r=tG.\u001b8f\u0013\ty\u0005+A\u0004L-N#xN]3\u000b\u00055\u001b\u0012B\u0001*T\u0005)\u0001V\u000f\u001e*fcV,7\u000f\u001e\u0006\u0003\u001fB\u0003\"AI+\n\u0005Y\u000b\"!D,sSR,'+Z:q_:\u001cX-\u0001\rhe>,\bOQ=TKJ4\u0018N\\4J]\u001a|\u0007+\u0019:tK\u0012\u0004\"!\u0017.\u000e\u0003AK!a\u0017)\u00031\u001d\u0013x.\u001e9CsN+'O^5oO&sgm\u001c)beN,G-A\u0004f]\u000e|G-\u001a:\u0011\u0007y\u001bW%D\u0001`\u0015\t\u0001\u0017-A\u0002tc2T!A\u0019\u001f\u0002\u000bM\u0004\u0018M]6\n\u0005\u0011|&aB#oG>$WM]\u0001\fa\u0006\u0014\u0018\r\u001c7fY&\u001cX\u000e\u0005\u0002\u001cO&\u0011\u0001\u000e\b\u0002\u0004\u0013:$\u0018A\u0002\u001fj]&$h\b\u0006\u0004lY6tw\u000e\u001d\t\u0004E\u0001)\u0003\"\u0002\u0011\u0007\u0001\u0004\t\u0003\"\u0002\u0019\u0007\u0001\u0004\t\u0004\"B,\u0007\u0001\u0004A\u0006\"\u0002/\u0007\u0001\u0004i\u0006\"B3\u0007\u0001\u00041\u0017A\u00027pO\u001e,'\u000f\u0005\u0002tm6\tAO\u0003\u0002v}\u0005)1\u000f\u001c45U&\u0011q\u000f\u001e\u0002\u0007\u0019><w-\u001a:\u0002!\u0019,\u0017\r^;sK\u001e\u0013x.\u001e9OC6,W#\u0001>\u0011\u0005m|hB\u0001?~!\t9E$\u0003\u0002\u007f9\u00051\u0001K]3eK\u001aLA!!\u0001\u0002\u0004\t11\u000b\u001e:j]\u001eT!A \u000f\u0002#\u0019,\u0017\r^;sK\u001e\u0013x.\u001e9OC6,\u0007%\u0001\u0005fqB\u0014XI^1m+\t\tY\u0001\u0005\u0003#\u0003\u001b)\u0013bAA\b#\t)2\u000b]1sW\u0016C\bO]3tg&|g.\u0012<bY\u001as\u0017!C3yaJ,e/\u00197!\u0003)Y\u0017MZ6b)>\u0004\u0018nY\u0001\fW\u000647.\u0019+pa&\u001c\u0007%A\u0007sk:<%o\\;q\u0005fTuN\u0019\u000b\u0005\u00037\t)\u0003E\u0003\u0002\u001e\u0005\u0005B+\u0004\u0002\u0002 )\u0011QdN\u0005\u0005\u0003G\tyB\u0001\u0006ECR\f7\u000b\u001e:fC6Dq!a\n\u000f\u0001\u0004\tI#A\u0002f]Z\u0004B!!\b\u0002,%!\u0011QFA\u0010\u0005i\u0019FO]3b[\u0016CXmY;uS>tWI\u001c<je>tW.\u001a8u\u0003I\u0011XO\u001c+jY\u0016$wI]8va\nK(j\u001c2\u0015\t\u0005m\u00111\u0007\u0005\b\u0003Oy\u0001\u0019AA\u0015\u0001")
/* loaded from: input_file:ai/chronon/flink/FlinkJob.class */
public class FlinkJob<T> {
    private final FlinkSource<T> eventSrc;
    private final RichAsyncFunction<KVStore.PutRequest, WriteResponse> sinkFn;
    private final GroupByServingInfoParsed groupByServingInfoParsed;
    private final int parallelism;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final String featureGroupName;
    private final SparkExpressionEvalFn<T> exprEval;
    private final String kafkaTopic;

    public String featureGroupName() {
        return this.featureGroupName;
    }

    public SparkExpressionEvalFn<T> exprEval() {
        return this.exprEval;
    }

    public String kafkaTopic() {
        return this.kafkaTopic;
    }

    public DataStream<WriteResponse> runGroupByJob(StreamExecutionEnvironment streamExecutionEnvironment) {
        this.logger.info(new StringBuilder(0).append(new StringOps("Running Flink job for featureGroupName=%s, kafkaTopic=%s. ").format(Predef$.MODULE$.genericWrapArray(new Object[]{featureGroupName(), kafkaTopic()}))).append("Tiling is disabled.").toString());
        DataStream<T> dataStream = this.eventSrc.getDataStream(kafkaTopic(), featureGroupName(), streamExecutionEnvironment, this.parallelism);
        final FlinkJob flinkJob = null;
        final FlinkJob flinkJob2 = null;
        return AsyncKVStoreWriter$.MODULE$.withUnorderedWaits(dataStream.flatMap(exprEval(), new FlinkJob$$anon$3(null, new CaseClassTypeInfo<Tuple2<String, Object>>(flinkJob) { // from class: ai.chronon.flink.FlinkJob$$anon$1
            public /* synthetic */ TypeInformation[] protected$types(FlinkJob$$anon$1 flinkJob$$anon$1) {
                return flinkJob$$anon$1.types;
            }

            public TypeSerializer<Tuple2<String, Object>> createSerializer(ExecutionConfig executionConfig) {
                final TypeSerializer[] typeSerializerArr = new TypeSerializer[getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), getArity()).foreach$mVc$sp(i -> {
                    typeSerializerArr[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                new ScalaCaseClassSerializer<Tuple2<String, Object>>(this, typeSerializerArr) { // from class: ai.chronon.flink.FlinkJob$$anon$1$$anon$2
                    /* renamed from: createInstance, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
                    public Tuple2<String, Object> m3createInstance(Object[] objArr) {
                        return new Tuple2<>((String) objArr[0], objArr[1]);
                    }

                    {
                        Class typeClass = this.getTypeClass();
                    }
                };
                return new ScalaCaseClassSerializer(getTypeClass(), typeSerializerArr);
            }

            {
                super(Tuple2.class, (TypeInformation[]) new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(TypeExtractor.createTypeInfo(Object.class), Nil$.MODULE$)).toArray((ClassTag) Predef$.MODULE$.implicitly(ClassTag$.MODULE$.apply(TypeInformation.class))), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(TypeExtractor.createTypeInfo(Object.class), Nil$.MODULE$)), new $colon.colon("_1", new $colon.colon("_2", Nil$.MODULE$)));
            }
        })).uid(new StringBuilder(24).append("spark-expr-eval-flatmap-").append(featureGroupName()).toString()).name(new StringBuilder(26).append("Spark expression eval for ").append(featureGroupName()).toString()).setParallelism(dataStream.parallelism()).flatMap(new AvroCodecFn(this.groupByServingInfoParsed), new CaseClassTypeInfo<KVStore.PutRequest>(flinkJob2) { // from class: ai.chronon.flink.FlinkJob$$anon$5
            public /* synthetic */ TypeInformation[] protected$types(FlinkJob$$anon$5 flinkJob$$anon$5) {
                return flinkJob$$anon$5.types;
            }

            public TypeSerializer<KVStore.PutRequest> createSerializer(ExecutionConfig executionConfig) {
                final TypeSerializer[] typeSerializerArr = new TypeSerializer[getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), getArity()).foreach$mVc$sp(i -> {
                    typeSerializerArr[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                new ScalaCaseClassSerializer<KVStore.PutRequest>(this, typeSerializerArr) { // from class: ai.chronon.flink.FlinkJob$$anon$5$$anon$6
                    /* renamed from: createInstance, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
                    public KVStore.PutRequest m20createInstance(Object[] objArr) {
                        return new KVStore.PutRequest((byte[]) objArr[0], (byte[]) objArr[1], (String) objArr[2], (Option) objArr[3]);
                    }

                    {
                        Class typeClass = this.getTypeClass();
                    }
                };
                return new ScalaCaseClassSerializer(getTypeClass(), typeSerializerArr);
            }

            {
                super(KVStore.PutRequest.class, (TypeInformation[]) Nil$.MODULE$.toArray((ClassTag) Predef$.MODULE$.implicitly(ClassTag$.MODULE$.apply(TypeInformation.class))), new $colon.colon(PrimitiveArrayTypeInfo.getInfoFor(byte[].class), new $colon.colon(PrimitiveArrayTypeInfo.getInfoFor(byte[].class), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(new OptionTypeInfo(BasicTypeInfo.getInfoFor(Long.TYPE)), Nil$.MODULE$)))), new $colon.colon("keyBytes", new $colon.colon("valueBytes", new $colon.colon("dataset", new $colon.colon("tsMillis", Nil$.MODULE$)))));
            }
        }).uid(new StringBuilder(16).append("avro-conversion-").append(featureGroupName()).toString()).name(new StringBuilder(20).append("Avro conversion for ").append(featureGroupName()).toString()).setParallelism(dataStream.parallelism()), this.sinkFn, featureGroupName(), AsyncKVStoreWriter$.MODULE$.withUnorderedWaits$default$4(), AsyncKVStoreWriter$.MODULE$.withUnorderedWaits$default$5());
    }

    public DataStream<WriteResponse> runTiledGroupByJob(StreamExecutionEnvironment streamExecutionEnvironment) {
        this.logger.info(new StringBuilder(0).append(new StringOps("Running Flink job for featureGroupName=%s, kafkaTopic=%s. ").format(Predef$.MODULE$.genericWrapArray(new Object[]{featureGroupName(), kafkaTopic()}))).append("Tiling is enabled.").toString());
        Option smallestWindowResolutionInMillis = ResolutionUtils$.MODULE$.getSmallestWindowResolutionInMillis(this.groupByServingInfoParsed.groupBy);
        DataStream<T> dataStream = this.eventSrc.getDataStream(kafkaTopic(), featureGroupName(), streamExecutionEnvironment, this.parallelism);
        final FlinkJob flinkJob = null;
        DataStream parallelism = dataStream.flatMap(exprEval(), new FlinkJob$$anon$9(null, new CaseClassTypeInfo<Tuple2<String, Object>>(flinkJob) { // from class: ai.chronon.flink.FlinkJob$$anon$7
            public /* synthetic */ TypeInformation[] protected$types(FlinkJob$$anon$7 flinkJob$$anon$7) {
                return flinkJob$$anon$7.types;
            }

            public TypeSerializer<Tuple2<String, Object>> createSerializer(ExecutionConfig executionConfig) {
                final TypeSerializer[] typeSerializerArr = new TypeSerializer[getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), getArity()).foreach$mVc$sp(i -> {
                    typeSerializerArr[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                new ScalaCaseClassSerializer<Tuple2<String, Object>>(this, typeSerializerArr) { // from class: ai.chronon.flink.FlinkJob$$anon$7$$anon$8
                    /* renamed from: createInstance, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
                    public Tuple2<String, Object> m22createInstance(Object[] objArr) {
                        return new Tuple2<>((String) objArr[0], objArr[1]);
                    }

                    {
                        Class typeClass = this.getTypeClass();
                    }
                };
                return new ScalaCaseClassSerializer(getTypeClass(), typeSerializerArr);
            }

            {
                super(Tuple2.class, (TypeInformation[]) new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(TypeExtractor.createTypeInfo(Object.class), Nil$.MODULE$)).toArray((ClassTag) Predef$.MODULE$.implicitly(ClassTag$.MODULE$.apply(TypeInformation.class))), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(TypeExtractor.createTypeInfo(Object.class), Nil$.MODULE$)), new $colon.colon("_1", new $colon.colon("_2", Nil$.MODULE$)));
            }
        })).uid(new StringBuilder(24).append("spark-expr-eval-flatmap-").append(featureGroupName()).toString()).name(new StringBuilder(26).append("Spark expression eval for ").append(featureGroupName()).toString()).setParallelism(dataStream.parallelism());
        Seq seq = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(exprEval().getOutputSchema().fields())).map(structField -> {
            return new Tuple2(structField.name(), SparkConversions$.MODULE$.toChrononType(structField.name(), structField.dataType()));
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class))))).toSeq();
        TumblingEventTimeWindows of = TumblingEventTimeWindows.of(Time.milliseconds(BoxesRunTime.unboxToLong(smallestWindowResolutionInMillis.get())));
        AlwaysFireOnElementTrigger alwaysFireOnElementTrigger = new AlwaysFireOnElementTrigger();
        final FlinkJob flinkJob2 = null;
        OutputTag apply = OutputTag$.MODULE$.apply("tiling-late-events", new FlinkJob$$anon$13(null, new CaseClassTypeInfo<Tuple2<String, Object>>(flinkJob2) { // from class: ai.chronon.flink.FlinkJob$$anon$11
            public /* synthetic */ TypeInformation[] protected$types(FlinkJob$$anon$11 flinkJob$$anon$11) {
                return flinkJob$$anon$11.types;
            }

            public TypeSerializer<Tuple2<String, Object>> createSerializer(ExecutionConfig executionConfig) {
                final TypeSerializer[] typeSerializerArr = new TypeSerializer[getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), getArity()).foreach$mVc$sp(i -> {
                    typeSerializerArr[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                new ScalaCaseClassSerializer<Tuple2<String, Object>>(this, typeSerializerArr) { // from class: ai.chronon.flink.FlinkJob$$anon$11$$anon$12
                    /* renamed from: createInstance, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
                    public Tuple2<String, Object> m5createInstance(Object[] objArr) {
                        return new Tuple2<>((String) objArr[0], objArr[1]);
                    }

                    {
                        Class typeClass = this.getTypeClass();
                    }
                };
                return new ScalaCaseClassSerializer(getTypeClass(), typeSerializerArr);
            }

            {
                super(Tuple2.class, (TypeInformation[]) new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(TypeExtractor.createTypeInfo(Object.class), Nil$.MODULE$)).toArray((ClassTag) Predef$.MODULE$.implicitly(ClassTag$.MODULE$.apply(TypeInformation.class))), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(TypeExtractor.createTypeInfo(Object.class), Nil$.MODULE$)), new $colon.colon("_1", new $colon.colon("_2", Nil$.MODULE$)));
            }
        }));
        final FlinkJob flinkJob3 = null;
        final FlinkJob flinkJob4 = null;
        DataStream parallelism2 = parallelism.keyBy(KeySelector$.MODULE$.getKeySelectionFunction(this.groupByServingInfoParsed.groupBy), new FlinkJob$$anon$15(null, TypeExtractor.createTypeInfo(Object.class))).window(of).trigger(alwaysFireOnElementTrigger).sideOutputLateData(apply).aggregate(new FlinkRowAggregationFunction(this.groupByServingInfoParsed.groupBy, seq), new FlinkRowAggProcessFunction(this.groupByServingInfoParsed.groupBy, seq), new CaseClassTypeInfo<TimestampedIR>(flinkJob3) { // from class: ai.chronon.flink.FlinkJob$$anon$17
            public /* synthetic */ TypeInformation[] protected$types(FlinkJob$$anon$17 flinkJob$$anon$17) {
                return flinkJob$$anon$17.types;
            }

            public TypeSerializer<TimestampedIR> createSerializer(ExecutionConfig executionConfig) {
                final TypeSerializer[] typeSerializerArr = new TypeSerializer[getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), getArity()).foreach$mVc$sp(i -> {
                    typeSerializerArr[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                new ScalaCaseClassSerializer<TimestampedIR>(this, typeSerializerArr) { // from class: ai.chronon.flink.FlinkJob$$anon$17$$anon$18
                    /* renamed from: createInstance, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
                    public TimestampedIR m7createInstance(Object[] objArr) {
                        return new TimestampedIR((Object[]) objArr[0], (Option) objArr[1]);
                    }

                    {
                        Class typeClass = this.getTypeClass();
                    }
                };
                return new ScalaCaseClassSerializer(getTypeClass(), typeSerializerArr);
            }

            /* JADX WARN: Illegal instructions before constructor call */
            {
                /*
                    Method dump skipped, instructions count: 451
                    To view this dump add '--comments-level debug' option
                */
                throw new UnsupportedOperationException("Method not decompiled: ai.chronon.flink.FlinkJob$$anon$17.<init>(ai.chronon.flink.FlinkJob):void");
            }
        }, new CaseClassTypeInfo<TimestampedIR>(flinkJob4) { // from class: ai.chronon.flink.FlinkJob$$anon$19
            public /* synthetic */ TypeInformation[] protected$types(FlinkJob$$anon$19 flinkJob$$anon$19) {
                return flinkJob$$anon$19.types;
            }

            public TypeSerializer<TimestampedIR> createSerializer(ExecutionConfig executionConfig) {
                final TypeSerializer[] typeSerializerArr = new TypeSerializer[getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), getArity()).foreach$mVc$sp(i -> {
                    typeSerializerArr[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                new ScalaCaseClassSerializer<TimestampedIR>(this, typeSerializerArr) { // from class: ai.chronon.flink.FlinkJob$$anon$19$$anon$20
                    /* renamed from: createInstance, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
                    public TimestampedIR m9createInstance(Object[] objArr) {
                        return new TimestampedIR((Object[]) objArr[0], (Option) objArr[1]);
                    }

                    {
                        Class typeClass = this.getTypeClass();
                    }
                };
                return new ScalaCaseClassSerializer(getTypeClass(), typeSerializerArr);
            }

            /* JADX WARN: Illegal instructions before constructor call */
            {
                /*
                    Method dump skipped, instructions count: 451
                    To view this dump add '--comments-level debug' option
                */
                throw new UnsupportedOperationException("Method not decompiled: ai.chronon.flink.FlinkJob$$anon$19.<init>(ai.chronon.flink.FlinkJob):void");
            }
        }, new CaseClassTypeInfo<TimestampedTile>(this) { // from class: ai.chronon.flink.FlinkJob$$anon$21
            public /* synthetic */ TypeInformation[] protected$types(FlinkJob$$anon$21 flinkJob$$anon$21) {
                return flinkJob$$anon$21.types;
            }

            public TypeSerializer<TimestampedTile> createSerializer(ExecutionConfig executionConfig) {
                final TypeSerializer[] typeSerializerArr = new TypeSerializer[getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), getArity()).foreach$mVc$sp(i -> {
                    typeSerializerArr[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                new ScalaCaseClassSerializer<TimestampedTile>(this, typeSerializerArr) { // from class: ai.chronon.flink.FlinkJob$$anon$21$$anon$24
                    /* renamed from: createInstance, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
                    public TimestampedTile m12createInstance(Object[] objArr) {
                        return new TimestampedTile((List) objArr[0], (byte[]) objArr[1], BoxesRunTime.unboxToLong(objArr[2]));
                    }

                    {
                        Class typeClass = this.getTypeClass();
                    }
                };
                return new ScalaCaseClassSerializer(getTypeClass(), typeSerializerArr);
            }

            {
                super(TimestampedTile.class, (TypeInformation[]) Nil$.MODULE$.toArray((ClassTag) Predef$.MODULE$.implicitly(ClassTag$.MODULE$.apply(TypeInformation.class))), new $colon.colon(new FlinkJob$$anon$21$$anon$22(null, TypeExtractor.createTypeInfo(Object.class)), new $colon.colon(PrimitiveArrayTypeInfo.getInfoFor(byte[].class), new $colon.colon(BasicTypeInfo.getInfoFor(Long.TYPE), Nil$.MODULE$))), new $colon.colon("keys", new $colon.colon("tileBytes", new $colon.colon("latestTsMillis", Nil$.MODULE$))));
            }
        }).uid(new StringBuilder(10).append("tiling-01-").append(featureGroupName()).toString()).name(new StringBuilder(11).append("Tiling for ").append(featureGroupName()).toString()).setParallelism(dataStream.parallelism());
        final FlinkJob flinkJob5 = null;
        final FlinkJob flinkJob6 = null;
        parallelism2.getSideOutput(apply, new FlinkJob$$anon$27(null, new CaseClassTypeInfo<Tuple2<String, Object>>(flinkJob5) { // from class: ai.chronon.flink.FlinkJob$$anon$25
            public /* synthetic */ TypeInformation[] protected$types(FlinkJob$$anon$25 flinkJob$$anon$25) {
                return flinkJob$$anon$25.types;
            }

            public TypeSerializer<Tuple2<String, Object>> createSerializer(ExecutionConfig executionConfig) {
                final TypeSerializer[] typeSerializerArr = new TypeSerializer[getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), getArity()).foreach$mVc$sp(i -> {
                    typeSerializerArr[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                new ScalaCaseClassSerializer<Tuple2<String, Object>>(this, typeSerializerArr) { // from class: ai.chronon.flink.FlinkJob$$anon$25$$anon$26
                    /* renamed from: createInstance, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
                    public Tuple2<String, Object> m14createInstance(Object[] objArr) {
                        return new Tuple2<>((String) objArr[0], objArr[1]);
                    }

                    {
                        Class typeClass = this.getTypeClass();
                    }
                };
                return new ScalaCaseClassSerializer(getTypeClass(), typeSerializerArr);
            }

            {
                super(Tuple2.class, (TypeInformation[]) new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(TypeExtractor.createTypeInfo(Object.class), Nil$.MODULE$)).toArray((ClassTag) Predef$.MODULE$.implicitly(ClassTag$.MODULE$.apply(TypeInformation.class))), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(TypeExtractor.createTypeInfo(Object.class), Nil$.MODULE$)), new $colon.colon("_1", new $colon.colon("_2", Nil$.MODULE$)));
            }
        })).flatMap(new LateEventCounter(featureGroupName()), new FlinkJob$$anon$31(null, new CaseClassTypeInfo<Tuple2<String, Object>>(flinkJob6) { // from class: ai.chronon.flink.FlinkJob$$anon$29
            public /* synthetic */ TypeInformation[] protected$types(FlinkJob$$anon$29 flinkJob$$anon$29) {
                return flinkJob$$anon$29.types;
            }

            public TypeSerializer<Tuple2<String, Object>> createSerializer(ExecutionConfig executionConfig) {
                final TypeSerializer[] typeSerializerArr = new TypeSerializer[getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), getArity()).foreach$mVc$sp(i -> {
                    typeSerializerArr[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                new ScalaCaseClassSerializer<Tuple2<String, Object>>(this, typeSerializerArr) { // from class: ai.chronon.flink.FlinkJob$$anon$29$$anon$30
                    /* renamed from: createInstance, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
                    public Tuple2<String, Object> m16createInstance(Object[] objArr) {
                        return new Tuple2<>((String) objArr[0], objArr[1]);
                    }

                    {
                        Class typeClass = this.getTypeClass();
                    }
                };
                return new ScalaCaseClassSerializer(getTypeClass(), typeSerializerArr);
            }

            {
                super(Tuple2.class, (TypeInformation[]) new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(TypeExtractor.createTypeInfo(Object.class), Nil$.MODULE$)).toArray((ClassTag) Predef$.MODULE$.implicitly(ClassTag$.MODULE$.apply(TypeInformation.class))), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(TypeExtractor.createTypeInfo(Object.class), Nil$.MODULE$)), new $colon.colon("_1", new $colon.colon("_2", Nil$.MODULE$)));
            }
        })).uid(new StringBuilder(22).append("tiling-side-output-01-").append(featureGroupName()).toString()).name(new StringBuilder(33).append("Tiling Side Output Late Data for ").append(featureGroupName()).toString()).setParallelism(dataStream.parallelism());
        final FlinkJob flinkJob7 = null;
        return AsyncKVStoreWriter$.MODULE$.withUnorderedWaits(parallelism2.flatMap(new TiledAvroCodecFn(this.groupByServingInfoParsed), new CaseClassTypeInfo<KVStore.PutRequest>(flinkJob7) { // from class: ai.chronon.flink.FlinkJob$$anon$33
            public /* synthetic */ TypeInformation[] protected$types(FlinkJob$$anon$33 flinkJob$$anon$33) {
                return flinkJob$$anon$33.types;
            }

            public TypeSerializer<KVStore.PutRequest> createSerializer(ExecutionConfig executionConfig) {
                final TypeSerializer[] typeSerializerArr = new TypeSerializer[getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), getArity()).foreach$mVc$sp(i -> {
                    typeSerializerArr[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                new ScalaCaseClassSerializer<KVStore.PutRequest>(this, typeSerializerArr) { // from class: ai.chronon.flink.FlinkJob$$anon$33$$anon$34
                    /* renamed from: createInstance, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
                    public KVStore.PutRequest m18createInstance(Object[] objArr) {
                        return new KVStore.PutRequest((byte[]) objArr[0], (byte[]) objArr[1], (String) objArr[2], (Option) objArr[3]);
                    }

                    {
                        Class typeClass = this.getTypeClass();
                    }
                };
                return new ScalaCaseClassSerializer(getTypeClass(), typeSerializerArr);
            }

            {
                super(KVStore.PutRequest.class, (TypeInformation[]) Nil$.MODULE$.toArray((ClassTag) Predef$.MODULE$.implicitly(ClassTag$.MODULE$.apply(TypeInformation.class))), new $colon.colon(PrimitiveArrayTypeInfo.getInfoFor(byte[].class), new $colon.colon(PrimitiveArrayTypeInfo.getInfoFor(byte[].class), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(new OptionTypeInfo(BasicTypeInfo.getInfoFor(Long.TYPE)), Nil$.MODULE$)))), new $colon.colon("keyBytes", new $colon.colon("valueBytes", new $colon.colon("dataset", new $colon.colon("tsMillis", Nil$.MODULE$)))));
            }
        }).uid(new StringBuilder(19).append("avro-conversion-01-").append(featureGroupName()).toString()).name(new StringBuilder(20).append("Avro conversion for ").append(featureGroupName()).toString()).setParallelism(dataStream.parallelism()), this.sinkFn, featureGroupName(), AsyncKVStoreWriter$.MODULE$.withUnorderedWaits$default$4(), AsyncKVStoreWriter$.MODULE$.withUnorderedWaits$default$5());
    }

    public FlinkJob(FlinkSource<T> flinkSource, RichAsyncFunction<KVStore.PutRequest, WriteResponse> richAsyncFunction, GroupByServingInfoParsed groupByServingInfoParsed, Encoder<T> encoder, int i) {
        this.eventSrc = flinkSource;
        this.sinkFn = richAsyncFunction;
        this.groupByServingInfoParsed = groupByServingInfoParsed;
        this.parallelism = i;
        this.featureGroupName = groupByServingInfoParsed.groupBy.getMetaData().getName();
        this.logger.info(new StringOps("Creating Flink job. featureGroupName=%s").format(Predef$.MODULE$.genericWrapArray(new Object[]{featureGroupName()})));
        this.exprEval = new SparkExpressionEvalFn<>(encoder, groupByServingInfoParsed.groupBy);
        if (Extensions$.MODULE$.GroupByOps(groupByServingInfoParsed.groupBy).streamingSource().isEmpty()) {
            throw new IllegalArgumentException(new StringBuilder(44).append("Invalid feature group: ").append(featureGroupName()).append(". No streaming source").toString());
        }
        this.kafkaTopic = Extensions$.MODULE$.SourceOps((Source) Extensions$.MODULE$.GroupByOps(groupByServingInfoParsed.groupBy).streamingSource().get()).topic();
    }
}
