package pl.touk.nussknacker.engine.process.registrar;

import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import java.net.URL;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.OutputTag;
import pl.touk.nussknacker.engine.api.StreamMetaData;
import pl.touk.nussknacker.engine.api.StreamMetaData$;
import pl.touk.nussknacker.engine.deployment.DeploymentData;
import pl.touk.nussknacker.engine.process.ExecutionConfigPreparer;
import pl.touk.nussknacker.engine.process.FlinkJobConfig;
import pl.touk.nussknacker.engine.process.FlinkJobConfig$ExecutionMode$;
import pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompilerData;
import pl.touk.nussknacker.engine.process.util.StateConfiguration;
import pl.touk.nussknacker.engine.process.util.StateConfiguration$;
import pl.touk.nussknacker.engine.util.MetaDataExtractor$;
import scala.Array$;
import scala.Enumeration;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: StreamExecutionEnvPreparer.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005Ef\u0001\u0002\u0007\u000e\u0001iA\u0001b\f\u0001\u0003\u0002\u0003\u0006I\u0001\r\u0005\ti\u0001\u0011\t\u0011)A\u0005k!)\u0001\b\u0001C\u0001s!)Q\b\u0001C!}!)A\r\u0001C\u0005K\"9\u00111\u0001\u0001\u0005\u0012\u0005\u0015\u0001bBA\u0012\u0001\u0011\u0005\u0013Q\u0005\u0005\b\u0003[\u0001A\u0011CA\u0018\u0011\u001d\t\t\u0005\u0001C!\u0003\u0007Bq!!\u0016\u0001\t\u0003\n9\u0006C\u0004\u0002\u001e\u0002!I!a(\u0003C\u0011+g-Y;miN#(/Z1n\u000bb,7-\u001e;j_:,eN\u001e)sKB\f'/\u001a:\u000b\u00059y\u0011!\u0003:fO&\u001cHO]1s\u0015\t\u0001\u0012#A\u0004qe>\u001cWm]:\u000b\u0005I\u0019\u0012AB3oO&tWM\u0003\u0002\u0015+\u0005Ya.^:tW:\f7m[3s\u0015\t1r#\u0001\u0003u_V\\'\"\u0001\r\u0002\u0005Ad7\u0001A\n\u0005\u0001m\tS\u0005\u0005\u0002\u001d?5\tQDC\u0001\u001f\u0003\u0015\u00198-\u00197b\u0013\t\u0001SD\u0001\u0004B]f\u0014VM\u001a\t\u0003E\rj\u0011!D\u0005\u0003I5\u0011!d\u0015;sK\u0006lW\t_3dkRLwN\\#omB\u0013X\r]1sKJ\u0004\"AJ\u0017\u000e\u0003\u001dR!\u0001K\u0015\u0002\u0019M\u001c\u0017\r\\1m_\u001e<\u0017N\\4\u000b\u0005)Z\u0013\u0001\u0003;za\u0016\u001c\u0018MZ3\u000b\u00031\n1aY8n\u0013\tqsEA\u0006MCjLHj\\4hS:<\u0017!\u00036pE\u000e{gNZ5h!\t\t$'D\u0001\u0010\u0013\t\u0019tB\u0001\bGY&t7NS8c\u0007>tg-[4\u0002/\u0015DXmY;uS>t7i\u001c8gS\u001e\u0004&/\u001a9be\u0016\u0014\bCA\u00197\u0013\t9tBA\fFq\u0016\u001cW\u000f^5p]\u000e{gNZ5h!J,\u0007/\u0019:fe\u00061A(\u001b8jiz\"2AO\u001e=!\t\u0011\u0003\u0001C\u00030\u0007\u0001\u0007\u0001\u0007C\u00035\u0007\u0001\u0007Q'A\bqe\u0016\u0014VmZ5tiJ\fG/[8o)\u0011y$\t\u0016/\u0011\u0005q\u0001\u0015BA!\u001e\u0005\u0011)f.\u001b;\t\u000b\r#\u0001\u0019\u0001#\u0002\u0007\u0015tg\u000f\u0005\u0002F%6\taI\u0003\u0002H\u0011\u0006YQM\u001c<je>tW.\u001a8u\u0015\tI%*A\u0002ba&T!a\u0013'\u0002\u0013M$(/Z1nS:<'BA'O\u0003\u00151G.\u001b8l\u0015\ty\u0005+\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002#\u0006\u0019qN]4\n\u0005M3%AG*ue\u0016\fW.\u0012=fGV$\u0018n\u001c8F]ZL'o\u001c8nK:$\b\"B+\u0005\u0001\u00041\u0016\u0001D2p[BLG.\u001a:ECR\f\u0007CA,[\u001b\u0005A&BA-\u0010\u0003!\u0019w.\u001c9jY\u0016\u0014\u0018BA.Y\u0005a1E.\u001b8l!J|7-Z:t\u0007>l\u0007/\u001b7fe\u0012\u000bG/\u0019\u0005\u0006;\u0012\u0001\rAX\u0001\u000fI\u0016\u0004Hn\\=nK:$H)\u0019;b!\ty&-D\u0001a\u0015\t\t\u0017#\u0001\u0006eKBdw._7f]RL!a\u00191\u0003\u001d\u0011+\u0007\u000f\\8z[\u0016tG\u000fR1uC\u000612m\u001c8gS\u001e,(/Z#yK\u000e,H/[8o\u001b>$W\rF\u0002@M\u001eDQaQ\u0003A\u0002\u0011CQ\u0001[\u0003A\u0002%\f1#\u001a=fGV$\u0018n\u001c8N_\u0012,7i\u001c8gS\u001e\u0004\"A\u001b@\u000f\u0005-\\hB\u00017z\u001d\ti\u0007P\u0004\u0002oo:\u0011qN\u001e\b\u0003aVt!!\u001d;\u000e\u0003IT!a]\r\u0002\rq\u0012xn\u001c;?\u0013\u0005A\u0012B\u0001\f\u0018\u0013\t!R#\u0003\u0002\u0013'%\u0011\u0001#E\u0005\u0003u>\taB\u00127j].TuNY\"p]\u001aLw-\u0003\u0002}{\u0006iQ\t_3dkRLwN\\'pI\u0016T!A_\b\n\u0007}\f\tAA\u0007Fq\u0016\u001cW\u000f^5p]6{G-\u001a\u0006\u0003yv\fqcY8oM&<WO]3S_\u000e\\7\u000f\u0012\"CC\u000e\\WM\u001c3\u0015\u000b}\n9!!\u0003\t\u000b\r3\u0001\u0019\u0001#\t\u000f\u0005-a\u00011\u0001\u0002\u000e\u000511m\u001c8gS\u001e\u0004B!a\u0004\u0002\u001e9!\u0011\u0011CA\f\u001d\ra\u00171C\u0005\u0004\u0003+y\u0011\u0001B;uS2LA!!\u0007\u0002\u001c\u0005\u00112\u000b^1uK\u000e{gNZ5hkJ\fG/[8o\u0015\r\t)bD\u0005\u0005\u0003?\t\tCA\rS_\u000e\\7\u000f\u0012\"Ti\u0006$XMQ1dW\u0016tGmQ8oM&<'\u0002BA\r\u00037\t\u0001\u0003]8tiJ+w-[:ue\u0006$\u0018n\u001c8\u0015\u000f}\n9#!\u000b\u0002,!)1i\u0002a\u0001\t\")Qk\u0002a\u0001-\")Ql\u0002a\u0001=\u0006!2m\u001c8gS\u001e,(/Z\"iK\u000e\\\u0007o\\5oiN$RaPA\u0019\u0003gAQa\u0011\u0005A\u0002\u0011Cq!!\u000e\t\u0001\u0004\t9$\u0001\btiJ,\u0017-\\'fi\u0006$\u0015\r^1\u0011\t\u0005e\u0012QH\u0007\u0003\u0003wQ!!S\t\n\t\u0005}\u00121\b\u0002\u000f'R\u0014X-Y7NKR\fG)\u0019;b\u0003i1G.\u001b8l\u00072\f7o\u001d'pC\u0012,'oU5nk2\fG/[8o+\t\t)\u0005\u0005\u0003\u0002H\u0005ESBAA%\u0015\u0011\tY%!\u0014\u0002\t1\fgn\u001a\u0006\u0003\u0003\u001f\nAA[1wC&!\u00111KA%\u0005-\u0019E.Y:t\u0019>\fG-\u001a:\u0002!MLG-Z(viB,HoR3ui\u0016\u0014X\u0003BA-\u0003W\"b!a\u0017\u0002~\u0005=\u0005CBA/\u0003G\n9'\u0004\u0002\u0002`)\u0019\u0011\u0011\r%\u0002\u0015\u0011\fG/Y:ue\u0016\fW.\u0003\u0003\u0002f\u0005}#A\u0003#bi\u0006\u001cFO]3b[B!\u0011\u0011NA6\u0019\u0001!q!!\u001c\u000b\u0005\u0004\tyGA\u0001U#\u0011\t\t(a\u001e\u0011\u0007q\t\u0019(C\u0002\u0002vu\u0011qAT8uQ&tw\rE\u0002\u001d\u0003sJ1!a\u001f\u001e\u0005\r\te.\u001f\u0005\b\u0003\u007fR\u0001\u0019AAA\u0003i\u0019\u0018N\\4mK>+H\u000f];u'R\u0014X-Y7Pa\u0016\u0014\u0018\r^8sa\u0011\t\u0019)a#\u0011\r\u0005u\u0013QQAE\u0013\u0011\t9)a\u0018\u00035MKgn\u001a7f\u001fV$\b/\u001e;TiJ,\u0017-\\(qKJ\fGo\u001c:\u0011\t\u0005%\u00141\u0012\u0003\r\u0003\u001b\u000bi(!A\u0001\u0002\u000b\u0005\u0011q\u000e\u0002\u0004?\u0012\u0012\u0004bBAI\u0015\u0001\u0007\u00111S\u0001\n_V$\b/\u001e;UC\u001e\u0004b!!&\u0002\u001a\u0006\u001dTBAAL\u0015\r\t)\u0002T\u0005\u0005\u00037\u000b9JA\u0005PkR\u0004X\u000f\u001e+bO\u0006aqO]1q\u0013:d\u0015-\u001c2eCV!\u0011\u0011UAS)\u0011\t\u0019+a*\u0011\t\u0005%\u0014Q\u0015\u0003\b\u0003[Z!\u0019AA8\u0011\u001d\tIk\u0003a\u0001\u0003W\u000b1a\u001c2k!\u0015a\u0012QVAR\u0013\r\ty+\b\u0002\n\rVt7\r^5p]B\u0002")
/* loaded from: input_file:pl/touk/nussknacker/engine/process/registrar/DefaultStreamExecutionEnvPreparer.class */
public class DefaultStreamExecutionEnvPreparer implements StreamExecutionEnvPreparer, LazyLogging {
    private final FlinkJobConfig jobConfig;
    private final ExecutionConfigPreparer executionConfigPreparer;
    private transient Logger logger;
    private volatile transient boolean bitmap$trans$0;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [pl.touk.nussknacker.engine.process.registrar.DefaultStreamExecutionEnvPreparer] */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$trans$0) {
                this.logger = LazyLogging.logger$(this);
                r0 = this;
                r0.bitmap$trans$0 = true;
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return !this.bitmap$trans$0 ? logger$lzycompute() : this.logger;
    }

    @Override // pl.touk.nussknacker.engine.process.registrar.StreamExecutionEnvPreparer
    public void preRegistration(StreamExecutionEnvironment streamExecutionEnvironment, FlinkProcessCompilerData flinkProcessCompilerData, DeploymentData deploymentData) {
        this.executionConfigPreparer.prepareExecutionConfig(streamExecutionEnvironment.getConfig(), flinkProcessCompilerData.jobData(), deploymentData);
        StreamMetaData streamMetaData = (StreamMetaData) MetaDataExtractor$.MODULE$.extractTypeSpecificDataOrDefault(flinkProcessCompilerData.jobData().metaData(), new StreamMetaData(StreamMetaData$.MODULE$.apply$default$1(), StreamMetaData$.MODULE$.apply$default$2(), StreamMetaData$.MODULE$.apply$default$3(), StreamMetaData$.MODULE$.apply$default$4()), ClassTag$.MODULE$.apply(StreamMetaData.class));
        streamExecutionEnvironment.setRestartStrategy(flinkProcessCompilerData.restartStrategy());
        streamMetaData.parallelism().foreach(obj -> {
            return streamExecutionEnvironment.setParallelism(BoxesRunTime.unboxToInt(obj));
        });
        configureCheckpoints(streamExecutionEnvironment, streamMetaData);
        configureExecutionMode(streamExecutionEnvironment, (Enumeration.Value) this.jobConfig.executionMode().getOrElse(() -> {
            return FlinkJobConfig$ExecutionMode$.MODULE$.m8default();
        }));
        Tuple2 tuple2 = new Tuple2(this.jobConfig.rocksDB(), streamMetaData.spillStateToDisk());
        if (tuple2 != null) {
            Some some = (Option) tuple2._1();
            Some some2 = (Option) tuple2._2();
            if (some instanceof Some) {
                StateConfiguration.RocksDBStateBackendConfig rocksDBStateBackendConfig = (StateConfiguration.RocksDBStateBackendConfig) some.value();
                if ((some2 instanceof Some) && true == BoxesRunTime.unboxToBoolean(some2.value()) && rocksDBStateBackendConfig.enable()) {
                    if (logger().underlying().isInfoEnabled()) {
                        logger().underlying().info("Using RocksDB state backend");
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    } else {
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    }
                    configureRocksDBBackend(streamExecutionEnvironment, rocksDBStateBackendConfig);
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                    return;
                }
            }
        }
        if (tuple2 != null) {
            Option option = (Option) tuple2._1();
            Some some3 = (Option) tuple2._2();
            if (None$.MODULE$.equals(option) && (some3 instanceof Some) && true == BoxesRunTime.unboxToBoolean(some3.value())) {
                if (!logger().underlying().isWarnEnabled()) {
                    BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                    return;
                } else {
                    logger().underlying().warn("RocksDB not configured, cannot use spillStateToDisk");
                    BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
                    return;
                }
            }
        }
        if (!logger().underlying().isInfoEnabled()) {
            BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
        } else {
            logger().underlying().info("Using default state backend configured by cluster");
            BoxedUnit boxedUnit7 = BoxedUnit.UNIT;
        }
    }

    private void configureExecutionMode(StreamExecutionEnvironment streamExecutionEnvironment, Enumeration.Value value) {
        Enumeration.Value Streaming = FlinkJobConfig$ExecutionMode$.MODULE$.Streaming();
        if (Streaming != null ? Streaming.equals(value) : value == null) {
            streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        Enumeration.Value Batch = FlinkJobConfig$ExecutionMode$.MODULE$.Batch();
        if (Batch != null ? !Batch.equals(value) : value != null) {
            throw new MatchError(value);
        }
        streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
    }

    public void configureRocksDBBackend(StreamExecutionEnvironment streamExecutionEnvironment, StateConfiguration.RocksDBStateBackendConfig rocksDBStateBackendConfig) {
        streamExecutionEnvironment.setStateBackend(StateConfiguration$.MODULE$.prepareRocksDBStateBackend(rocksDBStateBackendConfig));
    }

    @Override // pl.touk.nussknacker.engine.process.registrar.StreamExecutionEnvPreparer
    public void postRegistration(StreamExecutionEnvironment streamExecutionEnvironment, FlinkProcessCompilerData flinkProcessCompilerData, DeploymentData deploymentData) {
    }

    public void configureCheckpoints(StreamExecutionEnvironment streamExecutionEnvironment, StreamMetaData streamMetaData) {
        streamMetaData.checkpointIntervalDuration().orElse(() -> {
            return this.jobConfig.checkpointConfig().map(checkpointConfig -> {
                return checkpointConfig.checkpointInterval();
            });
        }).map(duration -> {
            return BoxesRunTime.boxToLong(duration.toMillis());
        }).foreach(j -> {
            streamExecutionEnvironment.enableCheckpointing(j);
            streamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(BoxesRunTime.unboxToLong(this.jobConfig.checkpointConfig().flatMap(checkpointConfig -> {
                return checkpointConfig.minPauseBetweenCheckpoints();
            }).map(finiteDuration -> {
                return BoxesRunTime.boxToLong(finiteDuration.toMillis());
            }).getOrElse(() -> {
                return j / 2;
            })));
            streamExecutionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(BoxesRunTime.unboxToInt(this.jobConfig.checkpointConfig().flatMap(checkpointConfig2 -> {
                return checkpointConfig2.maxConcurrentCheckpoints();
            }).getOrElse(() -> {
                return 1;
            })));
            Option flatMap = this.jobConfig.checkpointConfig().flatMap(checkpointConfig3 -> {
                return checkpointConfig3.tolerableCheckpointFailureNumber();
            });
            CheckpointConfig checkpointConfig4 = streamExecutionEnvironment.getCheckpointConfig();
            flatMap.foreach(i -> {
                checkpointConfig4.setTolerableCheckpointFailureNumber(i);
            });
        });
    }

    @Override // pl.touk.nussknacker.engine.process.registrar.StreamExecutionEnvPreparer
    public ClassLoader flinkClassLoaderSimulation() {
        return (ClassLoader) wrapInLambda(() -> {
            return FlinkUserCodeClassLoaders.childFirst((URL[]) Array$.MODULE$.empty(ClassTag$.MODULE$.apply(URL.class)), Thread.currentThread().getContextClassLoader(), (String[]) Array$.MODULE$.empty(ClassTag$.MODULE$.apply(String.class)), th -> {
                throw th;
            }, true);
        });
    }

    @Override // pl.touk.nussknacker.engine.process.registrar.StreamExecutionEnvPreparer
    public <T> DataStream<T> sideOutputGetter(SingleOutputStreamOperator<?> singleOutputStreamOperator, OutputTag<T> outputTag) {
        return (DataStream) wrapInLambda(() -> {
            return singleOutputStreamOperator.getSideOutput(outputTag);
        });
    }

    private <T> T wrapInLambda(Function0<T> function0) {
        return (T) function0.apply();
    }

    public DefaultStreamExecutionEnvPreparer(FlinkJobConfig flinkJobConfig, ExecutionConfigPreparer executionConfigPreparer) {
        this.jobConfig = flinkJobConfig;
        this.executionConfigPreparer = executionConfigPreparer;
        LazyLogging.$init$(this);
    }
}
