package polynote.kernel.remote;

import polynote.env.ops.Location;
import polynote.kernel.logging.package$Logging$;
import polynote.kernel.logging.package$Logging$Service;
import polynote.kernel.remote.SocketTransport;
import polynote.messages.NotebookUpdate;
import polynote.messages.NotebookUpdate$;
import scala.Function1;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import zio.CanFail$;
import zio.Cause;
import zio.Has;
import zio.Promise;
import zio.ZIO;
import zio.ZIO$;
import zio.blocking.package;
import zio.stream.ZStream;

/* compiled from: transport.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005\u001dc\u0001B\n\u0015\u0001mA\u0001B\n\u0001\u0003\u0002\u0003\u0006Ia\n\u0005\t]\u0001\u0011\t\u0011)A\u0005_!)A\t\u0001C\u0005\u000b\")\u0011\n\u0001C\u0001\u0015\"9q\r\u0001b\u0001\n\u0013A\u0007BB=\u0001A\u0003%\u0011\u000eC\u0004{\u0001\t\u0007I\u0011B>\t\u000f\u0005\u001d\u0001\u0001)A\u0005y\"9\u0011\u0011\u0002\u0001\u0005\u0002\u0005-\u0001\u0002CA\u000f\u0001\t\u0007I\u0011\t5\t\u000f\u0005}\u0001\u0001)A\u0005S\"A\u0011\u0011\u0005\u0001C\u0002\u0013\u00053\u0010C\u0004\u0002$\u0001\u0001\u000b\u0011\u0002?\t\u000f\u0005\u0015\u0002\u0001\"\u0001\u0002(\u001d9\u0011\u0011\u0006\u000b\t\u0002\u0005-bAB\n\u0015\u0011\u0003\ti\u0003\u0003\u0004E!\u0011\u0005\u0011q\u0006\u0005\b\u0003c\u0001B\u0011AA\u001a\u0005U\u0019vnY6fiR\u0013\u0018M\\:q_J$8\t\\5f]RT!!\u0006\f\u0002\rI,Wn\u001c;f\u0015\t9\u0002$\u0001\u0004lKJtW\r\u001c\u0006\u00023\u0005A\u0001o\u001c7z]>$Xm\u0001\u0001\u0014\u0007\u0001a\"\u0005\u0005\u0002\u001eA5\taDC\u0001 \u0003\u0015\u00198-\u00197b\u0013\t\tcD\u0001\u0004B]f\u0014VM\u001a\t\u0003G\u0011j\u0011\u0001F\u0005\u0003KQ\u0011q\u0002\u0016:b]N\u0004xN\u001d;DY&,g\u000e^\u0001\tG\"\fgN\\3mgB\u0011\u0001f\u000b\b\u0003G%J!A\u000b\u000b\u0002\u001fM{7m[3u)J\fgn\u001d9peRL!\u0001L\u0017\u0003\u0011\rC\u0017M\u001c8fYNT!A\u000b\u000b\u0002\r\rdwn]3e!\u0011\u00014'N!\u000e\u0003ER\u0011AM\u0001\u0004u&|\u0017B\u0001\u001b2\u0005\u001d\u0001&o\\7jg\u0016\u0004\"A\u000e \u000f\u0005]bdB\u0001\u001d<\u001b\u0005I$B\u0001\u001e\u001b\u0003\u0019a$o\\8u}%\tq$\u0003\u0002>=\u00059\u0001/Y2lC\u001e,\u0017BA A\u0005%!\u0006N]8xC\ndWM\u0003\u0002>=A\u0011QDQ\u0005\u0003\u0007z\u0011A!\u00168ji\u00061A(\u001b8jiz\"2AR$I!\t\u0019\u0003\u0001C\u0003'\u0007\u0001\u0007q\u0005C\u0003/\u0007\u0001\u0007q&\u0001\u0005m_\u001e,%O]8s)\tYU\r\u0005\u0003\u001e\u0019:\u000b\u0016BA'\u001f\u0005%1UO\\2uS>t\u0017\u0007E\u00021\u001fVJ!\u0001U\u0019\u0003\u000b\r\u000bWo]3\u0011\u000bA\u0012FKY!\n\u0005M\u000b$a\u0001.J\u001fB\u0011Qk\u0018\b\u0003-vs!aV.\u000f\u0005aSfB\u0001\u001dZ\u0013\u0005I\u0012BA\f\u0019\u0013\taf#A\u0004m_\u001e<\u0017N\\4\n\u0005ur&B\u0001/\u0017\u0013\t\u0001\u0017MA\u0004M_\u001e<\u0017N\\4\u000b\u0005ur\u0006CA\u000fd\u0013\t!gDA\u0004O_RD\u0017N\\4\t\u000b\u0019$\u0001\u0019A&\u0002\u0005\u0019t\u0017!\u0004:fcV,7\u000f^*ue\u0016\fW.F\u0001j!\u0015QWn\\\u001bw\u001b\u0005Y'B\u000172\u0003\u0019\u0019HO]3b[&\u0011an\u001b\u0002\b5N#(/Z1n!\t\u00018O\u0004\u0002re6\ta#\u0003\u0002>-%\u0011A/\u001e\u0002\b\u0005\u0006\u001cX-\u00128w\u0015\tid\u0003\u0005\u0002$o&\u0011\u0001\u0010\u0006\u0002\u000e%\u0016lw\u000e^3SKF,Xm\u001d;\u0002\u001dI,\u0017/^3tiN#(/Z1nA\u0005aQ\u000f\u001d3bi\u0016\u001cFO]3b[V\tA\u0010E\u0003k[>,T\u0010E\u0002\u007f\u0003\u0007i\u0011a \u0006\u0004\u0003\u0003A\u0012\u0001C7fgN\fw-Z:\n\u0007\u0005\u0015qP\u0001\bO_R,'m\\8l+B$\u0017\r^3\u0002\u001bU\u0004H-\u0019;f'R\u0014X-Y7!\u00031\u0019XM\u001c3SKN\u0004xN\\:f)\u0011\ti!a\u0005\u0011\tA\fy!Q\u0005\u0004\u0003#)(!\u0002+bg.\u0014\u0005bBA\u000b\u0013\u0001\u0007\u0011qC\u0001\u0004e\u0016\u0004\bcA\u0012\u0002\u001a%\u0019\u00111\u0004\u000b\u0003\u001dI+Wn\u001c;f%\u0016\u001c\bo\u001c8tK\u0006A!/Z9vKN$8/A\u0005sKF,Xm\u001d;tA\u00059Q\u000f\u001d3bi\u0016\u001c\u0018\u0001C;qI\u0006$Xm\u001d\u0011\u0002\u000b\rdwn]3\u0015\u0005\u00055\u0011!F*pG.,G\u000f\u0016:b]N\u0004xN\u001d;DY&,g\u000e\u001e\t\u0003GA\u0019\"\u0001\u0005\u000f\u0015\u0005\u0005-\u0012!B1qa2LH\u0003BA\u001b\u0003\u000b\u0002R!a\u000e\u0002@\u0019sA!!\u000f\u0002>9\u0019\u0001(a\u000f\n\u0003IJ!!P\u0019\n\t\u0005\u0005\u00131\t\u0002\u0005)\u0006\u001c8N\u0003\u0002>c!)aE\u0005a\u0001O\u0001")
/* loaded from: input_file:polynote/kernel/remote/SocketTransportClient.class */
public class SocketTransportClient implements TransportClient {
    private final SocketTransport.Channels channels;
    private final Promise<Throwable, BoxedUnit> closed;
    private final ZStream<Has<package.Blocking.Service>, Throwable, RemoteRequest> requestStream;
    private final ZStream<Has<package.Blocking.Service>, Throwable, NotebookUpdate> updateStream;
    private final ZStream<Has<package.Blocking.Service>, Throwable, RemoteRequest> requests = requestStream().takeUntil(remoteRequest -> {
        return BoxesRunTime.boxToBoolean($anonfun$requests$1(remoteRequest));
    });
    private final ZStream<Has<package.Blocking.Service>, Throwable, NotebookUpdate> updates;

