/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.driver.core;

import com.datastax.driver.core.Assertions;
import com.datastax.driver.core.AuthProvider;
import com.datastax.driver.core.Authenticator;
import com.datastax.driver.core.CCMConfig;
import com.datastax.driver.core.CCMTestsSupport;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.CountingReconnectionPolicy;
import com.datastax.driver.core.CreateCCM;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.PlainTextAuthProvider;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.TestUtils;
import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
import com.datastax.driver.core.policies.DelegatingLoadBalancingPolicy;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.ReconnectionPolicy;
import com.datastax.driver.core.policies.RoundRobinPolicy;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.testng.annotations.Test;

@CreateCCM(value=CreateCCM.TestMode.PER_METHOD)
public class ReconnectionTest
extends CCMTestsSupport {
    private final int reconnectionDelayMillis = 1000;

    @CCMConfig(dirtiesContext={true}, numberOfNodes={2}, createCluster={false})
    @Test(groups={"long"})
    public void should_reconnect_after_full_connectivity_loss() throws InterruptedException {
        Cluster cluster = this.register(Cluster.builder().addContactPoints(new InetAddress[]{this.getContactPoints().get(0)}).withPort(this.ccm().getBinaryPort()).withReconnectionPolicy((ReconnectionPolicy)new ConstantReconnectionPolicy(1000L)).build());
        cluster.connect();
        Assertions.assertThat(cluster).usesControlHost(1);
        this.ccm().stop(2);
        this.ccm().stop(1);
        this.ccm().waitForDown(2);
        this.ccm().start(2);
        this.ccm().waitForUp(2);
        Assertions.assertThat(cluster).host(2).comesUpWithin(Cluster.NEW_NODE_DELAY_SECONDS * 2, TimeUnit.SECONDS);
        TimeUnit.MILLISECONDS.sleep(2000L);
        Assertions.assertThat(cluster).usesControlHost(2);
    }

    @CCMConfig(dirtiesContext={true}, config={"authenticator:PasswordAuthenticator"}, jvmArgs={"-Dcassandra.superuser_setup_delay_ms=0"}, createCluster={false})
    @Test(groups={"long"})
    public void should_keep_reconnecting_on_authentication_error() throws InterruptedException {
        if (this.ccm().getCassandraVersion().getMajor() < 2) {
            Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.SECONDS);
        }
        CountingReconnectionPolicy reconnectionPolicy = new CountingReconnectionPolicy((ReconnectionPolicy)new ConstantReconnectionPolicy(1000L));
        CountingAuthProvider authProvider = new CountingAuthProvider("cassandra", "cassandra");
        Cluster cluster = this.register(Cluster.builder().addContactPoints(new InetAddress[]{this.getContactPoints().get(0)}).withPort(this.ccm().getBinaryPort()).withAuthProvider((AuthProvider)authProvider).withReconnectionPolicy((ReconnectionPolicy)reconnectionPolicy).build());
        cluster.init();
        Assertions.assertThat(cluster).usesControlHost(1);
        this.ccm().stop(1);
        this.ccm().waitForDown(1);
        authProvider.setPassword("wrongPassword");
        this.ccm().start(1);
        this.ccm().waitForUp(1);
        int initialCount = authProvider.count.get();
        long initialMetricCount = cluster.getMetrics().getErrorMetrics().getAuthenticationErrors().getCount();
        int iterations = 0;
        int maxIterations = 12;
        do {
            TimeUnit.SECONDS.sleep(5L);
        } while (++iterations < maxIterations && authProvider.count.get() <= initialCount + 1);
        Assertions.assertThat((int)iterations).isLessThan(maxIterations);
        Assertions.assertThat((long)cluster.getMetrics().getErrorMetrics().getAuthenticationErrors().getCount()).isGreaterThan(initialMetricCount);
        authProvider.setPassword("cassandra");
        Assertions.assertThat(cluster).host(1).comesUpWithin(Cluster.NEW_NODE_DELAY_SECONDS * 2, TimeUnit.SECONDS);
    }

    @CCMConfig(dirtiesContext={true}, numberOfNodes={2}, createCluster={false})
    @Test(groups={"long"})
    public void should_cancel_reconnection_attempts() throws InterruptedException {
        CountingReconnectionPolicy reconnectionPolicy = new CountingReconnectionPolicy((ReconnectionPolicy)new ConstantReconnectionPolicy(1000L));
        Cluster cluster = this.register(Cluster.builder().addContactPoints(new InetAddress[]{this.getContactPoints().get(0)}).withPort(this.ccm().getBinaryPort()).withReconnectionPolicy((ReconnectionPolicy)reconnectionPolicy).build());
        cluster.connect();
        this.ccm().stop(2);
        Host host2 = TestUtils.findHost(cluster, 2);
        host2.getReconnectionAttemptFuture().cancel(false);
        int initialCount = reconnectionPolicy.count.get();
        TimeUnit.MILLISECONDS.sleep(2000L);
        Assertions.assertThat((int)reconnectionPolicy.count.get()).isEqualTo(initialCount);
        this.ccm().start(2);
        this.ccm().waitForUp(2);
        Assertions.assertThat(cluster).host(2).comesUpWithin(Cluster.NEW_NODE_DELAY_SECONDS * 2, TimeUnit.SECONDS);
    }

    @CCMConfig(dirtiesContext={true}, createCluster={false})
    @Test(groups={"long"})
    public void should_trigger_one_time_reconnect() throws InterruptedException, IOException {
        TogglabePolicy loadBalancingPolicy = new TogglabePolicy((LoadBalancingPolicy)new RoundRobinPolicy());
        Cluster cluster = this.register(Cluster.builder().addContactPointsWithPorts(new InetSocketAddress[]{this.ccm().addressOfNode(1)}).withPort(this.ccm().getBinaryPort()).withLoadBalancingPolicy((LoadBalancingPolicy)loadBalancingPolicy).withReconnectionPolicy((ReconnectionPolicy)new ConstantReconnectionPolicy(1000L)).build());
        cluster.connect();
        loadBalancingPolicy.returnEmptyQueryPlan = true;
        this.ccm().stop(1);
        this.ccm().waitForDown(1);
        Assertions.assertThat(cluster).host(1).goesDownWithin(20L, TimeUnit.SECONDS);
        Host host1 = TestUtils.findHost(cluster, 1);
        loadBalancingPolicy.setDistance(TestUtils.findHost(cluster, 1), HostDistance.IGNORED);
        ListenableFuture reconnectionAttemptFuture = host1.getReconnectionAttemptFuture();
        if (reconnectionAttemptFuture != null) {
            reconnectionAttemptFuture.cancel(false);
        }
        host1.tryReconnectOnce();
        TimeUnit.MILLISECONDS.sleep(2000L);
        Assertions.assertThat(cluster).host(1).hasState(Host.State.DOWN);
        this.ccm().start(1);
        this.ccm().waitForUp(1);
        Assertions.assertThat(cluster).host(1).hasState(Host.State.DOWN);
        TimeUnit.SECONDS.sleep(Cluster.NEW_NODE_DELAY_SECONDS);
        Assertions.assertThat(cluster).host(1).hasState(Host.State.DOWN);
        host1.tryReconnectOnce();
        Assertions.assertThat(cluster).host(1).comesUpWithin(Cluster.NEW_NODE_DELAY_SECONDS * 2, TimeUnit.SECONDS);
    }

    @CCMConfig(dirtiesContext={true}, createCluster={false})
    @Test(groups={"long"})
    public void should_use_connection_from_reconnection_in_pool() {
        TogglabePolicy loadBalancingPolicy = new TogglabePolicy((LoadBalancingPolicy)new RoundRobinPolicy());
        SocketOptions socketOptions = (SocketOptions)Mockito.spy((Object)new SocketOptions());
        Cluster cluster = this.register(Cluster.builder().addContactPoints(new InetAddress[]{this.getContactPoints().get(0)}).withPort(this.ccm().getBinaryPort()).withReconnectionPolicy((ReconnectionPolicy)new ConstantReconnectionPolicy(5000L)).withLoadBalancingPolicy((LoadBalancingPolicy)loadBalancingPolicy).withSocketOptions(socketOptions).withProtocolVersion(this.ccm().getProtocolVersion()).build());
        cluster.connect();
        cluster.connect();
        int corePoolSize = TestUtils.numberOfLocalCoreConnections(cluster);
        ((SocketOptions)Mockito.verify((Object)socketOptions, (VerificationMode)Mockito.times((int)(1 + corePoolSize * 2)))).getKeepAlive();
        loadBalancingPolicy.returnEmptyQueryPlan = true;
        this.ccm().stop(1);
        this.ccm().waitForDown(1);
        Assertions.assertThat(cluster).host(1).goesDownWithin(20L, TimeUnit.SECONDS);
        Host host1 = TestUtils.findHost(cluster, 1);
        host1.getReconnectionAttemptFuture().cancel(false);
        this.ccm().start(1);
        this.ccm().waitForUp(1);
        Mockito.reset((Object[])new SocketOptions[]{socketOptions});
        host1.tryReconnectOnce();
        Assertions.assertThat(cluster).host(1).comesUpWithin(Cluster.NEW_NODE_DELAY_SECONDS * 2, TimeUnit.SECONDS);
        ((SocketOptions)Mockito.verify((Object)socketOptions, (VerificationMode)Mockito.times((int)(corePoolSize * 2)))).getKeepAlive();
    }

    public static class TogglabePolicy
    extends DelegatingLoadBalancingPolicy {
        volatile boolean returnEmptyQueryPlan;
        final ConcurrentMap<Host, HostDistance> distances = new ConcurrentHashMap<Host, HostDistance>();

        public TogglabePolicy(LoadBalancingPolicy delegate) {
            super(delegate);
        }

        @Override
        public HostDistance distance(Host host) {
            HostDistance distance = (HostDistance)this.distances.get(host);
            return distance != null ? distance : super.distance(host);
        }

        public void setDistance(Host host, HostDistance distance) {
            this.distances.put(host, distance);
        }

        @Override
        public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement) {
            if (this.returnEmptyQueryPlan) {
                return Collections.emptyList().iterator();
            }
            return super.newQueryPlan(loggedKeyspace, statement);
        }
    }

    static class CountingAuthProvider
    extends PlainTextAuthProvider {
        final AtomicInteger count = new AtomicInteger();

        CountingAuthProvider(String username, String password) {
            super(username, password);
        }

        public Authenticator newAuthenticator(InetSocketAddress host, String authenticator) {
            this.count.incrementAndGet();
            return super.newAuthenticator(host, authenticator);
        }
    }
}

