package io.numaproj.numaflow.reducer;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.AllDeadLetters;
import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.reduce.v1.ReduceGrpc;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import io.numaproj.numaflow.reducer.metadata.IntervalWindowImpl;
import io.numaproj.numaflow.reducer.metadata.MetadataImpl;
import io.numaproj.numaflow.shared.GrpcServerUtils;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/numaproj/numaflow/reducer/Service.class */
class Service extends ReduceGrpc.ReduceImplBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Service.class);
    public static final ActorSystem reduceActorSystem = ActorSystem.create("reduce");
    private ReducerFactory<? extends Reducer> reducerFactory;

    public Service(ReducerFactory<? extends Reducer> reducerFactory) {
        this.reducerFactory = reducerFactory;
    }

    @Override // io.numaproj.numaflow.reduce.v1.ReduceGrpc.ReduceImplBase
    public StreamObserver<ReduceOuterClass.ReduceRequest> reduceFn(final StreamObserver<ReduceOuterClass.ReduceResponse> streamObserver) {
        if (this.reducerFactory == null) {
            return ServerCalls.asyncUnimplementedStreamingCall(ReduceGrpc.getReduceFnMethod(), streamObserver);
        }
        MetadataImpl metadataImpl = new MetadataImpl(new IntervalWindowImpl(Instant.ofEpochMilli(Long.parseLong(GrpcServerUtils.WINDOW_START_TIME.get())), Instant.ofEpochMilli(Long.parseLong(GrpcServerUtils.WINDOW_END_TIME.get()))));
        CompletableFuture completableFuture = new CompletableFuture();
        ActorRef actorOf = reduceActorSystem.actorOf(ReduceShutdownActor.props(completableFuture));
        reduceActorSystem.getEventStream().subscribe(actorOf, AllDeadLetters.class);
        handleFailure(completableFuture, streamObserver);
        final ActorRef actorOf2 = reduceActorSystem.actorOf(ReduceSupervisorActor.props(this.reducerFactory, metadataImpl, actorOf, streamObserver));
        return new StreamObserver<ReduceOuterClass.ReduceRequest>() { // from class: io.numaproj.numaflow.reducer.Service.1
            @Override // io.grpc.stub.StreamObserver
            public void onNext(ReduceOuterClass.ReduceRequest reduceRequest) {
                if (actorOf2.isTerminated()) {
                    streamObserver.onError(new Throwable("Supervisor actor was terminated"));
                } else {
                    actorOf2.tell(reduceRequest, ActorRef.noSender());
                }
            }

            @Override // io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                Service.log.error("Error from the client - {}", th.getMessage());
                streamObserver.onError(th);
            }

            @Override // io.grpc.stub.StreamObserver
            public void onCompleted() {
                actorOf2.tell("EOF", ActorRef.noSender());
            }
        };
    }

    @Override // io.numaproj.numaflow.reduce.v1.ReduceGrpc.ReduceImplBase
    public void isReady(Empty empty, StreamObserver<ReduceOuterClass.ReadyResponse> streamObserver) {
        streamObserver.onNext(ReduceOuterClass.ReadyResponse.newBuilder().setReady(true).build());
        streamObserver.onCompleted();
    }

    static void handleFailure(CompletableFuture<Void> completableFuture, StreamObserver<ReduceOuterClass.ReduceResponse> streamObserver) {
        new Thread(() -> {
            try {
                completableFuture.get();
            } catch (Exception e) {
                e.printStackTrace();
                streamObserver.onError(Status.UNKNOWN.withDescription(e.getMessage()).withCause(e).asException());
            }
        }).start();
    }
}
