package pl.touk.nussknacker.engine.kafka.source;

import java.time.Duration;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import pl.touk.nussknacker.engine.api.Context;
import pl.touk.nussknacker.engine.api.deployment.TestProcess;
import pl.touk.nussknacker.engine.api.process.TestDataGenerator;
import pl.touk.nussknacker.engine.api.test.TestDataParser;
import pl.touk.nussknacker.engine.flink.api.compat.ExplicitUidInOperatorsSupport;
import pl.touk.nussknacker.engine.flink.api.process.FlinkContextInitializer;
import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomNodeContext;
import pl.touk.nussknacker.engine.flink.api.process.FlinkIntermediateRawSource;
import pl.touk.nussknacker.engine.flink.api.process.FlinkSource;
import pl.touk.nussknacker.engine.flink.api.process.FlinkSourceTestSupport;
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.StandardTimestampWatermarkHandler;
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.TimestampWatermarkHandler;
import pl.touk.nussknacker.engine.kafka.ConsumerGroupDeterminer$;
import pl.touk.nussknacker.engine.kafka.KafkaConfig;
import pl.touk.nussknacker.engine.kafka.KafkaUtils$;
import pl.touk.nussknacker.engine.kafka.ListUtil$;
import pl.touk.nussknacker.engine.kafka.PreparedKafkaTopic;
import pl.touk.nussknacker.engine.kafka.RecordFormatter;
import scala.Option;
import scala.Serializable;
import scala.Some;
import scala.collection.JavaConverters$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaSource.scala */
@ScalaSignature(bytes = "\u0006\u0001\tMb\u0001\u0002\u000e\u001c\u0001!B\u0001B\u0017\u0001\u0003\u0002\u0003\u0006Ia\u0017\u0005\tW\u0002\u0011\t\u0011)A\u0005Y\"Aq\u000e\u0001B\u0001B\u0003%\u0001\u000f\u0003\u0005\u007f\u0001\t\u0005\t\u0015!\u0003��\u0011)\t\t\u0002\u0001B\u0001B\u0003%\u00111\u0003\u0005\u000b\u00033\u0001!\u0011!Q\u0001\n\u0005m\u0001bBA\u0017\u0001\u0011\u0005\u0011q\u0006\u0005\u000b\u0003\u0003\u0002\u0001R1A\u0005\n\u0005\r\u0003bBA$\u0001\u0011\u0005\u0013\u0011\n\u0005\n\u0003g\u0002!\u0019!C!\u0003kB\u0001\"!#\u0001A\u0003%\u0011q\u000f\u0005\b\u0003\u0017\u0003A\u0011CAG\u0011\u001d\t\t\u000b\u0001C\t\u0003GCq!a*\u0001\t\u0003\nI\u000bC\u0004\u0002B\u0002!\t%a1\t\u000f\u0005E\u0007\u0001\"\u0011\u0002T\"9\u0011Q\u001b\u0001\u0005B\u0005M\u0007bBAl\u0001\u0011E\u0011\u0011\\\u0004\b\u0003k\\\u0002\u0012AA|\r\u0019Q2\u0004#\u0001\u0002z\"9\u0011Q\u0006\u000b\u0005\u0002\u0005m\b\"CA\u007f)\t\u0007I\u0011AA��\u0011!\u0011\t\u0001\u0006Q\u0001\n\u0005m\u0006\"\u0003B\u0002)E\u0005I\u0011\u0001B\u0003\u0011%\u0011y\u0002FA\u0001\n\u0013\u0011\tCA\u0006LC\u001a\\\u0017mU8ve\u000e,'B\u0001\u000f\u001e\u0003\u0019\u0019x.\u001e:dK*\u0011adH\u0001\u0006W\u000647.\u0019\u0006\u0003A\u0005\na!\u001a8hS:,'B\u0001\u0012$\u0003-qWo]:l]\u0006\u001c7.\u001a:\u000b\u0005\u0011*\u0013\u0001\u0002;pk.T\u0011AJ\u0001\u0003a2\u001c\u0001!\u0006\u0002*yMA\u0001A\u000b\u0019F\u0011.sE\u000b\u0005\u0002,]5\tAFC\u0001.\u0003\u0015\u00198-\u00197b\u0013\tyCF\u0001\u0004B]f\u0014VM\u001a\t\u0004caRT\"\u0001\u001a\u000b\u0005M\"\u0014a\u00029s_\u000e,7o\u001d\u0006\u0003kY\n1!\u00199j\u0015\t9t$A\u0003gY&t7.\u0003\u0002:e\tYa\t\\5oWN{WO]2f!\tYD\b\u0004\u0001\u0005\u000bu\u0002!\u0019\u0001 \u0003\u0003Q\u000b\"a\u0010\"\u0011\u0005-\u0002\u0015BA!-\u0005\u001dqu\u000e\u001e5j]\u001e\u0004\"aK\"\n\u0005\u0011c#aA!osB\u0019\u0011G\u0012\u001e\n\u0005\u001d\u0013$A\u0007$mS:\\\u0017J\u001c;fe6,G-[1uKJ\u000bwoU8ve\u000e,\u0007CA\u0016J\u0013\tQEF\u0001\u0007TKJL\u0017\r\\5{C\ndW\rE\u00022\u0019jJ!!\u0014\u001a\u0003-\u0019c\u0017N\\6T_V\u00148-\u001a+fgR\u001cV\u000f\u001d9peR\u0004\"a\u0014*\u000e\u0003AS!aM)\u000b\u0005Uz\u0012BA*Q\u0005E!Vm\u001d;ECR\fw)\u001a8fe\u0006$xN\u001d\t\u0003+bk\u0011A\u0016\u0006\u0003/R\naaY8na\u0006$\u0018BA-W\u0005u)\u0005\u0010\u001d7jG&$X+\u001b3J]>\u0003XM]1u_J\u001c8+\u001e9q_J$\u0018A\u00049sKB\f'/\u001a3U_BL7m\u001d\t\u00049\u0012<gBA/c\u001d\tq\u0016-D\u0001`\u0015\t\u0001w%\u0001\u0004=e>|GOP\u0005\u0002[%\u00111\rL\u0001\ba\u0006\u001c7.Y4f\u0013\t)gM\u0001\u0003MSN$(BA2-!\tA\u0017.D\u0001\u001e\u0013\tQWD\u0001\nQe\u0016\u0004\u0018M]3e\u0017\u000647.\u0019+pa&\u001c\u0017aC6bM.\f7i\u001c8gS\u001e\u0004\"\u0001[7\n\u00059l\"aC&bM.\f7i\u001c8gS\u001e\fQ\u0003Z3tKJL\u0017\r\\5{CRLwN\\*dQ\u0016l\u0017\rE\u0002ryjj\u0011A\u001d\u0006\u0003=MT!\u0001^;\u0002\u0015\r|gN\\3di>\u00148O\u0003\u0002wo\u0006I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003oaT!!\u001f>\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005Y\u0018aA8sO&\u0011QP\u001d\u0002\u001b\u0017\u000647.\u0019#fg\u0016\u0014\u0018.\u00197ju\u0006$\u0018n\u001c8TG\",W.Y\u0001\u000fa\u0006\u001c8/\u001a3BgNLwM\\3s!\u0015Y\u0013\u0011AA\u0003\u0013\r\t\u0019\u0001\f\u0002\u0007\u001fB$\u0018n\u001c8\u0011\u000b\u0005\u001d\u0011Q\u0002\u001e\u000e\u0005\u0005%!bAA\u0006i\u0005\u0011B/[7fgR\fW\u000e]<bi\u0016\u0014X.\u0019:l\u0013\u0011\ty!!\u0003\u00033QKW.Z:uC6\u0004x+\u0019;fe6\f'o\u001b%b]\u0012dWM]\u0001\u0010e\u0016\u001cwN\u001d3G_Jl\u0017\r\u001e;feB\u0019\u0001.!\u0006\n\u0007\u0005]QDA\bSK\u000e|'\u000f\u001a$pe6\fG\u000f^3s\u0003]yg/\u001a:sS\u0012$WM\\\"p]N,X.\u001a:He>,\b\u000fE\u0003,\u0003\u0003\ti\u0002\u0005\u0003\u0002 \u0005\u001db\u0002BA\u0011\u0003G\u0001\"A\u0018\u0017\n\u0007\u0005\u0015B&\u0001\u0004Qe\u0016$WMZ\u0005\u0005\u0003S\tYC\u0001\u0004TiJLgn\u001a\u0006\u0004\u0003Ka\u0013A\u0002\u001fj]&$h\b\u0006\b\u00022\u0005U\u0012qGA\u001d\u0003w\ti$a\u0010\u0011\t\u0005M\u0002AO\u0007\u00027!)!l\u0002a\u00017\")1n\u0002a\u0001Y\")qn\u0002a\u0001a\")ap\u0002a\u0001\u007f\"9\u0011\u0011C\u0004A\u0002\u0005M\u0001\"CA\r\u000fA\u0005\t\u0019AA\u000e\u0003\u0019!x\u000e]5dgV\u0011\u0011Q\t\t\u00059\u0012\fi\"\u0001\u0007t_V\u00148-Z*ue\u0016\fW\u000e\u0006\u0004\u0002L\u0005}\u0013\u0011\u000e\t\u0007\u0003\u001b\n\u0019&a\u0016\u000e\u0005\u0005=#bA\u0017\u0002R)\u0011Q'^\u0005\u0005\u0003+\nyE\u0001\u0006ECR\f7\u000b\u001e:fC6\u0004B!!\u0017\u0002\\5\t\u0011+C\u0002\u0002^E\u0013qaQ8oi\u0016DH\u000fC\u0004\u0002b%\u0001\r!a\u0019\u0002\u0007\u0015tg\u000f\u0005\u0003\u0002N\u0005\u0015\u0014\u0002BA4\u0003\u001f\u0012!d\u0015;sK\u0006lW\t_3dkRLwN\\#om&\u0014xN\\7f]RDq!a\u001b\n\u0001\u0004\ti'\u0001\tgY&t7NT8eK\u000e{g\u000e^3yiB\u0019\u0011'a\u001c\n\u0007\u0005E$G\u0001\fGY&t7nQ;ti>lgj\u001c3f\u0007>tG/\u001a=u\u0003=!\u0018\u0010]3J]\u001a|'/\\1uS>tWCAA<!\u0015\tI(!\";\u001b\t\tYH\u0003\u0003\u0002~\u0005}\u0014\u0001\u0003;za\u0016LgNZ8\u000b\t\u0005\u0005\u00151Q\u0001\u0007G>lWn\u001c8\u000b\u0005U:\u0018\u0002BAD\u0003w\u0012q\u0002V=qK&sgm\u001c:nCRLwN\\\u0001\u0011if\u0004X-\u00138g_Jl\u0017\r^5p]\u0002\n1C\u001a7j].\u001cv.\u001e:dK\u001a+hn\u0019;j_:$B!a$\u0002\u001eB)\u0011\u0011SAMu5\u0011\u00111\u0013\u0006\u00049\u0005U%\u0002BAL\u0003#\n\u0011BZ;oGRLwN\\:\n\t\u0005m\u00151\u0013\u0002\u000f'>,(oY3Gk:\u001cG/[8o\u0011\u001d\ty\n\u0004a\u0001\u0003;\tqbY8ogVlWM]$s_V\u0004\u0018\nZ\u0001\u0012GJ,\u0017\r^3GY&t7nU8ve\u000e,G\u0003BAH\u0003KCq!a(\u000e\u0001\u0004\ti\"\u0001\thK:,'/\u0019;f)\u0016\u001cH\u000fR1uCR!\u00111VA\\!\u0015Y\u0013QVAY\u0013\r\ty\u000b\f\u0002\u0006\u0003J\u0014\u0018-\u001f\t\u0004W\u0005M\u0016bAA[Y\t!!)\u001f;f\u0011\u001d\tIL\u0004a\u0001\u0003w\u000bAa]5{KB\u00191&!0\n\u0007\u0005}FFA\u0002J]R\fa\u0002^3ti\u0012\u000bG/\u0019)beN,'/\u0006\u0002\u0002FB)\u0011qYAgu5\u0011\u0011\u0011\u001a\u0006\u0004\u0003\u0017\f\u0016\u0001\u0002;fgRLA!a4\u0002J\nqA+Z:u\t\u0006$\u0018\rU1sg\u0016\u0014\u0018\u0001\u0007;j[\u0016\u001cH/Y7q\u0003N\u001c\u0018n\u001a8fe\u001a{'\u000fV3tiV\tq0A\tuS6,7\u000f^1na\u0006\u001b8/[4oKJ\f1\u0003Z3tKJL\u0017\r\\5{KR+7\u000f\u001e#bi\u0006$RAOAn\u0003?Dq!!8\u0013\u0001\u0004\ti\"A\u0003u_BL7\rC\u0004\u0002bJ\u0001\r!a9\u0002\rI,7m\u001c:e!!\t)/!=\u0002,\u0006-VBAAt\u0015\u0011\tI/a;\u0002\u0011\r|gn];nKJTA!!<\u0002p\u000691\r\\5f]R\u001c(B\u0001\u0010y\u0013\u0011\t\u00190a:\u0003\u001d\r{gn];nKJ\u0014VmY8sI\u0006Y1*\u00194lCN{WO]2f!\r\t\u0019\u0004F\n\u0004))BECAA|\u0003y!WMZ1vYRl\u0015\r_(vi>3wJ\u001d3fe:,7o]'jY2L7/\u0006\u0002\u0002<\u0006yB-\u001a4bk2$X*\u0019=PkR|em\u0014:eKJtWm]:NS2d\u0017n\u001d\u0011\u00027\u0011bWm]:j]&$He\u001a:fCR,'\u000f\n3fM\u0006,H\u000e\u001e\u00137+\u0011\u00119A!\b\u0016\u0005\t%!\u0006BA\u000e\u0005\u0017Y#A!\u0004\u0011\t\t=!\u0011D\u0007\u0003\u0005#QAAa\u0005\u0003\u0016\u0005IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0005/a\u0013AC1o]>$\u0018\r^5p]&!!1\u0004B\t\u0005E)hn\u00195fG.,GMV1sS\u0006t7-\u001a\u0003\u0006{a\u0011\rAP\u0001\fe\u0016\fGMU3t_24X\r\u0006\u0002\u0003$A!!Q\u0005B\u0018\u001b\t\u00119C\u0003\u0003\u0003*\t-\u0012\u0001\u00027b]\u001eT!A!\f\u0002\t)\fg/Y\u0005\u0005\u0005c\u00119C\u0001\u0004PE*,7\r\u001e")
/* loaded from: input_file:pl/touk/nussknacker/engine/kafka/source/KafkaSource.class */
public class KafkaSource<T> implements FlinkSource<T>, Serializable, FlinkSourceTestSupport<T>, TestDataGenerator {
    private List<String> pl$touk$nussknacker$engine$kafka$source$KafkaSource$$topics;
    private final List<PreparedKafkaTopic> preparedTopics;
    private final KafkaConfig kafkaConfig;
    private final KafkaDeserializationSchema<T> deserializationSchema;
    private final Option<TimestampWatermarkHandler<T>> passedAssigner;
    public final RecordFormatter pl$touk$nussknacker$engine$kafka$source$KafkaSource$$recordFormatter;
    private final Option<String> overriddenConsumerGroup;
    private final TypeInformation<T> typeInformation;
    private final FlinkContextInitializer<T> contextInitializer;
    private volatile boolean bitmap$0;

