package pl.touk.nussknacker.engine.kafka.source;

import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.TimestampAssigner;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import pl.touk.nussknacker.engine.api.process.TestDataGenerator;
import pl.touk.nussknacker.engine.api.process.TestDataParserProvider;
import pl.touk.nussknacker.engine.api.test.TestDataParser;
import pl.touk.nussknacker.engine.api.test.TestDataSplit;
import pl.touk.nussknacker.engine.flink.api.compat.ExplicitUidInOperatorsSupport;
import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomNodeContext;
import pl.touk.nussknacker.engine.flink.api.process.FlinkSource;
import pl.touk.nussknacker.engine.kafka.ConsumerGroupDeterminer$;
import pl.touk.nussknacker.engine.kafka.KafkaConfig;
import pl.touk.nussknacker.engine.kafka.KafkaUtils$;
import pl.touk.nussknacker.engine.kafka.ListUtil$;
import pl.touk.nussknacker.engine.kafka.PreparedKafkaTopic;
import pl.touk.nussknacker.engine.kafka.RecordFormatter;
import scala.MatchError;
import scala.Option;
import scala.Serializable;
import scala.Some;
import scala.collection.JavaConverters$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: KafkaSource.scala */
@ScalaSignature(bytes = "\u0006\u0001\t\u001da\u0001B\f\u0019\u0001\u0015B\u0001\u0002\u0016\u0001\u0003\u0002\u0003\u0006I!\u0016\u0005\tK\u0002\u0011\t\u0011)A\u0005M\"A\u0011\u000e\u0001B\u0001B\u0003%!\u000e\u0003\u0005y\u0001\t\u0005\t\u0015!\u0003z\u0011)\t9\u0001\u0001B\u0001B\u0003%\u0011\u0011\u0002\u0005\u000b\u0003#\u0001!\u0011!Q\u0001\n\u0005M\u0001BCA\u0010\u0001\t\u0005\t\u0015!\u0003\u0002\"!9\u00111\u0007\u0001\u0005\u0002\u0005U\u0002BCA%\u0001!\u0015\r\u0011\"\u0003\u0002L!9\u0011q\n\u0001\u0005B\u0005E\u0003\"CA9\u0001\t\u0007I\u0011IA:\u0011!\t9\t\u0001Q\u0001\n\u0005U\u0004bBAE\u0001\u0011E\u00111\u0012\u0005\b\u00037\u0003A\u0011CAO\u0011\u001d\t9\u000b\u0001C!\u0003SCq!!1\u0001\t\u0003\n\u0019\rC\u0004\u0002L\u0002!\t%!4\b\u0013\u0005=\u0007$!A\t\u0002\u0005Eg\u0001C\f\u0019\u0003\u0003E\t!a5\t\u000f\u0005M2\u0003\"\u0001\u0002V\"I\u0011q[\n\u0012\u0002\u0013\u0005\u0011\u0011\u001c\u0005\n\u0003g\u001c\u0012\u0011!C\u0005\u0003k\u00141bS1gW\u0006\u001cv.\u001e:dK*\u0011\u0011DG\u0001\u0007g>,(oY3\u000b\u0005ma\u0012!B6bM.\f'BA\u000f\u001f\u0003\u0019)gnZ5oK*\u0011q\u0004I\u0001\f]V\u001c8o\u001b8bG.,'O\u0003\u0002\"E\u0005!Ao\\;l\u0015\u0005\u0019\u0013A\u00019m\u0007\u0001)\"AJ\u001d\u0014\u000f\u00019SFQ#L\u001dB\u0011\u0001fK\u0007\u0002S)\t!&A\u0003tG\u0006d\u0017-\u0003\u0002-S\t1\u0011I\\=SK\u001a\u00042AL\u001b8\u001b\u0005y#B\u0001\u00192\u0003\u001d\u0001(o\\2fgNT!AM\u001a\u0002\u0007\u0005\u0004\u0018N\u0003\u000259\u0005)a\r\\5oW&\u0011ag\f\u0002\f\r2Lgn[*pkJ\u001cW\r\u0005\u00029s1\u0001A!\u0002\u001e\u0001\u0005\u0004Y$!\u0001+\u0012\u0005qz\u0004C\u0001\u0015>\u0013\tq\u0014FA\u0004O_RD\u0017N\\4\u0011\u0005!\u0002\u0015BA!*\u0005\r\te.\u001f\t\u0003Q\rK!\u0001R\u0015\u0003\u0019M+'/[1mSj\f'\r\\3\u0011\u0007\u0019Ku'D\u0001H\u0015\t\u0001\u0004J\u0003\u000239%\u0011!j\u0012\u0002\u0017)\u0016\u001cH\u000fR1uCB\u000b'o]3s!J|g/\u001b3feB\u0011a\tT\u0005\u0003\u001b\u001e\u0013\u0011\u0003V3ti\u0012\u000bG/Y$f]\u0016\u0014\u0018\r^8s!\ty%+D\u0001Q\u0015\t\t\u0016'\u0001\u0004d_6\u0004\u0018\r^\u0005\u0003'B\u0013Q$\u0012=qY&\u001c\u0017\u000e^+jI&sw\n]3sCR|'o]*vaB|'\u000f^\u0001\u000faJ,\u0007/\u0019:fIR{\u0007/[2t!\r1f,\u0019\b\u0003/rs!\u0001W.\u000e\u0003eS!A\u0017\u0013\u0002\rq\u0012xn\u001c;?\u0013\u0005Q\u0013BA/*\u0003\u001d\u0001\u0018mY6bO\u0016L!a\u00181\u0003\t1K7\u000f\u001e\u0006\u0003;&\u0002\"AY2\u000e\u0003iI!\u0001\u001a\u000e\u0003%A\u0013X\r]1sK\u0012\\\u0015MZ6b)>\u0004\u0018nY\u0001\fW\u000647.Y\"p]\u001aLw\r\u0005\u0002cO&\u0011\u0001N\u0007\u0002\f\u0017\u000647.Y\"p]\u001aLw-A\u000beKN,'/[1mSj\fG/[8o'\u000eDW-\\1\u0011\u0007-4x'D\u0001m\u0015\tYRN\u0003\u0002o_\u0006Q1m\u001c8oK\u000e$xN]:\u000b\u0005A\f\u0018!C:ue\u0016\fW.\u001b8h\u0015\t!$O\u0003\u0002ti\u00061\u0011\r]1dQ\u0016T\u0011!^\u0001\u0004_J<\u0017BA<m\u0005iY\u0015MZ6b\t\u0016\u001cXM]5bY&T\u0018\r^5p]N\u001b\u0007.Z7b\u0003E!\u0018.\\3ti\u0006l\u0007/Q:tS\u001etWM\u001d\t\u0004Qid\u0018BA>*\u0005\u0019y\u0005\u000f^5p]B!Q0a\u00018\u001b\u0005q(bA@\u0002\u0002\u0005Ia-\u001e8di&|gn\u001d\u0006\u0003e=L1!!\u0002\u007f\u0005E!\u0016.\\3ti\u0006l\u0007/Q:tS\u001etWM]\u0001\u0013e\u0016\u001cwN\u001d3G_Jl\u0017\r\u001e;fe>\u0003H\u000f\u0005\u0003)u\u0006-\u0001c\u00012\u0002\u000e%\u0019\u0011q\u0002\u000e\u0003\u001fI+7m\u001c:e\r>\u0014X.\u0019;uKJ\fq\u0002^3tiB\u0013X\r]1sK&sgm\u001c\t\u0005\u0003+\tY\"\u0004\u0002\u0002\u0018)\u0019\u0011\u0011\u0004%\u0002\tQ,7\u000f^\u0005\u0005\u0003;\t9BA\u0007UKN$H)\u0019;b'Bd\u0017\u000e^\u0001\u0018_Z,'O]5eI\u0016t7i\u001c8tk6,'o\u0012:pkB\u0004B\u0001\u000b>\u0002$A!\u0011QEA\u0017\u001d\u0011\t9#!\u000b\u0011\u0005aK\u0013bAA\u0016S\u00051\u0001K]3eK\u001aLA!a\f\u00022\t11\u000b\u001e:j]\u001eT1!a\u000b*\u0003\u0019a\u0014N\\5u}Q\u0001\u0012qGA\u001e\u0003{\ty$!\u0011\u0002D\u0005\u0015\u0013q\t\t\u0005\u0003s\u0001q'D\u0001\u0019\u0011\u0015!\u0006\u00021\u0001V\u0011\u0015)\u0007\u00021\u0001g\u0011\u0015I\u0007\u00021\u0001k\u0011\u0015A\b\u00021\u0001z\u0011\u001d\t9\u0001\u0003a\u0001\u0003\u0013Aq!!\u0005\t\u0001\u0004\t\u0019\u0002C\u0005\u0002 !\u0001\n\u00111\u0001\u0002\"\u00051Ao\u001c9jGN,\"!!\u0014\u0011\tYs\u00161E\u0001\rg>,(oY3TiJ,\u0017-\u001c\u000b\u0007\u0003'\ni&a\u001a\u0011\u000b\u0005U\u0013\u0011L\u001c\u000e\u0005\u0005]#b\u0001\u0016\u0002\u0002%!\u00111LA,\u0005)!\u0015\r^1TiJ,\u0017-\u001c\u0005\b\u0003?R\u0001\u0019AA1\u0003\r)gN\u001e\t\u0005\u0003+\n\u0019'\u0003\u0003\u0002f\u0005]#AG*ue\u0016\fW.\u0012=fGV$\u0018n\u001c8F]ZL'o\u001c8nK:$\bbBA5\u0015\u0001\u0007\u00111N\u0001\u0011M2Lgn\u001b(pI\u0016\u001cuN\u001c;fqR\u00042ALA7\u0013\r\tyg\f\u0002\u0017\r2Lgn[\"vgR|WNT8eK\u000e{g\u000e^3yi\u0006yA/\u001f9f\u0013:4wN]7bi&|g.\u0006\u0002\u0002vA)\u0011qOABo5\u0011\u0011\u0011\u0010\u0006\u0005\u0003w\ni(\u0001\u0005usB,\u0017N\u001c4p\u0015\u0011\ty(!!\u0002\r\r|W.\\8o\u0015\t\u0011\u0014/\u0003\u0003\u0002\u0006\u0006e$a\u0004+za\u0016LeNZ8s[\u0006$\u0018n\u001c8\u0002!QL\b/Z%oM>\u0014X.\u0019;j_:\u0004\u0013a\u00054mS:\\7k\\;sG\u00164UO\\2uS>tG\u0003BAG\u0003/\u0003R!a$\u0002\u0014^j!!!%\u000b\u0005eq\u0018\u0002BAK\u0003#\u0013abU8ve\u000e,g)\u001e8di&|g\u000eC\u0004\u0002\u001a6\u0001\r!a\t\u0002\u001f\r|gn];nKJ<%o\\;q\u0013\u0012\f\u0011c\u0019:fCR,g\t\\5oWN{WO]2f)\u0011\ty*!*\u0011\t-\f\tkN\u0005\u0004\u0003Gc'A\u0005$mS:\\7*\u00194lC\u000e{gn];nKJDq!!'\u000f\u0001\u0004\t\u0019#\u0001\thK:,'/\u0019;f)\u0016\u001cH\u000fR1uCR!\u00111VA\\!\u0015A\u0013QVAY\u0013\r\ty+\u000b\u0002\u0006\u0003J\u0014\u0018-\u001f\t\u0004Q\u0005M\u0016bAA[S\t!!)\u001f;f\u0011\u001d\tIl\u0004a\u0001\u0003w\u000bAa]5{KB\u0019\u0001&!0\n\u0007\u0005}\u0016FA\u0002J]R\fa\u0002^3ti\u0012\u000bG/\u0019)beN,'/\u0006\u0002\u0002FB)\u0011QCAdo%!\u0011\u0011ZA\f\u00059!Vm\u001d;ECR\f\u0007+\u0019:tKJ\f\u0001\u0004^5nKN$\u0018-\u001c9BgNLwM\\3s\r>\u0014H+Z:u+\u0005I\u0018aC&bM.\f7k\\;sG\u0016\u00042!!\u000f\u0014'\r\u0019rE\u0011\u000b\u0003\u0003#\f1\u0004\n7fgNLg.\u001b;%OJ,\u0017\r^3sI\u0011,g-Y;mi\u0012:T\u0003BAn\u0003c,\"!!8+\t\u0005\u0005\u0012q\\\u0016\u0003\u0003C\u0004B!a9\u0002n6\u0011\u0011Q\u001d\u0006\u0005\u0003O\fI/A\u0005v]\u000eDWmY6fI*\u0019\u00111^\u0015\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0003\u0002p\u0006\u0015(!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\u0012)!(\u0006b\u0001w\u0005Y!/Z1e%\u0016\u001cx\u000e\u001c<f)\t\t9\u0010\u0005\u0003\u0002z\n\rQBAA~\u0015\u0011\ti0a@\u0002\t1\fgn\u001a\u0006\u0003\u0005\u0003\tAA[1wC&!!QAA~\u0005\u0019y%M[3di\u0002")
/* loaded from: input_file:pl/touk/nussknacker/engine/kafka/source/KafkaSource.class */
public class KafkaSource<T> implements FlinkSource<T>, Serializable, TestDataParserProvider<T>, TestDataGenerator, ExplicitUidInOperatorsSupport {
    private List<String> pl$touk$nussknacker$engine$kafka$source$KafkaSource$$topics;
    private final List<PreparedKafkaTopic> preparedTopics;
    private final KafkaConfig kafkaConfig;
    public final KafkaDeserializationSchema<T> pl$touk$nussknacker$engine$kafka$source$KafkaSource$$deserializationSchema;
    private final Option<TimestampAssigner<T>> timestampAssigner;
    public final Option<RecordFormatter> pl$touk$nussknacker$engine$kafka$source$KafkaSource$$recordFormatterOpt;
    public final TestDataSplit pl$touk$nussknacker$engine$kafka$source$KafkaSource$$testPrepareInfo;
    private final Option<String> overriddenConsumerGroup;
    private final TypeInformation<T> typeInformation;
    private volatile boolean bitmap$0;

