package reactor.aeron.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.MessageToByteEncoder;
import java.io.PrintStream;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.HdrHistogram.Recorder;
import org.agrona.console.ContinueBarrier;
import reactor.aeron.Configurations;
import reactor.aeron.LatencyReporter;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.channel.BootstrapHandlers;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;

/* loaded from: input_file:reactor/aeron/netty/ReactorNettyClientPing.class */
public class ReactorNettyClientPing {
    private static final Recorder HISTOGRAM = new Recorder(TimeUnit.SECONDS.toNanos(10), 3);
    private static final LatencyReporter latencyReporter = new LatencyReporter(HISTOGRAM);
    private static final ByteBuf PAYLOAD = ByteBufAllocator.DEFAULT.buffer(Configurations.MESSAGE_LENGTH);

    public static void main(String[] strArr) {
        System.out.println("message size: " + Configurations.MESSAGE_LENGTH + ", number of messages: " + Configurations.NUMBER_OF_MESSAGES + ", address: " + Configurations.MDC_ADDRESS + ", port: " + Configurations.MDC_PORT);
        LoopResources create = LoopResources.create("reactor-netty");
        TcpClient option = TcpClient.create(ConnectionProvider.newConnection()).runOn(create).host(Configurations.MDC_ADDRESS).port(Configurations.MDC_PORT).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_REUSEADDR, true);
        PrintStream printStream = System.out;
        printStream.getClass();
        Connection connectNow = option.doOnConnected((v1) -> {
            r1.println(v1);
        }).bootstrap(bootstrap -> {
            return BootstrapHandlers.updateConfiguration(bootstrap, "channel", (connectionObserver, channel) -> {
                setupChannel(channel);
            });
        }).connectNow();
        ContinueBarrier continueBarrier = new ContinueBarrier("Execute again?");
        do {
            System.out.println("Pinging " + Configurations.NUMBER_OF_MESSAGES + " messages");
            roundTripMessages(connectNow, Configurations.NUMBER_OF_MESSAGES);
            System.out.println("Histogram of RTT latencies in microseconds.");
        } while (continueBarrier.await());
        connectNow.dispose();
        connectNow.onDispose(create).onDispose().block();
    }

    private static void roundTripMessages(Connection connection, long j) {
        HISTOGRAM.reset();
        Disposable start = latencyReporter.start();
        connection.outbound().sendObject(Flux.range(0, Configurations.REQUESTED)).then().subscribe();
        connection.outbound().sendObject(connection.inbound().receive().retain().take(j).doOnNext(byteBuf -> {
            long readLong = byteBuf.readLong();
            byteBuf.readerIndex(Configurations.MESSAGE_LENGTH);
            HISTOGRAM.recordValue(System.nanoTime() - readLong);
            byteBuf.release();
        }).map(byteBuf2 -> {
            return 1;
        })).then(Mono.defer(() -> {
            return Mono.delay(Duration.ofMillis(100L)).doOnSubscribe(subscription -> {
                start.dispose();
            }).then();
        })).then().block();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void setupChannel(Channel channel) {
        ChannelPipeline pipeline = channel.pipeline();
        pipeline.addLast(new ChannelHandler[]{new LengthFieldPrepender(2)});
        pipeline.addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(1048576, 0, 2, 0, 2)});
        pipeline.addLast(new ChannelHandler[]{new MessageToByteEncoder<Integer>() { // from class: reactor.aeron.netty.ReactorNettyClientPing.1
            /* JADX INFO: Access modifiers changed from: protected */
            public void encode(ChannelHandlerContext channelHandlerContext, Integer num, ByteBuf byteBuf) {
                byteBuf.writeLong(System.nanoTime());
                byteBuf.writeBytes(ReactorNettyClientPing.PAYLOAD.slice());
            }
        }});
    }

    static {
        Random random = new Random(System.nanoTime());
        byte[] bArr = new byte[Configurations.MESSAGE_LENGTH];
        random.nextBytes(bArr);
        PAYLOAD.writeBytes(bArr);
    }
}
