/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.bigtable.data.v2.it;

import com.google.api.core.ApiFunction;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.Credentials;
import com.google.auth.oauth2.ComputeEngineCredentials;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
import com.google.cloud.bigtable.test_helpers.env.AbstractTestEnv;
import com.google.cloud.bigtable.test_helpers.env.TestEnvRule;
import cz.o2.proxima.beam.io.pubsub.io.grpc.ManagedChannelBuilder;
import cz.o2.proxima.beam.io.pubsub.io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import cz.o2.proxima.beam.io.pubsub.io.grpc.netty.shaded.io.netty.channel.ChannelDuplexHandler;
import cz.o2.proxima.beam.io.pubsub.io.grpc.netty.shaded.io.netty.channel.ChannelFactory;
import cz.o2.proxima.beam.io.pubsub.io.grpc.netty.shaded.io.netty.channel.ChannelHandler;
import cz.o2.proxima.beam.io.pubsub.io.grpc.netty.shaded.io.netty.channel.ChannelHandlerContext;
import cz.o2.proxima.beam.io.pubsub.io.grpc.netty.shaded.io.netty.channel.ChannelPromise;
import cz.o2.proxima.beam.io.pubsub.io.grpc.netty.shaded.io.netty.channel.EventLoopGroup;
import cz.o2.proxima.beam.io.pubsub.io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
import cz.o2.proxima.beam.io.pubsub.io.grpc.netty.shaded.io.netty.channel.socket.nio.NioSocketChannel;
import cz.o2.proxima.beam.io.pubsub.io.grpc.netty.shaded.io.netty.util.ReferenceCountUtil;
import cz.o2.proxima.internal.shaded.com.google.common.base.Stopwatch;
import cz.o2.proxima.internal.shaded.com.google.common.collect.ImmutableSet;
import cz.o2.proxima.internal.shaded.com.google.common.truth.Truth;
import cz.o2.proxima.internal.shaded.com.google.common.truth.TruthJUnit;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class DirectPathFallbackIT {
    private static final int MIN_COMPLETE_READ_CALLS = 40;
    private static final int NUM_RPCS_TO_SEND = 20;
    private static final String DP_IPV6_PREFIX = "2001:4860:8040";
    private static final String DP_IPV4_PREFIX = "34.126";
    @ClassRule
    public static TestEnvRule testEnvRule = new TestEnvRule();
    private AtomicBoolean blackholeDpAddr = new AtomicBoolean();
    private AtomicInteger numBlocked = new AtomicInteger();
    private AtomicInteger numDpAddrRead = new AtomicInteger();
    private ChannelFactory<NioSocketChannel> channelFactory = new MyChannelFactory();
    private EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    private BigtableDataClient instrumentedClient;

    @Before
    public void setup() throws IOException {
        ImmutableSet validModes = ImmutableSet.of((Object)((Object)AbstractTestEnv.ConnectionMode.REQUIRE_DIRECT_PATH), (Object)((Object)AbstractTestEnv.ConnectionMode.REQUIRE_DIRECT_PATH_IPV4));
        TruthJUnit.assume().withMessage("DirectPathFallbackIT can only return when explicitly requested").that(Boolean.valueOf(validModes.contains((Object)testEnvRule.env().getConnectionMode()))).isTrue();
        BigtableDataSettings defaultSettings = testEnvRule.env().getDataClientSettings();
        InstantiatingGrpcChannelProvider defaultTransportProvider = (InstantiatingGrpcChannelProvider)defaultSettings.getStubSettings().getTransportChannelProvider();
        InstantiatingGrpcChannelProvider instrumentedTransportChannelProvider = defaultTransportProvider.toBuilder().setAttemptDirectPath(true).setPoolSize(1).setChannelConfigurator((ApiFunction)new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>(){

            public ManagedChannelBuilder apply(ManagedChannelBuilder builder) {
                DirectPathFallbackIT.this.injectNettyChannelHandler(builder);
                builder.keepAliveTime(1L, TimeUnit.SECONDS);
                builder.keepAliveTimeout(1L, TimeUnit.SECONDS);
                return builder;
            }
        }).build();
        BigtableDataSettings.Builder settingsBuilder = testEnvRule.env().getDataClientSettings().toBuilder();
        ((EnhancedBigtableStubSettings.Builder)settingsBuilder.stubSettings().setTransportChannelProvider((TransportChannelProvider)instrumentedTransportChannelProvider)).setCredentialsProvider((CredentialsProvider)FixedCredentialsProvider.create((Credentials)ComputeEngineCredentials.create()));
        this.instrumentedClient = BigtableDataClient.create((BigtableDataSettings)settingsBuilder.build());
    }

    @After
    public void teardown() {
        if (this.instrumentedClient != null) {
            this.instrumentedClient.close();
        }
        if (this.eventLoopGroup != null) {
            this.eventLoopGroup.shutdownGracefully();
        }
    }

    @Test
    public void testFallback() throws InterruptedException, TimeoutException {
        Truth.assertWithMessage((String)"Failed to observe RPCs over DirectPath").that(Boolean.valueOf(this.exerciseDirectPath())).isTrue();
        this.blackholeDpAddr.set(true);
        this.instrumentedClient.readRow(testEnvRule.env().getTableId(), "nonexistent-row");
        Truth.assertWithMessage((String)"Failed to detect any IPv6 traffic in blackhole").that(Integer.valueOf(this.numBlocked.get())).isGreaterThan((Comparable)Integer.valueOf(0));
        this.blackholeDpAddr.set(false);
        Truth.assertWithMessage((String)"Failed to upgrade back to DirectPath").that(Boolean.valueOf(this.exerciseDirectPath())).isTrue();
    }

    private boolean exerciseDirectPath() throws InterruptedException, TimeoutException {
        Stopwatch stopwatch = Stopwatch.createStarted();
        this.numDpAddrRead.set(0);
        boolean seenEnough = false;
        while (!seenEnough && stopwatch.elapsed(TimeUnit.MINUTES) < 2L) {
            for (int i = 0; i < 20; ++i) {
                this.instrumentedClient.readRow(testEnvRule.env().getTableId(), "nonexistent-row");
            }
            Thread.sleep(100L);
            seenEnough = this.numDpAddrRead.get() >= 40;
        }
        return seenEnough;
    }

    private void injectNettyChannelHandler(ManagedChannelBuilder<?> channelBuilder) {
        NettyChannelBuilder nettyChannelBuilder = (NettyChannelBuilder)channelBuilder;
        nettyChannelBuilder.channelFactory(this.channelFactory);
        nettyChannelBuilder.eventLoopGroup(this.eventLoopGroup);
    }

    private class MyChannelHandler
    extends ChannelDuplexHandler {
        private boolean isDpAddr;

        private MyChannelHandler() {
        }

        public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
            if (remoteAddress instanceof InetSocketAddress) {
                InetAddress inetAddress = ((InetSocketAddress)remoteAddress).getAddress();
                String addr = inetAddress.getHostAddress();
                boolean bl = this.isDpAddr = addr.startsWith(DirectPathFallbackIT.DP_IPV6_PREFIX) || addr.startsWith(DirectPathFallbackIT.DP_IPV4_PREFIX);
            }
            if (!this.isDpAddr || !DirectPathFallbackIT.this.blackholeDpAddr.get()) {
                super.connect(ctx, remoteAddress, localAddress, promise);
            } else {
                promise.setFailure((Throwable)new IOException("fake error"));
            }
        }

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            boolean dropCall;
            boolean bl = dropCall = this.isDpAddr && DirectPathFallbackIT.this.blackholeDpAddr.get();
            if (dropCall) {
                DirectPathFallbackIT.this.numBlocked.incrementAndGet();
                ReferenceCountUtil.release((Object)msg);
            } else {
                super.channelRead(ctx, msg);
            }
        }

        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            boolean dropCall;
            boolean bl = dropCall = this.isDpAddr && DirectPathFallbackIT.this.blackholeDpAddr.get();
            if (dropCall) {
                DirectPathFallbackIT.this.numBlocked.incrementAndGet();
            } else {
                if (this.isDpAddr) {
                    DirectPathFallbackIT.this.numDpAddrRead.incrementAndGet();
                }
                super.channelReadComplete(ctx);
            }
        }
    }

    private class MyChannelFactory
    implements ChannelFactory<NioSocketChannel> {
        private MyChannelFactory() {
        }

        public NioSocketChannel newChannel() {
            NioSocketChannel channel = new NioSocketChannel();
            channel.pipeline().addLast(new ChannelHandler[]{new MyChannelHandler()});
            return channel;
        }
    }
}