    public <T> DataStream<T> setUidToNodeIdIfNeed(FlinkCustomNodeContext flinkCustomNodeContext, DataStream<T> dataStream) {
        return ExplicitUidInOperatorsSupport.setUidToNodeIdIfNeed$(this, flinkCustomNodeContext, dataStream);
    }

    public <T> DataStreamSink<T> setUidToNodeIdIfNeed(FlinkCustomNodeContext flinkCustomNodeContext, DataStreamSink<T> dataStreamSink) {
        return ExplicitUidInOperatorsSupport.setUidToNodeIdIfNeed$(this, flinkCustomNodeContext, dataStreamSink);
    }

    public <T> SingleOutputStreamOperator<T> setUidToNodeIdIfNeed(FlinkCustomNodeContext flinkCustomNodeContext, SingleOutputStreamOperator<T> singleOutputStreamOperator) {
        return ExplicitUidInOperatorsSupport.setUidToNodeIdIfNeed$(this, flinkCustomNodeContext, singleOutputStreamOperator);
    }

    @Public
    public boolean explicitUidInStatefulOperators(FlinkCustomNodeContext flinkCustomNodeContext) {
        return ExplicitUidInOperatorsSupport.explicitUidInStatefulOperators$(this, flinkCustomNodeContext);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v9, types: [pl.touk.nussknacker.engine.kafka.source.KafkaSource] */
    private List<String> topics$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.pl$touk$nussknacker$engine$kafka$source$KafkaSource$$topics = (List) this.preparedTopics.map(preparedKafkaTopic -> {
                    return preparedKafkaTopic.prepared();
                }, List$.MODULE$.canBuildFrom());
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        this.preparedTopics = null;
        return this.pl$touk$nussknacker$engine$kafka$source$KafkaSource$$topics;
    }

