package ai.mantik.mnp.server;

import ai.mantik.componently.AkkaRuntime;
import ai.mantik.componently.ComponentBase;
import ai.mantik.componently.rpc.RpcConversions$;
import ai.mantik.componently.rpc.StreamConversions$;
import ai.mantik.mnp.MnpClient;
import ai.mantik.mnp.MnpClient$;
import ai.mantik.mnp.MnpException;
import ai.mantik.mnp.MnpException$;
import ai.mantik.mnp.MnpSessionPortUrl;
import ai.mantik.mnp.protocol.mnp.AboutResponse;
import ai.mantik.mnp.protocol.mnp.InitRequest;
import ai.mantik.mnp.protocol.mnp.InitResponse;
import ai.mantik.mnp.protocol.mnp.InitResponse$;
import ai.mantik.mnp.protocol.mnp.MnpServiceGrpc;
import ai.mantik.mnp.protocol.mnp.MnpServiceGrpc$MnpService$;
import ai.mantik.mnp.protocol.mnp.PullRequest;
import ai.mantik.mnp.protocol.mnp.PullResponse;
import ai.mantik.mnp.protocol.mnp.PushRequest;
import ai.mantik.mnp.protocol.mnp.PushResponse;
import ai.mantik.mnp.protocol.mnp.PushResponse$;
import ai.mantik.mnp.protocol.mnp.QueryTaskRequest;
import ai.mantik.mnp.protocol.mnp.QueryTaskResponse;
import ai.mantik.mnp.protocol.mnp.QueryTaskResponse$;
import ai.mantik.mnp.protocol.mnp.QuitRequest;
import ai.mantik.mnp.protocol.mnp.QuitResponse;
import ai.mantik.mnp.protocol.mnp.QuitResponse$;
import ai.mantik.mnp.protocol.mnp.QuitSessionRequest;
import ai.mantik.mnp.protocol.mnp.QuitSessionResponse;
import ai.mantik.mnp.protocol.mnp.QuitSessionResponse$;
import ai.mantik.mnp.protocol.mnp.SessionState;
import ai.mantik.mnp.protocol.mnp.SessionState$SS_FAILED$;
import ai.mantik.mnp.protocol.mnp.SessionState$SS_READY$;
import ai.mantik.mnp.protocol.mnp.TaskPortStatus;
import ai.mantik.mnp.protocol.mnp.TaskPortStatus$;
import ai.mantik.mnp.protocol.mnp.TaskState;
import ai.mantik.mnp.protocol.mnp.TaskState$TS_EXISTS$;
import ai.mantik.mnp.protocol.mnp.TaskState$TS_FAILED$;
import ai.mantik.mnp.protocol.mnp.TaskState$TS_FINISHED$;
import akka.Done;
import akka.NotUsed;
import akka.NotUsed$;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import akka.util.ByteString;
import com.google.protobuf.empty.Empty;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Function;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Some;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.immutable.Map;
import scala.collection.immutable.Seq;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.control.NonFatal$;

