package io.vertx.reactivex.test;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.rxjava3.core.AbstractVerticle;
import io.vertx.rxjava3.core.RxHelper;
import io.vertx.rxjava3.core.Vertx;
import io.vertx.rxjava3.core.buffer.Buffer;
import io.vertx.rxjava3.core.eventbus.EventBus;
import io.vertx.rxjava3.core.http.HttpClient;
import io.vertx.rxjava3.core.http.HttpServerResponse;
import io.vertx.test.core.VertxTestBase;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Test;

/* loaded from: input_file:io/vertx/reactivex/test/CoreApiTest.class */
public class CoreApiTest extends VertxTestBase {
    private Vertx vertx;

    public void setUp() throws Exception {
        super.setUp();
        this.vertx = new Vertx(((VertxTestBase) this).vertx);
    }

    @Test
    public void testDeployVerticle() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        RxHelper.deployVerticle(this.vertx, new AbstractVerticle() { // from class: io.vertx.reactivex.test.CoreApiTest.1
            public void start() {
                countDownLatch.countDown();
            }
        }).subscribe(str -> {
            countDownLatch.countDown();
        });
        awaitLatch(countDownLatch);
    }

    @Test
    public void testWebSocket() {
        waitFor(2);
        AtomicLong atomicLong = new AtomicLong();
        this.vertx.createHttpServer(new HttpServerOptions().setIdleTimeout(2)).webSocketHandler(serverWebSocket -> {
            serverWebSocket.toFlowable().subscribe(buffer -> {
                atomicLong.incrementAndGet();
                serverWebSocket.writeTextMessage("pong");
            }, th -> {
                assertEquals(1L, atomicLong.get());
                complete();
            }, this::fail);
        }).listen(8080, "localhost").blockingGet();
        HttpClient createHttpClient = this.vertx.createHttpClient();
        AtomicLong atomicLong2 = new AtomicLong();
        createHttpClient.rxWebSocket(8080, "localhost", "/").doAfterSuccess(webSocket -> {
            webSocket.writeTextMessage("ping");
        }).flatMapPublisher((v0) -> {
            return v0.toFlowable();
        }).subscribe(buffer -> {
            atomicLong2.incrementAndGet();
        }, th -> {
            assertEquals(1L, atomicLong2.get());
            complete();
        }, this::fail);
        await();
    }

    @Test
    public void testHttpClient() {
        this.vertx.createHttpServer().requestHandler(httpServerRequest -> {
            httpServerRequest.response().end("Hello World");
        }).listen(8080, "localhost").blockingGet();
        assertEquals("Hello World", ((Buffer) this.vertx.createHttpClient().rxRequest(HttpMethod.GET, 8080, "localhost", "/").flatMap(httpClientRequest -> {
            return httpClientRequest.rxSend().flatMap((v0) -> {
                return v0.body();
            });
        }).blockingGet()).toString());
    }

    @Test
    public void testHttpClientResponseStream() {
        this.vertx.createHttpServer().requestHandler(httpServerRequest -> {
            AtomicInteger atomicInteger = new AtomicInteger();
            HttpServerResponse chunked = httpServerRequest.response().setChunked(true);
            this.vertx.setPeriodic(10L, l -> {
                int andIncrement = atomicInteger.getAndIncrement();
                if (andIncrement < 10) {
                    chunked.write("" + andIncrement);
                } else {
                    this.vertx.cancelTimer(l.longValue());
                    chunked.end();
                }
            });
        }).listen(8080, "localhost").blockingGet();
        assertEquals("0123456789", (String) this.vertx.createHttpClient().rxRequest(HttpMethod.GET, 8080, "localhost", "/").flatMapPublisher(httpClientRequest -> {
            return httpClientRequest.rxSend().flatMapPublisher((v0) -> {
                return v0.toFlowable();
            });
        }).reduce("", (str, buffer) -> {
            return str + buffer;
        }).blockingGet());
    }

    @Test
    public void shouldRemoveInterceptor() {
        String uuid = UUID.randomUUID().toString();
        String uuid2 = UUID.randomUUID().toString();
        Handler handler = deliveryContext -> {
            deliveryContext.message().headers().add(uuid, uuid2);
            deliveryContext.next();
        };
        EventBus eventBus = this.vertx.eventBus();
        eventBus.addInboundInterceptor(handler);
        eventBus.consumer("foo", message -> {
            message.reply(message.headers().get(uuid));
        }).completionHandler().andThen(eventBus.rxRequest("foo", "bar").flatMapCompletable(message2 -> {
            return message2.body().equals(uuid2) ? Completable.complete() : Completable.error(new NoStackTraceThrowable("Expected msg to be intercepted"));
        })).andThen(Completable.fromAction(() -> {
            eventBus.removeInboundInterceptor(handler);
        })).andThen(eventBus.rxRequest("foo", "bar").flatMapCompletable(message3 -> {
            return message3.body() == null ? Completable.complete() : Completable.error(new NoStackTraceThrowable("Expected msg not to be intercepted"));
        })).subscribe(() -> {
            testComplete();
        }, th -> {
            fail(th);
        });
        await();
    }

    @Test
    public void testPipeFailureShouldUnsubscribe() throws Exception {
        this.vertx.createHttpServer().requestHandler(httpServerRequest -> {
            httpServerRequest.response().send(Flowable.generate(() -> {
                return 0L;
            }, (l, emitter) -> {
                emitter.onNext(Buffer.buffer("Chunk " + l + "\n"));
                return Long.valueOf(l.longValue() + 1);
            }).delay(100L, TimeUnit.MILLISECONDS).rebatchRequests(1).doOnCancel(this::testComplete));
        }).rxListen(8080, "localhost").blockingGet();
        this.vertx.createHttpClient().rxRequest(HttpMethod.GET, 8080, "localhost", "/").flatMap((v0) -> {
            return v0.rxSend();
        }).subscribe(httpClientResponse -> {
            httpClientResponse.toFlowable().take(5L).subscribe(buffer -> {
            }, this::fail, () -> {
                httpClientResponse.request().reset();
            });
        }, this::fail);
        await();
    }
}
