package kafka4m.io;

import com.typesafe.config.Config;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import kafka4m.io.FileSource;
import monix.eval.Task;
import monix.eval.Task$;
import monix.reactive.Observable;
import monix.reactive.Observable$;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.LinearSeqOps;
import scala.collection.StringOps$;
import scala.collection.immutable.List;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.runtime.BoxesRunTime;
import scala.util.matching.Regex;

/* compiled from: FileSource.scala */
/* loaded from: input_file:kafka4m/io/FileSource$.class */
public final class FileSource$ {
    public static final FileSource$ MODULE$ = new FileSource$();

    public Observable<Tuple2<String, byte[]>> apply(Config config) {
        return keysAndData(FileSource$EtlConfig$.MODULE$.apply(config));
    }

    public Observable<Tuple2<String, byte[]>> keysAndData(FileSource.EtlConfig etlConfig) {
        Observable<Tuple2<String, byte[]>> unlimited = unlimited(etlConfig);
        Observable observable = (Observable) etlConfig.limit().fold(() -> {
            return unlimited;
        }, obj -> {
            return unlimited.take(BoxesRunTime.unboxToLong(obj));
        });
        return (Observable) etlConfig.rateLimitPerSecond().fold(() -> {
            return observable;
        }, obj2 -> {
            return $anonfun$keysAndData$4(observable, BoxesRunTime.unboxToInt(obj2));
        });
    }

    private Observable<Tuple2<String, byte[]>> unlimited(FileSource.EtlConfig etlConfig) {
        Path path = Paths.get(etlConfig.dataDir(), new String[0]);
        if (!etlConfig.repeat()) {
            return all$1(etlConfig, path);
        }
        Regex r$extension = StringOps$.MODULE$.r$extension(Predef$.MODULE$.augmentString("(.*?)\\.(.*)"));
        return all$1(etlConfig, path).zipWithIndex().map(tuple2 -> {
            Tuple2 tuple2;
            String sb;
            if (tuple2 != null) {
                Tuple2 tuple22 = (Tuple2) tuple2._1();
                long _2$mcJ$sp = tuple2._2$mcJ$sp();
                if (tuple22 != null) {
                    String str = (String) tuple22._1();
                    byte[] bArr = (byte[]) tuple22._2();
                    if (etlConfig.fileNamesAsKeys()) {
                        if (str != null) {
                            Option unapplySeq = r$extension.unapplySeq(str);
                            if (!unapplySeq.isEmpty() && unapplySeq.get() != null && ((List) unapplySeq.get()).lengthCompare(2) == 0) {
                                String str2 = (String) ((LinearSeqOps) unapplySeq.get()).apply(0);
                                sb = new StringBuilder(2).append(str2).append("-").append(_2$mcJ$sp).append(".").append((String) ((LinearSeqOps) unapplySeq.get()).apply(1)).toString();
                                tuple2 = new Tuple2(sb, bArr);
                                return tuple2;
                            }
                        }
                        sb = new StringBuilder(1).append(str).append("-").append(_2$mcJ$sp).toString();
                        tuple2 = new Tuple2(sb, bArr);
                        return tuple2;
                    }
                }
            }
            if (tuple2 != null) {
                Tuple2 tuple23 = (Tuple2) tuple2._1();
                long _2$mcJ$sp2 = tuple2._2$mcJ$sp();
                if (tuple23 != null) {
                    tuple2 = new Tuple2(BoxesRunTime.boxToLong(_2$mcJ$sp2).toString(), (byte[]) tuple23._2());
                    return tuple2;
                }
            }
            throw new MatchError(tuple2);
        });
    }

    public List<Tuple2<String, byte[]>> cacheDirContents(Path path) {
        return listChildren(path).toList().map(path2 -> {
            return new Tuple2(path2.getFileName().toString(), Files.readAllBytes(path2));
        });
    }

    public Observable<Path> listChildrenObservable(Path path, boolean z) {
        Task delay = Task$.MODULE$.delay(() -> {
            return MODULE$.listChildren(path);
        });
        return z ? Observable$.MODULE$.repeatEval(() -> {
            return delay;
        }).flatMap(task -> {
            return Observable$.MODULE$.fromIterator(task);
        }) : Observable$.MODULE$.fromIterator(delay);
    }

    public Iterator<Path> listChildren(Path path) {
        return ((Iterator) JavaConverters$.MODULE$.asScalaIteratorConverter(Files.walk(path, new FileVisitOption[0]).iterator()).asScala()).filter(path2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$listChildren$1(path2));
        });
    }

    public static final /* synthetic */ Observable $anonfun$keysAndData$4(Observable observable, int i) {
        return observable.bufferTimedWithPressure(new package.DurationInt(package$.MODULE$.DurationInt(1)).second().$div(i), 1, observable.bufferTimedWithPressure$default$3()).flatMap(seq -> {
            return Observable$.MODULE$.fromIterable(seq);
        });
    }

    private final Observable all$1(FileSource.EtlConfig etlConfig, Path path) {
        if (!etlConfig.cache()) {
            return listChildrenObservable(path, etlConfig.repeat()).map(path2 -> {
                return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(path2.getFileName().toString()), Files.readAllBytes(path2));
            });
        }
        List<Tuple2<String, byte[]>> cacheDirContents = cacheDirContents(path);
        return etlConfig.repeat() ? Observable$.MODULE$.fromIterable(cacheDirContents).repeat() : Observable$.MODULE$.fromIterable(cacheDirContents);
    }

    public static final /* synthetic */ boolean $anonfun$listChildren$1(Path path) {
        return Files.isRegularFile(path, new LinkOption[0]);
    }

    private FileSource$() {
    }
}
