/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.athena.connector.lambda;

import com.amazonaws.athena.connector.lambda.data.BlockSpiller;
import com.amazonaws.athena.connector.lambda.exceptions.FederationThrottleException;
import com.google.common.base.MoreObjects;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.arrow.util.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThrottlingInvoker {
    private static final Logger logger = LoggerFactory.getLogger(ThrottlingInvoker.class);
    private static final String THROTTLE_INITIAL_DELAY_MS = "throttle_initial_delay_ms";
    private static final String THROTTLE_MAX_DELAY_MS = "throttle_max_delay_ms";
    private static final String THROTTLE_DECREASE_FACTOR = "throttle_decrease_factor";
    private static final String THROTTLE_INCREASE_MS = "throttle_increase_ms";
    private static final long DEFAULT_INITIAL_DELAY_MS = 10L;
    private static final long DEFAULT_MAX_DELAY_MS = 1000L;
    private static final double DEFAULT_DECREASE_FACTOR = 0.5;
    private static final long DEFAULT_INCREASE_MS = 10L;
    private final long initialDelayMs;
    private final long maxDelayMs;
    private final double decrease;
    private final long increase;
    private final ExceptionFilter filter;
    private final AtomicReference<BlockSpiller> spillerRef;
    private final AtomicLong delay = new AtomicLong(0L);
    private volatile State state = State.FAST_START;

    public ThrottlingInvoker(Builder builder) {
        this(builder.initialDelayMs, builder.maxDelayMs, builder.decrease, builder.increase, builder.filter, builder.spiller);
    }

    @VisibleForTesting
    ThrottlingInvoker(long initialDelayMs, long maxDelayMs, double decrease, long increase, ExceptionFilter filter, BlockSpiller spiller) {
        if (decrease > 1.0 || decrease < 0.001) {
            throw new IllegalArgumentException("decrease was " + decrease + " but should be between .001 and 1");
        }
        if (maxDelayMs < 1L) {
            throw new IllegalArgumentException("maxDelayMs was " + maxDelayMs + " but must be >= 1");
        }
        if (increase < 1L) {
            throw new IllegalArgumentException("increase was " + increase + " but must be >= 1");
        }
        this.initialDelayMs = initialDelayMs;
        this.maxDelayMs = maxDelayMs;
        this.decrease = decrease;
        this.increase = increase;
        this.filter = filter;
        this.spillerRef = new AtomicReference<BlockSpiller>(spiller);
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    public static Builder newDefaultBuilder(ExceptionFilter filter, Map<String, String> configOptions) {
        long initialDelayMs = configOptions.get(THROTTLE_INITIAL_DELAY_MS) != null ? Long.parseLong(configOptions.get(THROTTLE_INITIAL_DELAY_MS)) : 10L;
        long maxDelayMs = configOptions.get(THROTTLE_MAX_DELAY_MS) != null ? Long.parseLong(configOptions.get(THROTTLE_MAX_DELAY_MS)) : 1000L;
        double decreaseFactor = configOptions.get(THROTTLE_DECREASE_FACTOR) != null ? (double)Long.parseLong(configOptions.get(THROTTLE_DECREASE_FACTOR)) : 0.5;
        long increase = configOptions.get(THROTTLE_INCREASE_MS) != null ? Long.parseLong(configOptions.get(THROTTLE_INCREASE_MS)) : 10L;
        return ThrottlingInvoker.newBuilder().withInitialDelayMs(initialDelayMs).withMaxDelayMs(maxDelayMs).withDecrease(decreaseFactor).withIncrease(increase).withFilter(filter);
    }

    public <T> T invoke(Callable<T> callable) throws TimeoutException {
        return this.invoke(callable, 0L);
    }

    public <T> T invoke(Callable<T> callable, long timeoutMillis) throws TimeoutException {
        long startTime = System.currentTimeMillis();
        while (true) {
            try {
                this.applySleep();
                T result = callable.call();
                this.handleAvoidance();
                return result;
            }
            catch (Exception ex) {
                if (!this.filter.isMatch(ex)) {
                    throw ex instanceof RuntimeException ? (RuntimeException)ex : new RuntimeException(ex);
                }
                this.handleThrottle(ex);
                if (!this.isTimedOut(startTime, timeoutMillis)) continue;
                throw new TimeoutException("Timed out before call succeeded after " + (System.currentTimeMillis() - startTime) + " ms");
            }
            break;
        }
    }

    public void setBlockSpiller(BlockSpiller spiller) {
        this.spillerRef.set(spiller);
    }

    public State getState() {
        return this.state;
    }

    @VisibleForTesting
    long getDelay() {
        return this.delay.get();
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("initialDelayMs", this.initialDelayMs).add("maxDelayMs", this.maxDelayMs).add("decrease", this.decrease).add("increase", this.increase).add("delay", (Object)this.delay).add("state", (Object)this.state).toString();
    }

    private synchronized void handleThrottle(Exception ex) {
        long newDelay = (long)Math.ceil((double)this.delay.get() / this.decrease);
        if (newDelay == 0L) {
            newDelay = this.initialDelayMs;
        } else if (newDelay > this.maxDelayMs) {
            newDelay = this.maxDelayMs;
        }
        logger.info("handleThrottle: Encountered a Throttling event[{}] adjusting delay to {} ms @ {} TPS", new Object[]{ex, newDelay, 1000.0 / (double)newDelay});
        this.state = State.CONGESTED;
        this.delay.set(newDelay);
        if (this.spillerRef.get() != null && !this.spillerRef.get().spilled()) {
            throw new FederationThrottleException("ThrottlingInvoker requesting slow down due to " + ex, ex);
        }
    }

    private synchronized void handleAvoidance() {
        long newDelay = this.delay.get() - this.increase;
        if (newDelay <= 0L) {
            newDelay = 0L;
        }
        if (this.delay.get() > 0L) {
            this.state = State.AVOIDANCE;
            logger.info("handleAvoidance: Congestion AVOIDANCE active, decreasing delay to {} ms @ {} TPS", (Object)newDelay, newDelay > 0L ? Long.valueOf(1000L / newDelay) : "unlimited");
            this.delay.set(newDelay);
        }
    }

    private void applySleep() {
        if (this.delay.get() > 0L) {
            try {
                Thread.sleep(this.delay.get());
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(ex);
            }
        }
    }

    private boolean isTimedOut(long startTime, long timeoutMillis) {
        return timeoutMillis > 0L ? System.currentTimeMillis() - startTime > timeoutMillis : false;
    }

    public static class Builder {
        private long initialDelayMs;
        private long maxDelayMs;
        private double decrease;
        private long increase;
        private ExceptionFilter filter;
        private BlockSpiller spiller;

        public Builder withInitialDelayMs(long initialDelayMs) {
            this.initialDelayMs = initialDelayMs;
            return this;
        }

        public Builder withMaxDelayMs(long maxDelayMs) {
            this.maxDelayMs = maxDelayMs;
            return this;
        }

        public Builder withDecrease(double decrease) {
            this.decrease = decrease;
            return this;
        }

        public Builder withIncrease(long increase) {
            this.increase = increase;
            return this;
        }

        public Builder withFilter(ExceptionFilter filter) {
            this.filter = filter;
            return this;
        }

        public Builder withSpiller(BlockSpiller spiller) {
            this.spiller = spiller;
            return this;
        }

        public ThrottlingInvoker build() {
            return new ThrottlingInvoker(this);
        }
    }

    public static interface ExceptionFilter {
        public boolean isMatch(Exception var1);
    }

    public static enum State {
        FAST_START,
        CONGESTED,
        AVOIDANCE;

    }
}

