package io.epiphanous.flinkrunner.util;

import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import io.epiphanous.flinkrunner.model.CassandraSinkConfig;
import io.epiphanous.flinkrunner.model.CollectionSourceConfig;
import io.epiphanous.flinkrunner.model.ElasticsearchSinkConfig;
import io.epiphanous.flinkrunner.model.FileSinkConfig;
import io.epiphanous.flinkrunner.model.FileSourceConfig;
import io.epiphanous.flinkrunner.model.FlinkConfig;
import io.epiphanous.flinkrunner.model.FlinkEvent;
import io.epiphanous.flinkrunner.model.JdbcSinkConfig;
import io.epiphanous.flinkrunner.model.KafkaSinkConfig;
import io.epiphanous.flinkrunner.model.KafkaSourceConfig;
import io.epiphanous.flinkrunner.model.KinesisSinkConfig;
import io.epiphanous.flinkrunner.model.KinesisSourceConfig;
import io.epiphanous.flinkrunner.model.SinkConfig;
import io.epiphanous.flinkrunner.model.SocketSinkConfig;
import io.epiphanous.flinkrunner.model.SocketSourceConfig;
import io.epiphanous.flinkrunner.model.SourceConfig;
import io.epiphanous.flinkrunner.util.StreamUtils;
import java.io.File;
import java.io.FileNotFoundException;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import scala.Array$;
import scala.MatchError;
import scala.None$;
import scala.NotImplementedError;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.LinearSeqOptimized;
import scala.collection.Seq;
import scala.collection.immutable.List$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.util.matching.Regex;

/* compiled from: StreamUtils.scala */
/* loaded from: input_file:io/epiphanous/flinkrunner/util/StreamUtils$.class */
public final class StreamUtils$ implements LazyLogging {
    public static StreamUtils$ MODULE$;
    private final Regex RESOURCE_PATTERN;
    private transient Logger logger;
    private volatile transient boolean bitmap$trans$0;

    static {
        new StreamUtils$();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [io.epiphanous.flinkrunner.util.StreamUtils$] */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$trans$0) {
                this.logger = LazyLogging.logger$(this);
                r0 = this;
                r0.bitmap$trans$0 = true;
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return !this.bitmap$trans$0 ? logger$lzycompute() : this.logger;
    }

    public Regex RESOURCE_PATTERN() {
        return this.RESOURCE_PATTERN;
    }

    public <A> A Pipe(A a) {
        return a;
    }

    public <E extends FlinkEvent> WatermarkStrategy<E> boundedOutofOrderness(TypeInformation<E> typeInformation, FlinkConfig flinkConfig) {
        return WatermarkStrategy.forBoundedOutOfOrderness(flinkConfig.maxLateness()).withIdleness(flinkConfig.maxIdleness());
    }

    public <E extends FlinkEvent> DataStream<E> maybeAssignTimestampsAndWatermarks(DataStream<E> dataStream, TypeInformation<E> typeInformation, FlinkConfig flinkConfig, StreamExecutionEnvironment streamExecutionEnvironment) {
        TimeCharacteristic streamTimeCharacteristic = streamExecutionEnvironment.getStreamTimeCharacteristic();
        TimeCharacteristic timeCharacteristic = TimeCharacteristic.EventTime;
        return (streamTimeCharacteristic != null ? !streamTimeCharacteristic.equals(timeCharacteristic) : timeCharacteristic != null) ? dataStream : dataStream.assignTimestampsAndWatermarks(boundedOutofOrderness(typeInformation, flinkConfig)).name(new StringBuilder(3).append("wm:").append(dataStream.name()).toString()).uid(new StringBuilder(3).append("wm:").append(dataStream.name()).toString());
    }

