package io.numaproj.numaflow.function;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.AllDeadLetters;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.protobuf.Timestamp;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.function.handlers.MapHandler;
import io.numaproj.numaflow.function.handlers.MapTHandler;
import io.numaproj.numaflow.function.handlers.ReduceHandler;
import io.numaproj.numaflow.function.handlers.ReducerFactory;
import io.numaproj.numaflow.function.metadata.IntervalWindowImpl;
import io.numaproj.numaflow.function.metadata.MetadataImpl;
import io.numaproj.numaflow.function.types.MessageList;
import io.numaproj.numaflow.function.types.MessageTList;
import io.numaproj.numaflow.function.v1.Udfunction;
import io.numaproj.numaflow.function.v1.UserDefinedFunctionGrpc;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/numaproj/numaflow/function/FunctionService.class */
class FunctionService extends UserDefinedFunctionGrpc.UserDefinedFunctionImplBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) FunctionService.class);
    public static final ActorSystem functionActorSystem = ActorSystem.create("reduce");
    private MapHandler mapHandler;
    private MapTHandler mapTHandler;
    private ReducerFactory<? extends ReduceHandler> reducerFactory;

    public void setMapHandler(MapHandler mapHandler) {
        this.mapHandler = mapHandler;
    }

    public void setMapTHandler(MapTHandler mapTHandler) {
        this.mapTHandler = mapTHandler;
    }

    public void setReduceHandler(ReducerFactory<? extends ReduceHandler> reducerFactory) {
        this.reducerFactory = reducerFactory;
    }

    @Override // io.numaproj.numaflow.function.v1.UserDefinedFunctionGrpc.UserDefinedFunctionImplBase
    public void mapFn(Udfunction.DatumRequest datumRequest, StreamObserver<Udfunction.DatumResponseList> streamObserver) {
        if (this.mapHandler == null) {
            ServerCalls.asyncUnimplementedUnaryCall(UserDefinedFunctionGrpc.getMapFnMethod(), streamObserver);
            return;
        }
        streamObserver.onNext(buildDatumListResponse(this.mapHandler.processMessage((String[]) datumRequest.getKeysList().toArray(new String[0]), new HandlerDatum(datumRequest.getValue().toByteArray(), Instant.ofEpochSecond(datumRequest.getWatermark().getWatermark().getSeconds(), datumRequest.getWatermark().getWatermark().getNanos()), Instant.ofEpochSecond(datumRequest.getEventTime().getEventTime().getSeconds(), datumRequest.getEventTime().getEventTime().getNanos()), new HandlerDatumMetadata(datumRequest.getMetadata().getId(), datumRequest.getMetadata().getNumDelivered())))));
        streamObserver.onCompleted();
    }

    @Override // io.numaproj.numaflow.function.v1.UserDefinedFunctionGrpc.UserDefinedFunctionImplBase
    public void mapTFn(Udfunction.DatumRequest datumRequest, StreamObserver<Udfunction.DatumResponseList> streamObserver) {
        if (this.mapTHandler == null) {
            ServerCalls.asyncUnimplementedUnaryCall(UserDefinedFunctionGrpc.getMapFnMethod(), streamObserver);
            return;
        }
        streamObserver.onNext(buildDatumListResponse(this.mapTHandler.processMessage((String[]) datumRequest.getKeysList().toArray(new String[0]), new HandlerDatum(datumRequest.getValue().toByteArray(), Instant.ofEpochSecond(datumRequest.getWatermark().getWatermark().getSeconds(), datumRequest.getWatermark().getWatermark().getNanos()), Instant.ofEpochSecond(datumRequest.getEventTime().getEventTime().getSeconds(), datumRequest.getEventTime().getEventTime().getNanos()), new HandlerDatumMetadata(datumRequest.getMetadata().getId(), datumRequest.getMetadata().getNumDelivered())))));
        streamObserver.onCompleted();
    }

    @Override // io.numaproj.numaflow.function.v1.UserDefinedFunctionGrpc.UserDefinedFunctionImplBase
    public StreamObserver<Udfunction.DatumRequest> reduceFn(final StreamObserver<Udfunction.DatumResponseList> streamObserver) {
        if (this.reducerFactory == null) {
            return ServerCalls.asyncUnimplementedStreamingCall(UserDefinedFunctionGrpc.getReduceFnMethod(), streamObserver);
        }
        MetadataImpl metadataImpl = new MetadataImpl(new IntervalWindowImpl(Instant.ofEpochMilli(Long.parseLong(FunctionConstants.WINDOW_START_TIME.get())), Instant.ofEpochMilli(Long.parseLong(FunctionConstants.WINDOW_END_TIME.get()))));
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        ActorRef actorOf = functionActorSystem.actorOf(ReduceShutdownActor.props(streamObserver, completableFuture));
        functionActorSystem.getEventStream().subscribe(actorOf, AllDeadLetters.class);
        handleFailure(completableFuture);
        final ActorRef actorOf2 = functionActorSystem.actorOf(ReduceSupervisorActor.props(this.reducerFactory, metadataImpl, actorOf, streamObserver));
        return new StreamObserver<Udfunction.DatumRequest>() { // from class: io.numaproj.numaflow.function.FunctionService.1
            @Override // io.grpc.stub.StreamObserver
            public void onNext(Udfunction.DatumRequest datumRequest) {
                if (actorOf2.isTerminated()) {
                    streamObserver.onError(new Throwable("Supervisor actor was terminated"));
                } else {
                    actorOf2.tell(datumRequest, ActorRef.noSender());
                }
            }

            @Override // io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                FunctionService.log.error("Error from the client - {}", th.getMessage());
                streamObserver.onError(th);
            }

            @Override // io.grpc.stub.StreamObserver
            public void onCompleted() {
                actorOf2.tell("EOF", ActorRef.noSender());
            }
        };
    }

    @Override // io.numaproj.numaflow.function.v1.UserDefinedFunctionGrpc.UserDefinedFunctionImplBase
    public void isReady(Empty empty, StreamObserver<Udfunction.ReadyResponse> streamObserver) {
        streamObserver.onNext(Udfunction.ReadyResponse.newBuilder().setReady(true).build());
        streamObserver.onCompleted();
    }

    private Udfunction.DatumResponseList buildDatumListResponse(MessageList messageList) {
        Udfunction.DatumResponseList.Builder newBuilder = Udfunction.DatumResponseList.newBuilder();
        messageList.getMessages().forEach(message -> {
            newBuilder.addElements(Udfunction.DatumResponse.newBuilder().setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(message.getValue())).addAllKeys(message.getKeys() == null ? new ArrayList<>() : List.of((Object[]) message.getKeys())).addAllTags(message.getTags() == null ? new ArrayList<>() : List.of((Object[]) message.getTags())).build());
        });
        return newBuilder.build();
    }

    private Udfunction.DatumResponseList buildDatumListResponse(MessageTList messageTList) {
        Udfunction.DatumResponseList.Builder newBuilder = Udfunction.DatumResponseList.newBuilder();
        messageTList.getMessages().forEach(messageT -> {
            newBuilder.addElements(Udfunction.DatumResponse.newBuilder().setEventTime(messageT.getEventTime() == null ? Udfunction.EventTime.newBuilder().setEventTime(Timestamp.getDefaultInstance()) : Udfunction.EventTime.newBuilder().setEventTime(Timestamp.newBuilder().setSeconds(messageT.getEventTime().getEpochSecond()).setNanos(messageT.getEventTime().getNano()))).addAllKeys(messageT.getKeys() == null ? new ArrayList<>() : List.of((Object[]) messageT.getKeys())).addAllTags(messageT.getTags() == null ? new ArrayList<>() : List.of((Object[]) messageT.getTags())).setValue(messageT.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(messageT.getValue())).build());
        });
        return newBuilder.build();
    }

    private void handleFailure(CompletableFuture<Void> completableFuture) {
        new Thread(() -> {
            try {
                completableFuture.get();
            } catch (Exception e) {
                e.printStackTrace();
                System.exit(1);
            }
        }).start();
    }

    @Generated
    public FunctionService() {
    }
}
