package org.apache.spark.streaming.mqtt;

import java.io.File;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.commons.lang3.RandomUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Milliseconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.scheduler.StreamingListener;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
import org.apache.spark.util.Utils$;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.scalactic.Bool$;
import org.scalatest.Args;
import org.scalatest.BeforeAndAfter;
import org.scalatest.FunSuite;
import org.scalatest.FunSuiteLike;
import org.scalatest.Status;
import org.scalatest.Tag;
import org.scalatest.concurrent.AbstractPatienceConfiguration;
import org.scalatest.concurrent.AbstractPatienceConfiguration$PatienceConfig$;
import org.scalatest.concurrent.Eventually;
import org.scalatest.concurrent.PatienceConfiguration;
import org.scalatest.concurrent.ScaledTimeSpans;
import org.scalatest.time.Span;
import scala.Function0;
import scala.Option;
import scala.Predef$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: MQTTStreamSuite.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005]b\u0001B\u0001\u0003\u00015\u0011q\"T)U)N#(/Z1n'VLG/\u001a\u0006\u0003\u0007\u0011\tA!\\9ui*\u0011QAB\u0001\ngR\u0014X-Y7j]\u001eT!a\u0002\u0005\u0002\u000bM\u0004\u0018M]6\u000b\u0005%Q\u0011AB1qC\u000eDWMC\u0001\f\u0003\ry'oZ\u0002\u0001'\u0011\u0001a\u0002\u0006\u000e\u0011\u0005=\u0011R\"\u0001\t\u000b\u0005EQ\u0011!C:dC2\fG/Z:u\u0013\t\u0019\u0002C\u0001\u0005Gk:\u001cV/\u001b;f!\t)\u0002$D\u0001\u0017\u0015\t9\u0002#\u0001\u0006d_:\u001cWO\u001d:f]RL!!\u0007\f\u0003\u0015\u00153XM\u001c;vC2d\u0017\u0010\u0005\u0002\u00107%\u0011A\u0004\u0005\u0002\u000f\u0005\u00164wN]3B]\u0012\fe\r^3s\u0011\u0015q\u0002\u0001\"\u0001 \u0003\u0019a\u0014N\\5u}Q\t\u0001\u0005\u0005\u0002\"\u00015\t!\u0001C\u0004$\u0001\t\u0007I\u0011\u0002\u0013\u0002\u001b\t\fGo\u00195EkJ\fG/[8o+\u0005)\u0003C\u0001\u0014(\u001b\u0005!\u0011B\u0001\u0015\u0005\u0005!!UO]1uS>t\u0007B\u0002\u0016\u0001A\u0003%Q%\u0001\bcCR\u001c\u0007\u000eR;sCRLwN\u001c\u0011\t\u000f1\u0002!\u0019!C\u0005[\u00051Q.Y:uKJ,\u0012A\f\t\u0003_Qj\u0011\u0001\r\u0006\u0003cI\nA\u0001\\1oO*\t1'\u0001\u0003kCZ\f\u0017BA\u001b1\u0005\u0019\u0019FO]5oO\"1q\u0007\u0001Q\u0001\n9\nq!\\1ti\u0016\u0014\b\u0005C\u0004:\u0001\t\u0007I\u0011B\u0017\u0002\u0013\u0019\u0014\u0018-\\3x_J\\\u0007BB\u001e\u0001A\u0003%a&\u0001\u0006ge\u0006lWm^8sW\u0002Bq!\u0010\u0001C\u0002\u0013%a(\u0001\u0005ge\u0016,\u0007k\u001c:u+\u0005y\u0004C\u0001!D\u001b\u0005\t%\"\u0001\"\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0011\u000b%aA%oi\"1a\t\u0001Q\u0001\n}\n\u0011B\u001a:fKB{'\u000f\u001e\u0011\t\u000f!\u0003!\u0019!C\u0005[\u0005I!M]8lKJ,&/\u001b\u0005\u0007\u0015\u0002\u0001\u000b\u0011\u0002\u0018\u0002\u0015\t\u0014xn[3s+JL\u0007\u0005C\u0004M\u0001\t\u0007I\u0011B\u0017\u0002\u000bQ|\u0007/[2\t\r9\u0003\u0001\u0015!\u0003/\u0003\u0019!x\u000e]5dA!9\u0001\u000b\u0001b\u0001\n\u0013\t\u0016A\u00049feNL7\u000f^3oG\u0016$\u0015N]\u000b\u0002%B\u00111KV\u0007\u0002)*\u0011QKM\u0001\u0003S>L!a\u0016+\u0003\t\u0019KG.\u001a\u0005\u00073\u0002\u0001\u000b\u0011\u0002*\u0002\u001fA,'o]5ti\u0016t7-\u001a#je\u0002B\u0011b\u0017\u0001A\u0002\u0003\u0007I\u0011\u0002/\u0002\u0007M\u001c8-F\u0001^!\t1c,\u0003\u0002`\t\t\u00012\u000b\u001e:fC6LgnZ\"p]R,\u0007\u0010\u001e\u0005\nC\u0002\u0001\r\u00111A\u0005\n\t\fqa]:d?\u0012*\u0017\u000f\u0006\u0002dMB\u0011\u0001\tZ\u0005\u0003K\u0006\u0013A!\u00168ji\"9q\rYA\u0001\u0002\u0004i\u0016a\u0001=%c!1\u0011\u000e\u0001Q!\nu\u000bAa]:dA!I1\u000e\u0001a\u0001\u0002\u0004%I\u0001\\\u0001\u0007EJ|7.\u001a:\u0016\u00035\u0004\"A\u001c:\u000e\u0003=T!a\u001b9\u000b\u0005ED\u0011\u0001C1di&4X-\\9\n\u0005M|'!\u0004\"s_.,'oU3sm&\u001cW\rC\u0005v\u0001\u0001\u0007\t\u0019!C\u0005m\u0006Q!M]8lKJ|F%Z9\u0015\u0005\r<\bbB4u\u0003\u0003\u0005\r!\u001c\u0005\u0007s\u0002\u0001\u000b\u0015B7\u0002\u000f\t\u0014xn[3sA!I1\u0010\u0001a\u0001\u0002\u0004%I\u0001`\u0001\nG>tg.Z2u_J,\u0012! \t\u0003]zL!a`8\u0003%Q\u0013\u0018M\\:q_J$8i\u001c8oK\u000e$xN\u001d\u0005\f\u0003\u0007\u0001\u0001\u0019!a\u0001\n\u0013\t)!A\u0007d_:tWm\u0019;pe~#S-\u001d\u000b\u0004G\u0006\u001d\u0001\u0002C4\u0002\u0002\u0005\u0005\t\u0019A?\t\u000f\u0005-\u0001\u0001)Q\u0005{\u0006Q1m\u001c8oK\u000e$xN\u001d\u0011\t\u000f\u0005=\u0001\u0001\"\u0003\u0002\u0012\u0005I1/\u001a;va6\u000bF\u000b\u0016\u000b\u0002G\"9\u0011Q\u0003\u0001\u0005\n\u0005E\u0011\u0001\u0004;fCJ$un\u001e8N#R#\u0006bBA\r\u0001\u0011%\u00111D\u0001\rM&tGM\u0012:fKB{'\u000f\u001e\u000b\u0002\u007f!9\u0011q\u0004\u0001\u0005\u0002\u0005\u0005\u0012a\u00039vE2L7\u000f\u001b#bi\u0006$2aYA\u0012\u0011!\t)#!\bA\u0002\u0005\u001d\u0012\u0001\u00023bi\u0006\u0004B!!\u000b\u000209\u0019\u0001)a\u000b\n\u0007\u00055\u0012)\u0001\u0004Qe\u0016$WMZ\u0005\u0004k\u0005E\"bAA\u0017\u0003\"9\u0011Q\u0007\u0001\u0005\n\u0005E\u0011AF<bSR4uN\u001d*fG\u0016Lg/\u001a:U_N#\u0018M\u001d;")
/* loaded from: input_file:org/apache/spark/streaming/mqtt/MQTTStreamSuite.class */
public class MQTTStreamSuite extends FunSuite implements Eventually, BeforeAndAfter {
    private final Duration org$apache$spark$streaming$mqtt$MQTTStreamSuite$$batchDuration;
    private final String org$apache$spark$streaming$mqtt$MQTTStreamSuite$$master;
    private final String org$apache$spark$streaming$mqtt$MQTTStreamSuite$$framework;
    private final int freePort;
    private final String org$apache$spark$streaming$mqtt$MQTTStreamSuite$$brokerUri;
    private final String org$apache$spark$streaming$mqtt$MQTTStreamSuite$$topic;
    private final File org$apache$spark$streaming$mqtt$MQTTStreamSuite$$persistenceDir;
    private StreamingContext org$apache$spark$streaming$mqtt$MQTTStreamSuite$$ssc;
    private BrokerService broker;
    private TransportConnector connector;
    private final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$beforeFunctionAtomic;
    private final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$afterFunctionAtomic;
    private volatile boolean org$scalatest$BeforeAndAfter$$runHasBeenInvoked;
    private final AbstractPatienceConfiguration.PatienceConfig org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig;
    private volatile AbstractPatienceConfiguration$PatienceConfig$ PatienceConfig$module;

