package io.numaproj.numaflow.sourcetransformer;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.sourcetransformer.v1.SourceTransformGrpc;
import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer;
import java.util.concurrent.CompletableFuture;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/numaproj/numaflow/sourcetransformer/Service.class */
class Service extends SourceTransformGrpc.SourceTransformImplBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(Service.class);
    public static final ActorSystem transformerActorSystem = ActorSystem.create("transformer");
    private final SourceTransformer transformer;
    private final CompletableFuture<Void> shutdownSignal;

    @Override // io.numaproj.numaflow.sourcetransformer.v1.SourceTransformGrpc.AsyncService
    public StreamObserver<Sourcetransformer.SourceTransformRequest> sourceTransformFn(final StreamObserver<Sourcetransformer.SourceTransformResponse> streamObserver) {
        if (this.transformer == null) {
            return ServerCalls.asyncUnimplementedStreamingCall(SourceTransformGrpc.getSourceTransformFnMethod(), streamObserver);
        }
        final ActorRef actorOf = transformerActorSystem.actorOf(TransformSupervisorActor.props(this.transformer, streamObserver, this.shutdownSignal));
        return new StreamObserver<Sourcetransformer.SourceTransformRequest>() { // from class: io.numaproj.numaflow.sourcetransformer.Service.1
            private boolean handshakeDone = false;

            public void onNext(Sourcetransformer.SourceTransformRequest sourceTransformRequest) {
                if (this.handshakeDone) {
                    actorOf.tell(sourceTransformRequest, ActorRef.noSender());
                } else if (!sourceTransformRequest.hasHandshake() || !sourceTransformRequest.getHandshake().getSot()) {
                    streamObserver.onError(Status.INVALID_ARGUMENT.withDescription("Handshake request not received").asException());
                } else {
                    streamObserver.onNext(Sourcetransformer.SourceTransformResponse.newBuilder().setHandshake(sourceTransformRequest.getHandshake()).m1397build());
                    this.handshakeDone = true;
                }
            }

            public void onError(Throwable th) {
                actorOf.tell(new Exception(th), ActorRef.noSender());
            }

            public void onCompleted() {
                actorOf.tell("EOF", ActorRef.noSender());
            }
        };
    }

    @Override // io.numaproj.numaflow.sourcetransformer.v1.SourceTransformGrpc.AsyncService
    public void isReady(Empty empty, StreamObserver<Sourcetransformer.ReadyResponse> streamObserver) {
        streamObserver.onNext(Sourcetransformer.ReadyResponse.newBuilder().setReady(true).m1320build());
        streamObserver.onCompleted();
    }

    @Generated
    public Service(SourceTransformer sourceTransformer, CompletableFuture<Void> completableFuture) {
        this.transformer = sourceTransformer;
        this.shutdownSignal = completableFuture;
    }
}
