package org.apache.spark.streaming;

import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.internal.Logging;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.dstream.DStream$;
import org.scalactic.Bool$;
import org.scalactic.Prettifier$;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import org.scalatest.compatible.Assertion;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.ArrayOps$;
import scala.collection.IndexedSeqOps;
import scala.collection.Iterable;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.StringOps$;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Range;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Set;
import scala.collection.mutable.ArrayBuffer;
import scala.math.Numeric$LongIsIntegral$;
import scala.math.Ordering$String$;
import scala.package$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.runtime.BooleanRef;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong;
import scala.util.Random$;

/* compiled from: MasterFailureTest.scala */
/* loaded from: input_file:org/apache/spark/streaming/MasterFailureTest$.class */
public final class MasterFailureTest$ implements Logging {
    public static final MasterFailureTest$ MODULE$ = new MasterFailureTest$();
    private static volatile boolean killed;
    private static volatile int killCount;
    private static volatile boolean setupCalled;
    private static transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        Logging.$init$(MODULE$);
        killed = false;
        killCount = 0;
        setupCalled = false;
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        org$apache$spark$internal$Logging$$log_ = logger;
    }

    public boolean killed() {
        return killed;
    }

    public void killed_$eq(boolean z) {
        killed = z;
    }

    public int killCount() {
        return killCount;
    }

    public void killCount_$eq(int i) {
        killCount = i;
    }

    public boolean setupCalled() {
        return setupCalled;
    }

    public void setupCalled_$eq(boolean z) {
        setupCalled = z;
    }

    public void main(String[] strArr) {
        if (ArrayOps$.MODULE$.size$extension(Predef$.MODULE$.refArrayOps(strArr)) < 2) {
            Predef$.MODULE$.println("Usage: MasterFailureTest <local/HDFS directory> <# batches> [<batch size in milliseconds>]");
            System.exit(1);
        }
        String str = strArr[0];
        int int$extension = StringOps$.MODULE$.toInt$extension(Predef$.MODULE$.augmentString(strArr[1]));
        Duration apply = ArrayOps$.MODULE$.size$extension(Predef$.MODULE$.refArrayOps(strArr)) > 2 ? Milliseconds$.MODULE$.apply(StringOps$.MODULE$.toInt$extension(Predef$.MODULE$.augmentString(strArr[2]))) : Seconds$.MODULE$.apply(1L);
        Predef$.MODULE$.println("\n\n========================= MAP TEST =========================\n\n");
        testMap(str, int$extension, apply);
        Predef$.MODULE$.println("\n\n================= UPDATE-STATE-BY-KEY TEST =================\n\n");
        testUpdateStateByKey(str, int$extension, apply);
        Predef$.MODULE$.println("\n\nSUCCESS\n\n");
    }

    public void testMap(String str, int i, Duration duration) {
        IndexedSeq seq = RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), i).map(obj -> {
            return Integer.toString(BoxesRunTime.unboxToInt(obj));
        }).toSeq();
        Range.Inclusive inclusive = RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), i);
        Seq testOperation = testOperation(str, duration, seq, dStream -> {
            return dStream.map(str2 -> {
                return BoxesRunTime.boxToInteger($anonfun$testMap$3(str2));
            }, ClassTag$.MODULE$.Int());
        }, inclusive, ClassTag$.MODULE$.Int());
        logInfo(() -> {
            return new StringBuilder(24).append("Expected output, size = ").append(inclusive.size()).toString();
        });
        logInfo(() -> {
            return inclusive.mkString("[", ",", "]");
        });
        logInfo(() -> {
            return new StringBuilder(15).append("Output, size = ").append(testOperation.size()).toString();
        });
        logInfo(() -> {
            return testOperation.mkString("[", ",", "]");
        });
        Set set = ((IterableOnceOps) testOperation.distinct()).toSet();
        Set set2 = inclusive.toSet();
        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(set, "==", set2, set != null ? set.equals(set2) : set2 == null, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("MasterFailureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 86));
    }

    public void testUpdateStateByKey(String str, int i, Duration duration) {
        IndexedSeq seq = RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), i).map(obj -> {
            return $anonfun$testUpdateStateByKey$1(BoxesRunTime.unboxToInt(obj));
        }).toSeq();
        IndexedSeq indexedSeq = (IndexedSeq) ((IndexedSeqOps) new RichLong(Predef$.MODULE$.longWrapper(1L)).to(BoxesRunTime.boxToLong(i)).map(j -> {
            return BoxesRunTime.unboxToLong(new RichLong(Predef$.MODULE$.longWrapper(1L)).to(BoxesRunTime.boxToLong(j)).sum(Numeric$LongIsIntegral$.MODULE$));
        })).map(obj2 -> {
            return $anonfun$testUpdateStateByKey$4(BoxesRunTime.unboxToLong(obj2));
        });
        Seq testOperation = testOperation(str, duration, seq, dStream -> {
            return DStream$.MODULE$.toPairDStreamFunctions(dStream.flatMap(str2 -> {
                return Predef$.MODULE$.wrapRefArray(str2.split(" "));
            }, ClassTag$.MODULE$.apply(String.class)).map(str3 -> {
                return new Tuple2(str3, BoxesRunTime.boxToLong(1L));
            }, ClassTag$.MODULE$.apply(Tuple2.class)), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.Long(), Ordering$String$.MODULE$).updateStateByKey((seq2, option) -> {
                return new Some(BoxesRunTime.boxToLong(BoxesRunTime.unboxToLong(seq2.foldLeft(BoxesRunTime.boxToLong(0L), (j2, j3) -> {
                    return j2 + j3;
                })) + BoxesRunTime.unboxToLong(option.getOrElse(() -> {
                    return 0L;
                }))));
            }, ClassTag$.MODULE$.Long()).checkpoint(duration.$times(5));
        }, indexedSeq, ClassTag$.MODULE$.apply(Tuple2.class));
        logInfo(() -> {
            return new StringBuilder(25).append("Expected output, size = ").append(indexedSeq.size()).append("\n").append(indexedSeq).toString();
        });
        logInfo(() -> {
            return new StringBuilder(16).append("Output, size = ").append(testOperation.size()).append("\n").append(testOperation).toString();
        });
        testOperation.foreach(tuple2 -> {
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(indexedSeq, "contains", tuple2, indexedSeq.contains(tuple2), Prettifier$.MODULE$.default()), new StringBuilder(25).append("Expected value ").append(tuple2).append(" not found").toString(), Prettifier$.MODULE$.default(), new Position("MasterFailureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 114));
        });
        Tuple2 tuple22 = (Tuple2) testOperation.last();
        Tuple2 tuple23 = (Tuple2) indexedSeq.last();
        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(tuple22, "==", tuple23, tuple22 != null ? tuple22.equals(tuple23) : tuple23 == null, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("MasterFailureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 119));
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <T> Seq<T> testOperation(String str, Duration duration, Seq<String> seq, Function1<DStream<String>, DStream<T>> function1, Seq<T> seq2, ClassTag<T> classTag) {
        Set set = ((IterableOnceOps) seq2.distinct()).toSet();
        Set set2 = seq2.toSet();
        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(set, "==", set2, set != null ? set.equals(set2) : set2 == null, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("MasterFailureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 135));
        reset();
        Path path = new Path(str, UUID.randomUUID().toString());
        FileSystem fileSystem = path.getFileSystem(new Configuration());
        Path path2 = new Path(path, "checkpoint");
        Path path3 = new Path(path, "test");
        fileSystem.mkdirs(path2);
        fileSystem.mkdirs(path3);
        StreamingContext orCreate = StreamingContext$.MODULE$.getOrCreate(path2.toString(), () -> {
            return MODULE$.setupStreams(duration, function1, path2, path3, classTag);
        }, StreamingContext$.MODULE$.getOrCreate$default$3(), StreamingContext$.MODULE$.getOrCreate$default$4());
        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(setupCalled(), "MasterFailureTest.this.setupCalled", Prettifier$.MODULE$.default()), "Setup was not called in the first call to StreamingContext.getOrCreate", Prettifier$.MODULE$.default(), new Position("MasterFailureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 156));
        FileGeneratingThread fileGeneratingThread = new FileGeneratingThread(seq, path3, duration.milliseconds());
        fileGeneratingThread.start();
        Seq<T> runStreams = runStreams(orCreate, seq2.last(), seq2.size() * duration.milliseconds() * 2, classTag);
        fileGeneratingThread.join();
        orCreate.stop(orCreate.stop$default$1());
        fileSystem.delete(path2, true);
        fileSystem.delete(path3, true);
        logInfo(() -> {
            return new StringBuilder(29).append("Finished test after ").append(MODULE$.killCount()).append(" failures").toString();
        });
        return runStreams;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> StreamingContext setupStreams(Duration duration, Function1<DStream<String>, DStream<T>> function1, Path path, Path path2, ClassTag<T> classTag) {
        setupCalled_$eq(true);
        StreamingContext streamingContext = new StreamingContext("local[4]", "MasterFailureTest", duration, (String) null, package$.MODULE$.Nil(), (Map) Predef$.MODULE$.Map().apply(Nil$.MODULE$));
        streamingContext.checkpoint(path.toString());
        new TestOutputStream((DStream) function1.apply(streamingContext.textFileStream(path2.toString())), TestOutputStream$.MODULE$.$lessinit$greater$default$2(), classTag).register();
        return streamingContext;
    }

    private <T> Seq<T> runStreams(StreamingContext streamingContext, T t, long j, ClassTag<T> classTag) {
        StreamingContext streamingContext2 = streamingContext;
        LongRef create = LongRef.create(0L);
        BooleanRef create2 = BooleanRef.create(false);
        BooleanRef create3 = BooleanRef.create(false);
        ArrayBuffer arrayBuffer = new ArrayBuffer();
        String checkpointDir = streamingContext2.checkpointDir();
        Duration batchDuration = streamingContext2.graph().batchDuration();
        while (!create2.elem && !create3.elem) {
            ConcurrentLinkedQueue<Seq<T>> output = ((TestOutputStream) ArrayOps$.MODULE$.head$extension(Predef$.MODULE$.refArrayOps(streamingContext2.graph().getOutputStreams()))).output();
            killed_$eq(false);
            KillingThread killingThread = new KillingThread(streamingContext2, batchDuration.milliseconds() * 10);
            killingThread.start();
            LongRef create4 = LongRef.create(0L);
            try {
                try {
                    System.clearProperty("spark.streaming.clock");
                    System.clearProperty("spark.driver.port");
                    streamingContext2.start();
                    long nanoTime = System.nanoTime();
                    while (!killed() && !create2.elem && !create3.elem) {
                        Thread.sleep(100L);
                        create4.elem = System.nanoTime() - nanoTime;
                        create2.elem = output$3(output).nonEmpty() && BoxesRunTime.equals(output$3(output).last(), t);
                        create3.elem = create4.elem + create.elem > TimeUnit.MILLISECONDS.toNanos(j);
                    }
                } catch (Exception e) {
                    logError(() -> {
                        return "Error running streaming context";
                    }, e);
                }
                StreamingContext streamingContext3 = streamingContext2;
                streamingContext3.stop(streamingContext3.stop$default$1());
                if (killingThread.isAlive()) {
                    killingThread.interrupt();
                    killingThread.join();
                }
                logInfo(() -> {
                    return new StringBuilder(18).append("Has been killed = ").append(MODULE$.killed()).toString();
                });
                logInfo(() -> {
                    return new StringBuilder(27).append("Is last output generated = ").append(create2.elem).toString();
                });
                logInfo(() -> {
                    return new StringBuilder(15).append("Is timed out = ").append(create3.elem).toString();
                });
                arrayBuffer.$plus$plus$eq(output$3(output).toSeq());
                create.elem += create4.elem;
                logInfo(() -> {
                    return new StringBuilder(13).append("New output = ").append(output$3(output).toSeq()).toString();
                });
                logInfo(() -> {
                    return new StringBuilder(16).append("Merged output = ").append(arrayBuffer).toString();
                });
                logInfo(() -> {
                    return new StringBuilder(11).append("Time ran = ").append(create4.elem).toString();
                });
                logInfo(() -> {
                    return new StringBuilder(17).append("Total time ran = ").append(TimeUnit.NANOSECONDS.toMillis(create.elem)).toString();
                });
                if (!create2.elem && !create3.elem) {
                    int nextInt = Random$.MODULE$.nextInt(((int) batchDuration.milliseconds()) * 10);
                    logInfo(() -> {
                        return new StringBuilder(132).append("\n-------------------------------------------\n   Restarting stream computation in ").append(nextInt).append(" ms   ").append("\n-------------------------------------------\n").toString();
                    });
                    Thread.sleep(nextInt);
                    streamingContext2 = StreamingContext$.MODULE$.getOrCreate(checkpointDir, () -> {
                        throw new Exception("Trying to create new context when it should be reading from checkpoint file");
                    }, StreamingContext$.MODULE$.getOrCreate$default$3(), StreamingContext$.MODULE$.getOrCreate$default$4());
                }
            } catch (Throwable th) {
                StreamingContext streamingContext4 = streamingContext2;
                streamingContext4.stop(streamingContext4.stop$default$1());
                throw th;
            }
        }
        return arrayBuffer.toSeq();
    }

    private <T> void verifyOutput(Seq<T> seq, Seq<T> seq2, ClassTag<T> classTag) {
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), seq2.size() - 1).foreach(obj -> {
            return $anonfun$verifyOutput$1(seq2, BoxesRunTime.unboxToInt(obj));
        });
        Predef$.MODULE$.println(new StringBuilder(24).append("Expected output, size = ").append(seq2.size()).toString());
        Predef$.MODULE$.println(seq2.mkString("[", ",", "]"));
        Predef$.MODULE$.println(new StringBuilder(15).append("Output, size = ").append(seq.size()).toString());
        Predef$.MODULE$.println(seq.mkString("[", ",", "]"));
        seq.foreach(obj2 -> {
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(seq2, "contains", obj2, seq2.contains(obj2), Prettifier$.MODULE$.default()), new StringBuilder(25).append("Expected value ").append(obj2).append(" not found").toString(), Prettifier$.MODULE$.default(), new Position("MasterFailureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 314));
        });
    }

    private void reset() {
        killed_$eq(false);
        killCount_$eq(0);
        setupCalled_$eq(false);
    }

    public static final /* synthetic */ int $anonfun$testMap$3(String str) {
        return StringOps$.MODULE$.toInt$extension(Predef$.MODULE$.augmentString(str));
    }

    public static final /* synthetic */ String $anonfun$testUpdateStateByKey$2(int i) {
        return "a";
    }

    public static final /* synthetic */ String $anonfun$testUpdateStateByKey$1(int i) {
        return RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), i).map(obj -> {
            return $anonfun$testUpdateStateByKey$2(BoxesRunTime.unboxToInt(obj));
        }).mkString(" ");
    }

    public static final /* synthetic */ Tuple2 $anonfun$testUpdateStateByKey$4(long j) {
        return new Tuple2("a", BoxesRunTime.boxToLong(j));
    }

    private static final Iterable output$3(ConcurrentLinkedQueue concurrentLinkedQueue) {
        return (Iterable) ((IterableOps) JavaConverters$.MODULE$.collectionAsScalaIterableConverter(concurrentLinkedQueue).asScala()).flatten(Predef$.MODULE$.$conforms());
    }

    public static final /* synthetic */ Assertion $anonfun$verifyOutput$1(Seq seq, int i) {
        Object apply = seq.apply(i);
        Object apply2 = seq.apply(i + 1);
        return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(apply, "!=", apply2, !BoxesRunTime.equals(apply, apply2), Prettifier$.MODULE$.default()), "Expected output has consecutive duplicate sequence of values", Prettifier$.MODULE$.default(), new Position("MasterFailureTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 300));
    }

    private MasterFailureTest$() {
    }
}