    public static ZIO<Object, Throwable, SocketTransportClient> apply(SocketTransport.Channels channels) {
        return SocketTransportClient$.MODULE$.apply(channels);
    }

    public Function1<Cause<Throwable>, ZIO<Has<package$Logging$Service>, Nothing$, BoxedUnit>> logError(Function1<Cause<Throwable>, ZIO<Has<package$Logging$Service>, Nothing$, BoxedUnit>> function1) {
        return cause -> {
            return ZIO$.MODULE$.when(() -> {
                return !cause.interruptedOnly();
            }, () -> {
                return (ZIO) function1.apply(cause);
            });
        };
    }

    private ZStream<Has<package.Blocking.Service>, Throwable, RemoteRequest> requestStream() {
        return this.requestStream;
    }

    private ZStream<Has<package.Blocking.Service>, Throwable, NotebookUpdate> updateStream() {
        return this.updateStream;
    }

    @Override // polynote.kernel.remote.TransportClient
    public ZIO<Has<package.Blocking.Service>, Throwable, BoxedUnit> sendResponse(RemoteResponse remoteResponse) {
        return ZIO$.MODULE$.fromEither(() -> {
            return RemoteResponse$.MODULE$.codec().encode(remoteResponse).toEither();
        }).mapError(err -> {
            return new RuntimeException(err.message());
        }, CanFail$.MODULE$.canFail()).flatMap(bitVector -> {
            return this.channels.mainChannel().write(bitVector).onError(this.logError(cause -> {
                return package$Logging$.MODULE$.error("Remote kernel client had an error sending a response (it will probably die now)", (Cause<Throwable>) cause, new Location("transport.scala", 181, "sendResponse", "polynote.kernel.remote.SocketTransportClient"));
            })).map(boxedUnit -> {
                BoxedUnit.UNIT;
                return BoxedUnit.UNIT;
            });
        });
    }

