package com.azure.core.http.vertx;

import com.azure.core.http.HttpClient;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.vertx.implementation.BufferedVertxHttpResponse;
import com.azure.core.util.Context;
import com.azure.core.util.Contexts;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.ProgressReporter;
import io.netty.buffer.Unpooled;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.RequestOptions;
import java.util.Objects;
import java.util.function.Consumer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/azure/core/http/vertx/VertxAsyncHttpClient.class */
class VertxAsyncHttpClient implements HttpClient {
    private final Scheduler scheduler;
    final io.vertx.core.http.HttpClient client;

    /* JADX INFO: Access modifiers changed from: package-private */
    public VertxAsyncHttpClient(io.vertx.core.http.HttpClient httpClient, Vertx vertx) {
        Objects.requireNonNull(httpClient, "client cannot be null");
        Objects.requireNonNull(vertx, "vertx cannot be null");
        this.client = httpClient;
        this.scheduler = Schedulers.fromExecutor(vertx.nettyEventLoopGroup());
    }

    public Mono<HttpResponse> send(HttpRequest httpRequest) {
        return send(httpRequest, Context.NONE);
    }

    public Mono<HttpResponse> send(HttpRequest httpRequest, Context context) {
        ProgressReporter httpRequestProgressReporter = Contexts.with(context).getHttpRequestProgressReporter();
        RequestOptions absoluteURI = new RequestOptions().setMethod(HttpMethod.valueOf(httpRequest.getHttpMethod().name())).setAbsoluteURI(httpRequest.getUrl());
        return Mono.create(monoSink -> {
            this.client.request(absoluteURI, asyncResult -> {
                if (asyncResult.failed()) {
                    monoSink.error(asyncResult.cause());
                    return;
                }
                HttpClientRequest httpClientRequest = (HttpClientRequest) asyncResult.result();
                Objects.requireNonNull(monoSink);
                httpClientRequest.exceptionHandler(monoSink::error);
                httpRequest.getHeaders().stream().forEach(httpHeader -> {
                    httpClientRequest.putHeader(httpHeader.getName(), httpHeader.getValuesList());
                });
                if (httpRequest.getHeaders().get("Content-Length") == null) {
                    httpClientRequest.setChunked(true);
                }
                httpClientRequest.response(asyncResult -> {
                    if (!asyncResult.succeeded()) {
                        monoSink.error(asyncResult.cause());
                        return;
                    }
                    HttpClientResponse httpClientResponse = (HttpClientResponse) asyncResult.result();
                    Objects.requireNonNull(monoSink);
                    httpClientResponse.exceptionHandler(monoSink::error);
                    httpClientResponse.body(asyncResult -> {
                        if (asyncResult.succeeded()) {
                            monoSink.success(new BufferedVertxHttpResponse(httpRequest, httpClientResponse, (Buffer) asyncResult.result()));
                        } else {
                            monoSink.error(asyncResult.cause());
                        }
                    });
                });
                Flux body = httpRequest.getBody();
                if (body == null) {
                    httpClientRequest.end();
                    return;
                }
                if (httpRequestProgressReporter != null) {
                    body = body.map(byteBuffer -> {
                        httpRequestProgressReporter.reportProgress(byteBuffer.remaining());
                        return byteBuffer;
                    });
                }
                Mono subscribeOn = FluxUtil.collectBytesFromNetworkResponse(body, httpRequest.getHeaders()).subscribeOn(this.scheduler);
                Consumer consumer = bArr -> {
                    httpClientRequest.write(Buffer.buffer(Unpooled.wrappedBuffer(bArr)));
                };
                Objects.requireNonNull(monoSink);
                Consumer consumer2 = monoSink::error;
                Objects.requireNonNull(httpClientRequest);
                subscribeOn.subscribe(consumer, consumer2, httpClientRequest::end);
            });
        });
    }
}