/* compiled from: MnpServiceImp.scala */
@ScalaSignature(bytes = "\u0006\u0005\tEe\u0001\u0002\f\u0018\u0001\u0001B\u0001B\u0010\u0001\u0003\u0002\u0003\u0006Ia\u0010\u0005\t\u0007\u0002\u0011\t\u0011)A\u0005\t\"IQ\n\u0001B\u0001B\u0003-a*\u0015\u0005\u0006%\u0002!\ta\u0015\u0005\b3\u0002\u0011\r\u0011\"\u0003[\u0011\u0019\u0001\b\u0001)A\u00057\")\u0011\u000f\u0001C!e\"9\u0011Q\u0003\u0001\u0005B\u0005]\u0001bBA \u0001\u0011\u0005\u0013\u0011\t\u0005\b\u0003'\u0002A\u0011IA+\u0011\u001d\t9\u0007\u0001C!\u0003SBq!! \u0001\t\u0003\ny\bC\u0004\u0002\u0014\u0002!I!!&\t\u000f\u0005m\u0007\u0001\"\u0003\u0002^\"9\u00111\u001f\u0001\u0005\n\u0005U\bb\u0002B\u0001\u0001\u0011%!1\u0001\u0005\b\u0005+\u0001A\u0011\u0002B\f\u0011\u001d\u0011I\u0003\u0001C!\u0005WAqA!\u0010\u0001\t\u0013\u0011y\u0004C\u0004\u0003R\u0001!IAa\u0015\t\u000f\te\u0004\u0001\"\u0003\u0003|\tiQJ\u001c9TKJ4\u0018nY3J[BT!\u0001G\r\u0002\rM,'O^3s\u0015\tQ2$A\u0002n]BT!\u0001H\u000f\u0002\r5\fg\u000e^5l\u0015\u0005q\u0012AA1j\u0007\u0001\u00192\u0001A\u0011(!\t\u0011S%D\u0001$\u0015\t!3$A\u0006d_6\u0004xN\\3oi2L\u0018B\u0001\u0014$\u00055\u0019u.\u001c9p]\u0016tGOQ1tKB\u0011\u0001f\u000f\b\u0003Sar!A\u000b\u001c\u000f\u0005-\"dB\u0001\u00174\u001d\ti#G\u0004\u0002/c5\tqF\u0003\u00021?\u00051AH]8pizJ\u0011AH\u0005\u00039uI!AG\u000e\n\u0005UJ\u0012\u0001\u00039s_R|7m\u001c7\n\u0005i9$BA\u001b\u001a\u0013\tI$(\u0001\bN]B\u001cVM\u001d<jG\u0016<%\u000f]2\u000b\u0005i9\u0014B\u0001\u001f>\u0005)ie\u000e]*feZL7-\u001a\u0006\u0003si\nqAY1dW\u0016tG\r\u0005\u0002A\u00036\tq#\u0003\u0002C/\ti1+\u001a:wKJ\u0014\u0015mY6f]\u0012\f1\"];ji\"\u000bg\u000e\u001a7feB\u0019Q\t\u0013&\u000e\u0003\u0019S\u0011aR\u0001\u0006g\u000e\fG.Y\u0005\u0003\u0013\u001a\u0013\u0011BR;oGRLwN\u001c\u0019\u0011\u0005\u0015[\u0015B\u0001'G\u0005\u0011)f.\u001b;\u0002\u0017\u0005\\7.\u0019*v]RLW.\u001a\t\u0003E=K!\u0001U\u0012\u0003\u0017\u0005[7.\u0019*v]RLW.Z\u0005\u0003\u001b\u0016\na\u0001P5oSRtDc\u0001+X1R\u0011QK\u0016\t\u0003\u0001\u0002AQ!\u0014\u0003A\u00049CQA\u0010\u0003A\u0002}BQa\u0011\u0003A\u0002\u0011\u000b\u0001b]3tg&|gn]\u000b\u00027B!AlY3n\u001b\u0005i&B\u00010`\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0003A\u0006\fA!\u001e;jY*\t!-\u0001\u0003kCZ\f\u0017B\u00013^\u0005E\u0019uN\\2veJ,g\u000e\u001e%bg\"l\u0015\r\u001d\t\u0003M*t!a\u001a5\u0011\u000592\u0015BA5G\u0003\u0019\u0001&/\u001a3fM&\u00111\u000e\u001c\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005%4\u0005C\u0001!o\u0013\tywC\u0001\nTKJ4XM]*fgNLwN\\*uCR,\u0017!C:fgNLwN\\:!\u0003\u0015\t'm\\;u)\t\u0019H\u0010E\u0002umbl\u0011!\u001e\u0006\u0003=\u001aK!a^;\u0003\r\u0019+H/\u001e:f!\tI(0D\u0001;\u0013\tY(HA\u0007BE>,HOU3ta>t7/\u001a\u0005\u0006{\u001e\u0001\rA`\u0001\be\u0016\fX/Z:u!\ry\u0018\u0011C\u0007\u0003\u0003\u0003QA!a\u0001\u0002\u0006\u0005)Q-\u001c9us*!\u0011qAA\u0005\u0003!\u0001(o\u001c;pEV4'\u0002BA\u0006\u0003\u001b\taaZ8pO2,'BAA\b\u0003\r\u0019w.\\\u0005\u0005\u0003'\t\tAA\u0003F[B$\u00180\u0001\u0003j]&$H#\u0002&\u0002\u001a\u0005\u0005\u0002BB?\t\u0001\u0004\tY\u0002E\u0002z\u0003;I1!a\b;\u0005-Ie.\u001b;SKF,Xm\u001d;\t\u000f\u0005\r\u0002\u00021\u0001\u0002&\u0005\u0001\"/Z:q_:\u001cXm\u00142tKJ4XM\u001d\t\u0007\u0003O\t)$!\u000f\u000e\u0005\u0005%\"\u0002BA\u0016\u0003[\tAa\u001d;vE*!\u0011qFA\u0019\u0003\u00119'\u000f]2\u000b\u0005\u0005M\u0012AA5p\u0013\u0011\t9$!\u000b\u0003\u001dM#(/Z1n\u001f\n\u001cXM\u001d<feB\u0019\u00110a\u000f\n\u0007\u0005u\"H\u0001\u0007J]&$(+Z:q_:\u001cX-\u0001\u0003rk&$H\u0003BA\"\u0003\u0017\u0002B\u0001\u001e<\u0002FA\u0019\u00110a\u0012\n\u0007\u0005%#H\u0001\u0007Rk&$(+Z:q_:\u001cX\r\u0003\u0004~\u0013\u0001\u0007\u0011Q\n\t\u0004s\u0006=\u0013bAA)u\tY\u0011+^5u%\u0016\fX/Z:u\u0003-\tX/\u001b;TKN\u001c\u0018n\u001c8\u0015\t\u0005]\u0013q\f\t\u0005iZ\fI\u0006E\u0002z\u00037J1!!\u0018;\u0005M\tV/\u001b;TKN\u001c\u0018n\u001c8SKN\u0004xN\\:f\u0011\u0019i(\u00021\u0001\u0002bA\u0019\u00110a\u0019\n\u0007\u0005\u0015$H\u0001\nRk&$8+Z:tS>t'+Z9vKN$\u0018\u0001\u00029vg\"$B!a\u001b\u0002tA1\u0011qEA\u001b\u0003[\u00022!_A8\u0013\r\t\tH\u000f\u0002\f!V\u001c\bNU3rk\u0016\u001cH\u000fC\u0004\u0002$-\u0001\r!!\u001e\u0011\r\u0005\u001d\u0012QGA<!\rI\u0018\u0011P\u0005\u0004\u0003wR$\u0001\u0004)vg\"\u0014Vm\u001d9p]N,\u0017\u0001\u00029vY2$RASAA\u0003\u0013Ca! \u0007A\u0002\u0005\r\u0005cA=\u0002\u0006&\u0019\u0011q\u0011\u001e\u0003\u0017A+H\u000e\u001c*fcV,7\u000f\u001e\u0005\b\u0003Ga\u0001\u0019AAF!\u0019\t9#!\u000e\u0002\u000eB\u0019\u00110a$\n\u0007\u0005E%H\u0001\u0007Qk2d'+Z:q_:\u001cX-A\u0004qk2d'+\u001e8\u0015\r\u0005]\u0015qYAi!\u0019\tI*!(\u0002\"6\u0011\u00111\u0014\u0006\u0003A\u001aKA!a(\u0002\u001c\n\u0019AK]=\u0011\u0011\u0005\r\u0016\u0011WA[\u0003\u007fk!!!*\u000b\t\u0005\u001d\u0016\u0011V\u0001\tg\u000e\fG.\u00193tY*!\u00111VAW\u0003\u0019\u0019HO]3b[*\u0011\u0011qV\u0001\u0005C.\\\u0017-\u0003\u0003\u00024\u0006\u0015&AB*pkJ\u001cW\r\u0005\u0003\u00028\u0006mVBAA]\u0015\r\u0001\u0017QV\u0005\u0005\u0003{\u000bIL\u0001\u0006CsR,7\u000b\u001e:j]\u001e\u0004B!!1\u0002D6\u0011\u0011QV\u0005\u0005\u0003\u000b\fiKA\u0004O_R,6/\u001a3\t\u000f\u0005%W\u00021\u0001\u0002L\u0006IA/Y:l'R\fG/\u001a\t\u0004\u0001\u00065\u0017bAAh/\ty1+\u001a:wKJ$\u0016m]6Ti\u0006$X\rC\u0004\u0002T6\u0001\r!!6\u0002\tA|'\u000f\u001e\t\u0004\u000b\u0006]\u0017bAAm\r\n\u0019\u0011J\u001c;\u0002\u000f\u001d,G\u000fV1tWRA\u0011q\\Aq\u0003K\fI\u000f\u0005\u0003um\u0006-\u0007BBAr\u001d\u0001\u0007Q-A\u0005tKN\u001c\u0018n\u001c8JI\"1\u0011q\u001d\bA\u0002\u0015\fa\u0001^1tW&#\u0007bBAv\u001d\u0001\u0007\u0011Q^\u0001\u0007K:\u001cXO]3\u0011\u0007\u0015\u000by/C\u0002\u0002r\u001a\u0013qAQ8pY\u0016\fg.A\bti\u0006\u0014HOR8so\u0006\u0014H-\u001a:t)\u001dQ\u0015q_A~\u0003{Da!!?\u0010\u0001\u0004i\u0017\u0001D:fgNLwN\\*uCR,\u0007BBAt\u001f\u0001\u0007Q\rC\u0004\u0002��>\u0001\r!a3\u0002\tQ\f7o[\u0001\u0010kB$\u0017\r^3UCN\\7\u000b^1uKR1!Q\u0001B\t\u0005'!B!!<\u0003\b!9!\u0011\u0002\tA\u0002\t-\u0011!\u00014\u0011\u000f\u0015\u0013i!a3\u0002L&\u0019!q\u0002$\u0003\u0013\u0019+hn\u0019;j_:\f\u0004BBAr!\u0001\u0007Q\r\u0003\u0004\u0002hB\u0001\r!Z\u0001\r_:$\u0016m]6GS:L7\u000f\u001b\u000b\b\u0015\ne!1\u0004B\u000f\u0011\u0019\t\u0019/\u0005a\u0001K\"1\u0011q]\tA\u0002\u0015DqAa\b\u0012\u0001\u0004\u0011\t#\u0001\u0004sKN,H\u000e\u001e\t\u0007\u00033\u000biJa\t\u0011\t\u0005\u0005'QE\u0005\u0005\u0005O\tiK\u0001\u0003E_:,\u0017!C9vKJLH+Y:l)\u0011\u0011iC!\u000e\u0011\tQ4(q\u0006\t\u0004s\nE\u0012b\u0001B\u001au\t\t\u0012+^3ssR\u000b7o\u001b*fgB|gn]3\t\ru\u0014\u0002\u0019\u0001B\u001c!\rI(\u0011H\u0005\u0004\u0005wQ$\u0001E)vKJLH+Y:l%\u0016\fX/Z:u\u0003U\u0019wN\u001c<feR$\u0016m]6Q_J$8\u000b^1ukN$BA!\u0011\u0003HA\u0019\u0011Pa\u0011\n\u0007\t\u0015#H\u0001\bUCN\\\u0007k\u001c:u'R\fG/^:\t\u000f\t%3\u00031\u0001\u0003L\u0005Q\u0001o\u001c:u'R\fG/^:\u0011\u0007\u0001\u0013i%C\u0002\u0003P]\u0011!\u0002U8siN#\u0018\r^;t\u0003)i\u0017-\u001f2f\u000bJ\u0014xN\u001d\u000b\u0005\u0005+\u0012Y\u0006\u0005\u0003F\u0005/*\u0017b\u0001B-\r\n1q\n\u001d;j_:DqA!\u0018\u0015\u0001\u0004\u0011y&A\u0001ua\u0011\u0011\tGa\u001a\u0011\r\u0005e\u0015Q\u0014B2!\u0011\u0011)Ga\u001a\r\u0001\u0011a!\u0011\u000eB.\u0003\u0003\u0005\tQ!\u0001\u0003l\t\u0019q\fJ\u0019\u0012\t\t5$1\u000f\t\u0004\u000b\n=\u0014b\u0001B9\r\n9aj\u001c;iS:<\u0007cA#\u0003v%\u0019!q\u000f$\u0003\u0007\u0005s\u00170\u0001\nfeJ|'O\u0012:p[RC'o\\<bE2,GcA3\u0003~!9!QL\u000bA\u0002\t}\u0004\u0003\u0002BA\u0005\u0017sAAa!\u0003\b:\u0019aF!\"\n\u0003\u001dK1A!#G\u0003\u001d\u0001\u0018mY6bO\u0016LAA!$\u0003\u0010\nIA\u000b\u001b:po\u0006\u0014G.\u001a\u0006\u0004\u0005\u00133\u0005")
/* loaded from: input_file:ai/mantik/mnp/server/MnpServiceImp.class */
public class MnpServiceImp extends ComponentBase implements MnpServiceGrpc.MnpService {
    private final ServerBackend backend;
    public final Function0<BoxedUnit> ai$mantik$mnp$server$MnpServiceImp$$quitHandler;
    private final ConcurrentHashMap<String, ServerSessionState> ai$mantik$mnp$server$MnpServiceImp$$sessions;