    @Override // polynote.kernel.remote.TransportClient
    public ZStream<Has<package.Blocking.Service>, Throwable, RemoteRequest> requests() {
        return this.requests;
    }

    @Override // polynote.kernel.remote.TransportClient
    public ZStream<Has<package.Blocking.Service>, Throwable, NotebookUpdate> updates() {
        return this.updates;
    }

    @Override // polynote.kernel.remote.TransportClient
    public ZIO<Has<package.Blocking.Service>, Throwable, BoxedUnit> close() {
        return this.closed.succeed(BoxedUnit.UNIT).$times$greater(() -> {
            return this.channels.close();
        });
    }

    public static final /* synthetic */ boolean $anonfun$requests$1(RemoteRequest remoteRequest) {
        return remoteRequest instanceof ShutdownRequest;
    }

    public SocketTransportClient(SocketTransport.Channels channels, Promise<Throwable, BoxedUnit> promise) {
        this.channels = channels;
        this.closed = promise;
        this.requestStream = (ZStream) polynote.kernel.package$.MODULE$.streamCodec(RemoteRequest$.MODULE$.codec()).apply(channels.mainChannel().bitVectors().haltWhen(promise.await().run()).onError(logError(cause -> {
            return package$Logging$.MODULE$.error("Remote kernel client's request stream had an networking error (it will probably die now)", (Cause<Throwable>) cause, new Location("transport.scala", 170, "<unknown>", "polynote.kernel.remote.SocketTransportClient"));
        })));
        this.updateStream = (ZStream) polynote.kernel.package$.MODULE$.streamCodec(NotebookUpdate$.MODULE$.codec()).apply(channels.notebookUpdatesChannel().bitVectors().haltWhen(promise.await().run()).onError(logError(cause2 -> {
            return package$Logging$.MODULE$.error("Remote kernel client's update stream had an networking error (it will probably die now)", (Cause<Throwable>) cause2, new Location("transport.scala", 175, "<unknown>", "polynote.kernel.remote.SocketTransportClient"));
        })));
        this.updates = updateStream().haltWhen(promise.await().run());
    }
}