    public static int defaultMaxOutOfOrdernessMillis() {
        return KafkaSource$.MODULE$.defaultMaxOutOfOrdernessMillis();
    }

    public DataStream<Context> prepareSourceStream(StreamExecutionEnvironment streamExecutionEnvironment, FlinkCustomNodeContext flinkCustomNodeContext, SourceFunction<T> sourceFunction) {
        return FlinkIntermediateRawSource.prepareSourceStream$(this, streamExecutionEnvironment, flinkCustomNodeContext, sourceFunction);
    }

    public <T> DataStream<T> setUidToNodeIdIfNeed(FlinkCustomNodeContext flinkCustomNodeContext, DataStream<T> dataStream) {
        return ExplicitUidInOperatorsSupport.setUidToNodeIdIfNeed$(this, flinkCustomNodeContext, dataStream);
    }

    public <T> DataStreamSink<T> setUidToNodeIdIfNeed(FlinkCustomNodeContext flinkCustomNodeContext, DataStreamSink<T> dataStreamSink) {
        return ExplicitUidInOperatorsSupport.setUidToNodeIdIfNeed$(this, flinkCustomNodeContext, dataStreamSink);
    }

    public <T> SingleOutputStreamOperator<T> setUidToNodeIdIfNeed(FlinkCustomNodeContext flinkCustomNodeContext, SingleOutputStreamOperator<T> singleOutputStreamOperator) {
        return ExplicitUidInOperatorsSupport.setUidToNodeIdIfNeed$(this, flinkCustomNodeContext, singleOutputStreamOperator);
    }