    public <E extends FlinkEvent> DataStream<E> fromSource(String str, TypeInformation<E> typeInformation, FlinkConfig flinkConfig, StreamExecutionEnvironment streamExecutionEnvironment) {
        DataStream<E> fromCollection;
        SourceConfig sourceConfig = flinkConfig.getSourceConfig(str.isEmpty() ? (String) flinkConfig.getSourceNames().head() : str);
        String label = sourceConfig.label();
        if (sourceConfig instanceof KafkaSourceConfig) {
            fromCollection = fromKafka((KafkaSourceConfig) sourceConfig, typeInformation, flinkConfig, streamExecutionEnvironment);
        } else if (sourceConfig instanceof KinesisSourceConfig) {
            fromCollection = fromKinesis((KinesisSourceConfig) sourceConfig, typeInformation, flinkConfig, streamExecutionEnvironment);
        } else if (sourceConfig instanceof FileSourceConfig) {
            fromCollection = fromFile((FileSourceConfig) sourceConfig, typeInformation, flinkConfig, streamExecutionEnvironment);
        } else if (sourceConfig instanceof SocketSourceConfig) {
            fromCollection = fromSocket((SocketSourceConfig) sourceConfig, typeInformation, flinkConfig, streamExecutionEnvironment);
        } else {
            if (!(sourceConfig instanceof CollectionSourceConfig)) {
                throw new IllegalArgumentException(new StringBuilder(30).append("unsupported source connector: ").append(sourceConfig.connector()).toString());
            }
            fromCollection = fromCollection((CollectionSourceConfig) sourceConfig, typeInformation, flinkConfig, streamExecutionEnvironment);
        }
        return fromCollection.name(label).uid(label);
    }

    public <E extends FlinkEvent> String fromSource$default$1() {
        return "";
    }

    public <E extends FlinkEvent> DataStream<E> fromKafka(KafkaSourceConfig kafkaSourceConfig, TypeInformation<E> typeInformation, FlinkConfig flinkConfig, StreamExecutionEnvironment streamExecutionEnvironment) {
        return streamExecutionEnvironment.addSource(new FlinkKafkaConsumer(kafkaSourceConfig.topic(), flinkConfig.getKafkaDeserializationSchema(kafkaSourceConfig), kafkaSourceConfig.properties()), typeInformation);
    }

    public <E extends FlinkEvent> DataStream<E> fromKinesis(KinesisSourceConfig kinesisSourceConfig, TypeInformation<E> typeInformation, FlinkConfig flinkConfig, StreamExecutionEnvironment streamExecutionEnvironment) {
        return streamExecutionEnvironment.addSource(new FlinkKinesisConsumer(kinesisSourceConfig.stream(), flinkConfig.getDeserializationSchema(kinesisSourceConfig), kinesisSourceConfig.properties()), typeInformation).name(kinesisSourceConfig.label());
    }

    public <E extends FlinkEvent> DataStream<E> fromFile(FileSourceConfig fileSourceConfig, TypeInformation<E> typeInformation, FlinkConfig flinkConfig, StreamExecutionEnvironment streamExecutionEnvironment) {
        String path = fileSourceConfig.path();
        Option unapplySeq = RESOURCE_PATTERN().unapplySeq(path);
        String sourceFilePath = (unapplySeq.isEmpty() || unapplySeq.get() == null || ((LinearSeqOptimized) unapplySeq.get()).lengthCompare(1) != 0) ? path : getSourceFilePath((String) ((LinearSeqOptimized) unapplySeq.get()).apply(0));
        DeserializationSchema<?> deserializationSchema = flinkConfig.getDeserializationSchema(fileSourceConfig);
        return streamExecutionEnvironment.readTextFile(sourceFilePath).name(new StringBuilder(4).append("raw:").append(fileSourceConfig.label()).toString()).uid(new StringBuilder(4).append("raw:").append(fileSourceConfig.label()).toString()).map(str -> {
            return (FlinkEvent) deserializationSchema.deserialize(str.getBytes(StandardCharsets.UTF_8));
        }, typeInformation);
    }

    public <E extends FlinkEvent> DataStream<E> fromSocket(SocketSourceConfig socketSourceConfig, TypeInformation<E> typeInformation, FlinkConfig flinkConfig, StreamExecutionEnvironment streamExecutionEnvironment) {
        return streamExecutionEnvironment.socketTextStream(socketSourceConfig.host(), socketSourceConfig.port(), streamExecutionEnvironment.socketTextStream$default$3(), streamExecutionEnvironment.socketTextStream$default$4()).name(new StringBuilder(4).append("raw:").append(socketSourceConfig.label()).toString()).uid(new StringBuilder(4).append("raw:").append(socketSourceConfig.label()).toString()).map(str -> {
            return (FlinkEvent) flinkConfig.getDeserializationSchema(socketSourceConfig).deserialize(str.getBytes(StandardCharsets.UTF_8));
        }, typeInformation);
    }