    public AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$beforeFunctionAtomic() {
        return this.org$scalatest$BeforeAndAfter$$beforeFunctionAtomic;
    }

    public AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$afterFunctionAtomic() {
        return this.org$scalatest$BeforeAndAfter$$afterFunctionAtomic;
    }

    public boolean org$scalatest$BeforeAndAfter$$runHasBeenInvoked() {
        return this.org$scalatest$BeforeAndAfter$$runHasBeenInvoked;
    }

    public void org$scalatest$BeforeAndAfter$$runHasBeenInvoked_$eq(boolean z) {
        this.org$scalatest$BeforeAndAfter$$runHasBeenInvoked = z;
    }

    public /* synthetic */ Status org$scalatest$BeforeAndAfter$$super$runTest(String str, Args args) {
        return FunSuiteLike.class.runTest(this, str, args);
    }

    public /* synthetic */ Status org$scalatest$BeforeAndAfter$$super$run(Option option, Args args) {
        return FunSuiteLike.class.run(this, option, args);
    }

    public void org$scalatest$BeforeAndAfter$_setter_$org$scalatest$BeforeAndAfter$$beforeFunctionAtomic_$eq(AtomicReference atomicReference) {
        this.org$scalatest$BeforeAndAfter$$beforeFunctionAtomic = atomicReference;
    }