    @Override // ai.mantik.mnp.protocol.mnp.MnpServiceGrpc.MnpService
    /* renamed from: serviceCompanion, reason: merged with bridge method [inline-methods] */
    public MnpServiceGrpc$MnpService$ m149serviceCompanion() {
        return MnpServiceGrpc.MnpService.serviceCompanion$(this);
    }

    public ConcurrentHashMap<String, ServerSessionState> ai$mantik$mnp$server$MnpServiceImp$$sessions() {
        return this.ai$mantik$mnp$server$MnpServiceImp$$sessions;
    }

    @Override // ai.mantik.mnp.protocol.mnp.MnpServiceGrpc.MnpService
    public Future<AboutResponse> about(Empty empty) {
        return this.backend.about();
    }

    @Override // ai.mantik.mnp.protocol.mnp.MnpServiceGrpc.MnpService
    public void init(InitRequest initRequest, StreamObserver<InitResponse> streamObserver) {
        Function2<SessionState, Option<String>, BoxedUnit> function2 = (sessionState, option) -> {
            $anonfun$init$1(streamObserver, sessionState, option);
            return BoxedUnit.UNIT;
        };
        if (!ai$mantik$mnp$server$MnpServiceImp$$sessions().containsKey(initRequest.sessionId())) {
            Map map = ((IterableOnceOps) ((IterableOps) initRequest.outputs().zipWithIndex()).collect(new MnpServiceImp$$anonfun$1(null))).toMap($less$colon$less$.MODULE$.refl());
            this.backend.init(initRequest, function2).onComplete(r12 -> {
                Future future;
                Future future2;
                if (r12 instanceof Success) {
                    ServerSession serverSession = (ServerSession) ((Success) r12).value();
                    if (Option$.MODULE$.apply(this.ai$mantik$mnp$server$MnpServiceImp$$sessions().putIfAbsent(initRequest.sessionId(), new ServerSessionState(initRequest.sessionId(), serverSession, initRequest, map, ServerSessionState$.MODULE$.apply$default$5()))).isDefined()) {
                        future2 = serverSession.shutdown().andThen(new MnpServiceImp$$anonfun$$nestedInanonfun$init$3$1(this, initRequest, streamObserver), this.executionContext());
                    } else {
                        if (this.logger().underlying().isInfoEnabled()) {
                            this.logger().underlying().info("Session {} created", initRequest.sessionId());
                            BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        } else {
                            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                        }
                        streamObserver.onNext(new InitResponse(SessionState$SS_READY$.MODULE$, InitResponse$.MODULE$.apply$default$2(), InitResponse$.MODULE$.apply$default$3()));
                        streamObserver.onCompleted();
                        future2 = BoxedUnit.UNIT;
                    }
                    future = future2;
                } else {
                    if (!(r12 instanceof Failure)) {
                        throw new MatchError(r12);
                    }
                    Throwable exception = ((Failure) r12).exception();
                    if (this.logger().underlying().isWarnEnabled()) {
                        this.logger().underlying().warn(new StringBuilder(24).append("Session ").append(initRequest.sessionId()).append(" failed to start").toString(), exception);
                        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                    } else {
                        BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                    }
                    streamObserver.onNext(new InitResponse(SessionState$SS_FAILED$.MODULE$, this.ai$mantik$mnp$server$MnpServiceImp$$errorFromThrowable(exception), InitResponse$.MODULE$.apply$default$3()));
                    streamObserver.onCompleted();
                    future = BoxedUnit.UNIT;
                }
                return future;
            }, executionContext());
            return;
        }
        if (logger().underlying().isErrorEnabled()) {
            logger().underlying().error("Session {} already exists", initRequest.sessionId());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        streamObserver.onNext(new InitResponse(SessionState$SS_FAILED$.MODULE$, "Session already exists", InitResponse$.MODULE$.apply$default$3()));
        streamObserver.onCompleted();
    }

    @Override // ai.mantik.mnp.protocol.mnp.MnpServiceGrpc.MnpService
    public Future<QuitResponse> quit(QuitRequest quitRequest) {
        return this.backend.quit().map(boxedUnit -> {
            return new QuitResponse(QuitResponse$.MODULE$.apply$default$1());
        }, executionContext()).andThen(new MnpServiceImp$$anonfun$quit$2(this), executionContext());
    }

    @Override // ai.mantik.mnp.protocol.mnp.MnpServiceGrpc.MnpService
    public Future<QuitSessionResponse> quitSession(QuitSessionRequest quitSessionRequest) {
        Future<QuitSessionResponse> map;
        Some apply = Option$.MODULE$.apply(ai$mantik$mnp$server$MnpServiceImp$$sessions().get(quitSessionRequest.sessionId()));
        if (None$.MODULE$.equals(apply)) {
            map = Future$.MODULE$.failed(new MnpException("Session doesn't exist", MnpException$.MODULE$.$lessinit$greater$default$2()));
        } else {
            if (!(apply instanceof Some)) {
                throw new MatchError(apply);
            }
            map = ((ServerSessionState) apply.value()).session().shutdown().andThen(new MnpServiceImp$$anonfun$quitSession$1(this, quitSessionRequest), executionContext()).map(boxedUnit -> {
                return new QuitSessionResponse(QuitSessionResponse$.MODULE$.apply$default$1());
            }, executionContext());
        }
        return map;
    }

    @Override // ai.mantik.mnp.protocol.mnp.MnpServiceGrpc.MnpService
    public StreamObserver<PushRequest> push(StreamObserver<PushResponse> streamObserver) {
        return StreamConversions$.MODULE$.respondMultiInSingleOutWithHeader(new MnpServiceImp$$anonfun$push$1(null), streamObserver, (pushRequest, source) -> {
            return this.getTask(pushRequest.sessionId(), pushRequest.taskId(), true).flatMap(serverTaskState -> {
                if (pushRequest.port() < 0 || pushRequest.port() >= serverTaskState.inputs().length()) {
                    throw new MnpException(new StringBuilder(27).append("Invalid port, expected [0,").append(serverTaskState.inputs().length() - 1).append("]").toString(), MnpException$.MODULE$.$lessinit$greater$default$2());
                }
                PortStatus portStatus = (PortStatus) serverTaskState.inputs().apply(pushRequest.port());
                if (!portStatus.inUse().compareAndSet(false, true)) {
                    throw new MnpException("Port already in use", MnpException$.MODULE$.$lessinit$greater$default$2());
                }
                return serverTaskState.task().push(pushRequest.port(), source.map(pushRequest -> {
                    portStatus.messages().incrementAndGet();
                    portStatus.bytes().addAndGet(pushRequest.data().size());
                    return RpcConversions$.MODULE$.decodeByteString(pushRequest.data());
                }).mapMaterializedValue(obj -> {
                    return NotUsed$.MODULE$;
                })).map(done -> {
                    return new PushResponse(PushResponse$.MODULE$.apply$default$1());
                }, this.executionContext()).andThen(new MnpServiceImp$$anonfun$$nestedInanonfun$push$3$1(this, portStatus), this.executionContext());
            }, this.executionContext());
        }, materializer(), executionContext());
    }

    @Override // ai.mantik.mnp.protocol.mnp.MnpServiceGrpc.MnpService
    public void pull(PullRequest pullRequest, StreamObserver<PullResponse> streamObserver) {
        getTask(pullRequest.sessionId(), pullRequest.taskId(), true).andThen(new MnpServiceImp$$anonfun$pull$1(this, streamObserver, pullRequest), executionContext());
    }

    public Try<Source<ByteString, NotUsed>> ai$mantik$mnp$server$MnpServiceImp$$pullRun(ServerTaskState serverTaskState, int i) {
        if (i < 0 || i >= serverTaskState.outputs().length()) {
            return new Failure(new MnpException(new StringBuilder(27).append("Invalid port, expected [0,").append(serverTaskState.outputs().length() - 1).append("]").toString(), MnpException$.MODULE$.$lessinit$greater$default$2()));
        }
        PortStatus portStatus = (PortStatus) serverTaskState.outputs().apply(i);
        return !portStatus.inUse().compareAndSet(false, true) ? new Failure(new MnpException("Port already in use", MnpException$.MODULE$.$lessinit$greater$default$2())) : new Success(serverTaskState.task().pull(i).wireTap(Sink$.MODULE$.foreach(byteString -> {
            $anonfun$pullRun$1(portStatus, byteString);
            return BoxedUnit.UNIT;
        }).mapMaterializedValue(future -> {
            return future.andThen(new MnpServiceImp$$anonfun$$nestedInanonfun$pullRun$2$1(this, portStatus), this.executionContext());
        })));
    }

    private Future<ServerTaskState> getTask(String str, String str2, boolean z) {
        Future<ServerTaskState> successful;
        Future<ServerTaskState> future;
        Some apply = Option$.MODULE$.apply(ai$mantik$mnp$server$MnpServiceImp$$sessions().get(str));
        if (None$.MODULE$.equals(apply)) {
            future = Future$.MODULE$.failed(new MnpException(new StringBuilder(18).append("Session ").append(str).append(" not found").toString(), MnpException$.MODULE$.$lessinit$greater$default$2()));
        } else {
            if (!(apply instanceof Some)) {
                throw new MatchError(apply);
            }
            ServerSessionState serverSessionState = (ServerSessionState) apply.value();
            Some apply2 = Option$.MODULE$.apply(serverSessionState.tasks().get(str2));
            if (None$.MODULE$.equals(apply2)) {
                successful = z ? serverSessionState.session().runTask(str2).map(serverTask -> {
                    ServerTaskState serverTaskState;
                    ServerTaskState serverTaskState2 = new ServerTaskState(serverTask, TaskState$TS_EXISTS$.MODULE$, ServerTaskState$.MODULE$.apply$default$3(), package$.MODULE$.Vector().fill(serverSessionState.initRequest().inputs().size(), () -> {
                        return new PortStatus();
                    }), package$.MODULE$.Vector().fill(serverSessionState.initRequest().outputs().size(), () -> {
                        return new PortStatus();
                    }));
                    Some apply3 = Option$.MODULE$.apply(serverSessionState.tasks().putIfAbsent(str2, serverTaskState2));
                    if (apply3 instanceof Some) {
                        ServerTaskState serverTaskState3 = (ServerTaskState) apply3.value();
                        if (this.logger().underlying().isInfoEnabled()) {
                            this.logger().underlying().info("Race condition on creating task {}/{}", new Object[]{str, str2});
                            BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        } else {
                            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                        }
                        serverTask.shutdown();
                        serverTaskState = serverTaskState3;
                    } else {
                        if (!None$.MODULE$.equals(apply3)) {
                            throw new MatchError(apply3);
                        }
                        this.startForwarders(serverSessionState, str2, serverTaskState2);
                        serverTask.finished().andThen(new MnpServiceImp$$anonfun$$nestedInanonfun$getTask$1$1(this, str, str2), this.executionContext());
                        serverTaskState = serverTaskState2;
                    }
                    return serverTaskState;
                }, executionContext()) : Future$.MODULE$.failed(new MnpException(new StringBuilder(15).append("Task ").append(str2).append(" not found").toString(), MnpException$.MODULE$.$lessinit$greater$default$2()));
            } else {
                if (!(apply2 instanceof Some)) {
                    throw new MatchError(apply2);
                }
                successful = Future$.MODULE$.successful((ServerTaskState) apply2.value());
            }
            future = successful;
        }
        return future;
    }

    private void startForwarders(ServerSessionState serverSessionState, String str, ServerTaskState serverTaskState) {
        serverSessionState.forwardDefinitions().foreach(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            int _1$mcI$sp = tuple2._1$mcI$sp();
            MnpSessionPortUrl mnpSessionPortUrl = (MnpSessionPortUrl) tuple2._2();
            return Future$.MODULE$.apply(() -> {
                MnpClient connect = MnpClient$.MODULE$.connect(mnpSessionPortUrl.address());
                try {
                    return ((Future) ((Source) this.ai$mantik$mnp$server$MnpServiceImp$$pullRun(serverTaskState, _1$mcI$sp).getOrElse(() -> {
                        throw new IllegalStateException("Could not issue forwarding pull run");
                    })).runWith(connect.joinSession(mnpSessionPortUrl.session()).task(str).push(mnpSessionPortUrl.port(), this.executionContext()), this.materializer())).andThen(new MnpServiceImp$$anonfun$$nestedInanonfun$startForwarders$2$1(null, connect), this.executionContext());
                } catch (Throwable th) {
                    if (th == null || NonFatal$.MODULE$.unapply(th).isEmpty()) {
                        throw th;
                    }
                    return connect.channel().shutdownNow();
                }
            }, this.executionContext());
        });
    }