    @Public
    public boolean explicitUidInStatefulOperators(FlinkCustomNodeContext flinkCustomNodeContext) {
        return ExplicitUidInOperatorsSupport.explicitUidInStatefulOperators$(this, flinkCustomNodeContext);
    }

    public FlinkContextInitializer<T> contextInitializer() {
        return this.contextInitializer;
    }

    public void pl$touk$nussknacker$engine$flink$api$process$FlinkIntermediateRawSource$_setter_$contextInitializer_$eq(FlinkContextInitializer<T> flinkContextInitializer) {
        this.contextInitializer = flinkContextInitializer;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v9, types: [pl.touk.nussknacker.engine.kafka.source.KafkaSource] */
    private List<String> topics$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.pl$touk$nussknacker$engine$kafka$source$KafkaSource$$topics = (List) this.preparedTopics.map(preparedKafkaTopic -> {
                    return preparedKafkaTopic.prepared();
                }, List$.MODULE$.canBuildFrom());
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        this.preparedTopics = null;
        return this.pl$touk$nussknacker$engine$kafka$source$KafkaSource$$topics;
    }

    public List<String> pl$touk$nussknacker$engine$kafka$source$KafkaSource$$topics() {
        return !this.bitmap$0 ? topics$lzycompute() : this.pl$touk$nussknacker$engine$kafka$source$KafkaSource$$topics;
    }

