package io.reactivesocket.examples.transport.tcp.stress;

import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.client.LoadBalancingClient;
import io.reactivesocket.exceptions.RejectedException;
import io.reactivesocket.server.ReactiveSocketServer;
import io.reactivesocket.transport.tcp.server.TcpTransportServer;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.HdrHistogram.Recorder;
import org.reactivestreams.Publisher;

/* loaded from: input_file:io/reactivesocket/examples/transport/tcp/stress/StressTest.class */
class StressTest {
    private final TestConfig config;
    private volatile long testStartTime;
    private ReactiveSocket clientSocket;
    private Disposable printDisposable;
    private final AtomicInteger serverCount = new AtomicInteger(0);
    private final AtomicInteger outstandings = new AtomicInteger();
    private final AtomicInteger successes = new AtomicInteger(0);
    private final AtomicInteger failures = new AtomicInteger(0);
    private final AtomicInteger leaseExhausted = new AtomicInteger();
    private final AtomicInteger timeouts = new AtomicInteger();
    private final Recorder histogram = new Recorder(TimeUnit.MINUTES.toNanos(1), 4);

    /* JADX INFO: Access modifiers changed from: package-private */
    public StressTest(TestConfig testConfig) {
        this.config = testConfig;
    }

    public StressTest printStatsEvery(Duration duration) {
        this.printDisposable = Flowable.interval(duration.getSeconds(), TimeUnit.SECONDS).forEach(l -> {
            printTestStats(false);
        });
        return this;
    }

    public void printTestStats(boolean z) {
        System.out.println("==============================================================");
        long nanoTime = (System.nanoTime() - this.testStartTime) / 1000000;
        System.out.println(this.successes.get() + " events in " + nanoTime + " ms. Test time remaining(ms): " + (this.config.getTestDuration().toMillis() - nanoTime));
        System.out.println(((1.0E9d * this.successes.get()) / (System.nanoTime() - this.testStartTime)) + " rps");
        System.out.println("successes: " + this.successes.get() + ", failures: " + this.failures.get() + ", timeouts: " + this.timeouts.get() + ", lease exhaustion: " + this.leaseExhausted.get() + ", success rate: " + (this.successes.get() / (this.successes.get() + this.failures.get())));
        if (z) {
            System.out.println("Latency distribution in us");
            this.histogram.getIntervalHistogram().outputPercentileDistribution(System.out, Double.valueOf(1000.0d));
        }
        System.out.println("==============================================================");
        System.out.flush();
    }

    public StressTest startClient() {
        this.clientSocket = (ReactiveSocket) Single.fromPublisher(LoadBalancingClient.create(getServerList(), socketAddress -> {
            return this.config.newClientForServer(socketAddress);
        }).connect()).blockingGet();
        System.out.println("Client ready!");
        return this;
    }

    private Publisher<? extends Collection<SocketAddress>> getServerList() {
        return this.config.serverListChangeTicks().map(l -> {
            return startServer();
        }).map(new Function<SocketAddress, Collection<SocketAddress>>() { // from class: io.reactivesocket.examples.transport.tcp.stress.StressTest.1
            private final List<SocketAddress> addresses = new ArrayList();

            public Collection<SocketAddress> apply(SocketAddress socketAddress) {
                System.out.println("Adding server " + socketAddress);
                this.addresses.add(socketAddress);
                if (this.addresses.size() > 15) {
                    System.out.println("Removed server " + this.addresses.remove(0));
                }
                return this.addresses;
            }
        });
    }

    public void startTest(java.util.function.Function<ReactiveSocket, Publisher<?>> function) {
        if (this.clientSocket == null) {
            System.err.println("Client not connected. Call startClient() first.");
            System.exit(-1);
        }
        this.testStartTime = System.nanoTime();
        while (System.nanoTime() - this.testStartTime < this.config.getTestDuration().toNanos()) {
            if (this.outstandings.get() <= this.config.getMaxConcurrency()) {
                AtomicLong atomicLong = new AtomicLong();
                Flowable.defer(() -> {
                    return (Publisher) function.apply(this.clientSocket);
                }).doOnSubscribe(subscription -> {
                    atomicLong.set(System.nanoTime());
                    this.outstandings.incrementAndGet();
                }).doAfterTerminate(() -> {
                    this.histogram.recordValue((System.nanoTime() - atomicLong.get()) / 1000);
                    this.outstandings.decrementAndGet();
                }).doOnComplete(() -> {
                    this.successes.incrementAndGet();
                }).onErrorResumeNext(th -> {
                    this.failures.incrementAndGet();
                    if (th instanceof RejectedException) {
                        this.leaseExhausted.incrementAndGet();
                    } else if (th instanceof TimeoutException) {
                        this.timeouts.incrementAndGet();
                    }
                    if (this.failures.get() % 1000 == 0) {
                        th.printStackTrace();
                    }
                    return Flowable.empty();
                }).subscribe();
            } else {
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e) {
                    System.out.println("Interrupted while waiting for lowering concurrency.");
                    Thread.currentThread().interrupt();
                }
            }
        }
        System.out.println("Stress test finished. Duration (minutes): " + Duration.ofNanos(System.nanoTime() - this.testStartTime).toMinutes());
        printTestStats(true);
        Flowable.fromPublisher(this.clientSocket.close()).ignoreElements().blockingGet();
        if (null != this.printDisposable) {
            this.printDisposable.dispose();
        }
    }

    private SocketAddress startServer() {
        return ReactiveSocketServer.create(TcpTransportServer.create()).start((connectionSetupPayload, reactiveSocket) -> {
            return this.config.nextServerHandler(this.serverCount.incrementAndGet());
        }).getServerAddress();
    }
}
