/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.d2.balancer.clients;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.FutureCallback;
import com.linkedin.common.callback.SuccessCallback;
import com.linkedin.common.util.MapUtil;
import com.linkedin.d2.balancer.D2Client;
import com.linkedin.d2.balancer.D2ClientDelegator;
import com.linkedin.d2.balancer.KeyMapper;
import com.linkedin.d2.balancer.LoadBalancer;
import com.linkedin.d2.balancer.properties.ServiceProperties;
import com.linkedin.d2.balancer.strategies.LoadBalancerStrategy;
import com.linkedin.d2.balancer.util.LoadBalancerUtil;
import com.linkedin.data.ByteString;
import com.linkedin.r2.RetriableRequestException;
import com.linkedin.r2.message.Request;
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.message.Response;
import com.linkedin.r2.message.rest.RestRequest;
import com.linkedin.r2.message.rest.RestRequestBuilder;
import com.linkedin.r2.message.rest.RestResponse;
import com.linkedin.r2.message.stream.StreamRequest;
import com.linkedin.r2.message.stream.StreamRequestBuilder;
import com.linkedin.r2.message.stream.StreamResponse;
import com.linkedin.r2.message.stream.entitystream.ByteStringWriter;
import com.linkedin.r2.message.stream.entitystream.EntityStreams;
import com.linkedin.r2.message.stream.entitystream.FullEntityObserver;
import com.linkedin.r2.message.stream.entitystream.Observer;
import com.linkedin.r2.message.stream.entitystream.Writer;
import com.linkedin.util.clock.Clock;
import com.linkedin.util.clock.SystemClock;
import java.net.URI;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RetryClient
extends D2ClientDelegator {
    public static final long DEFAULT_UPDATE_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1L);
    public static final int DEFAULT_AGGREGATED_INTERVAL_NUM = 5;
    public static final boolean DEFAULT_REST_RETRY_ENABLED = false;
    public static final boolean DEFAULT_STREAM_RETRY_ENABLED = false;
    private static final Logger LOG = LoggerFactory.getLogger(RetryClient.class);
    private final Clock _clock;
    private final LoadBalancer _balancer;
    private final int _limit;
    private final long _updateIntervalMs;
    private final int _aggregatedIntervalNum;
    private final boolean _restRetryEnabled;
    private final boolean _streamRetryEnabled;
    ConcurrentMap<String, ClientRetryTracker> _retryTrackerMap;

    @Deprecated
    public RetryClient(D2Client d2Client, LoadBalancer balancer, int limit) {
        this(d2Client, balancer, limit, DEFAULT_UPDATE_INTERVAL_MS, 5, (Clock)SystemClock.instance(), false, false);
    }

    @Deprecated
    public RetryClient(D2Client d2Client, LoadBalancer balancer, int limit, long updateIntervalMs, int aggregatedIntervalNum, Clock clock) {
        this(d2Client, balancer, limit, updateIntervalMs, aggregatedIntervalNum, clock, false, false);
    }

    public RetryClient(D2Client d2Client, LoadBalancer balancer, int limit, long updateIntervalMs, int aggregatedIntervalNum, Clock clock, boolean restRetryEnabled, boolean streamRetryEnabled) {
        super(d2Client);
        this._balancer = balancer;
        this._limit = limit;
        this._updateIntervalMs = updateIntervalMs;
        this._aggregatedIntervalNum = aggregatedIntervalNum;
        this._clock = clock;
        this._retryTrackerMap = new ConcurrentHashMap<String, ClientRetryTracker>();
        this._restRetryEnabled = restRetryEnabled;
        this._streamRetryEnabled = streamRetryEnabled;
        LOG.debug("Retry client created with limit={}", (Object)this._limit);
    }

    @Override
    public Future<RestResponse> restRequest(RestRequest request) {
        return this.restRequest(request, new RequestContext());
    }

    @Override
    public Future<RestResponse> restRequest(RestRequest request, RequestContext requestContext) {
        FutureCallback future = new FutureCallback();
        this.restRequest(request, requestContext, (Callback<RestResponse>)future);
        return future;
    }

    @Override
    public void restRequest(RestRequest request, Callback<RestResponse> callback) {
        this.restRequest(request, new RequestContext(), callback);
    }

    @Override
    public void restRequest(RestRequest request, RequestContext requestContext, Callback<RestResponse> callback) {
        if (this._restRetryEnabled) {
            RestRequest newRequest = ((RestRequestBuilder)request.builder().setHeader("X-Number-Of-Retry-Attempts", "0")).build();
            ClientRetryTracker retryTracker = this.updateRetryTracker(newRequest.getURI(), false);
            RestRetryRequestCallback transportCallback = new RestRetryRequestCallback(newRequest, requestContext, callback, retryTracker);
            this._d2Client.restRequest(newRequest, requestContext, transportCallback);
        } else {
            this._d2Client.restRequest(request, requestContext, callback);
        }
    }

    @Override
    public void streamRequest(StreamRequest request, Callback<StreamResponse> callback) {
        this.streamRequest(request, new RequestContext(), callback);
    }

    @Override
    public void streamRequest(StreamRequest request, RequestContext requestContext, Callback<StreamResponse> callback) {
        if (this._streamRetryEnabled) {
            StreamRequest newRequest = ((StreamRequestBuilder)request.builder().setHeader("X-Number-Of-Retry-Attempts", "0")).build(request.getEntityStream());
            ClientRetryTracker retryTracker = this.updateRetryTracker(newRequest.getURI(), false);
            StreamRetryRequestCallback transportCallback = new StreamRetryRequestCallback(newRequest, requestContext, callback, retryTracker);
            this._d2Client.streamRequest(newRequest, requestContext, transportCallback);
        } else {
            this._d2Client.streamRequest(request, requestContext, callback);
        }
    }

    private ClientRetryTracker updateRetryTracker(URI uri, boolean isRetry) {
        String serviceName = LoadBalancerUtil.getServiceNameFromUri(uri);
        ClientRetryTracker retryTracker = this._retryTrackerMap.computeIfAbsent(serviceName, k -> new ClientRetryTracker(this._aggregatedIntervalNum, this._updateIntervalMs, this._clock, (String)k));
        retryTracker.add(isRetry);
        return retryTracker;
    }

    private static class RetryCounter {
        private int _retryRequestCount = 0;
        private int _totalRequestCount = 0;

        public int getRetryRequestCount() {
            return this._retryRequestCount;
        }

        public int getTotalRequestCount() {
            return this._totalRequestCount;
        }

        public void addToRetryRequestCount(int count) {
            this._retryRequestCount += count;
        }

        public void addToTotalRequestCount(int count) {
            this._totalRequestCount += count;
        }

        public void subtractFromRetryRequestCount(int count) {
            this._retryRequestCount -= count;
        }

        public void subtractFromTotalRequestCount(int count) {
            this._totalRequestCount -= count;
        }
    }

    @ThreadSafe
    private class ClientRetryTracker {
        private final int _aggregatedIntervalNum;
        private final long _updateIntervalMs;
        private final Clock _clock;
        private final String _serviceName;
        private final Object _counterLock = new Object();
        private final Object _updateLock = new Object();
        private volatile @GuardedBy(value={"_updateLock"}) long _lastRollOverTime;
        private @GuardedBy(value={"_updateLock"}) double _currentAggregatedRetryRatio;
        private final @GuardedBy(value={"_counterLock"}) LinkedList<RetryCounter> _retryCounter;
        private final @GuardedBy(value={"_counterLock"}) RetryCounter _aggregatedRetryCounter;

        private ClientRetryTracker(int aggregatedIntervalNum, long updateIntervalMs, Clock clock, String serviceName) {
            this._aggregatedIntervalNum = aggregatedIntervalNum;
            this._updateIntervalMs = updateIntervalMs;
            this._clock = clock;
            this._serviceName = serviceName;
            this._lastRollOverTime = clock.currentTimeMillis();
            this._currentAggregatedRetryRatio = 0.0;
            this._aggregatedRetryCounter = new RetryCounter();
            this._retryCounter = new LinkedList();
            this._retryCounter.add(new RetryCounter());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void add(boolean isRetry) {
            Object object = this._counterLock;
            synchronized (object) {
                if (isRetry) {
                    this._retryCounter.getLast().addToRetryRequestCount(1);
                }
                this._retryCounter.getLast().addToTotalRequestCount(1);
            }
            this.updateRetryDecision();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void rollOverStats() {
            Object object = this._counterLock;
            synchronized (object) {
                RetryCounter intervalToAggregate = this._retryCounter.getLast();
                this._aggregatedRetryCounter.addToTotalRequestCount(intervalToAggregate.getTotalRequestCount());
                this._aggregatedRetryCounter.addToRetryRequestCount(intervalToAggregate.getRetryRequestCount());
                if (this._retryCounter.size() > this._aggregatedIntervalNum) {
                    RetryCounter intervalToDiscard = this._retryCounter.removeFirst();
                    this._aggregatedRetryCounter.subtractFromTotalRequestCount(intervalToDiscard.getTotalRequestCount());
                    this._aggregatedRetryCounter.subtractFromRetryRequestCount(intervalToDiscard.getRetryRequestCount());
                }
                this._retryCounter.addLast(new RetryCounter());
            }
        }

        public void isBelowRetryRatio(final SuccessCallback<Boolean> callback) {
            RetryClient.this._balancer.getLoadBalancedServiceProperties(this._serviceName, new Callback<ServiceProperties>(){

                public void onError(Throwable e) {
                    LOG.warn("Failed to fetch transportClientProperties ", e);
                    callback.onSuccess((Object)(ClientRetryTracker.this._currentAggregatedRetryRatio <= 0.2 ? 1 : 0));
                }

                public void onSuccess(ServiceProperties result) {
                    Map<String, Object> transportClientProperties = result.getTransportClientProperties();
                    double maxClientRequestRetryRatio = transportClientProperties == null ? 0.2 : (Double)MapUtil.getWithDefault(transportClientProperties, (Object)"http.maxClientRequestRetryRatio", (Object)0.2, Double.class);
                    callback.onSuccess((Object)(ClientRetryTracker.this._currentAggregatedRetryRatio <= maxClientRequestRetryRatio ? 1 : 0));
                }
            });
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void updateRetryDecision() {
            long currentTime = this._clock.currentTimeMillis();
            Object object = this._updateLock;
            synchronized (object) {
                if (currentTime >= this._lastRollOverTime + this._updateIntervalMs) {
                    for (long time = currentTime; time >= this._lastRollOverTime + this._updateIntervalMs; time -= this._updateIntervalMs) {
                        this.rollOverStats();
                    }
                    this._currentAggregatedRetryRatio = this.getRetryRatio();
                    this._lastRollOverTime = currentTime;
                }
            }
        }

        private double getRetryRatio() {
            int aggregatedTotalCount = this._aggregatedRetryCounter.getTotalRequestCount();
            int aggregatedRetryCount = this._aggregatedRetryCounter.getRetryRequestCount();
            return aggregatedTotalCount == 0 ? 0.0 : (double)aggregatedRetryCount / (double)aggregatedTotalCount;
        }
    }

    private static abstract class RetryRequestCallback<REQ extends Request, RESP extends Response>
    implements Callback<RESP> {
        private final REQ _request;
        private final RequestContext _context;
        private final Callback<RESP> _callback;
        private final ClientRetryTracker _retryTracker;
        final /* synthetic */ RetryClient this$0;

        public RetryRequestCallback(REQ request, RequestContext context, Callback<RESP> callback, ClientRetryTracker retryTracker) {
            this.this$0 = var1_1;
            this._request = request;
            this._context = context;
            this._callback = callback;
            this._retryTracker = retryTracker;
        }

        public void onSuccess(RESP result) {
            LoadBalancerStrategy.ExcludedHostHints.clearRequestContextExcludedHosts(this._context);
            this._callback.onSuccess(result);
        }

        public void onError(Throwable e) {
            URI targetHostUri;
            boolean retry = false;
            if (this.isRetryException(e) && (targetHostUri = KeyMapper.TargetHostHints.getRequestContextTargetHost(this._context)) == null) {
                Set<URI> exclusionSet = LoadBalancerStrategy.ExcludedHostHints.getRequestContextExcludedHosts(this._context);
                if (exclusionSet == null || exclusionSet.isEmpty()) {
                    LOG.warn("Excluded hosts hint for retry is not set or is empty. This failed request will not be retried.");
                } else {
                    int attempts = exclusionSet.size();
                    if (attempts <= this.this$0._limit) {
                        retry = true;
                        this._retryTracker.isBelowRetryRatio((SuccessCallback<Boolean>)((SuccessCallback)isBelowRetryRatio -> {
                            boolean doRetry;
                            if (isBelowRetryRatio.booleanValue()) {
                                LOG.warn("A retriable exception occurred. Going to retry. This is attempt {}. Current exclusion set: {}", (Object)attempts, (Object)exclusionSet);
                                doRetry = this.doRetryRequest(this._request, this._context, attempts);
                            } else {
                                LOG.warn("Client retry ratio exceeded. This request will fail.");
                                this.disableRetryException(e);
                                doRetry = false;
                            }
                            if (!doRetry) {
                                LoadBalancerStrategy.ExcludedHostHints.clearRequestContextExcludedHosts(this._context);
                                this._callback.onError(e);
                            }
                        }));
                    } else {
                        LOG.warn("Retry limit exceeded. This request will fail.");
                        this.disableRetryException(e);
                    }
                }
            }
            if (!retry) {
                LoadBalancerStrategy.ExcludedHostHints.clearRequestContextExcludedHosts(this._context);
                this._callback.onError(e);
            }
        }

        private boolean isRetryException(Throwable e) {
            Throwable[] throwables;
            for (Throwable throwable : throwables = ExceptionUtils.getThrowables((Throwable)e)) {
                if (!(throwable instanceof RetriableRequestException)) continue;
                return !((RetriableRequestException)throwable).getDoNotRetryOverride();
            }
            return false;
        }

        private void disableRetryException(Throwable e) {
            Throwable[] throwables;
            for (Throwable throwable : throwables = ExceptionUtils.getThrowables((Throwable)e)) {
                if (!(throwable instanceof RetriableRequestException)) continue;
                ((RetriableRequestException)throwable).setDoNotRetryOverride(true);
                return;
            }
        }

        public abstract boolean doRetryRequest(REQ var1, RequestContext var2, int var3);
    }

    private class RestRetryRequestCallback
    extends RetryRequestCallback<RestRequest, RestResponse> {
        public RestRetryRequestCallback(RestRequest request, RequestContext context, Callback<RestResponse> callback, ClientRetryTracker retryTracker) {
            super(RetryClient.this, (Request)request, context, callback, retryTracker);
        }

        @Override
        public boolean doRetryRequest(RestRequest request, RequestContext context, int numberOfRetryAttempts) {
            RestRequest newRequest = ((RestRequestBuilder)request.builder().setHeader("X-Number-Of-Retry-Attempts", Integer.toString(numberOfRetryAttempts))).build();
            RetryClient.this.updateRetryTracker(request.getURI(), true);
            RetryClient.this._d2Client.restRequest(newRequest, context, this);
            return true;
        }
    }

    private class StreamRetryRequestCallback
    extends RetryRequestCallback<StreamRequest, StreamResponse> {
        private volatile boolean _recorded;
        private ByteString _content;

        public StreamRetryRequestCallback(StreamRequest request, RequestContext context, Callback<StreamResponse> callback, ClientRetryTracker retryTracker) {
            super(RetryClient.this, (Request)request, context, callback, retryTracker);
            this._recorded = false;
            this._content = null;
            FullEntityObserver observer = new FullEntityObserver((Callback)new Callback<ByteString>(){

                public void onError(Throwable e) {
                    if (StreamRetryRequestCallback.this._recorded) {
                        return;
                    }
                    LOG.warn("Failed to record request's entity for retrying.");
                    StreamRetryRequestCallback.this._content = null;
                    StreamRetryRequestCallback.this._recorded = true;
                }

                public void onSuccess(ByteString result) {
                    if (StreamRetryRequestCallback.this._recorded) {
                        return;
                    }
                    StreamRetryRequestCallback.this._content = result;
                    StreamRetryRequestCallback.this._recorded = true;
                }
            });
            request.getEntityStream().addObserver((Observer)observer);
        }

        @Override
        public boolean doRetryRequest(StreamRequest request, RequestContext context, int numberOfRetryAttempts) {
            if (this._recorded && this._content != null) {
                StreamRequest newRequest = ((StreamRequestBuilder)request.builder().setHeader("X-Number-Of-Retry-Attempts", Integer.toString(numberOfRetryAttempts))).build(EntityStreams.newEntityStream((Writer)new ByteStringWriter(this._content)));
                RetryClient.this.updateRetryTracker(request.getURI(), true);
                RetryClient.this._d2Client.streamRequest(newRequest, new RequestContext(context), this);
                return true;
            }
            LOG.warn("Request's entity has not been recorded before retrying.");
            return false;
        }
    }
}