    public void org$scalatest$BeforeAndAfter$_setter_$org$scalatest$BeforeAndAfter$$afterFunctionAtomic_$eq(AtomicReference atomicReference) {
        this.org$scalatest$BeforeAndAfter$$afterFunctionAtomic = atomicReference;
    }

    public void before(Function0<Object> function0) {
        BeforeAndAfter.class.before(this, function0);
    }

    public void after(Function0<Object> function0) {
        BeforeAndAfter.class.after(this, function0);
    }

    public Status runTest(String str, Args args) {
        return BeforeAndAfter.class.runTest(this, str, args);
    }

    public Status run(Option<String> option, Args args) {
        return BeforeAndAfter.class.run(this, option, args);
    }

    public <T> T eventually(PatienceConfiguration.Timeout timeout, PatienceConfiguration.Interval interval, Function0<T> function0) {
        return (T) Eventually.class.eventually(this, timeout, interval, function0);
    }

    public <T> T eventually(PatienceConfiguration.Timeout timeout, Function0<T> function0, AbstractPatienceConfiguration.PatienceConfig patienceConfig) {
        return (T) Eventually.class.eventually(this, timeout, function0, patienceConfig);
    }

    public <T> T eventually(PatienceConfiguration.Interval interval, Function0<T> function0, AbstractPatienceConfiguration.PatienceConfig patienceConfig) {
        return (T) Eventually.class.eventually(this, interval, function0, patienceConfig);
    }

    public <T> T eventually(Function0<T> function0, AbstractPatienceConfiguration.PatienceConfig patienceConfig) {
        return (T) Eventually.class.eventually(this, function0, patienceConfig);
    }

    public AbstractPatienceConfiguration.PatienceConfig org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig() {
        return this.org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig;
    }

    public void org$scalatest$concurrent$PatienceConfiguration$_setter_$org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig_$eq(AbstractPatienceConfiguration.PatienceConfig patienceConfig) {
        this.org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig = patienceConfig;
    }

    public AbstractPatienceConfiguration.PatienceConfig patienceConfig() {
        return PatienceConfiguration.class.patienceConfig(this);
    }

