package io.ceresdb;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.ceresdb.common.Display;
import io.ceresdb.common.Endpoint;
import io.ceresdb.common.Lifecycle;
import io.ceresdb.common.util.Clock;
import io.ceresdb.common.util.Cpus;
import io.ceresdb.common.util.MetricsUtil;
import io.ceresdb.common.util.Requires;
import io.ceresdb.common.util.SharedScheduledPool;
import io.ceresdb.common.util.Spines;
import io.ceresdb.common.util.TopKSelector;
import io.ceresdb.errors.RouteTableException;
import io.ceresdb.options.RouterOptions;
import io.ceresdb.proto.Storage;
import io.ceresdb.rpc.Context;
import io.ceresdb.rpc.Observer;
import io.ceresdb.rpc.RpcClient;
import io.ceresdb.rpc.errors.RemotingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ceresdb/RouterClient.class */
public class RouterClient implements Lifecycle<RouterOptions>, Display, Iterable<Route> {
    private static final float CLEAN_CACHE_THRESHOLD = 0.75f;
    private static final float CLEAN_THRESHOLD = 0.1f;
    private static final int MAX_CONTINUOUS_GC_TIMES = 3;
    private static final int ITEM_COUNT_EACH_REFRESH = 512;
    private static final long BLOCKING_ROUTE_TIMEOUT_MS = 3000;
    private ScheduledExecutorService cleaner;
    private ScheduledExecutorService refresher;
    private RouterOptions opts;
    private RpcClient rpcClient;
    private RouterByMetrics router;
    private InnerMetrics metrics;
    private final ConcurrentMap<String, Route> routeCache = new ConcurrentHashMap();
    private static final Logger LOG = LoggerFactory.getLogger(RouterClient.class);
    private static final SharedScheduledPool CLEANER_POOL = Utils.getSharedScheduledPool("route_cache_cleaner", 1);
    private static final SharedScheduledPool REFRESHER_POOL = Utils.getSharedScheduledPool("route_cache_refresher", Math.min(4, Cpus.cpus()));

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/ceresdb/RouterClient$InnerMetrics.class */
    public static final class InnerMetrics {
        final Histogram refreshedSize;
        final Histogram cachedSize;
        final Histogram gcTimes;
        final Histogram gcItems;
        final Timer gcTimer;
        final Timer refreshTimer;

        private InnerMetrics(Endpoint endpoint) {
            String endpoint2 = endpoint.toString();
            this.refreshedSize = MetricsUtil.histogram(new Object[]{"route_for_metrics_refreshed_size", endpoint2});
            this.cachedSize = MetricsUtil.histogram(new Object[]{"route_for_metrics_cached_size", endpoint2});
            this.gcTimes = MetricsUtil.histogram(new Object[]{"route_for_metrics_gc_times", endpoint2});
            this.gcItems = MetricsUtil.histogram(new Object[]{"route_for_metrics_gc_items", endpoint2});
            this.gcTimer = MetricsUtil.timer(new Object[]{"route_for_metrics_gc_timer", endpoint2});
            this.refreshTimer = MetricsUtil.timer(new Object[]{"route_for_metrics_refresh_timer", endpoint2});
        }

        Histogram refreshedSize() {
            return this.refreshedSize;
        }

        Histogram cachedSize() {
            return this.cachedSize;
        }

        Histogram gcTimes() {
            return this.gcTimes;
        }

        Histogram gcItems() {
            return this.gcItems;
        }

        Timer gcTimer() {
            return this.gcTimer;
        }

