package io.numaproj.numaflow.sourcer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ServerInterceptor;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.shared.GrpcServerUtils;
import io.numaproj.numaflow.shared.GrpcServerWrapper;
import java.util.concurrent.CompletableFuture;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/numaproj/numaflow/sourcer/Server.class */
public class Server {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(Server.class);
    private final GRPCConfig grpcConfig;
    private final CompletableFuture<Void> shutdownSignal;
    private final ServerInfoAccessor serverInfoAccessor;
    private final GrpcServerWrapper server;

    public Server(Sourcer sourcer) {
        this(sourcer, GRPCConfig.defaultGrpcConfig());
    }

    public Server(Sourcer sourcer, GRPCConfig gRPCConfig) {
        this.serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
        this.shutdownSignal = new CompletableFuture<>();
        this.grpcConfig = gRPCConfig;
        this.server = new GrpcServerWrapper(this.grpcConfig, new Service(sourcer, this.shutdownSignal));
    }

    @VisibleForTesting
    protected Server(GRPCConfig gRPCConfig, Sourcer sourcer, ServerInterceptor serverInterceptor, String str) {
        this.serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
        this.shutdownSignal = new CompletableFuture<>();
        this.grpcConfig = gRPCConfig;
        this.server = new GrpcServerWrapper(serverInterceptor, str, new Service(sourcer, this.shutdownSignal));
    }

    public void start() throws Exception {
        if (!this.grpcConfig.isLocal()) {
            GrpcServerUtils.writeServerInfo(this.serverInfoAccessor, this.grpcConfig.getSocketPath(), this.grpcConfig.getInfoFilePath(), ContainerType.SOURCER);
        }
        this.server.start();
        log.info("server started, listening on {}", this.grpcConfig.isLocal() ? "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath());
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.err.println("*** shutting down source gRPC server since JVM is shutting down");
            try {
                stop();
            } catch (InterruptedException e) {
                Thread.interrupted();
                e.printStackTrace(System.err);
            }
        }));
        this.shutdownSignal.whenCompleteAsync((r4, th) -> {
            if (th != null) {
                System.err.println("*** shutting down source gRPC server because of an exception - " + th.getMessage());
                try {
                    stop();
                } catch (InterruptedException e) {
                    Thread.interrupted();
                    e.printStackTrace(System.err);
                }
            }
        });
    }

    public void awaitTermination() throws InterruptedException {
        log.info("waiting for server to terminate");
        this.server.awaitTermination();
        log.info("server has terminated");
    }

    public void stop() throws InterruptedException {
        this.server.gracefullyShutdown();
    }
}
