package io.numaproj.numaflow.sink;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.AllDeadLetters;
import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.function.v1.UserDefinedFunctionGrpc;
import io.numaproj.numaflow.sink.handler.SinkHandler;
import io.numaproj.numaflow.sink.v1.Udsink;
import io.numaproj.numaflow.sink.v1.UserDefinedSinkGrpc;
import java.util.concurrent.CompletableFuture;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/numaproj/numaflow/sink/SinkService.class */
class SinkService extends UserDefinedSinkGrpc.UserDefinedSinkImplBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SinkService.class);
    public static final ActorSystem sinkActorSystem = ActorSystem.create("sink");
    private SinkHandler sinkHandler;

    public void setSinkHandler(SinkHandler sinkHandler) {
        this.sinkHandler = sinkHandler;
    }

    @Override // io.numaproj.numaflow.sink.v1.UserDefinedSinkGrpc.UserDefinedSinkImplBase
    public StreamObserver<Udsink.DatumRequest> sinkFn(final StreamObserver<Udsink.ResponseList> streamObserver) {
        if (this.sinkHandler == null) {
            return ServerCalls.asyncUnimplementedStreamingCall(UserDefinedFunctionGrpc.getMapFnMethod(), streamObserver);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        ActorRef actorOf = sinkActorSystem.actorOf(SinkShutdownActor.props(completableFuture));
        sinkActorSystem.getEventStream().subscribe(actorOf, AllDeadLetters.class);
        handleFailure(completableFuture, streamObserver);
        final ActorRef actorOf2 = sinkActorSystem.actorOf(SinkSupervisorActor.props(this.sinkHandler, actorOf, streamObserver));
        return new StreamObserver<Udsink.DatumRequest>() { // from class: io.numaproj.numaflow.sink.SinkService.1
            @Override // io.grpc.stub.StreamObserver
            public void onNext(Udsink.DatumRequest datumRequest) {
                actorOf2.tell(datumRequest, ActorRef.noSender());
            }

            @Override // io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                SinkService.log.error("Encountered error in sinkFn - {}", th.getMessage());
                streamObserver.onError(th);
            }

            @Override // io.grpc.stub.StreamObserver
            public void onCompleted() {
                actorOf2.tell("EOF", ActorRef.noSender());
            }
        };
    }

    @Override // io.numaproj.numaflow.sink.v1.UserDefinedSinkGrpc.UserDefinedSinkImplBase
    public void isReady(Empty empty, StreamObserver<Udsink.ReadyResponse> streamObserver) {
        streamObserver.onNext(Udsink.ReadyResponse.newBuilder().setReady(true).build());
        streamObserver.onCompleted();
    }

    private void handleFailure(CompletableFuture<Void> completableFuture, StreamObserver<Udsink.ResponseList> streamObserver) {
        new Thread(() -> {
            try {
                completableFuture.get();
            } catch (Exception e) {
                e.printStackTrace();
                streamObserver.onError(Status.UNKNOWN.withDescription(e.getMessage()).withCause(e).asException());
            }
        }).start();
    }
}
