package io.scalecube.transport.netty.tcp;

import io.netty.channel.ChannelOption;
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.transport.netty.Receiver;
import io.scalecube.transport.netty.TransportImpl;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

/* loaded from: input_file:io/scalecube/transport/netty/tcp/TcpReceiver.class */
public final class TcpReceiver implements Receiver {
    private final TransportConfig config;

    /* JADX INFO: Access modifiers changed from: package-private */
    public TcpReceiver(TransportConfig transportConfig) {
        this.config = transportConfig;
    }

    @Override // io.scalecube.transport.netty.Receiver
    public Mono<DisposableServer> bind() {
        return Mono.deferContextual(contextView -> {
            return Mono.just(contextView.get(TransportImpl.ReceiverContext.class));
        }).flatMap(receiverContext -> {
            return newTcpServer(receiverContext).handle((nettyInbound, nettyOutbound) -> {
                ByteBufFlux retain = nettyInbound.receive().retain();
                receiverContext.getClass();
                return retain.doOnNext(receiverContext::onMessage).then();
            }).bind().cast(DisposableServer.class);
        });
    }

    private TcpServer newTcpServer(TransportImpl.ReceiverContext receiverContext) {
        return TcpServer.create().runOn(receiverContext.loopResources()).port(this.config.port()).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.SO_REUSEADDR, true).doOnChannelInit((connectionObserver, channel, socketAddress) -> {
            new TcpChannelInitializer(this.config.maxFrameLength()).accept(connectionObserver, channel);
        });
    }
}