    public List<String> pl$touk$nussknacker$engine$kafka$source$KafkaSource$$topics() {
        return !this.bitmap$0 ? topics$lzycompute() : this.pl$touk$nussknacker$engine$kafka$source$KafkaSource$$topics;
    }

    public DataStream<T> sourceStream(StreamExecutionEnvironment streamExecutionEnvironment, FlinkCustomNodeContext flinkCustomNodeContext) {
        String str = (String) this.overriddenConsumerGroup.getOrElse(() -> {
            return ConsumerGroupDeterminer$.MODULE$.apply(this.kafkaConfig).consumerGroup(flinkCustomNodeContext);
        });
        streamExecutionEnvironment.setStreamTimeCharacteristic(this.timestampAssigner.isDefined() ? TimeCharacteristic.EventTime : TimeCharacteristic.IngestionTime);
        DataStream<T> uidToNodeIdIfNeed = setUidToNodeIdIfNeed(flinkCustomNodeContext, streamExecutionEnvironment.addSource(flinkSourceFunction(str), typeInformation()).name(new StringBuilder(8).append(flinkCustomNodeContext.metaData().id()).append("-").append(flinkCustomNodeContext.nodeId()).append("-source").toString()));
        return (DataStream) this.timestampAssigner.map(timestampAssigner -> {
            DataStream assignTimestampsAndWatermarks;
            if (timestampAssigner instanceof AssignerWithPeriodicWatermarks) {
                assignTimestampsAndWatermarks = uidToNodeIdIfNeed.assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks) timestampAssigner);
            } else {
                if (!(timestampAssigner instanceof AssignerWithPunctuatedWatermarks)) {
                    throw new MatchError(timestampAssigner);
                }
                assignTimestampsAndWatermarks = uidToNodeIdIfNeed.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks) timestampAssigner);
            }
            return assignTimestampsAndWatermarks;
        }).getOrElse(() -> {
            return uidToNodeIdIfNeed;
        });
    }

    public TypeInformation<T> typeInformation() {
        return this.typeInformation;
    }

    public SourceFunction<T> flinkSourceFunction(String str) {
        pl$touk$nussknacker$engine$kafka$source$KafkaSource$$topics().foreach(str2 -> {
            $anonfun$flinkSourceFunction$1(this, str, str2);
            return BoxedUnit.UNIT;
        });
        return createFlinkSource(str);
    }

    public FlinkKafkaConsumer<T> createFlinkSource(String str) {
        return new FlinkKafkaConsumer<>((java.util.List) JavaConverters$.MODULE$.seqAsJavaListConverter(pl$touk$nussknacker$engine$kafka$source$KafkaSource$$topics()).asJava(), this.pl$touk$nussknacker$engine$kafka$source$KafkaSource$$deserializationSchema, KafkaUtils$.MODULE$.toProperties(this.kafkaConfig, new Some(str)));
    }

    public byte[] generateTestData(int i) {
        List<T> mergeListsFromTopics = ListUtil$.MODULE$.mergeListsFromTopics((List) pl$touk$nussknacker$engine$kafka$source$KafkaSource$$topics().map(str -> {
            return KafkaUtils$.MODULE$.readLastMessages(str, i, this.kafkaConfig);
        }, List$.MODULE$.canBuildFrom()), i);
        return this.pl$touk$nussknacker$engine$kafka$source$KafkaSource$$testPrepareInfo.joinData((List) this.pl$touk$nussknacker$engine$kafka$source$KafkaSource$$recordFormatterOpt.map(recordFormatter -> {
            return (List) mergeListsFromTopics.map(consumerRecord -> {
                return recordFormatter.formatRecord(consumerRecord);
            }, List$.MODULE$.canBuildFrom());
        }).getOrElse(() -> {
            return (List) mergeListsFromTopics.map(consumerRecord -> {
                return (byte[]) consumerRecord.value();
            }, List$.MODULE$.canBuildFrom());
        }));
    }

    public TestDataParser<T> testDataParser() {
        return new TestDataParser<T>(this) { // from class: pl.touk.nussknacker.engine.kafka.source.KafkaSource$$anon$1
            private final /* synthetic */ KafkaSource $outer;

            public List<T> parseTestData(byte[] bArr) {
                return (List) this.$outer.pl$touk$nussknacker$engine$kafka$source$KafkaSource$$testPrepareInfo.splitData(bArr).map(bArr2 -> {
                    String str = (String) this.$outer.pl$touk$nussknacker$engine$kafka$source$KafkaSource$$topics().head();
                    ProducerRecord producerRecord = (ProducerRecord) this.$outer.pl$touk$nussknacker$engine$kafka$source$KafkaSource$$recordFormatterOpt.map(recordFormatter -> {
                        return recordFormatter.parseRecord(bArr2);
                    }).getOrElse(() -> {
                        return new ProducerRecord(str, bArr2);
                    });
                    return this.$outer.pl$touk$nussknacker$engine$kafka$source$KafkaSource$$deserializationSchema.deserialize(new ConsumerRecord(str, -1, -1L, producerRecord.key(), producerRecord.value()));
                }, List$.MODULE$.canBuildFrom());
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        };
    }

    public Option<TimestampAssigner<T>> timestampAssignerForTest() {
        return this.timestampAssigner;
    }

    public static final /* synthetic */ void $anonfun$flinkSourceFunction$1(KafkaSource kafkaSource, String str, String str2) {
        KafkaUtils$.MODULE$.setToLatestOffsetIfNeeded(kafkaSource.kafkaConfig, str2, str);
    }

    public KafkaSource(List<PreparedKafkaTopic> list, KafkaConfig kafkaConfig, KafkaDeserializationSchema<T> kafkaDeserializationSchema, Option<TimestampAssigner<T>> option, Option<RecordFormatter> option2, TestDataSplit testDataSplit, Option<String> option3) {
        this.preparedTopics = list;
        this.kafkaConfig = kafkaConfig;
        this.pl$touk$nussknacker$engine$kafka$source$KafkaSource$$deserializationSchema = kafkaDeserializationSchema;
        this.timestampAssigner = option;
        this.pl$touk$nussknacker$engine$kafka$source$KafkaSource$$recordFormatterOpt = option2;
        this.pl$touk$nussknacker$engine$kafka$source$KafkaSource$$testPrepareInfo = testDataSplit;
        this.overriddenConsumerGroup = option3;
        ExplicitUidInOperatorsSupport.$init$(this);
        this.typeInformation = kafkaDeserializationSchema.getProducedType();
    }
}
