package pl.touk.nussknacker.engine.process.registrar;

import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import java.net.URL;
import java.util.function.Consumer;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import pl.touk.nussknacker.engine.api.StreamMetaData;
import pl.touk.nussknacker.engine.deployment.DeploymentData;
import pl.touk.nussknacker.engine.process.CheckpointConfig;
import pl.touk.nussknacker.engine.process.ExecutionConfigPreparer;
import pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompilerData;
import pl.touk.nussknacker.engine.process.util.StateConfiguration;
import pl.touk.nussknacker.engine.process.util.StateConfiguration$;
import pl.touk.nussknacker.engine.util.MetaDataExtractor$;
import scala.Array$;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: StreamExecutionEnvPreparer.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005ub\u0001\u0002\u0006\f\u0001aA\u0001\"\f\u0001\u0003\u0002\u0003\u0006IA\f\u0005\tk\u0001\u0011\t\u0011)A\u0005m!Aq\n\u0001B\u0001B\u0003%\u0001\u000bC\u0003T\u0001\u0011\u0005A\u000bC\u0003Z\u0001\u0011\u0005#\f\u0003\u0004��\u0001\u0011E\u0011\u0011\u0001\u0005\b\u0003\u0013\u0001A\u0011IA\u0006\u0011\u001d\t)\u0002\u0001C\t\u0003/Aq!!\u000b\u0001\t\u0003\nYCA\u0011EK\u001a\fW\u000f\u001c;TiJ,\u0017-\\#yK\u000e,H/[8o\u000b:4\bK]3qCJ,'O\u0003\u0002\r\u001b\u0005I!/Z4jgR\u0014\u0018M\u001d\u0006\u0003\u001d=\tq\u0001\u001d:pG\u0016\u001c8O\u0003\u0002\u0011#\u00051QM\\4j]\u0016T!AE\n\u0002\u00179,8o]6oC\u000e\\WM\u001d\u0006\u0003)U\tA\u0001^8vW*\ta#\u0001\u0002qY\u000e\u00011\u0003\u0002\u0001\u001a?\r\u0002\"AG\u000f\u000e\u0003mQ\u0011\u0001H\u0001\u0006g\u000e\fG.Y\u0005\u0003=m\u0011a!\u00118z%\u00164\u0007C\u0001\u0011\"\u001b\u0005Y\u0011B\u0001\u0012\f\u0005i\u0019FO]3b[\u0016CXmY;uS>tWI\u001c<Qe\u0016\u0004\u0018M]3s!\t!3&D\u0001&\u0015\t1s%\u0001\u0007tG\u0006d\u0017\r\\8hO&twM\u0003\u0002)S\u0005AA/\u001f9fg\u00064WMC\u0001+\u0003\r\u0019w.\\\u0005\u0003Y\u0015\u00121\u0002T1{s2{wmZ5oO\u0006\u00012\r[3dWB|\u0017N\u001c;D_:4\u0017n\u001a\t\u00045=\n\u0014B\u0001\u0019\u001c\u0005\u0019y\u0005\u000f^5p]B\u0011!gM\u0007\u0002\u001b%\u0011A'\u0004\u0002\u0011\u0007\",7m\u001b9pS:$8i\u001c8gS\u001e\f\u0011D]8dWN$%i\u0015;bi\u0016\u0014\u0015mY6f]\u0012\u001cuN\u001c4jOB\u0019!dL\u001c\u0011\u0005abeBA\u001dJ\u001d\tQtI\u0004\u0002<\r:\u0011A(\u0012\b\u0003{\u0011s!AP\"\u000f\u0005}\u0012U\"\u0001!\u000b\u0005\u0005;\u0012A\u0002\u001fs_>$h(C\u0001\u0017\u0013\t!R#\u0003\u0002\u0013'%\u0011\u0001#E\u0005\u0003\u001d=I!\u0001S\u0007\u0002\tU$\u0018\u000e\\\u0005\u0003\u0015.\u000b!c\u0015;bi\u0016\u001cuN\u001c4jOV\u0014\u0018\r^5p]*\u0011\u0001*D\u0005\u0003\u001b:\u0013\u0011DU8dWN$%i\u0015;bi\u0016\u0014\u0015mY6f]\u0012\u001cuN\u001c4jO*\u0011!jS\u0001\u0018Kb,7-\u001e;j_:\u001cuN\u001c4jOB\u0013X\r]1sKJ\u0004\"AM)\n\u0005Ik!aF#yK\u000e,H/[8o\u0007>tg-[4Qe\u0016\u0004\u0018M]3s\u0003\u0019a\u0014N\\5u}Q!QKV,Y!\t\u0001\u0003\u0001C\u0003.\t\u0001\u0007a\u0006C\u00036\t\u0001\u0007a\u0007C\u0003P\t\u0001\u0007\u0001+A\bqe\u0016\u0014VmZ5tiJ\fG/[8o)\u0011Yfl\\<\u0011\u0005ia\u0016BA/\u001c\u0005\u0011)f.\u001b;\t\u000b}+\u0001\u0019\u00011\u0002\u0007\u0015tg\u000f\u0005\u0002b[6\t!M\u0003\u0002\u001dG*\u0011A-Z\u0001\u0004CBL'B\u00014h\u0003%\u0019HO]3b[&twM\u0003\u0002iS\u0006)a\r\\5oW*\u0011!n[\u0001\u0007CB\f7\r[3\u000b\u00031\f1a\u001c:h\u0013\tq'M\u0001\u000eTiJ,\u0017-\\#yK\u000e,H/[8o\u000b:4\u0018N]8o[\u0016tG\u000fC\u0003q\u000b\u0001\u0007\u0011/A\bqe>\u001cWm]:XSRDG)\u001a9t!\t\u0011X/D\u0001t\u0015\t!X\"\u0001\u0005d_6\u0004\u0018\u000e\\3s\u0013\t18O\u0001\rGY&t7\u000e\u0015:pG\u0016\u001c8oQ8na&dWM\u001d#bi\u0006DQ\u0001_\u0003A\u0002e\fa\u0002Z3qY>LX.\u001a8u\t\u0006$\u0018\r\u0005\u0002{{6\t1P\u0003\u0002}\u001f\u0005QA-\u001a9m_flWM\u001c;\n\u0005y\\(A\u0004#fa2|\u00170\\3oi\u0012\u000bG/Y\u0001\u0018G>tg-[4ve\u0016\u0014vnY6t\t\n\u0013\u0015mY6f]\u0012$RaWA\u0002\u0003\u000bAQa\u0018\u0004A\u0002\u0001Da!a\u0002\u0007\u0001\u00049\u0014AB2p]\u001aLw-\u0001\tq_N$(+Z4jgR\u0014\u0018\r^5p]R91,!\u0004\u0002\u0010\u0005M\u0001\"B0\b\u0001\u0004\u0001\u0007BBA\t\u000f\u0001\u0007\u0011/A\fd_6\u0004\u0018\u000e\\3e!J|7-Z:t/&$\b\u000eR3qg\")\u0001p\u0002a\u0001s\u0006!2m\u001c8gS\u001e,(/Z\"iK\u000e\\\u0007o\\5oiN$RaWA\r\u00037AQa\u0018\u0005A\u0002\u0001Dq!!\b\t\u0001\u0004\ty\"\u0001\btiJ,\u0017-\\'fi\u0006$\u0015\r^1\u0011\t\u0005\u0005\u0012QE\u0007\u0003\u0003GQ!\u0001Z\b\n\t\u0005\u001d\u00121\u0005\u0002\u000f'R\u0014X-Y7NKR\fG)\u0019;b\u0003i1G.\u001b8l\u00072\f7o\u001d'pC\u0012,'oU5nk2\fG/[8o+\t\ti\u0003\u0005\u0003\u00020\u0005eRBAA\u0019\u0015\u0011\t\u0019$!\u000e\u0002\t1\fgn\u001a\u0006\u0003\u0003o\tAA[1wC&!\u00111HA\u0019\u0005-\u0019E.Y:t\u0019>\fG-\u001a:")
/* loaded from: input_file:pl/touk/nussknacker/engine/process/registrar/DefaultStreamExecutionEnvPreparer.class */
public class DefaultStreamExecutionEnvPreparer implements StreamExecutionEnvPreparer, LazyLogging {
    private final Option<CheckpointConfig> checkpointConfig;
    private final Option<StateConfiguration.RocksDBStateBackendConfig> rocksDBStateBackendConfig;
    private final ExecutionConfigPreparer executionConfigPreparer;
    private transient Logger logger;
    private volatile transient boolean bitmap$trans$0;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [pl.touk.nussknacker.engine.process.registrar.DefaultStreamExecutionEnvPreparer] */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$trans$0) {
                this.logger = LazyLogging.logger$(this);
                r0 = this;
                r0.bitmap$trans$0 = true;
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return !this.bitmap$trans$0 ? logger$lzycompute() : this.logger;
    }

    @Override // pl.touk.nussknacker.engine.process.registrar.StreamExecutionEnvPreparer
    public void preRegistration(StreamExecutionEnvironment streamExecutionEnvironment, FlinkProcessCompilerData flinkProcessCompilerData, DeploymentData deploymentData) {
        BoxedUnit boxedUnit;
        BoxedUnit boxedUnit2;
        this.executionConfigPreparer.prepareExecutionConfig(streamExecutionEnvironment.getConfig(), flinkProcessCompilerData.jobData(), deploymentData);
        StreamMetaData streamMetaData = (StreamMetaData) MetaDataExtractor$.MODULE$.extractTypeSpecificDataOrFail(flinkProcessCompilerData.metaData(), ClassTag$.MODULE$.apply(StreamMetaData.class));
        streamExecutionEnvironment.setRestartStrategy(flinkProcessCompilerData.restartStrategy());
        streamMetaData.parallelism().foreach(i -> {
            streamExecutionEnvironment.setParallelism(i);
        });
        configureCheckpoints(streamExecutionEnvironment, streamMetaData);
        Tuple2 tuple2 = new Tuple2(this.rocksDBStateBackendConfig, streamMetaData.spillStateToDisk());
        if (tuple2 != null) {
            Some some = (Option) tuple2._1();
            Some some2 = (Option) tuple2._2();
            if (some instanceof Some) {
                StateConfiguration.RocksDBStateBackendConfig rocksDBStateBackendConfig = (StateConfiguration.RocksDBStateBackendConfig) some.value();
                if ((some2 instanceof Some) && true == BoxesRunTime.unboxToBoolean(some2.value()) && rocksDBStateBackendConfig.enable()) {
                    if (logger().underlying().isInfoEnabled()) {
                        logger().underlying().info("Using RocksDB state backend");
                        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                    } else {
                        BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                    }
                    configureRocksDBBackend(streamExecutionEnvironment, rocksDBStateBackendConfig);
                    BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
                    return;
                }
            }
        }
        if (tuple2 != null) {
            Option option = (Option) tuple2._1();
            Some some3 = (Option) tuple2._2();
            if (None$.MODULE$.equals(option) && (some3 instanceof Some) && true == BoxesRunTime.unboxToBoolean(some3.value())) {
                if (logger().underlying().isWarnEnabled()) {
                    logger().underlying().warn("RocksDB not configured, cannot use spillStateToDisk");
                    boxedUnit2 = BoxedUnit.UNIT;
                } else {
                    boxedUnit2 = BoxedUnit.UNIT;
                }
                return;
            }
        }
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info("Using default state backend configured by cluster");
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    public void configureRocksDBBackend(StreamExecutionEnvironment streamExecutionEnvironment, StateConfiguration.RocksDBStateBackendConfig rocksDBStateBackendConfig) {
        streamExecutionEnvironment.setStateBackend(StateConfiguration$.MODULE$.prepareRocksDBStateBackend(rocksDBStateBackendConfig));
    }

    @Override // pl.touk.nussknacker.engine.process.registrar.StreamExecutionEnvPreparer
    public void postRegistration(StreamExecutionEnvironment streamExecutionEnvironment, FlinkProcessCompilerData flinkProcessCompilerData, DeploymentData deploymentData) {
    }

    public void configureCheckpoints(StreamExecutionEnvironment streamExecutionEnvironment, StreamMetaData streamMetaData) {
        streamMetaData.checkpointIntervalDuration().orElse(() -> {
            return this.checkpointConfig.map(checkpointConfig -> {
                return checkpointConfig.checkpointInterval();
            });
        }).map(duration -> {
            return BoxesRunTime.boxToLong(duration.toMillis());
        }).foreach(j -> {
            streamExecutionEnvironment.enableCheckpointing(j);
            streamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(BoxesRunTime.unboxToLong(this.checkpointConfig.flatMap(checkpointConfig -> {
                return checkpointConfig.minPauseBetweenCheckpoints();
            }).map(finiteDuration -> {
                return BoxesRunTime.boxToLong(finiteDuration.toMillis());
            }).getOrElse(() -> {
                return j / 2;
            })));
            streamExecutionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(BoxesRunTime.unboxToInt(this.checkpointConfig.flatMap(checkpointConfig2 -> {
                return checkpointConfig2.maxConcurrentCheckpoints();
            }).getOrElse(() -> {
                return 1;
            })));
            Option flatMap = this.checkpointConfig.flatMap(checkpointConfig3 -> {
                return checkpointConfig3.tolerableCheckpointFailureNumber();
            });
            org.apache.flink.streaming.api.environment.CheckpointConfig checkpointConfig4 = streamExecutionEnvironment.getCheckpointConfig();
            flatMap.foreach(i -> {
                checkpointConfig4.setTolerableCheckpointFailureNumber(i);
            });
        });
    }

    @Override // pl.touk.nussknacker.engine.process.registrar.StreamExecutionEnvPreparer
    public ClassLoader flinkClassLoaderSimulation() {
        final DefaultStreamExecutionEnvPreparer defaultStreamExecutionEnvPreparer = null;
        return FlinkUserCodeClassLoaders.childFirst((URL[]) Array$.MODULE$.empty(ClassTag$.MODULE$.apply(URL.class)), Thread.currentThread().getContextClassLoader(), (String[]) Array$.MODULE$.empty(ClassTag$.MODULE$.apply(String.class)), new Consumer<Throwable>(defaultStreamExecutionEnvPreparer) { // from class: pl.touk.nussknacker.engine.process.registrar.DefaultStreamExecutionEnvPreparer$$anon$1
            @Override // java.util.function.Consumer
            public Consumer<Throwable> andThen(Consumer<? super Throwable> consumer) {
                return super.andThen(consumer);
            }

            @Override // java.util.function.Consumer
            public void accept(Throwable th) {
                throw th;
            }
        }, true);
    }

    public DefaultStreamExecutionEnvPreparer(Option<CheckpointConfig> option, Option<StateConfiguration.RocksDBStateBackendConfig> option2, ExecutionConfigPreparer executionConfigPreparer) {
        this.checkpointConfig = option;
        this.rocksDBStateBackendConfig = option2;
        this.executionConfigPreparer = executionConfigPreparer;
        LazyLogging.$init$(this);
    }
}
