package io.numaproj.numaflow.sink;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.netty.channel.unix.DomainSocketAddress;
import io.numaproj.numaflow.info.Language;
import io.numaproj.numaflow.info.Protocol;
import io.numaproj.numaflow.info.ServerInfo;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.sink.handler.SinkHandler;
import io.numaproj.numaflow.utils.GrpcServerUtils;
import io.numaproj.numaflow.utils.ThreadUtils;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/numaproj/numaflow/sink/SinkServer.class */
public class SinkServer {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SinkServer.class);
    private final SinkGRPCConfig grpcConfig;
    private final ServerBuilder<?> serverBuilder;
    private final SinkService sinkService;
    private final ServerInfoAccessor serverInfoAccessor;
    private Server server;

    public SinkServer() {
        this(new SinkGRPCConfig(67108864));
    }

    public SinkServer(SinkGRPCConfig sinkGRPCConfig) {
        this(NettyServerBuilder.forAddress(new DomainSocketAddress(sinkGRPCConfig.getSocketPath())).channelType(GrpcServerUtils.getChannelTypeClass()).maxInboundMessageSize(sinkGRPCConfig.getMaxMessageSize()).bossEventLoopGroup(GrpcServerUtils.createEventLoopGroup(1, "netty-boss")).workerEventLoopGroup(GrpcServerUtils.createEventLoopGroup(ThreadUtils.INSTANCE.availableProcessors(), "netty-worker")), sinkGRPCConfig);
    }

    public SinkServer(ServerBuilder<?> serverBuilder, SinkGRPCConfig sinkGRPCConfig) {
        this.sinkService = new SinkService();
        this.serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
        this.grpcConfig = sinkGRPCConfig;
        this.serverBuilder = serverBuilder;
    }

    public SinkServer registerSinker(SinkHandler sinkHandler) {
        this.sinkService.setSinkHandler(sinkHandler);
        return this;
    }

    /* JADX WARN: Type inference failed for: r1v5, types: [io.grpc.ServerBuilder] */
    public void start() throws Exception {
        String socketPath = this.grpcConfig.getSocketPath();
        String infoFilePath = this.grpcConfig.getInfoFilePath();
        if (socketPath != null) {
            Path path = Paths.get(socketPath, new String[0]);
            Files.deleteIfExists(path);
            if (Files.exists(path, new LinkOption[0])) {
                log.error("Failed to clean up socket path {}. Exiting", socketPath);
            }
        }
        ServerInfo serverInfo = new ServerInfo(Protocol.UDS_PROTOCOL, Language.JAVA, this.serverInfoAccessor.getSDKVersion(), new HashMap());
        log.info("Writing server info {} to {}", serverInfo, infoFilePath);
        this.serverInfoAccessor.write(serverInfo, infoFilePath);
        this.server = this.serverBuilder.addService(this.sinkService).build();
        this.server.start();
        log.info("Server started, listening on socket path: " + this.grpcConfig.getSocketPath());
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.err.println("*** shutting down gRPC server since JVM is shutting down");
            try {
                stop();
            } catch (InterruptedException e) {
                Thread.interrupted();
                e.printStackTrace(System.err);
            }
        }));
    }

    public void stop() throws InterruptedException {
        if (this.server != null) {
            this.server.shutdown().awaitTermination(30L, TimeUnit.SECONDS);
        }
    }
}
