package io.numaproj.numaflow.function;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.netty.NettyServerBuilder;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerDomainSocketChannel;
import io.netty.channel.unix.DomainSocketAddress;
import io.numaproj.numaflow.function.handlers.MapHandler;
import io.numaproj.numaflow.function.handlers.MapStreamHandler;
import io.numaproj.numaflow.function.handlers.MapTHandler;
import io.numaproj.numaflow.function.handlers.ReduceHandler;
import io.numaproj.numaflow.function.handlers.ReducerFactory;
import io.numaproj.numaflow.info.Language;
import io.numaproj.numaflow.info.Protocol;
import io.numaproj.numaflow.info.ServerInfo;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/numaproj/numaflow/function/FunctionServer.class */
public class FunctionServer {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) FunctionServer.class);
    private final FunctionGRPCConfig grpcConfig;
    private final ServerBuilder<?> serverBuilder;
    private final FunctionService functionService;
    private final ServerInfoAccessor serverInfoAccessor;
    private Server server;

    public FunctionServer() {
        this(new FunctionGRPCConfig(4194304));
    }

    public FunctionServer(FunctionGRPCConfig functionGRPCConfig) {
        this(functionGRPCConfig, new EpollEventLoopGroup());
    }

    private FunctionServer(FunctionGRPCConfig functionGRPCConfig, EpollEventLoopGroup epollEventLoopGroup) {
        this(NettyServerBuilder.forAddress(new DomainSocketAddress(functionGRPCConfig.getSocketPath())).channelType(EpollServerDomainSocketChannel.class).maxInboundMessageSize(functionGRPCConfig.getMaxMessageSize()).workerEventLoopGroup(epollEventLoopGroup).bossEventLoopGroup(epollEventLoopGroup), functionGRPCConfig);
    }

    @VisibleForTesting
    public FunctionServer(ServerBuilder<?> serverBuilder, FunctionGRPCConfig functionGRPCConfig) {
        this.functionService = new FunctionService();
        this.serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
        this.grpcConfig = functionGRPCConfig;
        this.serverBuilder = serverBuilder;
    }

    public FunctionServer registerMapHandler(MapHandler mapHandler) {
        this.functionService.setMapHandler(mapHandler);
        return this;
    }

    public FunctionServer registerMapTHandler(MapTHandler mapTHandler) {
        this.functionService.setMapTHandler(mapTHandler);
        return this;
    }

    public FunctionServer registerMapStreamHandler(MapStreamHandler mapStreamHandler) {
        this.functionService.setMapStreamHandler(mapStreamHandler);
        return this;
    }

    public FunctionServer registerReducerFactory(ReducerFactory<? extends ReduceHandler> reducerFactory) {
        this.functionService.setReduceHandler(reducerFactory);
        return this;
    }

    /* JADX WARN: Type inference failed for: r1v6, types: [io.grpc.ServerBuilder] */
    public void start() throws Exception {
        String socketPath = this.grpcConfig.getSocketPath();
        String infoFilePath = this.grpcConfig.getInfoFilePath();
        if (socketPath != null) {
            Path path = Paths.get(socketPath, new String[0]);
            Files.deleteIfExists(path);
            if (Files.exists(path, new LinkOption[0])) {
                log.error("Failed to clean up socket path {}. Exiting", socketPath);
            }
        }
        ServerInfo serverInfo = new ServerInfo(Protocol.UDS_PROTOCOL, Language.JAVA, this.serverInfoAccessor.getSDKVersion(), new HashMap());
        log.info("Writing server info {} to {}", serverInfo, infoFilePath);
        this.serverInfoAccessor.write(serverInfo, infoFilePath);
        this.server = this.serverBuilder.addService(this.functionService).intercept(new ServerInterceptor() { // from class: io.numaproj.numaflow.function.FunctionServer.1
            @Override // io.grpc.ServerInterceptor
            public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> serverCall, final Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
                return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(Contexts.interceptCall(Context.current().withValues(FunctionConstants.WINDOW_START_TIME, (String) metadata.get(FunctionConstants.DATUM_METADATA_WIN_START), FunctionConstants.WINDOW_END_TIME, (String) metadata.get(FunctionConstants.DATUM_METADATA_WIN_END)), serverCall, metadata, serverCallHandler)) { // from class: io.numaproj.numaflow.function.FunctionServer.1.1
                    @Override // io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener, io.grpc.ForwardingServerCallListener, io.grpc.PartialForwardingServerCallListener, io.grpc.ServerCall.Listener
                    public void onHalfClose() {
                        try {
                            super.onHalfClose();
                        } catch (RuntimeException e) {
                            handleException(e, serverCall, metadata);
                            throw e;
                        }
                    }

                    private void handleException(RuntimeException runtimeException, ServerCall<ReqT, RespT> serverCall2, Metadata metadata2) {
                        serverCall2.close(Status.fromThrowable(Status.UNKNOWN.withDescription(runtimeException.getMessage()).withCause(runtimeException).asException()), metadata2);
                    }
                };
            }
        }).build();
        this.server.start();
        log.info("Server started, listening on socket path: " + this.grpcConfig.getSocketPath());
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.err.println("*** shutting down gRPC server since JVM is shutting down");
            stop();
            System.err.println("*** server shut down");
        }));
    }

    public void stop() {
        if (this.server != null) {
            try {
                this.server.shutdown().awaitTermination(30L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.interrupted();
                e.printStackTrace(System.err);
            }
        }
    }
}
