/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.d2.balancer.util;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.SuccessCallback;
import com.linkedin.common.util.None;
import com.linkedin.d2.balancer.LoadBalancerWithFacilities;
import com.linkedin.d2.balancer.LoadBalancerWithFacilitiesDelegator;
import com.linkedin.d2.balancer.ServiceUnavailableException;
import com.linkedin.d2.balancer.WarmUpService;
import com.linkedin.d2.balancer.properties.ServiceProperties;
import com.linkedin.d2.balancer.util.ClusterInfoProvider;
import com.linkedin.d2.balancer.util.FileSystemDirectory;
import com.linkedin.d2.balancer.util.LoadBalancerUtil;
import com.linkedin.d2.balancer.util.downstreams.DownstreamServicesFetcher;
import com.linkedin.d2.discovery.event.PropertyEventThread;
import com.linkedin.r2.message.Request;
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.transport.common.bridge.client.TransportClient;
import com.linkedin.r2.transport.http.client.TimeoutCallback;
import com.linkedin.util.clock.SystemClock;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WarmUpLoadBalancer
extends LoadBalancerWithFacilitiesDelegator {
    private static final Logger LOG = LoggerFactory.getLogger(WarmUpLoadBalancer.class);
    public static final int DEFAULT_CONCURRENT_REQUESTS = 1;
    public static final int DEFAULT_SEND_REQUESTS_TIMEOUT_SECONDS = 60;
    private final ConcurrentLinkedDeque<Future<?>> _outstandingRequests;
    private WarmUpService _serviceWarmupper;
    private final String _d2FsDirPath;
    private final String _d2ServicePath;
    private final int _warmUpTimeoutSeconds;
    private final int _concurrentRequests;
    private final ScheduledExecutorService _executorService;
    private final DownstreamServicesFetcher _downstreamServicesFetcher;
    private volatile boolean _shuttingDown = false;
    private final Set<String> _usedServices;

    public WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService serviceWarmupper, ScheduledExecutorService executorService, String d2FsDirPath, String d2ServicePath, DownstreamServicesFetcher downstreamServicesFetcher, int warmUpTimeoutSeconds, int concurrentRequests) {
        super(balancer);
        this._serviceWarmupper = serviceWarmupper;
        this._executorService = executorService;
        this._d2FsDirPath = d2FsDirPath;
        this._d2ServicePath = d2ServicePath;
        this._downstreamServicesFetcher = downstreamServicesFetcher;
        this._warmUpTimeoutSeconds = warmUpTimeoutSeconds;
        this._concurrentRequests = concurrentRequests;
        this._outstandingRequests = new ConcurrentLinkedDeque();
        this._usedServices = new HashSet<String>();
    }

    @Override
    public void start(final Callback<None> callback) {
        LOG.info("D2 WarmUp enabled");
        this._loadBalancer.start(new Callback<None>(){

            public void onError(Throwable e) {
                callback.onError(e);
            }

            public void onSuccess(None result) {
                WarmUpLoadBalancer.this._executorService.execute(() -> WarmUpLoadBalancer.this.warmUpServices((Callback<None>)callback));
            }
        });
    }

    private void warmUpServices(final Callback<None> startUpCallback) {
        TimeoutCallback timeoutCallback = new TimeoutCallback(this._executorService, (long)this._warmUpTimeoutSeconds, TimeUnit.SECONDS, (Callback)new Callback<None>(){

            public void onError(Throwable e) {
                LOG.info("D2 WarmUp hit timeout, continuing startup. The WarmUp will continue in background", e);
                startUpCallback.onSuccess((Object)None.none());
            }

            public void onSuccess(None result) {
                LOG.info("D2 WarmUp completed");
                startUpCallback.onSuccess((Object)None.none());
            }
        }, "This message will never be used, even in case of timeout, no exception should be passed up");
        this._downstreamServicesFetcher.getServiceNames((SuccessCallback<List<String>>)((SuccessCallback)arg_0 -> this.lambda$warmUpServices$1((Callback)timeoutCallback, arg_0)));
    }

    @Override
    public ClusterInfoProvider getClusterInfoProvider() {
        return this._loadBalancer.getClusterInfoProvider();
    }

    @Override
    public void shutdown(PropertyEventThread.PropertyEventShutdownCallback shutdown) {
        if (this._outstandingRequests.size() == 0) {
            FileSystemDirectory fsDirectory = new FileSystemDirectory(this._d2FsDirPath, this._d2ServicePath);
            fsDirectory.removeAllServicesWithExcluded(this._usedServices);
            fsDirectory.removeAllClustersWithExcluded(this.getUsedClusters());
        }
        this._shuttingDown = true;
        this._outstandingRequests.forEach((Consumer<Future<?>>)((Consumer<Future>)future -> future.cancel(true)));
        this._outstandingRequests.clear();
        this._loadBalancer.shutdown(shutdown);
    }

    private Set<String> getUsedClusters() {
        HashSet<String> usedClusters = new HashSet<String>();
        for (String usedService : this._usedServices) {
            try {
                ServiceProperties loadBalancedServiceProperties = this.getLoadBalancedServiceProperties(usedService);
                usedClusters.add(loadBalancedServiceProperties.getClusterName());
            }
            catch (ServiceUnavailableException e) {
                LOG.error("This exception shouldn't happen at this point because all the data should be valid", (Throwable)((Object)e));
            }
        }
        return usedClusters;
    }

    @Override
    public TransportClient getClient(Request request, RequestContext requestContext) throws ServiceUnavailableException {
        TransportClient client = this._loadBalancer.getClient(request, requestContext);
        String serviceName = LoadBalancerUtil.getServiceNameFromUri(request.getURI());
        this._usedServices.add(serviceName);
        return client;
    }

    private /* synthetic */ void lambda$warmUpServices$1(Callback timeoutCallback, List serviceNames) {
        try {
            this._usedServices.addAll(serviceNames);
            LOG.info("Trying to warmup {} services: [{}]", (Object)serviceNames.size(), (Object)String.join((CharSequence)", ", serviceNames));
            if (serviceNames.size() == 0) {
                timeoutCallback.onSuccess((Object)None.none());
                return;
            }
            WarmUpTask warmUpTask = new WarmUpTask(serviceNames, (Callback<None>)timeoutCallback);
            int concurrentRequests = Math.min(serviceNames.size(), this._concurrentRequests);
            IntStream.range(0, concurrentRequests).forEach(i -> this._outstandingRequests.add(this._executorService.submit(warmUpTask::execute)));
        }
        catch (Exception e) {
            LOG.error("D2 WarmUp Failed, continuing start up.", (Throwable)e);
            timeoutCallback.onSuccess((Object)None.none());
        }
    }

    private class WarmUpTask {
        private final AtomicInteger _requestCompletedCount;
        private final AtomicInteger _requestStartedCount;
        private Queue<String> _serviceNamesQueue;
        private Callback<None> _callback;
        private List<String> _serviceNames;

        WarmUpTask(List<String> serviceNames, Callback<None> callback) {
            this._serviceNames = serviceNames;
            this._requestStartedCount = new AtomicInteger(0);
            this._requestCompletedCount = new AtomicInteger(0);
            this._serviceNamesQueue = new ConcurrentLinkedDeque<String>(serviceNames);
            this._callback = callback;
        }

        void execute() {
            final long startTime = SystemClock.instance().currentTimeMillis();
            final String serviceName = this._serviceNamesQueue.poll();
            if (serviceName == null || WarmUpLoadBalancer.this._shuttingDown) {
                return;
            }
            LOG.info("{}/{} Starting to warm up service {}", new Object[]{this._requestStartedCount.incrementAndGet(), this._serviceNames.size(), serviceName});
            WarmUpLoadBalancer.this._serviceWarmupper.warmUpService(serviceName, new Callback<None>(){

                private void executeNextTask() {
                    if (WarmUpTask.this._requestCompletedCount.incrementAndGet() == WarmUpTask.this._serviceNames.size()) {
                        WarmUpTask.this._callback.onSuccess((Object)None.none());
                        WarmUpLoadBalancer.this._outstandingRequests.clear();
                        return;
                    }
                    WarmUpLoadBalancer.this._outstandingRequests.add(WarmUpLoadBalancer.this._executorService.submit(() -> WarmUpTask.this.execute()));
                }

                public void onError(Throwable e) {
                    LOG.info(String.format("%s/%s Service %s failed to warm up, continuing with warm up", WarmUpTask.this._requestCompletedCount.get() + 1, WarmUpTask.this._serviceNames.size(), serviceName), e);
                    this.executeNextTask();
                }

                public void onSuccess(None result) {
                    LOG.info("{}/{} Service {} warmed up in {}ms", new Object[]{WarmUpTask.this._requestCompletedCount.get() + 1, WarmUpTask.this._serviceNames.size(), serviceName, SystemClock.instance().currentTimeMillis() - startTime});
                    this.executeNextTask();
                }
            });
        }
    }
}

