package io.reactivesocket.transport.netty.server;

import io.reactivesocket.DuplexConnection;
import io.reactivesocket.transport.TransportServer;
import io.reactivesocket.transport.netty.NettyDuplexConnection;
import io.reactivesocket.transport.netty.ReactiveSocketLengthCodec;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.tcp.TcpServer;

/* loaded from: input_file:io/reactivesocket/transport/netty/server/TcpTransportServer.class */
public class TcpTransportServer implements TransportServer {
    TcpServer server;

    /* loaded from: input_file:io/reactivesocket/transport/netty/server/TcpTransportServer$StartServerImpl.class */
    static class StartServerImpl implements TransportServer.StartedServer {
        NettyContext context;

        StartServerImpl(NettyContext nettyContext) {
            this.context = nettyContext;
        }

        @Override // io.reactivesocket.transport.TransportServer.StartedServer
        public SocketAddress getServerAddress() {
            return this.context.address();
        }

        @Override // io.reactivesocket.transport.TransportServer.StartedServer
        public int getServerPort() {
            return this.context.address().getPort();
        }

        @Override // io.reactivesocket.transport.TransportServer.StartedServer
        public void awaitShutdown() {
            this.context.onClose().block();
        }

        @Override // io.reactivesocket.transport.TransportServer.StartedServer
        public void awaitShutdown(long j, TimeUnit timeUnit) {
            this.context.onClose().blockMillis(TimeUnit.MILLISECONDS.convert(j, timeUnit));
        }

        @Override // io.reactivesocket.transport.TransportServer.StartedServer
        public void shutdown() {
            this.context.dispose();
        }
    }

    public TcpTransportServer(TcpServer tcpServer) {
        this.server = tcpServer;
    }

    public static TcpTransportServer create(TcpServer tcpServer) {
        return new TcpTransportServer(tcpServer);
    }

    @Override // io.reactivesocket.transport.TransportServer
    public TransportServer.StartedServer start(TransportServer.ConnectionAcceptor connectionAcceptor) {
        return new StartServerImpl(this.server.newHandler((nettyInbound, nettyOutbound) -> {
            nettyInbound.context().addHandler("server-length-codec", new ReactiveSocketLengthCodec());
            connectionAcceptor.apply((DuplexConnection) new NettyDuplexConnection(nettyInbound, nettyOutbound, nettyInbound.context())).subscribe();
            return nettyOutbound.neverComplete();
        }).block());
    }
}