        Timer refreshTimer() {
            return this.refreshTimer;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/ceresdb/RouterClient$RouterByMetrics.class */
    public class RouterByMetrics implements Router<Collection<String>, Map<String, Route>> {
        private final Endpoint endpoint;

        private RouterByMetrics(Endpoint endpoint) {
            this.endpoint = endpoint;
        }

        @Override // io.ceresdb.Router
        public CompletableFuture<Map<String, Route>> routeFor(Collection<String> collection) {
            return (collection == null || collection.isEmpty()) ? Utils.completedCf(Collections.emptyMap()) : invokeRpc(Storage.RouteRequest.newBuilder().addAllMetrics(collection).build(), Context.of("call_priority", "100")).thenCompose(routeResponse -> {
                return Utils.isSuccess(routeResponse.getHeader()) ? Utils.completedCf((Map) routeResponse.getRoutesList().stream().collect(Collectors.toMap((v0) -> {
                    return v0.getMetric();
                }, this::toRouteObj))) : Utils.errorCf(new RouteTableException("Fail to get route table: " + routeResponse.getHeader()));
            });
        }

        private CompletableFuture<Storage.RouteResponse> invokeRpc(Storage.RouteRequest routeRequest, Context context) {
            if (RouterClient.this.checkConn(this.endpoint, true)) {
                return RouterClient.this.invoke(this.endpoint, routeRequest, context);
            }
            RouterClient.LOG.warn("Fail to connect to the cluster address: {}.", this.endpoint);
            int i = 0;
            for (Endpoint endpoint : RouterClient.this.reserveAddresses()) {
                i++;
                RouterClient.LOG.warn("Try to invoke to the {}th server {}.", Integer.valueOf(i), endpoint);
                if (RouterClient.this.checkConn(endpoint, false)) {
                    return RouterClient.this.invoke(endpoint, routeRequest, context);
                }
            }
            return Utils.errorCf(new RouteTableException("Fail to connect to: " + this.endpoint));
        }

        private Route toRouteObj(Storage.Route route) {
            Storage.Endpoint endpoint = (Storage.Endpoint) Requires.requireNonNull(route.getEndpoint(), "CeresDB.Endpoint");
            return Route.of(route.getMetric(), Endpoint.of(endpoint.getIp(), endpoint.getPort()), route.getExt());
        }
    }

    public boolean init(RouterOptions routerOptions) {
        this.opts = ((RouterOptions) Requires.requireNonNull(routerOptions, "RouterClient.opts")).m33copy();
        this.rpcClient = this.opts.getRpcClient();
        Endpoint endpoint = (Endpoint) Requires.requireNonNull(this.opts.getClusterAddress(), "Null.clusterAddress");
        this.router = new RouterByMetrics(endpoint);
        this.metrics = new InnerMetrics(endpoint);
        long gcPeriodSeconds = this.opts.getGcPeriodSeconds();
        if (gcPeriodSeconds > 0) {
            this.cleaner = (ScheduledExecutorService) CLEANER_POOL.getObject();
            this.cleaner.scheduleWithFixedDelay(this::gc, Utils.randomInitialDelay(300L), gcPeriodSeconds, TimeUnit.SECONDS);
            LOG.info("Route table cache cleaner has been started.");
        }
        long refreshPeriodSeconds = this.opts.getRefreshPeriodSeconds();
        if (refreshPeriodSeconds <= 0) {
            return true;
        }
        this.refresher = (ScheduledExecutorService) REFRESHER_POOL.getObject();
        this.refresher.scheduleWithFixedDelay(this::refresh, Utils.randomInitialDelay(180L), refreshPeriodSeconds, TimeUnit.SECONDS);
        LOG.info("Route table cache refresher has been started.");
        return true;
    }

    public void shutdownGracefully() {
        if (this.rpcClient != null) {
            this.rpcClient.shutdownGracefully();
        }
        if (this.cleaner != null) {
            CLEANER_POOL.returnObject(this.cleaner);
            this.cleaner = null;
        }
        if (this.refresher != null) {
            REFRESHER_POOL.returnObject(this.refresher);
            this.refresher = null;
        }
        clearRouteCache();
    }

    @Override // java.lang.Iterable
    public Iterator<Route> iterator() {
        return this.routeCache.values().iterator();
    }

    public Route clusterRoute() {
        return Route.of(this.opts.getClusterAddress());
    }

    public CompletableFuture<Map<String, Route>> routeFor(Collection<String> collection) {
        if (collection == null || collection.isEmpty()) {
            return Utils.completedCf(Collections.emptyMap());
        }
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        collection.forEach(str -> {
            Route route = this.routeCache.get(str);
            if (route == null) {
                arrayList.add(str);
            } else {
                hashMap.put(str, route);
            }
        });
        return arrayList.isEmpty() ? Utils.completedCf(hashMap) : routeRefreshFor(arrayList).thenApply(map -> {
            Map map;
            if (map.size() > hashMap.size()) {
                map.putAll(hashMap);
                map = map;
            } else {
                hashMap.putAll(map);
                map = hashMap;
            }
            return map;
        }).thenApply((Function<? super U, ? extends U>) map2 -> {
            long tick = Clock.defaultClock().getTick();
            map2.values().forEach(route -> {
                route.tryWeekSetHit(tick);
            });
            return map2;
        });
    }

    public CompletableFuture<Map<String, Route>> routeRefreshFor(Collection<String> collection) {
        long tick = Clock.defaultClock().getTick();
        return this.router.routeFor(collection).whenComplete((map, th) -> {
            if (th != null) {
                LOG.warn("Route refresh failed: {}.", collection, th);
                return;
            }
            this.routeCache.putAll(map);
            this.metrics.refreshedSize().update(map.size());
            this.metrics.cachedSize().update(this.routeCache.size());
            this.metrics.refreshTimer().update(Clock.defaultClock().duration(tick), TimeUnit.MILLISECONDS);
            LOG.info("Route refreshed: {}, cached_size={}.", collection, Integer.valueOf(this.routeCache.size()));
        });
    }

    private void blockingRouteRefreshFor(Collection<String> collection) {
        try {
            routeRefreshFor(collection).get(BLOCKING_ROUTE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            LOG.error("Fail to blocking refresh route.", e);
        }
    }

    public void clearRouteCacheBy(Collection<String> collection) {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        ConcurrentMap<String, Route> concurrentMap = this.routeCache;
        concurrentMap.getClass();
        collection.forEach((v1) -> {
            r1.remove(v1);
        });
    }

    public int clearRouteCache() {
        int size = this.routeCache.size();
        this.routeCache.clear();
        return size;
    }

    public void refresh() {
        Set<String> keySet = this.routeCache.keySet();
        if (keySet.size() <= ITEM_COUNT_EACH_REFRESH) {
            blockingRouteRefreshFor(keySet);
            return;
        }
        Collection<String> newBuf = Spines.newBuf(ITEM_COUNT_EACH_REFRESH);
        Iterator<String> it = keySet.iterator();
        while (it.hasNext()) {
            newBuf.add(it.next());
            if (newBuf.size() >= ITEM_COUNT_EACH_REFRESH) {
                blockingRouteRefreshFor(newBuf);
                newBuf.clear();
            }
        }
        if (newBuf.isEmpty()) {
            return;
        }
        blockingRouteRefreshFor(newBuf);
    }

    public void gc() {
        this.metrics.gcTimer().time(() -> {
            this.metrics.gcTimes().update(gc0(0));
        });
    }

    private int gc0(int i) {
        if (this.routeCache.size() < this.opts.getMaxCachedSize() * CLEAN_CACHE_THRESHOLD) {
            LOG.info("Now that the number of cached entries is {}.", Integer.valueOf(this.routeCache.size()));
            return i;
        }
        LOG.warn("Now that the number of cached entries [{}] is about to exceed its limit [{}], we need to clean up.", Integer.valueOf(this.routeCache.size()), Integer.valueOf(this.opts.getMaxCachedSize()));
        int size = (int) (this.routeCache.size() * CLEAN_THRESHOLD);
        if (size <= 0) {
            LOG.warn("No more need to be clean.");
            return i;
        }
        List list = (List) TopKSelector.selectTopK(this.routeCache.entrySet(), size, (entry, entry2) -> {
            return -Long.compare(((Route) entry.getValue()).getLastHit(), ((Route) entry2.getValue()).getLastHit());
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList());
        this.metrics.gcItems().update(list.size());
        clearRouteCacheBy(list);
        LOG.warn("Cleaned {} entries from route cache, now entries size {}.", Integer.valueOf(size), Integer.valueOf(this.routeCache.size()));
        if (this.routeCache.size() <= this.opts.getMaxCachedSize() * CLEAN_CACHE_THRESHOLD || i >= 3) {
            return i;
        }
        LOG.warn("Now we need to work continuously, this will be the {}th attempt.", Integer.valueOf(i + 1));
        return gc0(i + 1);
    }

    public <Req, Resp> CompletableFuture<Resp> invoke(Endpoint endpoint, Req req, Context context) {
        return invoke(endpoint, req, context, -1L);
    }

    public <Req, Resp> CompletableFuture<Resp> invoke(Endpoint endpoint, Req req, Context context, long j) {
        final CompletableFuture<Resp> completableFuture = new CompletableFuture<>();
        try {
            this.rpcClient.invokeAsync(endpoint, req, context, new Observer<Resp>() { // from class: io.ceresdb.RouterClient.1
                public void onNext(Resp resp) {
                    completableFuture.complete(resp);
                }

                public void onError(Throwable th) {
                    completableFuture.completeExceptionally(th);
                }
            }, j);
        } catch (RemotingException e) {
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    public <Req, Resp> void invokeServerStreaming(Endpoint endpoint, Req req, Context context, Observer<Resp> observer) {
        try {
            this.rpcClient.invokeServerStreaming(endpoint, req, context, observer);
        } catch (RemotingException e) {
            observer.onError(e);
        }
    }

    public <Req, Resp> Observer<Req> invokeClientStreaming(Endpoint endpoint, Req req, Context context, Observer<Resp> observer) {
        try {
            return this.rpcClient.invokeClientStreaming(endpoint, req, context, observer);
        } catch (RemotingException e) {
            observer.onError(e);
            return new Observer.RejectedObserver(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Collection<Endpoint> reserveAddresses() {
        return (Collection) this.routeCache.values().stream().map((v0) -> {
            return v0.getEndpoint();
        }).collect(Collectors.toSet());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean checkConn(Endpoint endpoint, boolean z) {
        return this.rpcClient.checkConnection(endpoint, z);
    }

    public void display(Display.Printer printer) {
        printer.println("--- RouterClient ---").print("opts=").println(this.opts).print("routeCache.size=").println(Integer.valueOf(this.routeCache.size()));
        if (this.rpcClient != null) {
            printer.println("");
            this.rpcClient.display(printer);
        }
    }

    public String toString() {
        return "RouterClient{opts=" + this.opts + ", rpcClient=" + this.rpcClient + ", router=" + this.router + '}';
    }
}
