/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.util;

import java.io.EOFException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.util.BatchedWriteAheadLog;
import org.apache.spark.streaming.util.FileBasedWriteAheadLog;
import org.apache.spark.streaming.util.FileBasedWriteAheadLogReader;
import org.apache.spark.streaming.util.FileBasedWriteAheadLogSegment;
import org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter;
import org.apache.spark.streaming.util.HdfsUtils$;
import org.apache.spark.streaming.util.WriteAheadLog;
import org.apache.spark.util.ManualClock;
import org.apache.spark.util.Utils$;
import scala.Function1;
import scala.Predef$;
import scala.collection.ArrayOps$;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.StringOps$;
import scala.collection.immutable.List;
import scala.collection.immutable.Seq;
import scala.collection.mutable.ArrayBuffer;
import scala.math.Ordering;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;

public final class WriteAheadLogSuite$ {
    public static final WriteAheadLogSuite$ MODULE$ = new WriteAheadLogSuite$();
    private static final Configuration hadoopConf = new Configuration();

    private Configuration hadoopConf() {
        return hadoopConf;
    }

    public Seq<FileBasedWriteAheadLogSegment> writeDataManually(Seq<String> data, String file, boolean allowBatching) {
        ArrayBuffer segments = new ArrayBuffer();
        FSDataOutputStream writer = HdfsUtils$.MODULE$.getOutputStream(file, this.hadoopConf());
        if (allowBatching) {
            WriteAheadLogSuite$.writeToStream$1(this.wrapArrayArrayByte(data.toArray(ClassTag$.MODULE$.apply(String.class))).array(), writer, segments, file);
        } else {
            data.foreach((Function1 & Serializable)item -> {
                WriteAheadLogSuite$.writeToStream$1(Utils$.MODULE$.serialize((Object)item), writer, segments, file);
                return BoxedUnit.UNIT;
            });
        }
        writer.close();
        return segments.toSeq();
    }

    public Seq<FileBasedWriteAheadLogSegment> writeDataUsingWriter(String filePath, Seq<String> data) {
        FileBasedWriteAheadLogWriter writer = new FileBasedWriteAheadLogWriter(filePath, this.hadoopConf());
        Seq segments = (Seq)data.map((Function1 & Serializable)item -> writer.write(MODULE$.stringToByteBuffer((String)item)));
        writer.close();
        return segments;
    }

    public WriteAheadLog writeDataUsingWriteAheadLog(String logDirectory, Seq<String> data, boolean closeFileAfterWrite, boolean allowBatching, ManualClock manualClock, boolean closeLog, int clockAdvanceTime) {
        WriteAheadLog wal;
        block1: {
            if (manualClock.getTimeMillis() < 100000L) {
                manualClock.setTime(10000L);
            }
            wal = this.createWriteAheadLog(logDirectory, closeFileAfterWrite, allowBatching);
            data.foreach((Function1 & Serializable)item -> {
                manualClock.advance((long)clockAdvanceTime);
                return wal.write(MODULE$.stringToByteBuffer((String)item), manualClock.getTimeMillis());
            });
            if (!closeLog) break block1;
            wal.close();
        }
        return wal;
    }

    public ManualClock writeDataUsingWriteAheadLog$default$5() {
        return new ManualClock();
    }

    public boolean writeDataUsingWriteAheadLog$default$6() {
        return true;
    }

    public int writeDataUsingWriteAheadLog$default$7() {
        return 500;
    }

    public Seq<String> readDataManually(Seq<FileBasedWriteAheadLogSegment> segments) {
        return (Seq)segments.map((Function1 & Serializable)segment -> {
            void v0;
            try (FSDataInputStream reader = HdfsUtils$.MODULE$.getInputStream(segment.path(), MODULE$.hadoopConf());){
                void var3_3;
                reader.seek(segment.offset());
                byte[] bytes = new byte[segment.length()];
                reader.readInt();
                reader.readFully(bytes);
                String data = (String)Utils$.MODULE$.deserialize(bytes);
                reader.close();
                v0 = var3_3;
            }
            return v0;
        });
    }

    public <T> Seq<T> readDataManually(String file) {
        ArrayBuffer buffer = new ArrayBuffer();
        try (FSDataInputStream reader = HdfsUtils$.MODULE$.getInputStream(file, this.hadoopConf());){
            try {
                while (true) {
                    int length = reader.readInt();
                    byte[] bytes = new byte[length];
                    reader.read(bytes);
                    buffer.$plus$eq(Utils$.MODULE$.deserialize(bytes));
                }
            }
            catch (EOFException ex) {
            }
        }
        return buffer.toSeq();
    }

    /*
     * WARNING - void declaration
     */
    public Seq<String> readDataUsingReader(String file) {
        void var3_3;
        FileBasedWriteAheadLogReader reader = new FileBasedWriteAheadLogReader(file, this.hadoopConf());
        List readData = reader.toList().map((Function1 & Serializable)byteBuffer -> MODULE$.byteBufferToString((ByteBuffer)byteBuffer));
        reader.close();
        return var3_3;
    }

