package com.ceresdb;

import com.ceresdb.CeresDBLimiter;
import com.ceresdb.common.Display;
import com.ceresdb.common.Endpoint;
import com.ceresdb.common.Lifecycle;
import com.ceresdb.common.VisibleForTest;
import com.ceresdb.common.util.Clock;
import com.ceresdb.common.util.MetricsUtil;
import com.ceresdb.common.util.Requires;
import com.ceresdb.common.util.SerializingExecutor;
import com.ceresdb.errors.StreamException;
import com.ceresdb.models.Err;
import com.ceresdb.models.QueryOk;
import com.ceresdb.models.QueryRequest;
import com.ceresdb.models.Result;
import com.ceresdb.options.QueryOptions;
import com.ceresdb.proto.Storage;
import com.ceresdb.rpc.Context;
import com.ceresdb.rpc.Observer;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ceresdb/QueryClient.class */
public class QueryClient implements Query, Lifecycle<QueryOptions>, Display {
    private static final Logger LOG = LoggerFactory.getLogger(QueryClient.class);
    private QueryOptions opts;
    private RouterClient routerClient;
    private Executor asyncPool;
    private QueryLimiter queryLimiter;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTest
    /* loaded from: input_file:com/ceresdb/QueryClient$DefaultQueryLimiter.class */
    public static class DefaultQueryLimiter extends QueryLimiter {
        public DefaultQueryLimiter(int i, LimitedPolicy limitedPolicy) {
            super(i, limitedPolicy, "query_limiter_acquire");
        }

        @Override // com.ceresdb.CeresDBLimiter
        public int calculatePermits(QueryRequest queryRequest) {
            return 1;
        }

