package io.scalecube.gateway.clientsdk.http;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpHeaders;
import io.scalecube.gateway.clientsdk.ClientCodec;
import io.scalecube.gateway.clientsdk.ClientMessage;
import io.scalecube.gateway.clientsdk.ClientSettings;
import io.scalecube.gateway.clientsdk.ClientTransport;
import io.scalecube.services.api.Qualifier;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientResponse;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;

/* loaded from: input_file:io/scalecube/gateway/clientsdk/http/HttpClientTransport.class */
public final class HttpClientTransport implements ClientTransport {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientTransport.class);
    private static final String SERVICE_RECV_TIME = "service-recv-time";
    private static final String SERVICE_SEND_TIME = "service-send-time";
    private static final String CLIENT_RECV_TIME = "client-recv-time";
    private static final String CLIENT_SEND_TIME = "client-send-time";
    private final ClientCodec<ByteBuf> codec;
    private final HttpClient httpClient;
    private final ConnectionProvider connectionProvider = ConnectionProvider.elastic("http-client-sdk");

    public HttpClientTransport(ClientSettings clientSettings, ClientCodec<ByteBuf> clientCodec, LoopResources loopResources) {
        this.codec = clientCodec;
        this.httpClient = HttpClient.create(this.connectionProvider).tcpConfiguration(tcpClient -> {
            return tcpClient.runOn(loopResources).host(clientSettings.host()).port(clientSettings.port());
        });
    }

    @Override // io.scalecube.gateway.clientsdk.ClientTransport
    public Mono<ClientMessage> requestResponse(ClientMessage clientMessage, Scheduler scheduler) {
        return Mono.defer(() -> {
            ByteBuf encode = this.codec.encode(clientMessage);
            return this.httpClient.post().uri(clientMessage.qualifier()).send((httpClientRequest, nettyOutbound) -> {
                LOGGER.debug("Sending request {}", clientMessage);
                httpClientRequest.header(CLIENT_SEND_TIME, String.valueOf(System.currentTimeMillis()));
                return nettyOutbound.sendObject(encode).then();
            }).responseSingle((httpClientResponse, byteBufMono) -> {
                return byteBufMono.map((v0) -> {
                    return v0.retain();
                }).publishOn(scheduler).map(byteBuf -> {
                    return toClientMessage(httpClientResponse, byteBuf, clientMessage.qualifier());
                });
            });
        });
    }

    @Override // io.scalecube.gateway.clientsdk.ClientTransport
    public Flux<ClientMessage> requestStream(ClientMessage clientMessage, Scheduler scheduler) {
        return Flux.error(new UnsupportedOperationException("Request stream is not supported by HTTP/1.x"));
    }

    @Override // io.scalecube.gateway.clientsdk.ClientTransport
    public Mono<Void> close() {
        return this.connectionProvider.disposeLater().doOnTerminate(() -> {
            LOGGER.info("Closed http-client-sdk transport");
        });
    }

    private ClientMessage toClientMessage(HttpClientResponse httpClientResponse, ByteBuf byteBuf, String str) {
        int code = httpClientResponse.status().code();
        String asError = isError(code) ? Qualifier.asError(code) : str;
        HttpHeaders responseHeaders = httpClientResponse.responseHeaders();
        ClientMessage.Builder header = ClientMessage.builder().qualifier(asError).header(CLIENT_RECV_TIME, Long.valueOf(System.currentTimeMillis()));
        Optional.ofNullable(responseHeaders.get(CLIENT_SEND_TIME)).ifPresent(str2 -> {
            header.header(CLIENT_SEND_TIME, str2);
        });
        Optional.ofNullable(responseHeaders.get(SERVICE_RECV_TIME)).ifPresent(str3 -> {
            header.header(SERVICE_RECV_TIME, str3);
        });
        Optional.ofNullable(responseHeaders.get(SERVICE_SEND_TIME)).ifPresent(str4 -> {
            header.header(SERVICE_SEND_TIME, str4);
        });
        ClientMessage build = header.data(byteBuf).build();
        LOGGER.debug("Received response {}", build);
        return build;
    }

    private boolean isError(int i) {
        return i >= 400 && i <= 599;
    }
}
