package reactor.io.net.impl.netty.tcp;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.support.NamedDaemonThreadFactory;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.ChannelStream;
import reactor.io.net.ReactorChannelHandler;
import reactor.io.net.config.ServerSocketOptions;
import reactor.io.net.config.SslOptions;
import reactor.io.net.impl.netty.NettyChannelHandlerBridge;
import reactor.io.net.impl.netty.NettyChannelStream;
import reactor.io.net.impl.netty.NettyServerSocketOptions;
import reactor.io.net.tcp.TcpServer;
import reactor.io.net.tcp.ssl.SSLEngineSupplier;
import reactor.rx.Promise;
import reactor.rx.Promises;

/* loaded from: input_file:reactor/io/net/impl/netty/tcp/NettyTcpServer.class */
public class NettyTcpServer<IN, OUT> extends TcpServer<IN, OUT> {
    private static final Logger log = LoggerFactory.getLogger(NettyTcpServer.class);
    private final NettyServerSocketOptions nettyOptions;
    private final ServerBootstrap bootstrap;
    private final EventLoopGroup selectorGroup;
    private final EventLoopGroup ioGroup;

    /* JADX INFO: Access modifiers changed from: protected */
    public NettyTcpServer(Environment environment, Dispatcher dispatcher, InetSocketAddress inetSocketAddress, ServerSocketOptions serverSocketOptions, SslOptions sslOptions, Codec<Buffer, IN, OUT> codec) {
        super(environment, dispatcher, inetSocketAddress, serverSocketOptions, sslOptions, codec);
        if (serverSocketOptions instanceof NettyServerSocketOptions) {
            this.nettyOptions = (NettyServerSocketOptions) serverSocketOptions;
        } else {
            this.nettyOptions = null;
        }
        int intValue = ((Integer) getDefaultEnvironment().getProperty("reactor.tcp.selectThreadCount", Integer.class, Integer.valueOf(Environment.PROCESSORS / 2))).intValue();
        int intValue2 = ((Integer) getDefaultEnvironment().getProperty("reactor.tcp.ioThreadCount", Integer.class, Integer.valueOf(Environment.PROCESSORS))).intValue();
        this.selectorGroup = new NioEventLoopGroup(intValue, new NamedDaemonThreadFactory("reactor-tcp-select"));
        if (null == this.nettyOptions || null == this.nettyOptions.eventLoopGroup()) {
            this.ioGroup = new NioEventLoopGroup(intValue2, new NamedDaemonThreadFactory("reactor-tcp-io"));
        } else {
            this.ioGroup = this.nettyOptions.eventLoopGroup();
        }
        this.bootstrap = new ServerBootstrap().group(this.selectorGroup, this.ioGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, Integer.valueOf(serverSocketOptions.backlog())).option(ChannelOption.SO_RCVBUF, Integer.valueOf(serverSocketOptions.rcvbuf())).option(ChannelOption.SO_SNDBUF, Integer.valueOf(serverSocketOptions.sndbuf())).option(ChannelOption.SO_REUSEADDR, Boolean.valueOf(serverSocketOptions.reuseAddr())).localAddress(null == inetSocketAddress ? new InetSocketAddress(0) : inetSocketAddress).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childOption(ChannelOption.AUTO_READ, Boolean.valueOf(sslOptions != null));
    }

    @Override // reactor.io.net.ReactorPeer
    protected Promise<Void> doStart(final ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>> reactorChannelHandler) {
        this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { // from class: reactor.io.net.impl.netty.tcp.NettyTcpServer.1
            public void initChannel(SocketChannel socketChannel) throws Exception {
                if (NettyTcpServer.this.nettyOptions != null) {
                    SocketChannelConfig config = socketChannel.config();
                    config.setReceiveBufferSize(NettyTcpServer.this.nettyOptions.rcvbuf());
                    config.setSendBufferSize(NettyTcpServer.this.nettyOptions.sndbuf());
                    config.setKeepAlive(NettyTcpServer.this.nettyOptions.keepAlive());
                    config.setReuseAddress(NettyTcpServer.this.nettyOptions.reuseAddr());
                    config.setSoLinger(NettyTcpServer.this.nettyOptions.linger());
                    config.setTcpNoDelay(NettyTcpServer.this.nettyOptions.tcpNoDelay());
                }
                if (NettyTcpServer.log.isDebugEnabled()) {
                    NettyTcpServer.log.debug("CONNECT {}", socketChannel);
                }
                if (null != NettyTcpServer.this.getSslOptions()) {
                    SSLEngine m29get = new SSLEngineSupplier(NettyTcpServer.this.getSslOptions(), false).m29get();
                    if (NettyTcpServer.log.isDebugEnabled()) {
                        NettyTcpServer.log.debug("SSL enabled using keystore {}", null != NettyTcpServer.this.getSslOptions().keystoreFile() ? NettyTcpServer.this.getSslOptions().keystoreFile() : "<DEFAULT>");
                    }
                    socketChannel.pipeline().addLast(new ChannelHandler[]{new SslHandler(m29get)});
                }
                if (null != NettyTcpServer.this.nettyOptions && null != NettyTcpServer.this.nettyOptions.pipelineConfigurer()) {
                    NettyTcpServer.this.nettyOptions.pipelineConfigurer().accept(socketChannel.pipeline());
                }
                NettyTcpServer.this.bindChannel(reactorChannelHandler, socketChannel);
            }
        });
        ChannelFuture bind = this.bootstrap.bind();
        final Promise<Void> prepare = Promises.prepare();
        bind.addListener(new ChannelFutureListener() { // from class: reactor.io.net.impl.netty.tcp.NettyTcpServer.2
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                NettyTcpServer.log.info("BIND {}", channelFuture.channel().localAddress());
                if (!channelFuture.isSuccess()) {
                    prepare.onError(channelFuture.cause());
                    return;
                }
                if (NettyTcpServer.this.listenAddress.getPort() == 0) {
                    NettyTcpServer.this.listenAddress = (InetSocketAddress) channelFuture.channel().localAddress();
                }
                prepare.onComplete();
            }
        });
        return prepare;
    }

    @Override // reactor.io.net.ReactorPeer
    public Promise<Void> doShutdown() {
        final Promise<Void> prepare = Promises.prepare();
        final AtomicInteger atomicInteger = new AtomicInteger(2);
        GenericFutureListener genericFutureListener = new GenericFutureListener() { // from class: reactor.io.net.impl.netty.tcp.NettyTcpServer.3
            public void operationComplete(Future future) throws Exception {
                if (atomicInteger.decrementAndGet() == 0) {
                    prepare.onComplete();
                }
            }
        };
        this.selectorGroup.shutdownGracefully().addListener(genericFutureListener);
        if (null == this.nettyOptions || null == this.nettyOptions.eventLoopGroup()) {
            this.ioGroup.shutdownGracefully().addListener(genericFutureListener);
        }
        return prepare;
    }

    protected void bindChannel(ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>> reactorChannelHandler, SocketChannel socketChannel) {
        NettyChannelStream nettyChannelStream = new NettyChannelStream(getDefaultEnvironment(), getDefaultCodec(), getDefaultPrefetchSize(), getDefaultDispatcher(), socketChannel);
        ChannelPipeline pipeline = socketChannel.pipeline();
        if (log.isDebugEnabled()) {
            pipeline.addLast(new ChannelHandler[]{new LoggingHandler(NettyTcpServer.class)});
        }
        pipeline.addLast(new ChannelHandler[]{new NettyChannelHandlerBridge(reactorChannelHandler, nettyChannelStream)});
    }
}
