package io.numaproj.numaflow.mapper;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.map.v1.MapGrpc;
import io.numaproj.numaflow.map.v1.MapOuterClass;
import java.util.concurrent.CompletableFuture;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/numaproj/numaflow/mapper/Service.class */
class Service extends MapGrpc.MapImplBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(Service.class);
    public static final ActorSystem mapperActorSystem = ActorSystem.create("mapper");
    private final Mapper mapper;
    private final CompletableFuture<Void> shutdownSignal;

    @Override // io.numaproj.numaflow.map.v1.MapGrpc.AsyncService
    public StreamObserver<MapOuterClass.MapRequest> mapFn(final StreamObserver<MapOuterClass.MapResponse> streamObserver) {
        if (this.mapper == null) {
            return ServerCalls.asyncUnimplementedStreamingCall(MapGrpc.getMapFnMethod(), streamObserver);
        }
        final ActorRef actorOf = mapperActorSystem.actorOf(MapSupervisorActor.props(this.mapper, streamObserver, this.shutdownSignal));
        return new StreamObserver<MapOuterClass.MapRequest>() { // from class: io.numaproj.numaflow.mapper.Service.1
            private boolean handshakeDone = false;

            public void onNext(MapOuterClass.MapRequest mapRequest) {
                if (this.handshakeDone) {
                    actorOf.tell(mapRequest, ActorRef.noSender());
                } else if (!mapRequest.hasHandshake() || !mapRequest.getHandshake().getSot()) {
                    streamObserver.onError(Status.INVALID_ARGUMENT.withDescription("Handshake request not received").asException());
                } else {
                    streamObserver.onNext(MapOuterClass.MapResponse.newBuilder().setHandshake(mapRequest.getHandshake()).m198build());
                    this.handshakeDone = true;
                }
            }

            public void onError(Throwable th) {
                actorOf.tell(new Exception(th), ActorRef.noSender());
            }

            public void onCompleted() {
                actorOf.tell("EOF", ActorRef.noSender());
            }
        };
    }

    @Override // io.numaproj.numaflow.map.v1.MapGrpc.AsyncService
    public void isReady(Empty empty, StreamObserver<MapOuterClass.ReadyResponse> streamObserver) {
        streamObserver.onNext(MapOuterClass.ReadyResponse.newBuilder().setReady(true).m294build());
        streamObserver.onCompleted();
    }

    @Generated
    public Service(Mapper mapper, CompletableFuture<Void> completableFuture) {
        this.mapper = mapper;
        this.shutdownSignal = completableFuture;
    }
}
