package reactor.aeron;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.HdrHistogram.Recorder;
import org.agrona.BufferUtil;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.console.ContinueBarrier;
import reactor.aeron.mdc.AeronClient;
import reactor.aeron.mdc.AeronResources;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:reactor/aeron/AeronPingClient.class */
public final class AeronPingClient {
    private static final Recorder HISTOGRAM = new Recorder(TimeUnit.SECONDS.toNanos(10), 3);
    private static final LatencyReporter reporter = new LatencyReporter(HISTOGRAM);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/aeron/AeronPingClient$NanoTimeGeneratorHandler.class */
    public static class NanoTimeGeneratorHandler implements DirectBufferHandler<Object> {
        private static final UnsafeBuffer OFFER_BUFFER = new UnsafeBuffer(BufferUtil.allocateDirectAligned(Configurations.MESSAGE_LENGTH, 64));

        private NanoTimeGeneratorHandler() {
        }

        public DirectBuffer map(Object obj) {
            OFFER_BUFFER.putLong(0, System.nanoTime());
            return OFFER_BUFFER;
        }
    }

    public static void main(String... strArr) {
        AeronResources aeronResources = (AeronResources) new AeronResources().useTmpDir().pollFragmentLimit(Configurations.FRAGMENT_COUNT_LIMIT).singleWorker().workerIdleStrategySupplier(Configurations::idleStrategy).start().block();
        AeronDuplex aeronDuplex = (AeronDuplex) AeronClient.create(aeronResources).options(Configurations.MDC_ADDRESS, Configurations.MDC_PORT, Configurations.MDC_CONTROL_PORT).connect().block();
        System.out.println("address: " + Configurations.MDC_ADDRESS + ", port: " + Configurations.MDC_PORT + ", controlPort: " + Configurations.MDC_CONTROL_PORT);
        System.out.println("Message length of " + Configurations.MESSAGE_LENGTH + " bytes");
        System.out.println("pollFragmentLimit of " + Configurations.FRAGMENT_COUNT_LIMIT);
        System.out.println("Using worker idle strategy " + Configurations.idleStrategy().getClass() + "(" + Configurations.IDLE_STRATEGY + ")");
        System.out.println("Request " + Configurations.REQUESTED);
        ContinueBarrier continueBarrier = new ContinueBarrier("Execute again?");
        do {
            System.out.println("Pinging " + Configurations.NUMBER_OF_MESSAGES + " messages");
            roundTripMessages(aeronDuplex, Configurations.NUMBER_OF_MESSAGES);
            System.out.println("Histogram of RTT latencies in microseconds.");
        } while (continueBarrier.await());
        aeronDuplex.dispose();
        aeronDuplex.onDispose(aeronResources).onDispose().block();
    }

    private static void roundTripMessages(AeronDuplex<DirectBuffer> aeronDuplex, long j) {
        HISTOGRAM.reset();
        Disposable start = reporter.start();
        NanoTimeGeneratorHandler nanoTimeGeneratorHandler = new NanoTimeGeneratorHandler();
        aeronDuplex.outbound().send(Flux.range(0, Configurations.REQUESTED), nanoTimeGeneratorHandler).then().subscribe();
        aeronDuplex.outbound().send(aeronDuplex.inbound().receive().take(j).doOnNext(directBuffer -> {
            HISTOGRAM.recordValue(System.nanoTime() - directBuffer.getLong(0));
        }), nanoTimeGeneratorHandler).then(Mono.defer(() -> {
            return Mono.delay(Duration.ofMillis(100L)).doOnSubscribe(subscription -> {
                start.dispose();
            }).then();
        })).then().block();
    }
}
