package pl.touk.nussknacker.engine.flink.util.signal;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
import org.apache.flink.streaming.api.scala.ConnectedStreams;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import pl.touk.nussknacker.engine.kafka.KafkaConfig;
import pl.touk.nussknacker.engine.kafka.KafkaUtils$;
import scala.Some;
import scala.reflect.ScalaSignature;

/* compiled from: KafkaSignalStreamConnector.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005%aa\u0002\u0004\b!\u0003\r\tA\u0006\u0005\u0006;\u0001!\tA\b\u0005\bE\u0001\u0011\rQ\"\u0001$\u0011\u001dQ\u0003A1A\u0007\u0002-BQa\u000e\u0001\u0005\u0002aBQa\u001d\u0001\u0005\u0012Q\u0014!dS1gW\u0006\u001c\u0016n\u001a8bYN#(/Z1n\u0007>tg.Z2u_JT!\u0001C\u0005\u0002\rMLwM\\1m\u0015\tQ1\"\u0001\u0003vi&d'B\u0001\u0007\u000e\u0003\u00151G.\u001b8l\u0015\tqq\"\u0001\u0004f]\u001eLg.\u001a\u0006\u0003!E\t1B\\;tg.t\u0017mY6fe*\u0011!cE\u0001\u0005i>,8NC\u0001\u0015\u0003\t\u0001Hn\u0001\u0001\u0014\u0005\u00019\u0002C\u0001\r\u001c\u001b\u0005I\"\"\u0001\u000e\u0002\u000bM\u001c\u0017\r\\1\n\u0005qI\"AB!osJ+g-\u0001\u0004%S:LG\u000f\n\u000b\u0002?A\u0011\u0001\u0004I\u0005\u0003Ce\u0011A!\u00168ji\u0006Y1.\u00194lC\u000e{gNZ5h+\u0005!\u0003CA\u0013)\u001b\u00051#BA\u0014\u000e\u0003\u0015Y\u0017MZ6b\u0013\tIcEA\u0006LC\u001a\\\u0017mQ8oM&<\u0017\u0001D:jO:\fGn\u001d+pa&\u001cW#\u0001\u0017\u0011\u00055\"dB\u0001\u00183!\ty\u0013$D\u00011\u0015\t\tT#\u0001\u0004=e>|GOP\u0005\u0003ge\ta\u0001\u0015:fI\u00164\u0017BA\u001b7\u0005\u0019\u0019FO]5oO*\u00111'G\u0001\u0013G>tg.Z2u/&$\bnU5h]\u0006d7/F\u0002:\u0017V#RA\u000f2hS.$\"aO,\u0011\tq:\u0015\nV\u0007\u0002{)\u0011!D\u0010\u0006\u0003\u007f\u0001\u000b1!\u00199j\u0015\t\t%)A\u0005tiJ,\u0017-\\5oO*\u0011Ab\u0011\u0006\u0003\t\u0016\u000ba!\u00199bG\",'\"\u0001$\u0002\u0007=\u0014x-\u0003\u0002I{\t\u00012i\u001c8oK\u000e$X\rZ*ue\u0016\fWn\u001d\t\u0003\u0015.c\u0001\u0001B\u0003M\t\t\u0007QJA\u0001B#\tq\u0015\u000b\u0005\u0002\u0019\u001f&\u0011\u0001+\u0007\u0002\b\u001d>$\b.\u001b8h!\tA\"+\u0003\u0002T3\t\u0019\u0011I\\=\u0011\u0005)+F!\u0002,\u0005\u0005\u0004i%!\u0001\"\t\u000fa#\u0011\u0011!a\u00023\u0006QQM^5eK:\u001cW\rJ\u0019\u0011\u0007i\u0003G+D\u0001\\\u0015\taV,\u0001\u0005usB,\u0017N\u001c4p\u0015\tqv,\u0001\u0004d_6lwN\u001c\u0006\u0003\u007f\tK!!Y.\u0003\u001fQK\b/Z%oM>\u0014X.\u0019;j_:DQa\u0019\u0003A\u0002\u0011\fQa\u001d;beR\u00042\u0001P3J\u0013\t1WH\u0001\u0006ECR\f7\u000b\u001e:fC6DQ\u0001\u001b\u0003A\u00021\n\u0011\u0002\u001d:pG\u0016\u001c8/\u00133\t\u000b)$\u0001\u0019\u0001\u0017\u0002\r9|G-Z%e\u0011\u0015aG\u00011\u0001n\u0003\u0019\u00198\r[3nCB\u0019a.\u001d+\u000e\u0003=T!\u0001]/\u0002\u001bM,'/[1mSj\fG/[8o\u0013\t\u0011xNA\u000bEKN,'/[1mSj\fG/[8o'\u000eDW-\\1\u0002;\u0005\u001c8/[4o)&lWm\u001d;b[B\u001c\u0018I\u001c3XCR,'/\\1sWN,\"!\u001e=\u0015\u0005YL\bc\u0001\u001ffoB\u0011!\n\u001f\u0003\u0006-\u0016\u0011\r!\u0014\u0005\u0006u\u0016\u0001\rA^\u0001\u000bI\u0006$\u0018m\u0015;sK\u0006l\u0007\u0006B\u0003}\u0003\u000b\u00012!`A\u0001\u001b\u0005q(BA@\u001a\u0003)\tgN\\8uCRLwN\\\u0005\u0004\u0003\u0007q(A\u00028po\u0006\u0014h.\t\u0002\u0002\b\u0005y1-\u0019;>I\u0016\u0004(/Z2bi&|g\u000e")
/* loaded from: input_file:pl/touk/nussknacker/engine/flink/util/signal/KafkaSignalStreamConnector.class */
public interface KafkaSignalStreamConnector {
    KafkaConfig kafkaConfig();

    String signalsTopic();

    default <A, B> ConnectedStreams<A, B> connectWithSignals(DataStream<A> dataStream, String str, String str2, DeserializationSchema<B> deserializationSchema, TypeInformation<B> typeInformation) {
        return dataStream.connect(assignTimestampsAndWatermarks(dataStream.executionEnvironment().addSource(new FlinkKafkaConsumer(signalsTopic(), deserializationSchema, KafkaUtils$.MODULE$.toProperties(kafkaConfig(), new Some(new StringBuilder(8).append(str).append("-").append(str2).append("-signal").toString()))), typeInformation).name(new StringBuilder(9).append("signals-").append(str).append("-").append(str2).toString())));
    }

    default <B> DataStream<B> assignTimestampsAndWatermarks(DataStream<B> dataStream) {
        return dataStream.assignTimestampsAndWatermarks(new IngestionTimeExtractor());
    }

    static void $init$(KafkaSignalStreamConnector kafkaSignalStreamConnector) {
    }
}