    public PatienceConfiguration.Timeout timeout(Span span) {
        return PatienceConfiguration.class.timeout(this, span);
    }

    public PatienceConfiguration.Interval interval(Span span) {
        return PatienceConfiguration.class.interval(this, span);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private AbstractPatienceConfiguration$PatienceConfig$ PatienceConfig$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (this.PatienceConfig$module == null) {
                this.PatienceConfig$module = new AbstractPatienceConfiguration$PatienceConfig$(this);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.PatienceConfig$module;
        }
    }

    public AbstractPatienceConfiguration$PatienceConfig$ PatienceConfig() {
        return this.PatienceConfig$module == null ? PatienceConfig$lzycompute() : this.PatienceConfig$module;
    }

    public final Span scaled(Span span) {
        return ScaledTimeSpans.class.scaled(this, span);
    }

    public double spanScaleFactor() {
        return ScaledTimeSpans.class.spanScaleFactor(this);
    }

    public Duration org$apache$spark$streaming$mqtt$MQTTStreamSuite$$batchDuration() {
        return this.org$apache$spark$streaming$mqtt$MQTTStreamSuite$$batchDuration;
    }

    public String org$apache$spark$streaming$mqtt$MQTTStreamSuite$$master() {
        return this.org$apache$spark$streaming$mqtt$MQTTStreamSuite$$master;
    }

    public String org$apache$spark$streaming$mqtt$MQTTStreamSuite$$framework() {
        return this.org$apache$spark$streaming$mqtt$MQTTStreamSuite$$framework;
    }

    private int freePort() {
        return this.freePort;
    }

    public String org$apache$spark$streaming$mqtt$MQTTStreamSuite$$brokerUri() {
        return this.org$apache$spark$streaming$mqtt$MQTTStreamSuite$$brokerUri;
    }

    public String org$apache$spark$streaming$mqtt$MQTTStreamSuite$$topic() {
        return this.org$apache$spark$streaming$mqtt$MQTTStreamSuite$$topic;
    }

    public File org$apache$spark$streaming$mqtt$MQTTStreamSuite$$persistenceDir() {
        return this.org$apache$spark$streaming$mqtt$MQTTStreamSuite$$persistenceDir;
    }

    public StreamingContext org$apache$spark$streaming$mqtt$MQTTStreamSuite$$ssc() {
        return this.org$apache$spark$streaming$mqtt$MQTTStreamSuite$$ssc;
    }

    public void org$apache$spark$streaming$mqtt$MQTTStreamSuite$$ssc_$eq(StreamingContext streamingContext) {
        this.org$apache$spark$streaming$mqtt$MQTTStreamSuite$$ssc = streamingContext;
    }

    private BrokerService broker() {
        return this.broker;
    }

    private void broker_$eq(BrokerService brokerService) {
        this.broker = brokerService;
    }

    private TransportConnector connector() {
        return this.connector;
    }

    private void connector_$eq(TransportConnector transportConnector) {
        this.connector = transportConnector;
    }

    public void org$apache$spark$streaming$mqtt$MQTTStreamSuite$$setupMQTT() {
        broker_$eq(new BrokerService());
        broker().setDataDirectoryFile(Utils$.MODULE$.createTempDir(Utils$.MODULE$.createTempDir$default$1(), Utils$.MODULE$.createTempDir$default$2()));
        connector_$eq(new TransportConnector());
        connector().setName("mqtt");
        connector().setUri(new URI(new StringBuilder().append("mqtt:").append(org$apache$spark$streaming$mqtt$MQTTStreamSuite$$brokerUri()).toString()));
        broker().addConnector(connector());
        broker().start();
    }

    public void org$apache$spark$streaming$mqtt$MQTTStreamSuite$$tearDownMQTT() {
        if (broker() != null) {
            broker().stop();
            broker_$eq(null);
        }
        if (connector() != null) {
            connector().stop();
            connector_$eq(null);
        }
    }

    private int findFreePort() {
        return Utils$.MODULE$.startServiceOnPort(RandomUtils.nextInt(1024, 65536), new MQTTStreamSuite$$anonfun$findFreePort$1(this), new SparkConf(), Utils$.MODULE$.startServiceOnPort$default$4())._2$mcI$sp();
    }

