package io.reactivesocket.test;

import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.client.KeepAliveProvider;
import io.reactivesocket.client.ReactiveSocketClient;
import io.reactivesocket.client.SetupProvider;
import io.reactivesocket.transport.TransportClient;
import io.reactivesocket.util.PayloadImpl;
import io.reactivex.subscribers.TestSubscriber;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import org.junit.Assert;
import org.junit.rules.ExternalResource;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import reactor.core.publisher.Flux;

/* loaded from: input_file:io/reactivesocket/test/ClientSetupRule.class */
public class ClientSetupRule extends ExternalResource {
    private final Callable<SocketAddress> serverStarter;
    private final Function<SocketAddress, TransportClient> clientFactory;
    private SocketAddress serverAddress;
    private ReactiveSocket reactiveSocket;
    private ReactiveSocketClient reactiveSocketClient;

    public ClientSetupRule(Function<SocketAddress, TransportClient> function, Callable<SocketAddress> callable) {
        this.clientFactory = function;
        this.serverStarter = callable;
    }

    public Statement apply(final Statement statement, Description description) {
        return new Statement() { // from class: io.reactivesocket.test.ClientSetupRule.1
            public void evaluate() throws Throwable {
                ClientSetupRule.this.serverAddress = (SocketAddress) ClientSetupRule.this.serverStarter.call();
                TransportClient transportClient = (TransportClient) ClientSetupRule.this.clientFactory.apply(ClientSetupRule.this.serverAddress);
                SetupProvider disableLease = SetupProvider.keepAlive(KeepAliveProvider.never()).disableLease();
                ClientSetupRule.this.reactiveSocketClient = ReactiveSocketClient.create(transportClient, disableLease);
                statement.evaluate();
            }
        };
    }

    public ReactiveSocketClient getClient() {
        return this.reactiveSocketClient;
    }

    public SocketAddress getServerAddress() {
        return this.serverAddress;
    }

    public ReactiveSocket getReactiveSocket() {
        if (null == this.reactiveSocket) {
            this.reactiveSocket = (ReactiveSocket) this.reactiveSocketClient.connect().block();
        }
        return this.reactiveSocket;
    }

    public void testFireAndForget(int i) {
        TestSubscriber create = TestSubscriber.create();
        Flux.range(1, i).flatMap(num -> {
            return getReactiveSocket().fireAndForget(new PayloadImpl("hello", "metadata"));
        }).doOnError((v0) -> {
            v0.printStackTrace();
        }).subscribe(create);
        await(create);
        create.assertTerminated();
        create.assertNoErrors();
        create.assertTerminated();
    }

    public void testMetadata(int i) {
        TestSubscriber create = TestSubscriber.create();
        Flux.range(1, i).flatMap(num -> {
            return getReactiveSocket().metadataPush(new PayloadImpl("", "metadata"));
        }).doOnError((v0) -> {
            v0.printStackTrace();
        }).subscribe(create);
        await(create);
        create.assertTerminated();
        create.assertNoErrors();
        create.assertTerminated();
    }

    public void testRequestResponseN(int i) {
        TestSubscriber create = TestSubscriber.create();
        Flux.range(1, i).flatMap(num -> {
            return getReactiveSocket().requestResponse(new PayloadImpl("hello", "metadata")).map(payload -> {
                return StandardCharsets.UTF_8.decode(payload.getData()).toString();
            });
        }).doOnError((v0) -> {
            v0.printStackTrace();
        }).subscribe(create);
        await(create);
        create.assertTerminated();
        create.assertValueCount(i);
        create.assertNoErrors();
        create.assertTerminated();
    }

    public void testRequestStream() {
        testStream(reactiveSocket -> {
            return reactiveSocket.requestStream(new PayloadImpl("hello", "metadata"));
        });
    }

    public void testRequestStreamWithRequestN() {
        testStreamRequestN(reactiveSocket -> {
            return reactiveSocket.requestStream(new PayloadImpl("hello", "metadata"));
        });
    }

    private void testStreamRequestN(Function<ReactiveSocket, Flux<Payload>> function) {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        TestSubscriber create = TestSubscriber.create(10 / 2);
        function.apply(getReactiveSocket()).doOnNext(payload -> {
            countDownLatch.countDown();
        }).subscribe(create);
        create.request(10 / 2);
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            Assert.fail(e.getMessage());
        }
        create.assertNoErrors();
        create.assertValueCount(10);
        create.assertNotTerminated();
    }

    private void testStream(Function<ReactiveSocket, Flux<Payload>> function) {
        TestSubscriber create = TestSubscriber.create();
        function.apply(getReactiveSocket()).take(5L).subscribe(create);
        await(create);
        create.assertTerminated();
        create.assertNoErrors();
        create.assertValueCount(5);
        create.assertTerminated();
    }

    private static void await(TestSubscriber<?> testSubscriber) {
        try {
            testSubscriber.await();
        } catch (InterruptedException e) {
            Assert.fail("Interrupted while waiting for completion.");
        }
    }
}
