/*
 * Decompiled with CFR 0.152.
 */
package bio.ferlab.datalake.spark3.etl;

import bio.ferlab.datalake.commons.config.Configuration;
import bio.ferlab.datalake.commons.config.DatasetConf;
import bio.ferlab.datalake.commons.file.File;
import bio.ferlab.datalake.commons.file.FileSystem;
import bio.ferlab.datalake.spark3.etl.RawToNormalizedETL;
import bio.ferlab.datalake.spark3.file.FileSystemResolver$;
import bio.ferlab.datalake.spark3.transformation.Transformation;
import bio.ferlab.datalake.spark3.transformation.Transformation$;
import java.io.Serializable;
import java.time.LocalDateTime;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.java8.JFunction0;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;

@ScalaSignature(bytes="\u0006\u0001\u0005\rc\u0001\u0002\t\u0012\u0001qA\u0001\"\t\u0001\u0003\u0006\u0004%\tE\t\u0005\nW\u0001\u0011\t\u0011)A\u0005G1B\u0001\"\f\u0001\u0003\u0006\u0004%\tE\t\u0005\n]\u0001\u0011\t\u0011)A\u0005G=B\u0001\u0002\r\u0001\u0003\u0006\u0004%\t%\r\u0005\n\r\u0002\u0011\t\u0011)A\u0005e\u001dC\u0001\u0002\u0013\u0001\u0003\u0006\u0004%\u0019%\u0013\u0005\t\u001b\u0002\u0011\t\u0011)A\u0005\u0015\")a\n\u0001C\u0001\u001f\"9a\u000b\u0001a\u0001\n\u00139\u0006bB1\u0001\u0001\u0004%IA\u0019\u0005\u0007S\u0002\u0001\u000b\u0015\u0002-\t\u000b)\u0004A\u0011I6\t\u000f\u0005M\u0002\u0001\"\u0011\u00026!9\u00111\b\u0001\u0005B\u0005u\"A\u0006*bo\u001aKG.\u001a+p\u001d>\u0014X.\u00197ju\u0016$W\t\u0016'\u000b\u0005I\u0019\u0012aA3uY*\u0011A#F\u0001\u0007gB\f'o[\u001a\u000b\u0005Y9\u0012\u0001\u00033bi\u0006d\u0017m[3\u000b\u0005aI\u0012A\u00024fe2\f'MC\u0001\u001b\u0003\r\u0011\u0017n\\\u0002\u0001'\t\u0001Q\u0004\u0005\u0002\u001f?5\t\u0011#\u0003\u0002!#\t\u0011\"+Y<U_:{'/\\1mSj,G-\u0012+M\u0003\u0019\u0019x.\u001e:dKV\t1\u0005\u0005\u0002%S5\tQE\u0003\u0002'O\u000511m\u001c8gS\u001eT!\u0001K\u000b\u0002\u000f\r|W.\\8og&\u0011!&\n\u0002\f\t\u0006$\u0018m]3u\u0007>tg-A\u0004t_V\u00148-\u001a\u0011\n\u0005\u0005z\u0012a\u00033fgRLg.\u0019;j_:\fA\u0002Z3ti&t\u0017\r^5p]\u0002J!!L\u0010\u0002\u001fQ\u0014\u0018M\\:g_Jl\u0017\r^5p]N,\u0012A\r\t\u0004gu\u0002eB\u0001\u001b;\u001d\t)\u0004(D\u00017\u0015\t94$\u0001\u0004=e>|GOP\u0005\u0002s\u0005)1oY1mC&\u00111\bP\u0001\ba\u0006\u001c7.Y4f\u0015\u0005I\u0014B\u0001 @\u0005\u0011a\u0015n\u001d;\u000b\u0005mb\u0004CA!E\u001b\u0005\u0011%BA\"\u0014\u00039!(/\u00198tM>\u0014X.\u0019;j_:L!!\u0012\"\u0003\u001dQ\u0013\u0018M\\:g_Jl\u0017\r^5p]\u0006\u0001BO]1og\u001a|'/\\1uS>t7\u000fI\u0005\u0003a}\tAaY8oMV\t!\n\u0005\u0002%\u0017&\u0011A*\n\u0002\u000e\u0007>tg-[4ve\u0006$\u0018n\u001c8\u0002\u000b\r|gN\u001a\u0011\u0002\rqJg.\u001b;?)\u0011\u00016\u000bV+\u0015\u0005E\u0013\u0006C\u0001\u0010\u0001\u0011\u0015A\u0015\u0002q\u0001K\u0011\u0015\t\u0013\u00021\u0001$\u0011\u0015i\u0013\u00021\u0001$\u0011\u0015\u0001\u0014\u00021\u00013\u00039\u0001(o\\2fgN,GMR5mKN,\u0012\u0001\u0017\t\u0004guJ\u0006C\u0001._\u001d\tYF\f\u0005\u00026y%\u0011Q\fP\u0001\u0007!J,G-\u001a4\n\u0005}\u0003'AB*ue&twM\u0003\u0002^y\u0005\u0011\u0002O]8dKN\u001cX\r\u001a$jY\u0016\u001cx\fJ3r)\t\u0019w\r\u0005\u0002eK6\tA(\u0003\u0002gy\t!QK\\5u\u0011\u001dA7\"!AA\u0002a\u000b1\u0001\u001f\u00132\u0003=\u0001(o\\2fgN,GMR5mKN\u0004\u0013!\u0003;sC:\u001chm\u001c:n)\u001da\u0017\u0011CA\u000e\u0003_!2!\\A\u0004!\rq\u0017\u0011\u0001\b\u0003_zt!\u0001]>\u000f\u0005EDhB\u0001:v\u001d\t)4/C\u0001u\u0003\ry'oZ\u0005\u0003m^\fa!\u00199bG\",'\"\u0001;\n\u0005eT\u0018!B:qCJ\\'B\u0001<x\u0013\taX0A\u0002tc2T!!\u001f>\n\u0005mz(B\u0001?~\u0013\u0011\t\u0019!!\u0002\u0003\u0013\u0011\u000bG/\u0019$sC6,'BA\u001e\u0000\u0011\u0019IX\u0002q\u0001\u0002\nA!\u00111BA\u0007\u001b\u0005y\u0018bAA\b\u007f\na1\u000b]1sWN+7o]5p]\"9\u00111C\u0007A\u0002\u0005U\u0011\u0001\u00023bi\u0006\u0004RAWA\f36L1!!\u0007a\u0005\ri\u0015\r\u001d\u0005\n\u0003;i\u0001\u0013!a\u0001\u0003?\tq\u0002\\1tiJ+h\u000eR1uKRKW.\u001a\t\u0005\u0003C\tY#\u0004\u0002\u0002$)!\u0011QEA\u0014\u0003\u0011!\u0018.\\3\u000b\u0005\u0005%\u0012\u0001\u00026bm\u0006LA!!\f\u0002$\tiAj\\2bY\u0012\u000bG/\u001a+j[\u0016D\u0011\"!\r\u000e!\u0003\u0005\r!a\b\u0002%\r,(O]3oiJ+h\u000eR1uKRKW.Z\u0001\baV\u0014G.[:i)\t\t9\u0004F\u0002d\u0003sAa!\u001f\bA\u0004\u0005%\u0011!\u0002:fg\u0016$HCAA )\r\u0019\u0017\u0011\t\u0005\u0007s>\u0001\u001d!!\u0003")
public class RawFileToNormalizedETL
extends RawToNormalizedETL {
    private final Configuration conf;
    private List<String> processedFiles;

    @Override
    public DatasetConf source() {
        return super.source();
    }

    @Override
    public DatasetConf destination() {
        return super.destination();
    }

    @Override
    public List<Transformation> transformations() {
        return super.transformations();
    }

    @Override
    public Configuration conf() {
        return this.conf;
    }

    private List<String> processedFiles() {
        return this.processedFiles;
    }

    private void processedFiles_$eq(List<String> x$1) {
        this.processedFiles = x$1;
    }

    @Override
    public Dataset<Row> transform(Map<String, Dataset<Row>> data, LocalDateTime lastRunDateTime, LocalDateTime currentRunDateTime, SparkSession spark) {
        this.log().info(new StringBuilder(18).append("transforming: ").append(this.source().id()).append(" to ").append(this.destination().id()).toString());
        this.processedFiles_$eq((List<String>)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])((Dataset)data.apply((Object)this.source().id())).withColumn("files", functions$.MODULE$.input_file_name()).select("files", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).as(spark.implicits().newStringEncoder()).collect())).distinct())).toList());
        Dataset finalDf = Transformation$.MODULE$.applyTransformations((Dataset<Row>)((Dataset)data.apply((Object)this.source().id())), this.transformations()).persist();
        this.log().info(new StringBuilder(12).append("unique ids: ").append(finalDf.dropDuplicates((Seq)this.destination().keys()).count()).toString());
        this.log().info(new StringBuilder(6).append("rows: ").append(finalDf.count()).toString());
        return finalDf;
    }

    @Override
    public void publish(SparkSession spark) {
        this.log().info(new StringBuilder(15).append("moving files: \n").append(this.processedFiles().mkString("\n")).toString());
        List<String> files = this.processedFiles();
        Try try_ = Try$.MODULE$.apply((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            files.foreach((Function1 & Serializable & scala.Serializable)file -> {
                RawFileToNormalizedETL.$anonfun$publish$2(this, file);
                return BoxedUnit.UNIT;
            });
            this.processedFiles_$eq((List<String>)List$.MODULE$.empty());
        });
        if (try_ instanceof Success) {
            this.log().info("SUCCESS");
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if (try_ instanceof Failure) {
            Failure failure = (Failure)try_;
            Throwable exception = failure.exception();
            this.log().error(new StringBuilder(9).append("FAILURE: ").append(exception.getLocalizedMessage()).toString());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            throw new MatchError((Object)try_);
        }
    }

    @Override
    public void reset(SparkSession spark) {
        FileSystem fs = (FileSystem)FileSystemResolver$.MODULE$.resolve().apply((Object)this.conf().getStorage(this.source().storageid()).filesystem());
        List files = fs.list(this.source().location(this.conf()).replace("landing", "archive"), true);
        files.foreach((Function1 & Serializable & scala.Serializable)f -> {
            RawFileToNormalizedETL.$anonfun$reset$1(this, fs, f);
            return BoxedUnit.UNIT;
        });
        super.reset(spark);
    }

    public static final /* synthetic */ void $anonfun$publish$2(RawFileToNormalizedETL $this, String file) {
        ((FileSystem)FileSystemResolver$.MODULE$.resolve().apply((Object)$this.conf().getStorage($this.source().storageid()).filesystem())).move(file, file.replace("landing", "archive"), true);
    }

    public static final /* synthetic */ void $anonfun$reset$1(RawFileToNormalizedETL $this, FileSystem fs$1, File f) {
        $this.log().info(new StringBuilder(11).append("Moving ").append(f.path()).append(" to ").append(f.path().replace("archive", "landing")).toString());
        fs$1.move(f.path(), f.path().replace("archive", "landing"), true);
    }

    public RawFileToNormalizedETL(DatasetConf source, DatasetConf destination, List<Transformation> transformations, Configuration conf) {
        this.conf = conf;
        super(source, destination, transformations, conf);
        this.processedFiles = Nil$.MODULE$;
    }
}