    public Seq<String> readDataUsingWriteAheadLog(String logDirectory, boolean closeFileAfterWrite, boolean allowBatching) {
        WriteAheadLog wal = this.createWriteAheadLog(logDirectory, closeFileAfterWrite, allowBatching);
        String[] data = (String[])((Iterator)JavaConverters$.MODULE$.asScalaIteratorConverter(wal.readAll()).asScala()).map((Function1 & Serializable)byteBuffer -> MODULE$.byteBufferToString((ByteBuffer)byteBuffer)).toArray(ClassTag$.MODULE$.apply(String.class));
        wal.close();
        return Predef$.MODULE$.copyArrayToImmutableIndexedSeq((Object)data);
    }

    public Seq<String> getLogFilesInDirectory(String directory) {
        Path logDirectoryPath = new Path(directory);
        FileSystem fileSystem = HdfsUtils$.MODULE$.getFileSystemForPath(logDirectoryPath, this.hadoopConf());
        return fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDirectory() ? Predef$.MODULE$.copyArrayToImmutableIndexedSeq(ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps((Object[])ArrayOps$.MODULE$.sortBy$extension(Predef$.MODULE$.refArrayOps((Object[])ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps((Object[])fileSystem.listStatus(logDirectoryPath)), (Function1 & Serializable)x$8 -> x$8.getPath(), ClassTag$.MODULE$.apply(Path.class))), (Function1 & Serializable)x$9 -> BoxesRunTime.boxToLong((long)StringOps$.MODULE$.toLong$extension(Predef$.MODULE$.augmentString(x$9.getName().split("-")[1]))), (Ordering)Ordering.Long$.MODULE$)), (Function1 & Serializable)x$10 -> StringOps$.MODULE$.stripPrefix$extension(Predef$.MODULE$.augmentString(x$10.toString()), "file:"), ClassTag$.MODULE$.apply(String.class))) : (Seq)package$.MODULE$.Seq().empty();
    }

    public WriteAheadLog createWriteAheadLog(String logDirectory, boolean closeFileAfterWrite, boolean allowBatching) {
        SparkConf sparkConf = new SparkConf();
        FileBasedWriteAheadLog wal = new FileBasedWriteAheadLog(sparkConf, logDirectory, this.hadoopConf(), 1, 1, closeFileAfterWrite);
        return allowBatching ? new BatchedWriteAheadLog((WriteAheadLog)wal, sparkConf) : wal;
    }

    public Seq<String> generateRandomData() {
        return RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 100).map((Function1 & Serializable)x$11 -> WriteAheadLogSuite$.$anonfun$generateRandomData$1(BoxesRunTime.unboxToInt((Object)x$11)));
    }

    public Seq<String> readAndDeserializeDataManually(Seq<String> logFiles, boolean allowBatching) {
        return allowBatching ? (Seq)logFiles.flatMap((Function1 & Serializable)file -> {
            Seq data = MODULE$.readDataManually((String)file);
            return (Seq)data.flatMap((Function1 & Serializable)byteArray -> Predef$.MODULE$.wrapRefArray((Object[])ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps((Object[])byteArray), (Function1 & Serializable)bytes -> (String)Utils$.MODULE$.deserialize(bytes), ClassTag$.MODULE$.apply(String.class))));
        }) : (Seq)logFiles.flatMap((Function1 & Serializable)file -> MODULE$.readDataManually((String)file));
    }

    public ByteBuffer stringToByteBuffer(String str) {
        return ByteBuffer.wrap(Utils$.MODULE$.serialize((Object)str));
    }

    public String byteBufferToString(ByteBuffer byteBuffer) {
        return (String)Utils$.MODULE$.deserialize(byteBuffer.array());
    }

    public <T> ByteBuffer wrapArrayArrayByte(Object records) {
        return ByteBuffer.wrap(Utils$.MODULE$.serialize(ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.genericArrayOps(records), (Function1 & Serializable)o -> Utils$.MODULE$.serialize(o), ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Byte.TYPE)))));
    }

    private static final void writeToStream$1(byte[] bytes, FSDataOutputStream writer$2, ArrayBuffer segments$1, String file$1) {
        long offset = writer$2.getPos();
        writer$2.writeInt(ArrayOps$.MODULE$.size$extension(Predef$.MODULE$.byteArrayOps(bytes)));
        writer$2.write(bytes);
        segments$1.$plus$eq((Object)new FileBasedWriteAheadLogSegment(file$1, offset, ArrayOps$.MODULE$.size$extension(Predef$.MODULE$.byteArrayOps(bytes))));
    }

    public static final /* synthetic */ String $anonfun$generateRandomData$1(int x$11) {
        return Integer.toString(x$11);
    }

    private WriteAheadLogSuite$() {
    }
}