    private boolean updateTaskState(String str, String str2, final Function1<ServerTaskState, ServerTaskState> function1) {
        boolean isDefined;
        Some apply = Option$.MODULE$.apply(ai$mantik$mnp$server$MnpServiceImp$$sessions().get(str));
        if (None$.MODULE$.equals(apply)) {
            isDefined = false;
        } else {
            if (!(apply instanceof Some)) {
                throw new MatchError(apply);
            }
            final MnpServiceImp mnpServiceImp = null;
            isDefined = Option$.MODULE$.apply(((ServerSessionState) apply.value()).tasks().computeIfPresent(str2, new BiFunction<String, ServerTaskState, ServerTaskState>(mnpServiceImp, function1) { // from class: ai.mantik.mnp.server.MnpServiceImp$$anon$1
                private final Function1 f$1;

                @Override // java.util.function.BiFunction
                public <V> BiFunction<String, ServerTaskState, V> andThen(Function<? super ServerTaskState, ? extends V> function) {
                    return super.andThen(function);
                }

                @Override // java.util.function.BiFunction
                public ServerTaskState apply(String str3, ServerTaskState serverTaskState) {
                    return (ServerTaskState) this.f$1.apply(serverTaskState);
                }

                {
                    this.f$1 = function1;
                }
            })).isDefined();
        }
        return isDefined;
    }

