package com.uber.rss;

import com.uber.rss.common.ServerDetail;
import com.uber.rss.common.ServerDetailCollection;
import com.uber.rss.decoders.StreamServerVersionDecoder;
import com.uber.rss.exceptions.RssAggregateException;
import com.uber.rss.execution.ShuffleExecutor;
import com.uber.rss.handlers.HttpChannelInboundHandler;
import com.uber.rss.handlers.UploadChannelManager;
import com.uber.rss.metadata.InMemoryServiceRegistry;
import com.uber.rss.metadata.ServiceRegistry;
import com.uber.rss.metadata.StandaloneServiceRegistryClient;
import com.uber.rss.metrics.M3Stats;
import com.uber.rss.metrics.ScheduledMetricCollector;
import com.uber.rss.storage.ShuffleFileStorage;
import com.uber.rss.util.FileUtils;
import com.uber.rss.util.NetworkUtils;
import com.uber.rss.util.ServerHostAndPort;
import com.uber.rss.util.SystemUtils;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.util.concurrent.Future;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/uber/rss/StreamServer.class */
public class StreamServer {
    private static final Logger logger = LoggerFactory.getLogger(StreamServer.class);
    private StreamServerConfig serverConfig;
    private final String hostName;
    private int shufflePort;
    private int httpPort;
    private ShuffleExecutor shuffleExecutor;
    private ServiceRegistry serviceRegistry;
    private EventLoopGroup shuffleBossGroup;
    private EventLoopGroup shuffleWorkerGroup;
    private EventLoopGroup healthCheckEventLoopGroup;
    private UploadChannelManager channelManager;
    private List<Channel> channels;
    private final ServerDetailCollection serverDetailCollection;

    public StreamServer(StreamServerConfig streamServerConfig) {
        this(streamServerConfig, null);
    }

    public StreamServer(StreamServerConfig streamServerConfig, ServiceRegistry serviceRegistry) {
        this.channels = new ArrayList(2);
        this.serverDetailCollection = new ServerDetailCollection();
        this.hostName = NetworkUtils.getLocalFQDN();
        construct(streamServerConfig, serviceRegistry);
    }

    public String getRootDir() {
        return this.serverConfig.getRootDirectory();
    }

    private void construct(StreamServerConfig streamServerConfig, ServiceRegistry serviceRegistry) {
        this.serverConfig = streamServerConfig;
        if (streamServerConfig.isUseEpoll()) {
            this.shuffleBossGroup = new EpollEventLoopGroup(streamServerConfig.getNettyAcceptThreads());
            this.shuffleWorkerGroup = new EpollEventLoopGroup(streamServerConfig.getNettyWorkerThreads());
            this.healthCheckEventLoopGroup = new EpollEventLoopGroup(2);
        } else {
            this.shuffleBossGroup = new NioEventLoopGroup(streamServerConfig.getNettyAcceptThreads());
            this.shuffleWorkerGroup = new NioEventLoopGroup(streamServerConfig.getNettyWorkerThreads());
            this.healthCheckEventLoopGroup = new NioEventLoopGroup(2);
        }
        if (serviceRegistry == null) {
            createServiceRegistry(streamServerConfig);
        } else {
            this.serviceRegistry = serviceRegistry;
        }
        this.shuffleExecutor = new ShuffleExecutor(streamServerConfig.getRootDirectory(), streamServerConfig.getStorage(), streamServerConfig.getAppMemoryRetentionMillis(), streamServerConfig.getAppMaxWriteBytes());
        this.channelManager = new UploadChannelManager();
        this.channelManager.setMaxConnections(streamServerConfig.getMaxConnections());
    }

    private void createServiceRegistry(StreamServerConfig streamServerConfig) {
        String serviceRegistryType = streamServerConfig.getServiceRegistryType();
        boolean z = -1;
        switch (serviceRegistryType.hashCode()) {
            case -1284644795:
                if (serviceRegistryType.equals(ServiceRegistry.TYPE_STANDALONE)) {
                    z = true;
                    break;
                }
                break;
            case 369859174:
                if (serviceRegistryType.equals(ServiceRegistry.TYPE_INMEMORY)) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                this.serviceRegistry = new InMemoryServiceRegistry();
                return;
            case true:
                String registryServer = streamServerConfig.getRegistryServer();
                if (registryServer == null || registryServer.isEmpty()) {
                    logger.info("Registry server is not specified, will use localhost as registry server and create registry client when local stream server is started (need to get port at that time)");
                    return;
                }
                ServerHostAndPort fromString = ServerHostAndPort.fromString(registryServer);
                logger.info(String.format("Creating registry client connecting to registry server: %s:%s", fromString.getHost(), Integer.valueOf(fromString.getPort())));
                this.serviceRegistry = new StandaloneServiceRegistryClient(fromString.getHost(), fromString.getPort(), streamServerConfig.getNetworkTimeout(), "streamServer");
                return;
            default:
                throw new RuntimeException("Unknown service registry type: " + streamServerConfig.getServiceRegistryType());
        }
    }