    public <E extends FlinkEvent> DataStream<E> fromCollection(CollectionSourceConfig collectionSourceConfig, TypeInformation<E> typeInformation, FlinkConfig flinkConfig, StreamExecutionEnvironment streamExecutionEnvironment) {
        return streamExecutionEnvironment.fromCollection(flinkConfig.getCollectionSource(collectionSourceConfig.topic()), PrimitiveArrayTypeInfo.getInfoFor(byte[].class)).name(new StringBuilder(4).append("raw:").append(collectionSourceConfig.label()).toString()).uid(new StringBuilder(4).append("raw:").append(collectionSourceConfig.label()).toString()).map(bArr -> {
            return (FlinkEvent) flinkConfig.getDeserializationSchema(collectionSourceConfig).deserialize(bArr);
        }, typeInformation);
    }

    public String getSourceFilePath(String str) throws FileNotFoundException {
        URI uri;
        Class<?> cls = getClass();
        Some apply = Option$.MODULE$.apply(cls.getResource(str));
        if (apply instanceof Some) {
            uri = ((URL) apply.value()).toURI();
        } else {
            if (!None$.MODULE$.equals(apply)) {
                throw new MatchError(apply);
            }
            Some apply2 = Option$.MODULE$.apply(cls.getResource(new StringBuilder(3).append(str).append(".gz").toString()));
            if (!(apply2 instanceof Some)) {
                if (None$.MODULE$.equals(apply2)) {
                    throw new FileNotFoundException(new StringBuilder(20).append("can't load resource ").append(str).toString());
                }
                throw new MatchError(apply2);
            }
            uri = ((URL) apply2.value()).toURI();
        }
        return new File(uri).getAbsolutePath();
    }

    public <E extends FlinkEvent> StreamUtils.EventStreamOps<E> EventStreamOps(DataStream<E> dataStream, TypeInformation<E> typeInformation) {
        return new StreamUtils.EventStreamOps<>(dataStream, typeInformation);
    }

    public <E extends FlinkEvent> Object toSink(DataStream<E> dataStream, String str, TypeInformation<E> typeInformation, FlinkConfig flinkConfig) {
        DataStreamSink elasticsearchSink;
        SinkConfig sinkConfig = flinkConfig.getSinkConfig(str.isEmpty() ? (String) flinkConfig.getSinkNames().head() : str);
        if (sinkConfig instanceof KafkaSinkConfig) {
            elasticsearchSink = toKafka(dataStream, (KafkaSinkConfig) sinkConfig, typeInformation, flinkConfig);
        } else if (sinkConfig instanceof KinesisSinkConfig) {
            elasticsearchSink = toKinesis(dataStream, (KinesisSinkConfig) sinkConfig, typeInformation, flinkConfig);
        } else if (sinkConfig instanceof FileSinkConfig) {
            elasticsearchSink = toFile(dataStream, (FileSinkConfig) sinkConfig, typeInformation, flinkConfig);
        } else if (sinkConfig instanceof SocketSinkConfig) {
            elasticsearchSink = toSocket(dataStream, (SocketSinkConfig) sinkConfig, typeInformation, flinkConfig);
        } else if (sinkConfig instanceof JdbcSinkConfig) {
            elasticsearchSink = toJdbc(dataStream, (JdbcSinkConfig) sinkConfig, typeInformation, flinkConfig);
        } else if (sinkConfig instanceof CassandraSinkConfig) {
            elasticsearchSink = toCassandraSink(dataStream, (CassandraSinkConfig) sinkConfig, typeInformation);
        } else {
            if (!(sinkConfig instanceof ElasticsearchSinkConfig)) {
                throw new IllegalArgumentException(new StringBuilder(30).append("unsupported source connector: ").append(sinkConfig.connector()).toString());
            }
            elasticsearchSink = toElasticsearchSink(dataStream, (ElasticsearchSinkConfig) sinkConfig, typeInformation);
        }
        return elasticsearchSink;
    }