    public DataStream<Context> sourceStream(StreamExecutionEnvironment streamExecutionEnvironment, FlinkCustomNodeContext flinkCustomNodeContext) {
        return prepareSourceStream(streamExecutionEnvironment, flinkCustomNodeContext, flinkSourceFunction((String) this.overriddenConsumerGroup.getOrElse(() -> {
            return ConsumerGroupDeterminer$.MODULE$.apply(this.kafkaConfig).consumerGroup(flinkCustomNodeContext);
        })));
    }

    public TypeInformation<T> typeInformation() {
        return this.typeInformation;
    }

    public SourceFunction<T> flinkSourceFunction(String str) {
        pl$touk$nussknacker$engine$kafka$source$KafkaSource$$topics().foreach(str2 -> {
            $anonfun$flinkSourceFunction$1(this, str, str2);
            return BoxedUnit.UNIT;
        });
        return createFlinkSource(str);
    }

    public SourceFunction<T> createFlinkSource(String str) {
        return new FlinkKafkaConsumer((java.util.List) JavaConverters$.MODULE$.seqAsJavaListConverter(pl$touk$nussknacker$engine$kafka$source$KafkaSource$$topics()).asJava(), this.deserializationSchema, KafkaUtils$.MODULE$.toProperties(this.kafkaConfig, new Some(str)));
    }

