/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.DynamicFlowControlSettings;
import com.google.api.gax.batching.FlowControlEventStats;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.cloud.bigtable.data.v2.stub.DynamicFlowControlCallable;
import com.google.cloud.bigtable.data.v2.stub.DynamicFlowControlStats;
import cz.o2.proxima.beam.io.pubsub.io.grpc.Status;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Lists;
import cz.o2.proxima.internal.shaded.com.google.common.truth.Truth;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class DynamicFlowControlCallableTest {
    @Rule
    public Timeout timeout = new Timeout(1L, TimeUnit.MINUTES);
    private static final int TARGET_LATENCY_MS = 100;
    private static final long ADJUSTING_INTERVAL_MS = TimeUnit.SECONDS.toMillis(20L);
    private static final String LATENCY_HEADER = "latency";
    private static final long INITIAL_ELEMENT = 20L;
    private static final long MAX_ELEMENT = 30L;
    private static final long MIN_ELEMENT = 5L;
    private static final int DEADLINE_EXCEEDED_LATENCY = 501;
    private FlowController flowController;
    private FlowControlEventStats flowControlEvents;
    private DynamicFlowControlStats stats;
    private UnaryCallable innerCallable;
    private ApiCallContext context;
    private MutateRowsRequest request;
    private DynamicFlowControlCallable callableToTest;

    @Before
    public void setup() {
        this.flowController = new FlowController(DynamicFlowControlSettings.newBuilder().setInitialOutstandingElementCount(Long.valueOf(20L)).setMaxOutstandingElementCount(Long.valueOf(30L)).setMinOutstandingElementCount(Long.valueOf(5L)).setInitialOutstandingRequestBytes(Long.valueOf(15L)).setMaxOutstandingRequestBytes(Long.valueOf(15L)).setMinOutstandingRequestBytes(Long.valueOf(15L)).setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block).build());
        this.flowControlEvents = this.flowController.getFlowControlEventStats();
        this.stats = new DynamicFlowControlStats();
        this.context = GrpcCallContext.createDefault();
        this.innerCallable = new MockInnerCallable();
        this.request = MutateRowsRequest.newBuilder().addEntries(MutateRowsRequest.Entry.getDefaultInstance()).build();
        this.callableToTest = new DynamicFlowControlCallable(this.innerCallable, this.flowController, this.stats, 100L, ADJUSTING_INTERVAL_MS);
    }

    @Test
    public void testLatenciesAreRecorded() throws Exception {
        HashMap<String, List<String>> extraHeaders = new HashMap<String, List<String>>();
        extraHeaders.put(LATENCY_HEADER, Arrays.asList("5"));
        ApiCallContext newContext = this.context.withExtraHeaders(extraHeaders);
        ApiFuture future = this.callableToTest.futureCall((Object)this.request, newContext);
        future.get();
        Truth.assertThat((Double)this.stats.getMeanLatency()).isNonZero();
        Truth.assertThat((Long)this.stats.getLastAdjustedTimestampMs()).isEqualTo((Object)0);
    }

    @Test
    public void testTriggeringAdjustingThreshold() throws Exception {
        HashMap<String, List<String>> extraHeaders = new HashMap<String, List<String>>();
        extraHeaders.put(LATENCY_HEADER, Arrays.asList(String.valueOf(400)));
        long currentTimeMs = System.currentTimeMillis();
        ApiCallContext newContext = this.context.withExtraHeaders(extraHeaders);
        ApiFuture future = this.callableToTest.futureCall((Object)this.request, newContext);
        future.get();
        Truth.assertThat((Double)this.stats.getMeanLatency()).isAtLeast((Comparable)Double.valueOf(300.0));
        Truth.assertThat((Long)this.stats.getLastAdjustedTimestampMs()).isGreaterThan((Comparable)Long.valueOf(currentTimeMs));
        long expectedStep = Math.round(9.0);
        Truth.assertThat((Long)this.flowController.getCurrentElementCountLimit()).isEqualTo((Object)(20L - expectedStep));
    }

    @Test
    public void testNoConsecutiveUpdatesToThreshold() throws Exception {
        HashMap<String, List<String>> extraHeaders = new HashMap<String, List<String>>();
        extraHeaders.put(LATENCY_HEADER, Arrays.asList(String.valueOf(400)));
        long firstRequest = System.currentTimeMillis();
        ApiCallContext newContext = this.context.withExtraHeaders(extraHeaders);
        ApiFuture future = this.callableToTest.futureCall((Object)this.request, newContext);
        future.get();
        long secondRequest = System.currentTimeMillis();
        future = this.callableToTest.futureCall((Object)this.request, newContext);
        future.get();
        Truth.assertThat((Double)this.stats.getMeanLatency()).isAtLeast((Comparable)Double.valueOf(300.0));
        Truth.assertThat((Long)this.stats.getLastAdjustedTimestampMs()).isGreaterThan((Comparable)Long.valueOf(firstRequest));
        Truth.assertThat((Long)this.stats.getLastAdjustedTimestampMs()).isAtMost((Comparable)Long.valueOf(secondRequest));
        long expectedStep = Math.round(9.0);
        Truth.assertThat((Long)this.flowController.getCurrentElementCountLimit()).isEqualTo((Object)(20L - expectedStep));
    }

    @Test
    public void testDecreasingThresholdsCantGoOverLimit() throws Exception {
        this.callableToTest = new DynamicFlowControlCallable(this.innerCallable, this.flowController, this.stats, 100L, 0L);
        HashMap<String, List<String>> extraHeaders = new HashMap<String, List<String>>();
        extraHeaders.put(LATENCY_HEADER, Arrays.asList(String.valueOf(400)));
        ApiCallContext newContext = this.context.withExtraHeaders(extraHeaders);
        ArrayList<ApiFuture> futures = new ArrayList<ApiFuture>();
        for (int i = 0; i < 3; ++i) {
            ApiFuture apiFuture = this.callableToTest.futureCall((Object)this.request, newContext);
            futures.add(apiFuture);
        }
        for (Future future : futures) {
            future.get();
        }
        long expectedStep = Math.round(9.0) * 3L;
        Truth.assertThat((Long)(20L - expectedStep)).isLessThan((Comparable)Long.valueOf(5L));
        Truth.assertThat((Long)this.flowController.getCurrentElementCountLimit()).isEqualTo((Object)5L);
    }

    @Test
    public void testIncreasingThreshold() throws Exception {
        this.callableToTest = new DynamicFlowControlCallable(this.innerCallable, this.flowController, this.stats, 1000L, ADJUSTING_INTERVAL_MS);
        this.createFlowControlEvent(this.flowController);
        ApiFuture future = this.callableToTest.futureCall((Object)this.request, this.context);
        future.get();
        long expectedIncrease = Math.round(1.5);
        Truth.assertThat((Long)expectedIncrease).isNotEqualTo((Object)0);
        Truth.assertThat((Long)(20L + expectedIncrease)).isLessThan((Comparable)Long.valueOf(30L));
        Truth.assertThat((Long)this.flowController.getCurrentElementCountLimit()).isEqualTo((Object)(20L + expectedIncrease));
    }

    @Test
    public void testIncreasingThresholdCantGoOverLimit() throws Exception {
        this.callableToTest = new DynamicFlowControlCallable(this.innerCallable, this.flowController, this.stats, 1000L, 0L);
        this.createFlowControlEvent(this.flowController);
        ArrayList<ApiFuture> futures = new ArrayList<ApiFuture>();
        for (int i = 0; i < 20; ++i) {
            ApiFuture apiFuture = this.callableToTest.futureCall((Object)this.request, this.context);
            futures.add(apiFuture);
        }
        for (Future future : futures) {
            future.get();
        }
        long expectedIncrease = Math.round(1.5) * 20L;
        Truth.assertThat((Long)(20L + expectedIncrease)).isGreaterThan((Comparable)Long.valueOf(30L));
        Truth.assertThat((Long)this.flowController.getCurrentElementCountLimit()).isEqualTo((Object)30L);
    }

    @Test
    public void testConcurrentUpdates() throws Exception {
        this.callableToTest = new DynamicFlowControlCallable(this.innerCallable, this.flowController, this.stats, 1000L, ADJUSTING_INTERVAL_MS);
        this.createFlowControlEvent(this.flowController);
        ArrayList<ApiFuture> futures = new ArrayList<ApiFuture>();
        for (int i = 0; i < 20; ++i) {
            ApiFuture apiFuture = this.callableToTest.futureCall((Object)this.request, this.context);
            futures.add(apiFuture);
        }
        for (Future future : futures) {
            future.get();
        }
        long expectedIncrease = Math.round(1.5);
        Truth.assertThat((Long)expectedIncrease).isNotEqualTo((Object)0);
        Truth.assertThat((Long)(20L + expectedIncrease)).isLessThan((Comparable)Long.valueOf(30L));
        Truth.assertThat((Long)this.flowController.getCurrentElementCountLimit()).isEqualTo((Object)(20L + expectedIncrease));
    }

    @Test
    public void testDeadlineExceeded() throws Exception {
        HashMap<String, List<String>> extraHeaders = new HashMap<String, List<String>>();
        extraHeaders.put(LATENCY_HEADER, Arrays.asList(String.valueOf(501)));
        this.callableToTest.futureCall((Object)this.request, this.context.withExtraHeaders(extraHeaders));
        Truth.assertThat((Long)this.flowController.getCurrentElementCountLimit()).isEqualTo((Object)(20L - Math.round(9.0)));
    }

    private void createFlowControlEvent(final FlowController flowController) throws Exception {
        flowController.reserve(20L, 0L);
        final AtomicBoolean threadStarted = new AtomicBoolean(false);
        Thread t = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    threadStarted.set(true);
                    flowController.reserve(1L, 0L);
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        });
        t.start();
        for (int i = 0; i < 1000 && !threadStarted.get(); ++i) {
            Thread.sleep(5L);
        }
        Thread.sleep(50L);
        flowController.release(20L, 0L);
        t.join();
        flowController.release(1L, 0L);
        Truth.assertThat((Comparable)flowController.getFlowControlEventStats().getLastFlowControlEvent()).isNotNull();
    }

    static class MockInnerCallable
    extends UnaryCallable<MutateRowsRequest, List<MutateRowsResponse>> {
        List<MutateRowsResponse> response = Lists.newArrayList();

        MockInnerCallable() {
        }

        public ApiFuture<List<MutateRowsResponse>> futureCall(MutateRowsRequest request, ApiCallContext context) {
            List latencyHeader = (List)context.getExtraHeaders().get(DynamicFlowControlCallableTest.LATENCY_HEADER);
            if (latencyHeader != null) {
                try {
                    Thread.sleep(Integer.valueOf((String)latencyHeader.get(0)).intValue());
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                if (Integer.valueOf((String)latencyHeader.get(0)) == 501) {
                    return ApiFutures.immediateFailedFuture((Throwable)new DeadlineExceededException("deadline exceeded", null, (StatusCode)GrpcStatusCode.of((Status.Code)Status.Code.DEADLINE_EXCEEDED), false));
                }
            }
            return ApiFutures.immediateFuture(this.response);
        }
    }
}

