package io.numaproj.numaflow.sessionreducer;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.AllDeadLetters;
import akka.util.Timeout;
import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.reduce.v1.ReduceGrpc;
import io.numaproj.numaflow.sessionreduce.v1.SessionReduceGrpc;
import io.numaproj.numaflow.sessionreduce.v1.Sessionreduce;
import io.numaproj.numaflow.sessionreducer.model.SessionReducer;
import io.numaproj.numaflow.sessionreducer.model.SessionReducerFactory;
import java.util.concurrent.CompletableFuture;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;

/* loaded from: input_file:io/numaproj/numaflow/sessionreducer/Service.class */
class Service extends SessionReduceGrpc.SessionReduceImplBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Service.class);
    public static final ActorSystem sessionReduceActorSystem = ActorSystem.create("sessionreduce");
    private final SessionReducerFactory<? extends SessionReducer> sessionReducerFactory;

    public Service(SessionReducerFactory<? extends SessionReducer> sessionReducerFactory) {
        this.sessionReducerFactory = sessionReducerFactory;
    }

    static void handleFailure(CompletableFuture<Void> completableFuture, StreamObserver<Sessionreduce.SessionReduceResponse> streamObserver) {
        new Thread(() -> {
            try {
                completableFuture.get();
            } catch (Exception e) {
                e.printStackTrace();
                streamObserver.onError(Status.UNKNOWN.withDescription(e.getMessage()).withCause(e).asException());
            }
        }).start();
    }

    @Override // io.numaproj.numaflow.sessionreduce.v1.SessionReduceGrpc.AsyncService
    public StreamObserver<Sessionreduce.SessionReduceRequest> sessionReduceFn(final StreamObserver<Sessionreduce.SessionReduceResponse> streamObserver) {
        if (this.sessionReducerFactory == null) {
            return ServerCalls.asyncUnimplementedStreamingCall(ReduceGrpc.getReduceFnMethod(), streamObserver);
        }
        CompletableFuture completableFuture = new CompletableFuture();
        ActorRef actorOf = sessionReduceActorSystem.actorOf(ShutdownActor.props(completableFuture));
        sessionReduceActorSystem.getEventStream().subscribe(actorOf, AllDeadLetters.class);
        handleFailure(completableFuture, streamObserver);
        final ActorRef actorOf2 = sessionReduceActorSystem.actorOf(SupervisorActor.props(this.sessionReducerFactory, actorOf, sessionReduceActorSystem.actorOf(OutputActor.props(streamObserver))));
        return new StreamObserver<Sessionreduce.SessionReduceRequest>() { // from class: io.numaproj.numaflow.sessionreducer.Service.1
            @Override // io.grpc.stub.StreamObserver
            public void onNext(Sessionreduce.SessionReduceRequest sessionReduceRequest) {
                if (actorOf2.isTerminated()) {
                    streamObserver.onError(new Throwable("Supervisor actor was terminated"));
                    return;
                }
                if (sessionReduceRequest.getOperation().getEvent() != Sessionreduce.SessionReduceRequest.WindowOperation.Event.MERGE) {
                    actorOf2.tell(sessionReduceRequest, ActorRef.noSender());
                    return;
                }
                Timeout timeout = new Timeout(Duration.create(1L, "hour"));
                try {
                } catch (Exception e) {
                    streamObserver.onError(new Throwable("Supervisor actor failed processing a MERGE request: " + e.getMessage()));
                }
            }

            @Override // io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                Service.log.error("Error from the client - {}", th.getMessage());
                streamObserver.onError(th);
            }

            @Override // io.grpc.stub.StreamObserver
            public void onCompleted() {
                actorOf2.tell("EOF", ActorRef.noSender());
            }
        };
    }

    @Override // io.numaproj.numaflow.sessionreduce.v1.SessionReduceGrpc.AsyncService
    public void isReady(Empty empty, StreamObserver<Sessionreduce.ReadyResponse> streamObserver) {
        streamObserver.onNext(Sessionreduce.ReadyResponse.newBuilder().setReady(true).build());
        streamObserver.onCompleted();
    }
}