    public byte[] generateTestData(int i) {
        return this.pl$touk$nussknacker$engine$kafka$source$KafkaSource$$recordFormatter.prepareGeneratedTestData(ListUtil$.MODULE$.mergeListsFromTopics((List) pl$touk$nussknacker$engine$kafka$source$KafkaSource$$topics().map(str -> {
            return KafkaUtils$.MODULE$.readLastMessages(str, i, this.kafkaConfig);
        }, List$.MODULE$.canBuildFrom()), i));
    }

    public TestDataParser<T> testDataParser() {
        return new TestDataParser<T>(this) { // from class: pl.touk.nussknacker.engine.kafka.source.KafkaSource$$anon$1
            private final /* synthetic */ KafkaSource $outer;

            public List<T> parseTestData(TestProcess.TestData testData) {
                String str = (String) this.$outer.pl$touk$nussknacker$engine$kafka$source$KafkaSource$$topics().head();
                return (List) this.$outer.pl$touk$nussknacker$engine$kafka$source$KafkaSource$$recordFormatter.parseDataForTest(str, testData.testData()).map(consumerRecord -> {
                    return this.$outer.deserializeTestData(str, consumerRecord);
                }, List$.MODULE$.canBuildFrom());
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        };
    }

    public Option<TimestampWatermarkHandler<T>> timestampAssignerForTest() {
        return timestampAssigner();
    }

    public Option<TimestampWatermarkHandler<T>> timestampAssigner() {
        return new Some(this.passedAssigner.getOrElse(() -> {
            return new StandardTimestampWatermarkHandler(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(BoxesRunTime.unboxToLong(this.kafkaConfig.defaultMaxOutOfOrdernessMillis().getOrElse(() -> {
                return KafkaSource$.MODULE$.defaultMaxOutOfOrdernessMillis();
            })))));
        }));
    }