    private Pair<Channel, Integer> bindPort(ServerBootstrap serverBootstrap, int i) throws InterruptedException, BindException {
        logger.info(String.format("Binding to specified port: %s", Integer.valueOf(i)));
        Channel channel = serverBootstrap.bind(i).sync().channel();
        InetSocketAddress inetSocketAddress = (InetSocketAddress) channel.localAddress();
        logger.info(String.format("Bound to local address: %s", inetSocketAddress));
        return Pair.of(channel, Integer.valueOf(inetSocketAddress.getPort()));
    }

    private ServerBootstrap bootstrapChannel(EventLoopGroup eventLoopGroup, EventLoopGroup eventLoopGroup2, int i, int i2, final Supplier<ChannelHandler[]> supplier) {
        return (eventLoopGroup instanceof EpollEventLoopGroup ? (ServerBootstrap) new ServerBootstrap().group(eventLoopGroup, eventLoopGroup2).channel(EpollServerSocketChannel.class) : new ServerBootstrap().group(eventLoopGroup, eventLoopGroup2).channel(NioServerSocketChannel.class)).childHandler(new ChannelInitializer<SocketChannel>() { // from class: com.uber.rss.StreamServer.1
            public void initChannel(SocketChannel socketChannel) {
                socketChannel.pipeline().addLast((ChannelHandler[]) supplier.get());
            }
        }).option(ChannelOption.SO_BACKLOG, Integer.valueOf(i)).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(i2)).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(i2)).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    }

    public void run() throws InterruptedException, BindException {
        logger.info(String.format("Number of opened files: %s", Long.valueOf(SystemUtils.getFileDescriptorCount())));
        String serverId = getServerId();
        ServerBootstrap bootstrapChannel = bootstrapChannel(this.shuffleBossGroup, this.shuffleWorkerGroup, this.serverConfig.getNetworkBacklog(), this.serverConfig.getNetworkTimeout(), () -> {
            return new ChannelHandler[]{new StreamServerVersionDecoder(serverId, this.serverConfig.getIdleTimeoutMillis(), this.shuffleExecutor, this.channelManager, this.serverDetailCollection)};
        });
        ServerBootstrap bootstrapChannel2 = bootstrapChannel(this.healthCheckEventLoopGroup, this.healthCheckEventLoopGroup, 32, this.serverConfig.getNetworkTimeout() * 3, () -> {
            return new ChannelHandler[]{new HttpServerCodec(), new HttpObjectAggregator(524288), new HttpChannelInboundHandler()};
        });
        Pair<Channel, Integer> bindPort = bindPort(bootstrapChannel, this.serverConfig.getShufflePort());
        this.channels.add((Channel) bindPort.getKey());
        this.shufflePort = ((Integer) bindPort.getValue()).intValue();
        logger.info(String.format("ShuffleServer: %s:%s", this.hostName, Integer.valueOf(this.shufflePort)));
        if (this.serviceRegistry == null && this.serverConfig.getServiceRegistryType().equalsIgnoreCase(ServiceRegistry.TYPE_STANDALONE)) {
            logger.info(String.format("Creating registry client connecting to local stream server: %s:%s", this.hostName, Integer.valueOf(this.shufflePort)));
            this.serviceRegistry = new StandaloneServiceRegistryClient(this.hostName, this.shufflePort, this.serverConfig.getNetworkTimeout(), "streamServer");
        }
        String dataCenterOrDefault = this.serverConfig.getDataCenterOrDefault();
        String clusterOrDefault = this.serverConfig.getClusterOrDefault();
        String format = String.format("%s:%s", this.hostName, Integer.valueOf(this.shufflePort));
        logger.info(String.format("Registering shuffle server, data center: %s, cluster: %s, server id: %s, host and port: %s", dataCenterOrDefault, clusterOrDefault, serverId, format));
        this.serviceRegistry.registerServer(dataCenterOrDefault, clusterOrDefault, serverId, format);
        if (this.serverConfig.getHttpPort() != -1) {
            Pair<Channel, Integer> bindPort2 = bindPort(bootstrapChannel2, this.serverConfig.getHttpPort());
            this.channels.add((Channel) bindPort2.getKey());
            this.httpPort = ((Integer) bindPort2.getValue()).intValue();
            logger.info(String.format("HttpServer: %s:%s", this.hostName, Integer.valueOf(this.httpPort)));
        } else {
            this.httpPort = this.serverConfig.getHttpPort();
        }
        if (this.serverConfig.getStorage() instanceof ShuffleFileStorage) {
            CompletableFuture.runAsync(() -> {
                FileUtils.cleanupOldFiles(this.serverConfig.getRootDirectory(), System.currentTimeMillis() - this.serverConfig.getAppFileRetentionMillis());
            });
        }
        M3Stats.getDefaultScope().counter("serverStart").inc(1L);
    }

    public int getShufflePort() {
        return this.shufflePort;
    }

    public String getShuffleConnectionString() {
        return String.format("%s:%s", this.hostName, Integer.valueOf(this.shufflePort));
    }

    public int getHttpPort() {
        return this.httpPort;
    }

    public String getServerId() {
        String str = System.getenv("RSS_SERVER_ID");
        return (str == null || str.isEmpty()) ? String.format("%s:%s", this.hostName, this.serverConfig.getRootDirectory()) : str;
    }

    public ServerDetail getServerDetail() {
        return new ServerDetail(getServerId(), getShuffleConnectionString());
    }

    public ServiceRegistry getServiceRegistry() {
        return this.serviceRegistry;
    }

    public ShuffleExecutor getShuffleExecutor() {
        return this.shuffleExecutor;
    }

    public void shutdown() {
        shutdown(false);
    }

    public void shutdown(boolean z) {
        ArrayList arrayList = new ArrayList();
        try {
            this.serviceRegistry.close();
        } catch (Throwable th) {
            logger.warn("Unable to shutdown metadata store:", th);
            arrayList.add(th);
        }
        for (Channel channel : this.channels) {
            try {
                channel.close();
            } catch (Throwable th2) {
                logger.warn(String.format("Unable to shutdown channel %s:", channel), th2);
                arrayList.add(th2);
            }
        }
        Future shutdownGracefully = this.healthCheckEventLoopGroup.shutdownGracefully();
        Future shutdownGracefully2 = this.shuffleBossGroup.shutdownGracefully();
        Future shutdownGracefully3 = this.shuffleWorkerGroup.shutdownGracefully();
        try {
            shutdownGracefully.get();
        } catch (Throwable th3) {
            logger.warn("Hit exception when shutting down health check event loop group", th3);
            arrayList.add(th3);
        }
        try {
            shutdownGracefully2.get();
        } catch (Throwable th4) {
            logger.warn("Hit exception when shutting down shuffle boss event loop group", th4);
            arrayList.add(th4);
        }
        try {
            shutdownGracefully3.get();
        } catch (Throwable th5) {
            logger.warn("Hit exception when shutting down shuffle worker event loop group", th5);
            arrayList.add(th5);
        }
        try {
            this.shuffleExecutor.stop(z);
        } catch (Throwable th6) {
            logger.warn("Unable to shutdown writer executor:", th6);
            arrayList.add(th6);
        }
        logger.info(String.format("Number of opened files: %s", Long.valueOf(SystemUtils.getFileDescriptorCount())));
        if (!arrayList.isEmpty()) {
            throw new RssAggregateException(arrayList);
        }
    }

    public String toString() {
        return "StreamServer{serverId='" + getServerId() + "', hostName='" + this.hostName + "', shufflePort=" + this.shufflePort + '}';
    }

    private static Thread addShutdownHook(StreamServer streamServer) {
        Thread thread = new Thread(() -> {
            logger.info("Started shutting down server in shutdown hook");
            streamServer.shutdown();
            logger.info("Finished shutting down server in shutdown hook");
        });
        Runtime.getRuntime().addShutdownHook(thread);
        return thread;
    }

    public static void main(String[] strArr) throws Exception {
        StreamServerConfig buildFromArgs = StreamServerConfig.buildFromArgs(strArr);
        logger.info(String.format("Starting server (version: %s, revision: %s) with config: %s", RssBuildInfo.Version, RssBuildInfo.Revision, buildFromArgs));
        StreamServer streamServer = new StreamServer(buildFromArgs);
        streamServer.run();
        addShutdownHook(streamServer);
        new ScheduledMetricCollector(streamServer.serviceRegistry).scheduleCollectingMetrics(streamServer.shuffleExecutor.getLowPriorityExecutorService(), buildFromArgs.getDataCenterOrDefault(), buildFromArgs.getClusterOrDefault());
    }
}