    public <E extends FlinkEvent> String toSink$default$2() {
        return "";
    }

    public <E extends FlinkEvent> DataStreamSink<E> toKafka(DataStream<E> dataStream, KafkaSinkConfig kafkaSinkConfig, TypeInformation<E> typeInformation, FlinkConfig flinkConfig) {
        return dataStream.addSink(new FlinkKafkaProducer(kafkaSinkConfig.topic(), flinkConfig.getKafkaSerializationSchema(kafkaSinkConfig), kafkaSinkConfig.properties(), FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)).uid(kafkaSinkConfig.label()).name(kafkaSinkConfig.label());
    }

    public <E extends FlinkEvent> DataStreamSink<E> toKinesis(DataStream<E> dataStream, KinesisSinkConfig kinesisSinkConfig, TypeInformation<E> typeInformation, FlinkConfig flinkConfig) {
        FlinkKinesisProducer flinkKinesisProducer = new FlinkKinesisProducer(flinkConfig.getSerializationSchema(kinesisSinkConfig), kinesisSinkConfig.properties());
        flinkKinesisProducer.setDefaultStream(kinesisSinkConfig.stream());
        flinkKinesisProducer.setFailOnError(true);
        flinkKinesisProducer.setDefaultPartition("0");
        return dataStream.addSink(flinkKinesisProducer).uid(kinesisSinkConfig.label()).name(kinesisSinkConfig.label());
    }

    public <E extends FlinkEvent> DataStreamSink<E> toJdbc(DataStream<E> dataStream, JdbcSinkConfig jdbcSinkConfig, TypeInformation<E> typeInformation, FlinkConfig flinkConfig) {
        return dataStream.addSink(new JdbcSink(flinkConfig.getAddToJdbcBatchFunction(jdbcSinkConfig), jdbcSinkConfig.properties(), typeInformation)).uid(jdbcSinkConfig.label()).name(jdbcSinkConfig.label());
    }

    public <E extends FlinkEvent> DataStreamSink<E> toFile(DataStream<E> dataStream, FileSinkConfig fileSinkConfig, TypeInformation<E> typeInformation, FlinkConfig flinkConfig) {
        BasePathBucketAssigner bucketAssigner;
        DefaultRollingPolicy build;
        String path = fileSinkConfig.path();
        Properties properties = fileSinkConfig.properties();
        long j = new StringOps(Predef$.MODULE$.augmentString(properties.getProperty("bucket.check.interval", String.valueOf(BoxesRunTime.boxToInteger(60000))))).toLong();
        String property = properties.getProperty("bucket.assigner.type", "datetime");
        if ("none".equals(property)) {
            bucketAssigner = new BasePathBucketAssigner();
        } else if ("datetime".equals(property)) {
            bucketAssigner = new DateTimeBucketAssigner(properties.getProperty("bucket.assigner.datetime.format", "YYYY/MM/DD/HH"));
        } else {
            if (!"custom".equals(property)) {
                throw new IllegalArgumentException(new StringBuilder(32).append("Unknown bucket assigner type '").append(property).append("'.").toString());
            }
            bucketAssigner = flinkConfig.getBucketAssigner(properties);
        }
        BasePathBucketAssigner basePathBucketAssigner = bucketAssigner;
        String property2 = properties.getProperty("encoder.format", "row");
        if (!"row".equals(property2)) {
            if ("bulk".equals(property2)) {
                throw new NotImplementedError("Bulk file sink not implemented yet");
            }
            throw new IllegalArgumentException(new StringBuilder(36).append("Unknown file sink encoder format: '").append(property2).append("'").toString());
        }
        StreamingFileSink.DefaultRowFormatBuilder forRowFormat = StreamingFileSink.forRowFormat(new Path(path), flinkConfig.getEncoder(fileSinkConfig));
        String property3 = properties.getProperty("bucket.rolling.policy", "default");
        if ("default".equals(property3)) {
            build = DefaultRollingPolicy.builder().withInactivityInterval(new StringOps(Predef$.MODULE$.augmentString(properties.getProperty("bucket.rolling.policy.inactivity.interval", String.valueOf(BoxesRunTime.boxToInteger(60000))))).toLong()).withMaxPartSize(new StringOps(Predef$.MODULE$.augmentString(properties.getProperty("bucket.rolling.policy.max.part.size", String.valueOf(BoxesRunTime.boxToInteger(134217728))))).toLong()).withRolloverInterval(new StringOps(Predef$.MODULE$.augmentString(properties.getProperty("bucket.rolling.policy.rollover.interval", String.valueOf(BoxesRunTime.boxToLong(Long.MAX_VALUE))))).toLong()).build();
        } else {
            if (!"checkpoint".equals(property3)) {
                throw new IllegalArgumentException(new StringBuilder(38).append("Unknown bucket rolling policy type: '").append(property3).append("'").toString());
            }
            build = OnCheckpointRollingPolicy.build();
        }
        return dataStream.addSink(forRowFormat.withBucketAssigner(basePathBucketAssigner).withRollingPolicy(build).withBucketCheckInterval(j).build()).uid(fileSinkConfig.label()).name(fileSinkConfig.label());
    }

