package io.streamthoughts.azkarra.http.query;

import io.streamthoughts.azkarra.api.errors.AzkarraRetriableException;
import io.streamthoughts.azkarra.api.query.Queried;
import io.streamthoughts.azkarra.api.query.QueryInfo;
import io.streamthoughts.azkarra.api.query.RemoteQueryClient;
import io.streamthoughts.azkarra.api.query.result.ErrorResultSet;
import io.streamthoughts.azkarra.api.query.result.QueryError;
import io.streamthoughts.azkarra.api.query.result.QueryResult;
import io.streamthoughts.azkarra.api.query.result.QueryResultBuilder;
import io.streamthoughts.azkarra.api.query.result.QueryStatus;
import io.streamthoughts.azkarra.api.streams.StreamsServerInfo;
import io.streamthoughts.azkarra.serialization.Serdes;
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/azkarra/http/query/HttpRemoteQueryClient.class */
public class HttpRemoteQueryClient implements RemoteQueryClient {
    private static Logger LOG = LoggerFactory.getLogger(HttpRemoteQueryClient.class);
    private OkHttpClient client;
    private final QueryURLBuilder queryURLBuilder;
    private final Serdes<QueryResult> serdes;
    private final MediaType mediaType;

    /* loaded from: input_file:io/streamthoughts/azkarra/http/query/HttpRemoteQueryClient$AsyncQueryCallback.class */
    private static class AsyncQueryCallback<K, V> implements Callback {
        private final String remoteServerName;
        private final QueryResultBuilder<K, V> builder;
        private final CompletableFuture<QueryResult<K, V>> completableFuture;
        private final Serdes<QueryResult> serdes;

        AsyncQueryCallback(String str, CompletableFuture<QueryResult<K, V>> completableFuture, QueryResultBuilder<K, V> queryResultBuilder, Serdes<QueryResult> serdes) {
            this.remoteServerName = str;
            this.completableFuture = completableFuture;
            this.builder = queryResultBuilder;
            this.serdes = serdes;
        }

        public void onFailure(Call call, IOException iOException) {
            this.completableFuture.completeExceptionally(new AzkarraRetriableException(iOException));
        }

        public void onResponse(Call call, Response response) {
            ResponseBody body = response.body();
            try {
                try {
                    byte[] bytes = body.bytes();
                    int code = response.code();
                    if (code < 200 || code >= 300) {
                        this.completableFuture.complete(buildQueryResultFor(this.remoteServerName, new QueryError("Invalid response from remote server (code:'" + code + "') : " + Arrays.toString(bytes))));
                    } else {
                        this.completableFuture.complete((QueryResult) this.serdes.deserialize(bytes));
                    }
                } catch (Exception e) {
                    this.completableFuture.complete(buildQueryResultFor(this.remoteServerName, QueryError.of(e)));
                }
                if (body != null) {
                    body.close();
                }
            } catch (Throwable th) {
                if (body != null) {
                    try {
                        body.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }

        private QueryResult<K, V> buildQueryResultFor(String str, QueryError queryError) {
            return this.builder.setStatus(QueryStatus.ERROR).setFailedResultSet(new ErrorResultSet(str, true, queryError)).setTook(0L).build();
        }
    }

    public HttpRemoteQueryClient(OkHttpClient okHttpClient, QueryURLBuilder queryURLBuilder, Serdes<QueryResult> serdes) {
        this.client = (OkHttpClient) Objects.requireNonNull(okHttpClient, "httpClient cannot be null");
        this.queryURLBuilder = (QueryURLBuilder) Objects.requireNonNull(queryURLBuilder, "queryURLBuilder cannot be null");
        this.serdes = (Serdes) Objects.requireNonNull(serdes, "serdes cannot be null");
        this.mediaType = MediaType.get(serdes.contentType());
    }

    public <K, V> CompletableFuture<QueryResult<K, V>> query(StreamsServerInfo streamsServerInfo, QueryInfo queryInfo, Queried queried) {
        String hostAndPort = streamsServerInfo.hostAndPort();
        Request build = new Request.Builder().url(this.queryURLBuilder.buildURL(hostAndPort, streamsServerInfo.id(), queryInfo.storeName())).addHeader("Accept", this.mediaType.toString()).addHeader("Content-type", this.mediaType.toString()).post(RequestBody.create(JsonQuerySerde.serialize(queryInfo, queried), this.mediaType)).build();
        QueryResultBuilder storeType = QueryResultBuilder.newBuilder().setServer(hostAndPort).setStoreName(queryInfo.storeName()).setStoreType(queryInfo.type().prettyName());
        CompletableFuture<QueryResult<K, V>> completableFuture = new CompletableFuture<>();
        LOG.debug("Forwarding state store query to remote server {}", hostAndPort);
        this.client.newCall(build).enqueue(new AsyncQueryCallback(hostAndPort, completableFuture, storeType, this.serdes));
        return completableFuture;
    }
}
