package io.numaproj.numaflow.shared;

import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Metadata;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.netty.NettyServerBuilder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerDomainSocketChannel;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueServerDomainSocketChannel;
import io.netty.channel.unix.DomainSocketAddress;
import io.numaproj.numaflow.info.Language;
import io.numaproj.numaflow.info.Protocol;
import io.numaproj.numaflow.info.ServerInfo;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/numaproj/numaflow/shared/GrpcServerUtils.class */
public class GrpcServerUtils {
    public static final String WIN_START_KEY = "x-numaflow-win-start-time";
    public static final String WIN_END_KEY = "x-numaflow-win-end-time";

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) GrpcServerUtils.class);
    public static final Context.Key<String> WINDOW_START_TIME = Context.keyWithDefault("x-numaflow-win-start-time", "");
    public static final Context.Key<String> WINDOW_END_TIME = Context.keyWithDefault("x-numaflow-win-end-time", "");
    public static final Metadata.Key<String> DATUM_METADATA_WIN_START = Metadata.Key.of("x-numaflow-win-start-time", Metadata.ASCII_STRING_MARSHALLER);
    public static final Metadata.Key<String> DATUM_METADATA_WIN_END = Metadata.Key.of("x-numaflow-win-end-time", Metadata.ASCII_STRING_MARSHALLER);

    public static Class<? extends ServerChannel> getChannelTypeClass() {
        return KQueue.isAvailable() ? KQueueServerDomainSocketChannel.class : EpollServerDomainSocketChannel.class;
    }

    public static EventLoopGroup createEventLoopGroup(int i, String str) {
        return KQueue.isAvailable() ? new KQueueEventLoopGroup(i, ThreadUtils.INSTANCE.newThreadFactory(str)) : new EpollEventLoopGroup(i, ThreadUtils.INSTANCE.newThreadFactory(str));
    }

    public static void writeServerInfo(ServerInfoAccessor serverInfoAccessor, String str, String str2) throws Exception {
        if (str != null) {
            Path path = Paths.get(str, new String[0]);
            Files.deleteIfExists(path);
            if (Files.exists(path, new LinkOption[0])) {
                log.error("Failed to clean up socket path {}. Exiting", str);
            }
        }
        ServerInfo serverInfo = new ServerInfo(Protocol.UDS_PROTOCOL, Language.JAVA, serverInfoAccessor.getSDKVersion(), new HashMap());
        log.info("Writing server info {} to {}", serverInfo, str2);
        serverInfoAccessor.write(serverInfo, str2);
    }

    public static ServerBuilder<?> createServerBuilder(String str, int i) {
        return NettyServerBuilder.forAddress(new DomainSocketAddress(str)).channelType(getChannelTypeClass()).maxInboundMessageSize(i).bossEventLoopGroup(createEventLoopGroup(1, "netty-boss")).workerEventLoopGroup(createEventLoopGroup(ThreadUtils.INSTANCE.availableProcessors(), "netty-worker")).intercept(new ServerInterceptor() { // from class: io.numaproj.numaflow.shared.GrpcServerUtils.1
            @Override // io.grpc.ServerInterceptor
            public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> serverCall, final Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
                return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(Contexts.interceptCall(Context.current().withValues(GrpcServerUtils.WINDOW_START_TIME, (String) metadata.get(GrpcServerUtils.DATUM_METADATA_WIN_START), GrpcServerUtils.WINDOW_END_TIME, (String) metadata.get(GrpcServerUtils.DATUM_METADATA_WIN_END)), serverCall, metadata, serverCallHandler)) { // from class: io.numaproj.numaflow.shared.GrpcServerUtils.1.1
                    @Override // io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener, io.grpc.ForwardingServerCallListener, io.grpc.PartialForwardingServerCallListener, io.grpc.ServerCall.Listener
                    public void onHalfClose() {
                        try {
                            super.onHalfClose();
                        } catch (RuntimeException e) {
                            handleException(e, serverCall, metadata);
                            throw e;
                        }
                    }

                    private void handleException(RuntimeException runtimeException, ServerCall<ReqT, RespT> serverCall2, Metadata metadata2) {
                        serverCall2.close(Status.fromThrowable(Status.UNKNOWN.withDescription(runtimeException.getMessage()).withCause(runtimeException).asException()), metadata2);
                    }
                };
            }
        });
    }
}
