package io.numaproj.numaflow.sinker;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.AllDeadLetters;
import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.sink.v1.SinkGrpc;
import io.numaproj.numaflow.sink.v1.SinkOuterClass;
import java.util.concurrent.CompletableFuture;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/numaproj/numaflow/sinker/Service.class */
class Service extends SinkGrpc.SinkImplBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Service.class);
    public static final ActorSystem sinkActorSystem = ActorSystem.create("sink");
    private final Sinker sinker;

    public Service(Sinker sinker) {
        this.sinker = sinker;
    }

    @Override // io.numaproj.numaflow.sink.v1.SinkGrpc.SinkImplBase
    public StreamObserver<SinkOuterClass.SinkRequest> sinkFn(final StreamObserver<SinkOuterClass.SinkResponse> streamObserver) {
        if (this.sinker == null) {
            return ServerCalls.asyncUnimplementedStreamingCall(SinkGrpc.getSinkFnMethod(), streamObserver);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        ActorRef actorOf = sinkActorSystem.actorOf(SinkShutdownActor.props(completableFuture));
        sinkActorSystem.getEventStream().subscribe(actorOf, AllDeadLetters.class);
        handleFailure(completableFuture, streamObserver);
        final ActorRef actorOf2 = sinkActorSystem.actorOf(SinkSupervisorActor.props(this.sinker, actorOf, streamObserver));
        return new StreamObserver<SinkOuterClass.SinkRequest>() { // from class: io.numaproj.numaflow.sinker.Service.1
            @Override // io.grpc.stub.StreamObserver
            public void onNext(SinkOuterClass.SinkRequest sinkRequest) {
                actorOf2.tell(sinkRequest, ActorRef.noSender());
            }

            @Override // io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                Service.log.error("Encountered error in sinkFn - {}", th.getMessage());
                streamObserver.onError(th);
            }

            @Override // io.grpc.stub.StreamObserver
            public void onCompleted() {
                actorOf2.tell("EOF", ActorRef.noSender());
            }
        };
    }

    @Override // io.numaproj.numaflow.sink.v1.SinkGrpc.SinkImplBase
    public void isReady(Empty empty, StreamObserver<SinkOuterClass.ReadyResponse> streamObserver) {
        streamObserver.onNext(SinkOuterClass.ReadyResponse.newBuilder().setReady(true).build());
        streamObserver.onCompleted();
    }

    private void handleFailure(CompletableFuture<Void> completableFuture, StreamObserver<SinkOuterClass.SinkResponse> streamObserver) {
        new Thread(() -> {
            try {
                completableFuture.get();
            } catch (Exception e) {
                e.printStackTrace();
                streamObserver.onError(Status.UNKNOWN.withDescription(e.getMessage()).withCause(e).asException());
            }
        }).start();
    }
}
