package reactor.aeron.rsocket.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.ByteBufPayload;
import java.io.PrintStream;
import java.util.Random;
import reactor.aeron.Configurations;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpServer;

/* loaded from: input_file:reactor/aeron/rsocket/netty/RSocketNettyServerTps.class */
public final class RSocketNettyServerTps {
    private static final ByteBuf BUFFER = ByteBufAllocator.DEFAULT.buffer(Configurations.MESSAGE_LENGTH);

    public static void main(String... strArr) {
        System.out.println("message size: " + Configurations.MESSAGE_LENGTH + ", number of messages: " + Configurations.NUMBER_OF_MESSAGES + ", address: " + Configurations.MDC_ADDRESS + ", port: " + Configurations.MDC_PORT);
        TcpServer option = TcpServer.create().runOn(LoopResources.create("rsocket-netty")).host(Configurations.MDC_ADDRESS).port(Configurations.MDC_PORT).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_REUSEADDR, true);
        PrintStream printStream = System.out;
        printStream.getClass();
        TcpServer doOnConnection = option.doOnConnection((v1) -> {
            r1.println(v1);
        });
        ((CloseableChannel) RSocketFactory.receive().frameDecoder(PayloadDecoder.ZERO_COPY).acceptor((connectionSetupPayload, rSocket) -> {
            System.out.println(rSocket);
            return Mono.just(new AbstractRSocket() { // from class: reactor.aeron.rsocket.netty.RSocketNettyServerTps.1
                public Flux<Payload> requestStream(Payload payload) {
                    payload.release();
                    System.out.println("streaming " + Configurations.NUMBER_OF_MESSAGES + " messages ...");
                    return Flux.range(0, Integer.MAX_VALUE).map(num -> {
                        return ByteBufPayload.create(RSocketNettyServerTps.BUFFER.retainedSlice());
                    });
                }
            });
        }).transport(() -> {
            return TcpServerTransport.create(doOnConnection);
        }).start().block()).onClose().block();
    }

    static {
        Random random = new Random(System.nanoTime());
        byte[] bArr = new byte[Configurations.MESSAGE_LENGTH];
        random.nextBytes(bArr);
        BUFFER.writeBytes(bArr);
    }
}
