package reactor.aeron.rsocket.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.ByteBufPayload;
import java.io.PrintStream;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.HdrHistogram.Recorder;
import reactor.aeron.Configurations;
import reactor.aeron.LatencyReporter;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;

/* loaded from: input_file:reactor/aeron/rsocket/netty/RSocketNettyPing.class */
public final class RSocketNettyPing {
    private static final Recorder HISTOGRAM = new Recorder(TimeUnit.SECONDS.toNanos(10), 3);
    private static final LatencyReporter latencyReporter = new LatencyReporter(HISTOGRAM);
    private static final ByteBuf BUFFER = ByteBufAllocator.DEFAULT.buffer(Configurations.MESSAGE_LENGTH);

    public static void main(String... strArr) {
        System.out.println("message size: " + Configurations.MESSAGE_LENGTH + ", number of messages: " + Configurations.NUMBER_OF_MESSAGES + ", address: " + Configurations.MDC_ADDRESS + ", port: " + Configurations.MDC_PORT);
        TcpClient option = TcpClient.create(ConnectionProvider.newConnection()).runOn(LoopResources.create("rsocket-netty")).host(Configurations.MDC_ADDRESS).port(Configurations.MDC_PORT).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_REUSEADDR, true);
        PrintStream printStream = System.out;
        printStream.getClass();
        TcpClient doOnConnected = option.doOnConnected((v1) -> {
            r1.println(v1);
        });
        Mono start = RSocketFactory.connect().frameDecoder(PayloadDecoder.ZERO_COPY).transport(() -> {
            return TcpClientTransport.create(doOnConnected);
        }).start();
        PrintStream printStream2 = System.out;
        printStream2.getClass();
        RSocket rSocket = (RSocket) start.doOnSuccess((v1) -> {
            r1.println(v1);
        }).block();
        Disposable start2 = latencyReporter.start();
        Supplier supplier = () -> {
            return ByteBufPayload.create(BUFFER.retainedSlice());
        };
        Flux.range(1, (int) Configurations.NUMBER_OF_MESSAGES).flatMap(num -> {
            r0 = System.nanoTime();
            return rSocket.requestResponse((Payload) supplier.get()).doOnNext((v0) -> {
                v0.release();
            }).doFinally(signalType -> {
                HISTOGRAM.recordValue(System.nanoTime() - r5);
            });
        }, 64).doOnError((v0) -> {
            v0.printStackTrace();
        }).doOnTerminate(() -> {
            System.out.println("Sent " + Configurations.NUMBER_OF_MESSAGES + " messages");
        }).doFinally(signalType -> {
            start2.dispose();
        }).then().block();
    }

    static {
        Random random = new Random(System.nanoTime());
        byte[] bArr = new byte[Configurations.MESSAGE_LENGTH];
        random.nextBytes(bArr);
        BUFFER.writeBytes(bArr);
    }
}