    public void ai$mantik$mnp$server$MnpServiceImp$$onTaskFinish(String str, String str2, Try<Done> r10) {
        TaskState.Recognized recognized = r10.isSuccess() ? TaskState$TS_FINISHED$.MODULE$ : TaskState$TS_FAILED$.MODULE$;
        updateTaskState(str, str2, serverTaskState -> {
            return serverTaskState.copy(serverTaskState.copy$default$1(), (TaskState) recognized, this.maybeError(r10), serverTaskState.copy$default$4(), serverTaskState.copy$default$5());
        });
    }

    @Override // ai.mantik.mnp.protocol.mnp.MnpServiceGrpc.MnpService
    public Future<QueryTaskResponse> queryTask(QueryTaskRequest queryTaskRequest) {
        return getTask(queryTaskRequest.sessionId(), queryTaskRequest.taskId(), queryTaskRequest.ensure()).map(serverTaskState -> {
            return new QueryTaskResponse(serverTaskState.state(), (String) serverTaskState.error().getOrElse(() -> {
                return "";
            }), (Seq) serverTaskState.inputs().map(portStatus -> {
                return this.convertTaskPortStatus(portStatus);
            }), (Seq) serverTaskState.outputs().map(portStatus2 -> {
                return this.convertTaskPortStatus(portStatus2);
            }), QueryTaskResponse$.MODULE$.apply$default$5());
        }, executionContext());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public TaskPortStatus convertTaskPortStatus(PortStatus portStatus) {
        return new TaskPortStatus(portStatus.messages().get(), portStatus.bytes().get(), (String) Option$.MODULE$.apply(portStatus.error().get()).getOrElse(() -> {
            return "";
        }), portStatus.done().get(), TaskPortStatus$.MODULE$.apply$default$5());
    }

    private Option<String> maybeError(Try<?> r4) {
        return r4.failed().map(th -> {
            return this.ai$mantik$mnp$server$MnpServiceImp$$errorFromThrowable(th);
        }).toOption();
    }

    public String ai$mantik$mnp$server$MnpServiceImp$$errorFromThrowable(Throwable th) {
        return (String) Option$.MODULE$.apply(th.getMessage()).getOrElse(() -> {
            return "Unknown Error";
        });
    }

    public static final /* synthetic */ void $anonfun$init$1(StreamObserver streamObserver, SessionState sessionState, Option option) {
        if (sessionState.isSsReady() || sessionState.isSsFailed()) {
            return;
        }
        streamObserver.onNext(new InitResponse(sessionState, (String) option.getOrElse(() -> {
            return "";
        }), InitResponse$.MODULE$.apply$default$3()));
    }

    public static final /* synthetic */ void $anonfun$pullRun$1(PortStatus portStatus, ByteString byteString) {
        portStatus.messages().incrementAndGet();
        portStatus.bytes().addAndGet(byteString.length());
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public MnpServiceImp(ServerBackend serverBackend, Function0<BoxedUnit> function0, AkkaRuntime akkaRuntime) {
        super(akkaRuntime);
        this.backend = serverBackend;
        this.ai$mantik$mnp$server$MnpServiceImp$$quitHandler = function0;
        MnpServiceGrpc.MnpService.$init$(this);
        this.ai$mantik$mnp$server$MnpServiceImp$$sessions = new ConcurrentHashMap<>();
    }
}
