package io.numaproj.numaflow.shared;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.BindableService;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/numaproj/numaflow/shared/GrpcServerWrapper.class */
public class GrpcServerWrapper {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(GrpcServerWrapper.class);
    private final Server server;
    private EventLoopGroup bossEventLoopGroup;
    private EventLoopGroup workerEventLoopGroup;

    public GrpcServerWrapper(GrpcConfigRetriever grpcConfigRetriever, BindableService bindableService) {
        this.server = createServer(grpcConfigRetriever.getSocketPath(), grpcConfigRetriever.getMaxMessageSize(), grpcConfigRetriever.isLocal(), grpcConfigRetriever.getPort(), bindableService);
    }

    @VisibleForTesting
    public GrpcServerWrapper(ServerInterceptor serverInterceptor, String str, BindableService bindableService) {
        if (serverInterceptor == null) {
            this.server = InProcessServerBuilder.forName(str).directExecutor().addService(bindableService).build();
        } else {
            this.server = InProcessServerBuilder.forName(str).intercept(serverInterceptor).directExecutor().addService(bindableService).build();
        }
    }

    public void start() throws Exception {
        if (this.server == null) {
            throw new IllegalStateException("Server is not initialized");
        }
        this.server.start();
    }

    public void awaitTermination() throws InterruptedException {
        this.server.awaitTermination();
        if (this.workerEventLoopGroup.awaitTermination(30L, TimeUnit.SECONDS) && this.bossEventLoopGroup.awaitTermination(30L, TimeUnit.SECONDS)) {
            return;
        }
        log.error("Timed out to gracefully shutdown event loop groups");
        throw new InterruptedException("Timed out to gracefully shutdown event loop groups");
    }

    public void gracefullyShutdown() throws InterruptedException {
        if (this.server == null || this.server.isTerminated()) {
            return;
        }
        log.info("stopping gRPC server...");
        this.server.shutdown().awaitTermination(30L, TimeUnit.SECONDS);
        if (!this.server.isTerminated()) {
            this.server.shutdownNow();
        }
        log.info("gracefully shutting down event loop groups...");
        gracefullyShutdownEventLoopGroups();
    }

    private void gracefullyShutdownEventLoopGroups() {
        if (this.bossEventLoopGroup != null) {
            this.bossEventLoopGroup.shutdownGracefully();
        }
        if (this.workerEventLoopGroup != null) {
            this.workerEventLoopGroup.shutdownGracefully();
        }
    }

    private Server createServer(String str, int i, boolean z, int i2, BindableService bindableService) {
        ServerInterceptor serverInterceptor = new ServerInterceptor() { // from class: io.numaproj.numaflow.shared.GrpcServerWrapper.1
            public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> serverCall, final Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
                return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(Contexts.interceptCall(Context.current().withValues(GrpcServerUtils.WINDOW_START_TIME, (String) metadata.get(GrpcServerUtils.DATUM_METADATA_WIN_START), GrpcServerUtils.WINDOW_END_TIME, (String) metadata.get(GrpcServerUtils.DATUM_METADATA_WIN_END)), serverCall, metadata, serverCallHandler)) { // from class: io.numaproj.numaflow.shared.GrpcServerWrapper.1.1
                    public void onHalfClose() {
                        try {
                            super.onHalfClose();
                        } catch (RuntimeException e) {
                            handleException(e, serverCall, metadata);
                            throw e;
                        }
                    }

                    private void handleException(RuntimeException runtimeException, ServerCall<ReqT, RespT> serverCall2, Metadata metadata2) {
                        serverCall2.close(Status.fromThrowable(Status.UNKNOWN.withDescription(runtimeException.getMessage()).withCause(runtimeException).asException()), metadata2);
                        runtimeException.printStackTrace();
                        System.exit(1);
                    }
                };
            }
        };
        if (z) {
            return ServerBuilder.forPort(i2).maxInboundMessageSize(i).intercept(serverInterceptor).addService(bindableService).build();
        }
        this.bossEventLoopGroup = GrpcServerUtils.createEventLoopGroup(1, "netty-boss");
        this.workerEventLoopGroup = GrpcServerUtils.createEventLoopGroup(ThreadUtils.INSTANCE.availableProcessors(), "netty-worker");
        return NettyServerBuilder.forAddress(new DomainSocketAddress(str)).channelType(GrpcServerUtils.getChannelTypeClass()).maxInboundMessageSize(i).bossEventLoopGroup(this.bossEventLoopGroup).workerEventLoopGroup(this.workerEventLoopGroup).intercept(serverInterceptor).addService(bindableService).build();
    }
}
