package org.apache.spark.sql.prophecy;

import akka.Done$;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Props$;
import akka.actor.ScalaActorRef;
import akka.http.scaladsl.Http$;
import akka.http.scaladsl.HttpExt;
import akka.http.scaladsl.model.Uri$;
import akka.http.scaladsl.model.ws.WebSocketRequest;
import akka.http.scaladsl.model.ws.WebSocketRequest$;
import akka.http.scaladsl.model.ws.WebSocketUpgradeResponse;
import akka.stream.Materializer$;
import akka.stream.OverflowStrategy$;
import akka.stream.scaladsl.Keep$;
import akka.stream.scaladsl.Sink;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import akka.stream.scaladsl.SourceQueueWithComplete;
import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import java.util.UUID;
import org.apache.spark.sql.prophecy.ProphecyEventActor;
import org.apache.spark.sql.prophecy.util.CommonUtils$;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.mutable.Map;
import scala.concurrent.Await$;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;

/* compiled from: ProphecyEventActor.scala */
/* loaded from: input_file:org/apache/spark/sql/prophecy/ProphecyEventActor$.class */
public final class ProphecyEventActor$ implements LazyLogging {
    public static final ProphecyEventActor$ MODULE$ = null;
    private final Map<String, ActorRef> mapFromSessionIdToWs;
    private final FiniteDuration org$apache$spark$sql$prophecy$ProphecyEventActor$$ReapInterval;
    private final transient Logger logger;
    private volatile transient boolean bitmap$trans$0;

    static {
        new ProphecyEventActor$();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$trans$0) {
                this.logger = LazyLogging.class.logger(this);
                this.bitmap$trans$0 = true;
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.logger;
        }
    }

    public Logger logger() {
        return this.bitmap$trans$0 ? this.logger : logger$lzycompute();
    }

    private Map<String, ActorRef> mapFromSessionIdToWs() {
        return this.mapFromSessionIdToWs;
    }

    public FiniteDuration org$apache$spark$sql$prophecy$ProphecyEventActor$$ReapInterval() {
        return this.org$apache$spark$sql$prophecy$ProphecyEventActor$$ReapInterval;
    }

    public Props props(String str) {
        return Props$.MODULE$.apply(new ProphecyEventActor$$anonfun$props$1(str), ClassTag$.MODULE$.apply(ProphecyEventActor.class));
    }

    public ActorRef apply(String str, String str2, boolean z) {
        ActorRef createActorRef;
        if (z) {
            if (logger().underlying().isInfoEnabled()) {
                logger().underlying().info("Creating noop - event actor for session {} ", new Object[]{str2});
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            return ProphecyEventSendingListener$.MODULE$.system().actorOf(NoopEventActor$.MODULE$.props());
        }
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info("Creating event actor for session {} ", new Object[]{str2});
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
        Some some = mapFromSessionIdToWs().get(str2);
        if (some instanceof Some) {
            ActorRef actorRef = (ActorRef) some.x();
            if (logger().underlying().isInfoEnabled()) {
                logger().underlying().info("Returning existing actor {}", new Object[]{str2});
                BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
            }
            createActorRef = actorRef;
        } else {
            if (!None$.MODULE$.equals(some)) {
                throw new MatchError(some);
            }
            createActorRef = createActorRef(str, str2);
        }
        ActorRef actorRef2 = createActorRef;
        mapFromSessionIdToWs().$plus$eq(Predef$ArrowAssoc$.MODULE$.$u2192$extension(Predef$.MODULE$.ArrowAssoc(str2), actorRef2));
        return actorRef2;
    }

    public boolean apply$default$3() {
        return false;
    }

    public void remove(String str) {
        mapFromSessionIdToWs().$minus$eq(str);
    }

    private ActorRef createActorRef(String str, String str2) {
        String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", "/", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str, str2}));
        Source queue = Source$.MODULE$.queue(1024, OverflowStrategy$.MODULE$.backpressure());
        ActorRef actorOf = ProphecyEventSendingListener$.MODULE$.system().actorOf(props(str2), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", "-", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str2, UUID.randomUUID().toString()})));
        Sink actorRef = Sink$.MODULE$.actorRef(actorOf, Done$.MODULE$);
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info("Connecting to websocket at url {}", new Object[]{s});
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        HttpExt apply = Http$.MODULE$.apply(ProphecyEventSendingListener$.MODULE$.system());
        Tuple2 tuple2 = (Tuple2) queue.viaMat(apply.webSocketClientFlow(new WebSocketRequest(Uri$.MODULE$.apply(s), WebSocketRequest$.MODULE$.apply$default$2(), WebSocketRequest$.MODULE$.apply$default$3()), apply.webSocketClientFlow$default$2(), apply.webSocketClientFlow$default$3(), apply.webSocketClientFlow$default$4(), apply.webSocketClientFlow$default$5()), Keep$.MODULE$.both()).toMat(actorRef, Keep$.MODULE$.left()).run(Materializer$.MODULE$.matFromSystem(ProphecyEventSendingListener$.MODULE$.system()));
        if (tuple2 != null) {
            SourceQueueWithComplete sourceQueueWithComplete = (SourceQueueWithComplete) tuple2._1();
            Future future = (Future) tuple2._2();
            if (sourceQueueWithComplete != null && future != null) {
                Tuple2 tuple22 = new Tuple2(sourceQueueWithComplete, future);
                SourceQueueWithComplete sourceQueueWithComplete2 = (SourceQueueWithComplete) tuple22._1();
                WebSocketUpgradeResponse webSocketUpgradeResponse = (WebSocketUpgradeResponse) Await$.MODULE$.result((Future) tuple22._2(), new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(2)).minute());
                ScalaActorRef actorRef2Scala = akka.actor.package$.MODULE$.actorRef2Scala(actorOf);
                ProphecyEventActor.Initialize initialize = new ProphecyEventActor.Initialize(sourceQueueWithComplete2);
                actorRef2Scala.$bang(initialize, actorRef2Scala.$bang$default$2(initialize));
                if (logger().underlying().isInfoEnabled()) {
                    logger().underlying().info("Websocket upgrade successful: {}", new Object[]{webSocketUpgradeResponse});
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                }
                return actorOf;
            }
        }
        throw new MatchError(tuple2);
    }

    private ProphecyEventActor$() {
        MODULE$ = this;
        LazyLogging.class.$init$(this);
        this.mapFromSessionIdToWs = CommonUtils$.MODULE$.emptyMMap();
        this.org$apache$spark$sql$prophecy$ProphecyEventActor$$ReapInterval = new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(30)).minutes();
    }
}
