package io.epiphanous.flinkrunner.flink;

import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import io.epiphanous.flinkrunner.model.FlinkConfig;
import io.epiphanous.flinkrunner.model.FlinkEvent;
import io.epiphanous.flinkrunner.util.StreamUtils$;
import io.epiphanous.flinkrunner.util.StreamUtils$Pipe$;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import scala.Predef$;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.util.Either;

/* compiled from: BaseFlinkJob.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015c!B\u0001\u0003\u0003\u0003Y!\u0001\u0004\"bg\u00164E.\u001b8l\u0015>\u0014'BA\u0002\u0005\u0003\u00151G.\u001b8l\u0015\t)a!A\u0006gY&t7N];o]\u0016\u0014(BA\u0004\t\u0003))\u0007/\u001b9iC:|Wo\u001d\u0006\u0002\u0013\u0005\u0011\u0011n\\\u0002\u0001+\ra1\tM\n\u0004\u00015\u0019\u0002C\u0001\b\u0012\u001b\u0005y!\"\u0001\t\u0002\u000bM\u001c\u0017\r\\1\n\u0005Iy!AB!osJ+g\r\u0005\u0002\u001575\tQC\u0003\u0002\u0017/\u0005a1oY1mC2|wmZ5oO*\u0011\u0001$G\u0001\tif\u0004Xm]1gK*\t!$A\u0002d_6L!\u0001H\u000b\u0003\u00171\u000b'0\u001f'pO\u001eLgn\u001a\u0005\t=\u0001\u0011\u0019\u0011)A\u0006?\u0005QQM^5eK:\u001cW\rJ\u0019\u0011\u0007\u0001bc&D\u0001\"\u0015\t\u00113%\u0001\u0005usB,\u0017N\u001c4p\u0015\t!S%\u0001\u0004d_6lwN\u001c\u0006\u0003M\u001d\n1!\u00199j\u0015\t\u0019\u0001F\u0003\u0002*U\u00051\u0011\r]1dQ\u0016T\u0011aK\u0001\u0004_J<\u0017BA\u0017\"\u0005=!\u0016\u0010]3J]\u001a|'/\\1uS>t\u0007CA\u00181\u0019\u0001!Q!\r\u0001C\u0002I\u00121aT+U#\t\u0019d\u0007\u0005\u0002\u000fi%\u0011Qg\u0004\u0002\b\u001d>$\b.\u001b8h!\t9$(D\u00019\u0015\tID!A\u0003n_\u0012,G.\u0003\u0002<q\tQa\t\\5oW\u00163XM\u001c;\t\u000bu\u0002A\u0011\u0001 \u0002\rqJg.\u001b;?)\u0005yDC\u0001!J!\u0011\t\u0005A\u0011\u0018\u000e\u0003\t\u0001\"aL\"\u0005\u000b\u0011\u0003!\u0019A#\u0003\u0005\u0011\u001b\u0016CA\u001aG!\tqq)\u0003\u0002I\u001f\t\u0019\u0011I\\=\t\u000bya\u00049A\u0010\t\u000b-\u0003A\u0011\u0001'\u0002\t\u0019dwn\u001e\u000b\u0002\u001bR\u0019aJV.\u0011\u0007=#f&D\u0001Q\u0015\t\u0001\u0012K\u0003\u0002'%*\u00111kJ\u0001\ngR\u0014X-Y7j]\u001eL!!\u0016)\u0003\u0015\u0011\u000bG/Y*ue\u0016\fW\u000eC\u0003X\u0015\u0002\u000f\u0001,\u0001\u0004d_:4\u0017n\u001a\t\u0003oeK!A\u0017\u001d\u0003\u0017\u0019c\u0017N\\6D_:4\u0017n\u001a\u0005\u00069*\u0003\u001d!X\u0001\u0004K:4\bC\u00010k\u001d\ty\u0006N\u0004\u0002aO:\u0011\u0011M\u001a\b\u0003E\u0016l\u0011a\u0019\u0006\u0003I*\ta\u0001\u0010:p_Rt\u0014\"A\u0005\n\u0005\u001dA\u0011BA\u0003\u0007\u0013\tIG!A\u0004qC\u000e\\\u0017mZ3\n\u0005-d'aA*F\u000b*\u0011\u0011\u000e\u0002\u0005\u0006]\u0002!\ta\\\u0001\u0004eVtG#\u00019\u0015\u000bE\f\t!a\u0001\u0011\tI4\u0018\u0010 \b\u0003gVt!A\u0019;\n\u0003AI!![\b\n\u0005]D(AB#ji\",'O\u0003\u0002j\u001fA\u0019!O\u001f\u0018\n\u0005mD(\u0001C%uKJ\fGo\u001c:\u0011\u0005utX\"A\u0012\n\u0005}\u001c#A\u0005&pE\u0016CXmY;uS>t'+Z:vYRDQaV7A\u0004aCQ\u0001X7A\u0004uCq!a\u0002\u0001\r\u0003\tI!\u0001\u0004t_V\u00148-\u001a\u000b\u0003\u0003\u0017!RAQA\u0007\u0003\u001fAaaVA\u0003\u0001\bA\u0006B\u0002/\u0002\u0006\u0001\u000fQ\fC\u0004\u0002\u0014\u00011\t!!\u0006\u0002\u0013Q\u0014\u0018M\\:g_JlG\u0003BA\f\u0003;!RATA\r\u00037AaaVA\t\u0001\bA\u0006B\u0002/\u0002\u0012\u0001\u000fQ\fC\u0004\u0002 \u0005E\u0001\u0019\u0001\"\u0002\u0005%t\u0007bBA\u0012\u0001\u0011\u0005\u0011QE\u0001\u0005g&t7\u000e\u0006\u0003\u0002(\u0005MBCBA\u0015\u0003_\t\t\u0004E\u0002\u000f\u0003WI1!!\f\u0010\u0005\u0011)f.\u001b;\t\r]\u000b\t\u0003q\u0001Y\u0011\u0019a\u0016\u0011\u0005a\u0002;\"9\u0011QGA\u0011\u0001\u0004q\u0015aA8vi\"9\u0011\u0011\b\u0001\u0005\u0002\u0005m\u0012!C7bs\n,7+\u001b8l)\u0011\ti$a\u0011\u0015\r\u0005%\u0012qHA!\u0011\u00199\u0016q\u0007a\u00021\"1A,a\u000eA\u0004uCq!!\u000e\u00028\u0001\u0007a\n")
/* loaded from: input_file:io/epiphanous/flinkrunner/flink/BaseFlinkJob.class */
public abstract class BaseFlinkJob<DS, OUT extends FlinkEvent> implements LazyLogging {
    public final TypeInformation<OUT> io$epiphanous$flinkrunner$flink$BaseFlinkJob$$evidence$1;
    private final transient Logger logger;
    private volatile transient boolean bitmap$trans$0;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$trans$0) {
                this.logger = LazyLogging.class.logger(this);
                this.bitmap$trans$0 = true;
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.logger;
        }
    }

    public Logger logger() {
        return this.bitmap$trans$0 ? this.logger : logger$lzycompute();
    }

    public DataStream<OUT> flow(FlinkConfig flinkConfig, StreamExecutionEnvironment streamExecutionEnvironment) {
        return (DataStream) StreamUtils$Pipe$.MODULE$.$bar$hash$extension(StreamUtils$.MODULE$.Pipe(StreamUtils$Pipe$.MODULE$.$bar$greater$extension(StreamUtils$.MODULE$.Pipe(source(flinkConfig, streamExecutionEnvironment)), new BaseFlinkJob$$anonfun$flow$1(this, flinkConfig, streamExecutionEnvironment))), new BaseFlinkJob$$anonfun$flow$2(this, flinkConfig, streamExecutionEnvironment));
    }

    public Either<Iterator<OUT>, JobExecutionResult> run(FlinkConfig flinkConfig, StreamExecutionEnvironment streamExecutionEnvironment) {
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info("\nSTARTING FLINK JOB: {} {}\n", new String[]{flinkConfig.jobName(), Predef$.MODULE$.refArrayOps(flinkConfig.jobArgs()).mkString(" ")});
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        DataStream<OUT> flow = flow(flinkConfig, streamExecutionEnvironment);
        if (!flinkConfig.showPlan()) {
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info("PLAN:\n{}\n", new Object[]{streamExecutionEnvironment.getExecutionPlan()});
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
        }
        return flinkConfig.mockEdges() ? package$.MODULE$.Left().apply(JavaConverters$.MODULE$.asScalaIteratorConverter(DataStreamUtils.collect(flow.javaStream())).asScala()) : package$.MODULE$.Right().apply(streamExecutionEnvironment.execute(flinkConfig.jobName()));
    }

    public abstract DS source(FlinkConfig flinkConfig, StreamExecutionEnvironment streamExecutionEnvironment);

    public abstract DataStream<OUT> transform(DS ds, FlinkConfig flinkConfig, StreamExecutionEnvironment streamExecutionEnvironment);

    public void sink(DataStream<OUT> dataStream, FlinkConfig flinkConfig, StreamExecutionEnvironment streamExecutionEnvironment) {
        flinkConfig.getSinkNames().foreach(new BaseFlinkJob$$anonfun$sink$1(this, dataStream, flinkConfig));
    }

    public void maybeSink(DataStream<OUT> dataStream, FlinkConfig flinkConfig, StreamExecutionEnvironment streamExecutionEnvironment) {
        if (flinkConfig.mockEdges()) {
            return;
        }
        sink(dataStream, flinkConfig, streamExecutionEnvironment);
    }

    public BaseFlinkJob(TypeInformation<OUT> typeInformation) {
        this.io$epiphanous$flinkrunner$flink$BaseFlinkJob$$evidence$1 = typeInformation;
        LazyLogging.class.$init$(this);
    }
}