        @Override // com.ceresdb.CeresDBLimiter
        public Result<QueryOk, Err> rejected(QueryRequest queryRequest, CeresDBLimiter.RejectedState rejectedState) {
            return Result.err(Err.queryErr(Result.FLOW_CONTROL, String.format("Query limited by client, acquirePermits=%d, maxPermits=%d, availablePermits=%d.", Integer.valueOf(rejectedState.acquirePermits()), Integer.valueOf(rejectedState.maxPermits()), Integer.valueOf(rejectedState.availablePermits())), null, queryRequest.getQl(), queryRequest.getMetrics()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/ceresdb/QueryClient$ErrHandler.class */
    public static final class ErrHandler implements Runnable {
        private final QueryRequest req;

        private ErrHandler(QueryRequest queryRequest) {
            this.req = queryRequest;
        }

        @Override // java.lang.Runnable
        public void run() {
            QueryClient.LOG.error("Fail to query by request: {}.", this.req);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/ceresdb/QueryClient$InnerMetrics.class */
    public static final class InnerMetrics {
        static final Histogram READ_ROW_COUNT = MetricsUtil.histogram("read_row_count");
        static final Meter READ_FAILED = MetricsUtil.meter("read_failed");
        static final Meter READ_QPS = MetricsUtil.meter("read_qps");

        InnerMetrics() {
        }

        static Histogram readRowCount() {
            return READ_ROW_COUNT;
        }

        static Meter readFailed() {
            return READ_FAILED;
        }

        static Meter readQps() {
            return READ_QPS;
        }

        static Meter readByRetries(int i) {
            return MetricsUtil.meter(new Object[]{"read_by_retries", Integer.valueOf(Math.min(3, i))});
        }
    }

    public boolean init(QueryOptions queryOptions) {
        this.opts = (QueryOptions) Requires.requireNonNull(queryOptions, "QueryOptions.opts");
        this.routerClient = this.opts.getRouterClient();
        Executor asyncPool = this.opts.getAsyncPool();
        this.asyncPool = asyncPool != null ? asyncPool : new SerializingExecutor("query_client");
        this.queryLimiter = new DefaultQueryLimiter(this.opts.getMaxInFlightQueryRequests(), this.opts.getLimitedPolicy());
        return true;
    }

    public void shutdownGracefully() {
    }

    @Override // com.ceresdb.Query
    public CompletableFuture<Result<QueryOk, Err>> query(QueryRequest queryRequest, Context context) {
        Requires.requireNonNull(queryRequest, "Null.request");
        long tick = Clock.defaultClock().getTick();
        setMetricsIfAbsent(queryRequest);
        return this.queryLimiter.acquireAndDo(queryRequest, () -> {
            return query0(queryRequest, context, 0).whenCompleteAsync((result, th) -> {
                InnerMetrics.readQps().mark();
                if (result != null) {
                    int intValue = ((Integer) result.mapOr(0, (v0) -> {
                        return v0.getRowCount();
                    })).intValue();
                    InnerMetrics.readRowCount().update(intValue);
                    if (Utils.isRwLogging()) {
                        LOG.info("Read from {}, duration={} ms, rowCount={}.", new Object[]{Utils.DB_NAME, Long.valueOf(Clock.defaultClock().duration(tick)), Integer.valueOf(intValue)});
                    }
                    if (result.isOk()) {
                        return;
                    }
                }
                InnerMetrics.readFailed().mark();
            }, this.asyncPool);
        });
    }

    @Override // com.ceresdb.Query
    public void streamQuery(QueryRequest queryRequest, Context context, Observer<QueryOk> observer) {
        Requires.requireNonNull(queryRequest, "Null.request");
        Requires.requireNonNull(observer, "Null.observer");
        setMetricsIfAbsent(queryRequest);
        this.routerClient.routeFor(queryRequest.getMetrics()).thenApply(map -> {
            return (Route) map.values().stream().findAny().orElse(this.routerClient.clusterRoute());
        }).thenAccept((Consumer<? super U>) route -> {
            streamQueryFrom(route.getEndpoint(), queryRequest, context, observer);
        });
    }

    private CompletableFuture<Result<QueryOk, Err>> query0(QueryRequest queryRequest, Context context, int i) {
        InnerMetrics.readByRetries(i).mark();
        return this.routerClient.routeFor(queryRequest.getMetrics()).thenApplyAsync(map -> {
            return (Route) map.values().stream().findAny().orElse(this.routerClient.clusterRoute());
        }, this.asyncPool).thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) route -> {
            return queryFrom(route.getEndpoint(), queryRequest, context, i);
        }, this.asyncPool).thenComposeAsync(result -> {
            if (result.isOk()) {
                LOG.debug("Success to read from {}, ok={}.", Utils.DB_NAME, result.getOk());
                return Utils.completedCf(result);
            }
            Err err = (Err) result.getErr();
            LOG.warn("Failed to read from {}, retries={}, err={}.", new Object[]{Utils.DB_NAME, Integer.valueOf(i), err});
            if (i > this.opts.getMaxRetries()) {
                LOG.error("Retried {} times still failed.", Integer.valueOf(i));
                return Utils.completedCf(result);
            }
            Set set = (Set) err.stream().filter(Utils::shouldRefreshRouteTable).flatMap(err2 -> {
                return err2.getFailedMetrics().stream();
            }).collect(Collectors.toSet());
            return set.isEmpty() ? Utils.completedCf(result) : this.routerClient.routeRefreshFor(set).thenComposeAsync(map2 -> {
                return query0(queryRequest, context, i + 1);
            }, this.asyncPool);
        }, this.asyncPool);
    }

    private void setMetricsIfAbsent(QueryRequest queryRequest) {
        if (queryRequest.getMetrics() == null || queryRequest.getMetrics().isEmpty()) {
            queryRequest.setMetrics(MetricParserFactoryProvider.getMetricParserFactory().getParser(queryRequest.getQl()).metricNames());
        }
    }

    private CompletableFuture<Result<QueryOk, Err>> queryFrom(Endpoint endpoint, QueryRequest queryRequest, Context context, int i) {
        return this.routerClient.invoke(endpoint, Storage.QueryRequest.newBuilder().addAllMetrics(queryRequest.getMetrics()).setQl(queryRequest.getQl()).build(), context.with("retries", Integer.valueOf(i))).thenApplyAsync(queryResponse -> {
            return Utils.toResult(queryResponse, queryRequest.getQl(), endpoint, queryRequest.getMetrics(), new ErrHandler(queryRequest));
        }, this.asyncPool);
    }

    private void streamQueryFrom(final Endpoint endpoint, final QueryRequest queryRequest, Context context, final Observer<QueryOk> observer) {
        this.routerClient.invokeServerStreaming(endpoint, Storage.QueryRequest.newBuilder().addAllMetrics(queryRequest.getMetrics()).setQl(queryRequest.getQl()).build(), context, new Observer<Storage.QueryResponse>() { // from class: com.ceresdb.QueryClient.1
            public void onNext(Storage.QueryResponse queryResponse) {
                Result<QueryOk, Err> result = Utils.toResult(queryResponse, queryRequest.getQl(), endpoint, queryRequest.getMetrics(), new ErrHandler(queryRequest));
                if (result.isOk()) {
                    observer.onNext(result.getOk());
                } else {
                    observer.onError(new StreamException("Failed to do stream query: " + result.getErr()));
                }
            }

            public void onError(Throwable th) {
                observer.onError(th);
            }

            public void onCompleted() {
                observer.onCompleted();
            }

            public Executor executor() {
                return observer.executor();
            }
        });
    }

    public void display(Display.Printer printer) {
        printer.println("--- QueryClient ---").print("maxRetries=").println(Integer.valueOf(this.opts.getMaxRetries())).print("asyncPool=").println(this.asyncPool);
    }

    public String toString() {
        return "QueryClient{opts=" + this.opts + ", routerClient=" + this.routerClient + ", asyncPool=" + this.asyncPool + '}';
    }
}
