package pl.touk.nussknacker.engine.process.registrar;

import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import java.net.URL;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.OutputTag;
import pl.touk.nussknacker.engine.api.StreamMetaData;
import pl.touk.nussknacker.engine.deployment.DeploymentData;
import pl.touk.nussknacker.engine.process.CheckpointConfig;
import pl.touk.nussknacker.engine.process.ExecutionConfigPreparer;
import pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompilerData;
import pl.touk.nussknacker.engine.process.util.StateConfiguration;
import pl.touk.nussknacker.engine.process.util.StateConfiguration$;
import pl.touk.nussknacker.engine.util.MetaDataExtractor$;
import scala.Array$;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: StreamExecutionEnvPreparer.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005}e\u0001\u0002\u0007\u000e\u0001iA\u0001b\f\u0001\u0003\u0002\u0003\u0006I\u0001\r\u0005\to\u0001\u0011\t\u0011)A\u0005q!A\u0011\u000b\u0001B\u0001B\u0003%!\u000bC\u0003V\u0001\u0011\u0005a\u000bC\u0003\\\u0001\u0011\u0005C\fC\u0004\u0002\u0006\u0001!\t\"a\u0002\t\u000f\u0005=\u0001\u0001\"\u0011\u0002\u0012!9\u00111\u0004\u0001\u0005\u0012\u0005u\u0001bBA\u0018\u0001\u0011\u0005\u0013\u0011\u0007\u0005\b\u0003\u0007\u0002A\u0011IA#\u0011\u001d\tY\t\u0001C\u0005\u0003\u001b\u0013\u0011\u0005R3gCVdGo\u0015;sK\u0006lW\t_3dkRLwN\\#omB\u0013X\r]1sKJT!AD\b\u0002\u0013I,w-[:ue\u0006\u0014(B\u0001\t\u0012\u0003\u001d\u0001(o\\2fgNT!AE\n\u0002\r\u0015tw-\u001b8f\u0015\t!R#A\u0006okN\u001c8N\\1dW\u0016\u0014(B\u0001\f\u0018\u0003\u0011!x.^6\u000b\u0003a\t!\u0001\u001d7\u0004\u0001M!\u0001aG\u0011&!\tar$D\u0001\u001e\u0015\u0005q\u0012!B:dC2\f\u0017B\u0001\u0011\u001e\u0005\u0019\te.\u001f*fMB\u0011!eI\u0007\u0002\u001b%\u0011A%\u0004\u0002\u001b'R\u0014X-Y7Fq\u0016\u001cW\u000f^5p]\u0016sg\u000f\u0015:fa\u0006\u0014XM\u001d\t\u0003M5j\u0011a\n\u0006\u0003Q%\nAb]2bY\u0006dwnZ4j]\u001eT!AK\u0016\u0002\u0011QL\b/Z:bM\u0016T\u0011\u0001L\u0001\u0004G>l\u0017B\u0001\u0018(\u0005-a\u0015M_=M_\u001e<\u0017N\\4\u0002!\rDWmY6q_&tGoQ8oM&<\u0007c\u0001\u000f2g%\u0011!'\b\u0002\u0007\u001fB$\u0018n\u001c8\u0011\u0005Q*T\"A\b\n\u0005Yz!\u0001E\"iK\u000e\\\u0007o\\5oi\u000e{gNZ5h\u0003e\u0011xnY6t\t\n\u001bF/\u0019;f\u0005\u0006\u001c7.\u001a8e\u0007>tg-[4\u0011\u0007q\t\u0014\b\u0005\u0002;\u001d:\u00111h\u0013\b\u0003y%s!!\u0010%\u000f\u0005y:eBA G\u001d\t\u0001UI\u0004\u0002B\t6\t!I\u0003\u0002D3\u00051AH]8pizJ\u0011\u0001G\u0005\u0003-]I!\u0001F\u000b\n\u0005I\u0019\u0012B\u0001\t\u0012\u0013\tQu\"\u0001\u0003vi&d\u0017B\u0001'N\u0003I\u0019F/\u0019;f\u0007>tg-[4ve\u0006$\u0018n\u001c8\u000b\u0005){\u0011BA(Q\u0005e\u0011vnY6t\t\n\u001bF/\u0019;f\u0005\u0006\u001c7.\u001a8e\u0007>tg-[4\u000b\u00051k\u0015aF3yK\u000e,H/[8o\u0007>tg-[4Qe\u0016\u0004\u0018M]3s!\t!4+\u0003\u0002U\u001f\t9R\t_3dkRLwN\\\"p]\u001aLw\r\u0015:fa\u0006\u0014XM]\u0001\u0007y%t\u0017\u000e\u001e \u0015\t]C\u0016L\u0017\t\u0003E\u0001AQa\f\u0003A\u0002ABQa\u000e\u0003A\u0002aBQ!\u0015\u0003A\u0002I\u000bq\u0002\u001d:f%\u0016<\u0017n\u001d;sCRLwN\u001c\u000b\u0005;\u0002\u0014(\u0010\u0005\u0002\u001d=&\u0011q,\b\u0002\u0005+:LG\u000fC\u0003b\u000b\u0001\u0007!-A\u0002f]Z\u0004\"a\u00199\u000e\u0003\u0011T!!\u001a4\u0002\u0017\u0015tg/\u001b:p]6,g\u000e\u001e\u0006\u0003O\"\f1!\u00199j\u0015\tI'.A\u0005tiJ,\u0017-\\5oO*\u00111\u000e\\\u0001\u0006M2Lgn\u001b\u0006\u0003[:\fa!\u00199bG\",'\"A8\u0002\u0007=\u0014x-\u0003\u0002rI\nQ2\u000b\u001e:fC6,\u00050Z2vi&|g.\u00128wSJ|g.\\3oi\")1/\u0002a\u0001i\u0006y\u0001O]8dKN\u001cx+\u001b;i\t\u0016\u00048\u000f\u0005\u0002vq6\taO\u0003\u0002x\u001f\u0005A1m\\7qS2,'/\u0003\u0002zm\nAb\t\\5oWB\u0013xnY3tg\u000e{W\u000e]5mKJ$\u0015\r^1\t\u000bm,\u0001\u0019\u0001?\u0002\u001d\u0011,\u0007\u000f\\8z[\u0016tG\u000fR1uCB\u0019Q0!\u0001\u000e\u0003yT!a`\t\u0002\u0015\u0011,\u0007\u000f\\8z[\u0016tG/C\u0002\u0002\u0004y\u0014a\u0002R3qY>LX.\u001a8u\t\u0006$\u0018-A\fd_:4\u0017nZ;sKJ{7m[:E\u0005\n\u000b7m[3oIR)Q,!\u0003\u0002\f!)\u0011M\u0002a\u0001E\"1\u0011Q\u0002\u0004A\u0002e\naaY8oM&<\u0017\u0001\u00059pgR\u0014VmZ5tiJ\fG/[8o)\u001di\u00161CA\u000b\u00033AQ!Y\u0004A\u0002\tDa!a\u0006\b\u0001\u0004!\u0018aF2p[BLG.\u001a3Qe>\u001cWm]:XSRDG)\u001a9t\u0011\u0015Yx\u00011\u0001}\u0003Q\u0019wN\u001c4jOV\u0014Xm\u00115fG.\u0004x.\u001b8ugR)Q,a\b\u0002\"!)\u0011\r\u0003a\u0001E\"9\u00111\u0005\u0005A\u0002\u0005\u0015\u0012AD:ue\u0016\fW.T3uC\u0012\u000bG/\u0019\t\u0005\u0003O\tY#\u0004\u0002\u0002*)\u0011q-E\u0005\u0005\u0003[\tIC\u0001\bTiJ,\u0017-\\'fi\u0006$\u0015\r^1\u00025\u0019d\u0017N\\6DY\u0006\u001c8\u000fT8bI\u0016\u00148+[7vY\u0006$\u0018n\u001c8\u0016\u0005\u0005M\u0002\u0003BA\u001b\u0003\u007fi!!a\u000e\u000b\t\u0005e\u00121H\u0001\u0005Y\u0006twM\u0003\u0002\u0002>\u0005!!.\u0019<b\u0013\u0011\t\t%a\u000e\u0003\u0017\rc\u0017m]:M_\u0006$WM]\u0001\u0011g&$WmT;uaV$x)\u001a;uKJ,B!a\u0012\u0002ZQ1\u0011\u0011JA6\u0003{\u0002b!a\u0013\u0002R\u0005USBAA'\u0015\r\tyEZ\u0001\u000bI\u0006$\u0018m\u001d;sK\u0006l\u0017\u0002BA*\u0003\u001b\u0012!\u0002R1uCN#(/Z1n!\u0011\t9&!\u0017\r\u0001\u00119\u00111\f\u0006C\u0002\u0005u#!\u0001+\u0012\t\u0005}\u0013Q\r\t\u00049\u0005\u0005\u0014bAA2;\t9aj\u001c;iS:<\u0007c\u0001\u000f\u0002h%\u0019\u0011\u0011N\u000f\u0003\u0007\u0005s\u0017\u0010C\u0004\u0002n)\u0001\r!a\u001c\u00025MLgn\u001a7f\u001fV$\b/\u001e;TiJ,\u0017-\\(qKJ\fGo\u001c:1\t\u0005E\u0014\u0011\u0010\t\u0007\u0003\u0017\n\u0019(a\u001e\n\t\u0005U\u0014Q\n\u0002\u001b'&tw\r\\3PkR\u0004X\u000f^*ue\u0016\fWn\u00149fe\u0006$xN\u001d\t\u0005\u0003/\nI\b\u0002\u0007\u0002|\u0005-\u0014\u0011!A\u0001\u0006\u0003\tiFA\u0002`IIBq!a \u000b\u0001\u0004\t\t)A\u0005pkR\u0004X\u000f\u001e+bOB1\u00111QAD\u0003+j!!!\"\u000b\u0005)S\u0017\u0002BAE\u0003\u000b\u0013\u0011bT;uaV$H+Y4\u0002\u0019]\u0014\u0018\r]%o\u0019\u0006l'\rZ1\u0016\t\u0005=\u00151\u0013\u000b\u0005\u0003#\u000b)\n\u0005\u0003\u0002X\u0005MEaBA.\u0017\t\u0007\u0011Q\f\u0005\b\u0003/[\u0001\u0019AAM\u0003\ry'M\u001b\t\u00069\u0005m\u0015\u0011S\u0005\u0004\u0003;k\"!\u0003$v]\u000e$\u0018n\u001c81\u0001")
/* loaded from: input_file:pl/touk/nussknacker/engine/process/registrar/DefaultStreamExecutionEnvPreparer.class */
public class DefaultStreamExecutionEnvPreparer implements StreamExecutionEnvPreparer, LazyLogging {
    private final Option<CheckpointConfig> checkpointConfig;
    private final Option<StateConfiguration.RocksDBStateBackendConfig> rocksDBStateBackendConfig;
    private final ExecutionConfigPreparer executionConfigPreparer;
    private transient Logger logger;
    private volatile transient boolean bitmap$trans$0;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [pl.touk.nussknacker.engine.process.registrar.DefaultStreamExecutionEnvPreparer] */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$trans$0) {
                this.logger = LazyLogging.logger$(this);
                r0 = this;
                r0.bitmap$trans$0 = true;
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return !this.bitmap$trans$0 ? logger$lzycompute() : this.logger;
    }

    @Override // pl.touk.nussknacker.engine.process.registrar.StreamExecutionEnvPreparer
    public void preRegistration(StreamExecutionEnvironment streamExecutionEnvironment, FlinkProcessCompilerData flinkProcessCompilerData, DeploymentData deploymentData) {
        this.executionConfigPreparer.prepareExecutionConfig(streamExecutionEnvironment.getConfig(), flinkProcessCompilerData.jobData(), deploymentData);
        StreamMetaData streamMetaData = (StreamMetaData) MetaDataExtractor$.MODULE$.extractTypeSpecificDataOrFail(flinkProcessCompilerData.metaData(), ClassTag$.MODULE$.apply(StreamMetaData.class));
        streamExecutionEnvironment.setRestartStrategy(flinkProcessCompilerData.restartStrategy());
        streamMetaData.parallelism().foreach(obj -> {
            return streamExecutionEnvironment.setParallelism(BoxesRunTime.unboxToInt(obj));
        });
        configureCheckpoints(streamExecutionEnvironment, streamMetaData);
        Tuple2 tuple2 = new Tuple2(this.rocksDBStateBackendConfig, streamMetaData.spillStateToDisk());
        if (tuple2 != null) {
            Some some = (Option) tuple2._1();
            Some some2 = (Option) tuple2._2();
            if (some instanceof Some) {
                StateConfiguration.RocksDBStateBackendConfig rocksDBStateBackendConfig = (StateConfiguration.RocksDBStateBackendConfig) some.value();
                if ((some2 instanceof Some) && true == BoxesRunTime.unboxToBoolean(some2.value()) && rocksDBStateBackendConfig.enable()) {
                    if (logger().underlying().isInfoEnabled()) {
                        logger().underlying().info("Using RocksDB state backend");
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    } else {
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    }
                    configureRocksDBBackend(streamExecutionEnvironment, rocksDBStateBackendConfig);
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                    return;
                }
            }
        }
        if (tuple2 != null) {
            Option option = (Option) tuple2._1();
            Some some3 = (Option) tuple2._2();
            if (None$.MODULE$.equals(option) && (some3 instanceof Some) && true == BoxesRunTime.unboxToBoolean(some3.value())) {
                if (!logger().underlying().isWarnEnabled()) {
                    BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                    return;
                } else {
                    logger().underlying().warn("RocksDB not configured, cannot use spillStateToDisk");
                    BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
                    return;
                }
            }
        }
        if (!logger().underlying().isInfoEnabled()) {
            BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
        } else {
            logger().underlying().info("Using default state backend configured by cluster");
            BoxedUnit boxedUnit7 = BoxedUnit.UNIT;
        }
    }

    public void configureRocksDBBackend(StreamExecutionEnvironment streamExecutionEnvironment, StateConfiguration.RocksDBStateBackendConfig rocksDBStateBackendConfig) {
        streamExecutionEnvironment.setStateBackend(StateConfiguration$.MODULE$.prepareRocksDBStateBackend(rocksDBStateBackendConfig));
    }

    @Override // pl.touk.nussknacker.engine.process.registrar.StreamExecutionEnvPreparer
    public void postRegistration(StreamExecutionEnvironment streamExecutionEnvironment, FlinkProcessCompilerData flinkProcessCompilerData, DeploymentData deploymentData) {
    }

    public void configureCheckpoints(StreamExecutionEnvironment streamExecutionEnvironment, StreamMetaData streamMetaData) {
        streamMetaData.checkpointIntervalDuration().orElse(() -> {
            return this.checkpointConfig.map(checkpointConfig -> {
                return checkpointConfig.checkpointInterval();
            });
        }).map(duration -> {
            return BoxesRunTime.boxToLong(duration.toMillis());
        }).foreach(j -> {
            streamExecutionEnvironment.enableCheckpointing(j);
            streamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(BoxesRunTime.unboxToLong(this.checkpointConfig.flatMap(checkpointConfig -> {
                return checkpointConfig.minPauseBetweenCheckpoints();
            }).map(finiteDuration -> {
                return BoxesRunTime.boxToLong(finiteDuration.toMillis());
            }).getOrElse(() -> {
                return j / 2;
            })));
            streamExecutionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(BoxesRunTime.unboxToInt(this.checkpointConfig.flatMap(checkpointConfig2 -> {
                return checkpointConfig2.maxConcurrentCheckpoints();
            }).getOrElse(() -> {
                return 1;
            })));
            Option flatMap = this.checkpointConfig.flatMap(checkpointConfig3 -> {
                return checkpointConfig3.tolerableCheckpointFailureNumber();
            });
            org.apache.flink.streaming.api.environment.CheckpointConfig checkpointConfig4 = streamExecutionEnvironment.getCheckpointConfig();
            flatMap.foreach(i -> {
                checkpointConfig4.setTolerableCheckpointFailureNumber(i);
            });
        });
    }

    @Override // pl.touk.nussknacker.engine.process.registrar.StreamExecutionEnvPreparer
    public ClassLoader flinkClassLoaderSimulation() {
        return (ClassLoader) wrapInLambda(() -> {
            return FlinkUserCodeClassLoaders.childFirst((URL[]) Array$.MODULE$.empty(ClassTag$.MODULE$.apply(URL.class)), Thread.currentThread().getContextClassLoader(), (String[]) Array$.MODULE$.empty(ClassTag$.MODULE$.apply(String.class)), th -> {
                throw th;
            }, true);
        });
    }

    @Override // pl.touk.nussknacker.engine.process.registrar.StreamExecutionEnvPreparer
    public <T> DataStream<T> sideOutputGetter(SingleOutputStreamOperator<?> singleOutputStreamOperator, OutputTag<T> outputTag) {
        return (DataStream) wrapInLambda(() -> {
            return singleOutputStreamOperator.getSideOutput(outputTag);
        });
    }

    private <T> T wrapInLambda(Function0<T> function0) {
        return (T) function0.apply();
    }

    public DefaultStreamExecutionEnvPreparer(Option<CheckpointConfig> option, Option<StateConfiguration.RocksDBStateBackendConfig> option2, ExecutionConfigPreparer executionConfigPreparer) {
        this.checkpointConfig = option;
        this.rocksDBStateBackendConfig = option2;
        this.executionConfigPreparer = executionConfigPreparer;
        LazyLogging.$init$(this);
    }
}