    public T deserializeTestData(String str, ConsumerRecord<byte[], byte[]> consumerRecord) {
        return (T) this.deserializationSchema.deserialize(consumerRecord);
    }

    public static final /* synthetic */ void $anonfun$flinkSourceFunction$1(KafkaSource kafkaSource, String str, String str2) {
        KafkaUtils$.MODULE$.setToLatestOffsetIfNeeded(kafkaSource.kafkaConfig, str2, str);
    }

    public KafkaSource(List<PreparedKafkaTopic> list, KafkaConfig kafkaConfig, KafkaDeserializationSchema<T> kafkaDeserializationSchema, Option<TimestampWatermarkHandler<T>> option, RecordFormatter recordFormatter, Option<String> option2) {
        this.preparedTopics = list;
        this.kafkaConfig = kafkaConfig;
        this.deserializationSchema = kafkaDeserializationSchema;
        this.passedAssigner = option;
        this.pl$touk$nussknacker$engine$kafka$source$KafkaSource$$recordFormatter = recordFormatter;
        this.overriddenConsumerGroup = option2;
        ExplicitUidInOperatorsSupport.$init$(this);
        FlinkIntermediateRawSource.$init$(this);
        this.typeInformation = kafkaDeserializationSchema.getProducedType();
    }
}