    public <E extends FlinkEvent> DataStreamSink<E> toSocket(DataStream<E> dataStream, SocketSinkConfig socketSinkConfig, TypeInformation<E> typeInformation, FlinkConfig flinkConfig) {
        return dataStream.writeToSocket(socketSinkConfig.host(), Predef$.MODULE$.int2Integer(socketSinkConfig.port()), flinkConfig.getSerializationSchema(socketSinkConfig)).uid(socketSinkConfig.label()).name(socketSinkConfig.label());
    }

    public <E extends FlinkEvent> CassandraSink<E> toCassandraSink(DataStream<E> dataStream, CassandraSinkConfig cassandraSinkConfig, TypeInformation<E> typeInformation) {
        return CassandraSink.addSink(dataStream).setHost(cassandraSinkConfig.host()).setQuery(cassandraSinkConfig.query()).build().uid(cassandraSinkConfig.label()).name(cassandraSinkConfig.label());
    }

    public <E extends FlinkEvent> DataStreamSink<E> toElasticsearchSink(DataStream<E> dataStream, final ElasticsearchSinkConfig elasticsearchSinkConfig, TypeInformation<E> typeInformation) {
        return dataStream.addSink(new ElasticsearchSink.Builder((List) JavaConverters$.MODULE$.seqAsJavaListConverter((Seq) elasticsearchSinkConfig.transports().map(str -> {
            URL url = new URL(new StringBuilder(8).append("https://").append(str).toString());
            return new HttpHost(url.getHost(), url.getPort() < 0 ? 9200 : url.getPort(), "https");
        }, List$.MODULE$.canBuildFrom())).asJava(), new ElasticsearchSinkFunction<E>(elasticsearchSinkConfig) { // from class: io.epiphanous.flinkrunner.util.StreamUtils$$anon$1
            private final ElasticsearchSinkConfig sinkConfig$1;

            public void open() {
                super.open();
            }

            /* JADX WARN: Incorrect types in method signature: (TE;Lorg/apache/flink/api/common/functions/RuntimeContext;Lorg/apache/flink/streaming/connectors/elasticsearch/RequestIndexer;)V */
            public void process(FlinkEvent flinkEvent, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                Iterator productIterator = flinkEvent.productIterator();
                requestIndexer.add(new IndexRequest[]{Requests.indexRequest(this.sinkConfig$1.index()).type(this.sinkConfig$1.type()).source((Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(flinkEvent.getClass().getDeclaredFields())).map(field -> {
                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(field.getName()), productIterator.next());
                }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class))))).toMap(Predef$.MODULE$.$conforms())).asJava())});
            }

            {
                this.sinkConfig$1 = elasticsearchSinkConfig;
            }
        }).build()).uid(elasticsearchSinkConfig.label()).name(elasticsearchSinkConfig.label());
    }

    private StreamUtils$() {
        MODULE$ = this;
        LazyLogging.$init$(this);
        this.RESOURCE_PATTERN = new StringOps(Predef$.MODULE$.augmentString("resource://(.*)")).r();
    }
}