    public void publishData(String str) {
        MqttClient mqttClient = null;
        try {
            mqttClient = new MqttClient(new StringBuilder().append("tcp:").append(org$apache$spark$streaming$mqtt$MQTTStreamSuite$$brokerUri()).toString(), MqttClient.generateClientId(), new MqttDefaultFilePersistence(org$apache$spark$streaming$mqtt$MQTTStreamSuite$$persistenceDir().getAbsolutePath()));
            mqttClient.connect();
            if (mqttClient.isConnected()) {
                MqttTopic topic = mqttClient.getTopic(org$apache$spark$streaming$mqtt$MQTTStreamSuite$$topic());
                MqttMessage mqttMessage = new MqttMessage(str.getBytes("utf-8"));
                mqttMessage.setQos(1);
                mqttMessage.setRetained(true);
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 10).foreach(new MQTTStreamSuite$$anonfun$publishData$1(this, topic, mqttMessage));
            }
            mqttClient.disconnect();
            mqttClient.close();
        } catch (Throwable th) {
            mqttClient.disconnect();
            mqttClient.close();
            throw th;
        }
    }

    public void org$apache$spark$streaming$mqtt$MQTTStreamSuite$$waitForReceiverToStart() {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        org$apache$spark$streaming$mqtt$MQTTStreamSuite$$ssc().addStreamingListener(new StreamingListener(this, countDownLatch) { // from class: org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anon$1
            private final CountDownLatch latch$1;

            public void onReceiverError(StreamingListenerReceiverError streamingListenerReceiverError) {
                StreamingListener.class.onReceiverError(this, streamingListenerReceiverError);
            }

            public void onReceiverStopped(StreamingListenerReceiverStopped streamingListenerReceiverStopped) {
                StreamingListener.class.onReceiverStopped(this, streamingListenerReceiverStopped);
            }

            public void onBatchSubmitted(StreamingListenerBatchSubmitted streamingListenerBatchSubmitted) {
                StreamingListener.class.onBatchSubmitted(this, streamingListenerBatchSubmitted);
            }

            public void onBatchStarted(StreamingListenerBatchStarted streamingListenerBatchStarted) {
                StreamingListener.class.onBatchStarted(this, streamingListenerBatchStarted);
            }

            public void onBatchCompleted(StreamingListenerBatchCompleted streamingListenerBatchCompleted) {
                StreamingListener.class.onBatchCompleted(this, streamingListenerBatchCompleted);
            }

            public void onReceiverStarted(StreamingListenerReceiverStarted streamingListenerReceiverStarted) {
                this.latch$1.countDown();
            }

            {
                this.latch$1 = countDownLatch;
                StreamingListener.class.$init$(this);
            }
        });
        assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(countDownLatch.await(10L, TimeUnit.SECONDS), "latch.await(10L, SECONDS)"), "Timeout waiting for receiver to start.");
    }

    public MQTTStreamSuite() {
        ScaledTimeSpans.class.$init$(this);
        AbstractPatienceConfiguration.class.$init$(this);
        PatienceConfiguration.class.$init$(this);
        Eventually.class.$init$(this);
        BeforeAndAfter.class.$init$(this);
        this.org$apache$spark$streaming$mqtt$MQTTStreamSuite$$batchDuration = Milliseconds$.MODULE$.apply(500L);
        this.org$apache$spark$streaming$mqtt$MQTTStreamSuite$$master = "local[2]";
        this.org$apache$spark$streaming$mqtt$MQTTStreamSuite$$framework = getClass().getSimpleName();
        this.freePort = findFreePort();
        this.org$apache$spark$streaming$mqtt$MQTTStreamSuite$$brokerUri = new StringBuilder().append("//localhost:").append(BoxesRunTime.boxToInteger(freePort())).toString();
        this.org$apache$spark$streaming$mqtt$MQTTStreamSuite$$topic = "def";
        this.org$apache$spark$streaming$mqtt$MQTTStreamSuite$$persistenceDir = Utils$.MODULE$.createTempDir(Utils$.MODULE$.createTempDir$default$1(), Utils$.MODULE$.createTempDir$default$2());
        before(new MQTTStreamSuite$$anonfun$1(this));
        after(new MQTTStreamSuite$$anonfun$2(this));
        test("mqtt input stream", Predef$.MODULE$.wrapRefArray(new Tag[0]), new MQTTStreamSuite$$anonfun$3(this));
    }
}
