/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.driver.core;

import com.datastax.driver.core.Assertions;
import com.datastax.driver.core.CCMConfig;
import com.datastax.driver.core.CCMTestsSupport;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.TestUtils;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
import com.datastax.driver.core.policies.ReconnectionPolicy;
import com.datastax.driver.core.utils.SocketChannelMonitor;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.channel.socket.SocketChannel;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.AbstractIntegerAssert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;

@CCMConfig(numberOfNodes={3})
public class TimeoutStressTest
extends CCMTestsSupport {
    static final Logger logger = LoggerFactory.getLogger(TimeoutStressTest.class);
    static final int CONCURRENT_QUERIES = 25;
    static final long DURATION = 60000L;
    static final int READ_TIMEOUT_IN_MS = 50;
    static final int CONNECTION_TIMEOUT_IN_MS = 20;
    private static AtomicInteger executedQueries = new AtomicInteger(0);
    private SocketChannelMonitor channelMonitor;

    @Override
    public Cluster.Builder createClusterBuilder() {
        this.channelMonitor = this.register(new SocketChannelMonitor());
        PoolingOptions poolingOptions = new PoolingOptions().setConnectionsPerHost(HostDistance.LOCAL, 8, 8);
        return Cluster.builder().withPoolingOptions(poolingOptions).withNettyOptions(this.channelMonitor.nettyOptions()).withReconnectionPolicy((ReconnectionPolicy)new ConstantReconnectionPolicy(1000L));
    }

    @Override
    public void onTestContextInitialized() {
        this.execute("create table record (\n  name text,\n  phone text,\n  value text,\n  PRIMARY KEY (name, phone)\n)");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"stress"})
    public void host_state_should_be_maintained_with_timeouts() throws Exception {
        this.insertRecords();
        this.session().close();
        this.cluster().getConfiguration().getSocketOptions().setConnectTimeoutMillis(20);
        this.cluster().getConfiguration().getSocketOptions().setReadTimeoutMillis(50);
        Session newSession = this.cluster().connect(this.keyspace);
        PreparedStatement statement = newSession.prepare("select * from record where name=? limit 1000;");
        int workers = Runtime.getRuntime().availableProcessors();
        ExecutorService workerPool = Executors.newFixedThreadPool(workers, new ThreadFactoryBuilder().setNameFormat("timeout-stress-test-worker-%d").setDaemon(true).build());
        AtomicBoolean stopped = new AtomicBoolean(false);
        int maxConnections = TestUtils.numberOfLocalCoreConnections(this.cluster()) * this.getContactPoints().size() + 1;
        try {
            Semaphore concurrentQueries = new Semaphore(25);
            for (int i = 0; i < workers; ++i) {
                workerPool.submit(new TimeoutStressWorker(newSession, statement, concurrentQueries, stopped));
            }
            long startTime = System.currentTimeMillis();
            while (System.currentTimeMillis() - startTime < 60000L) {
                Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.SECONDS);
                this.channelMonitor.report();
                Collection<SocketChannel> openChannels = this.channelMonitor.openChannels(this.getContactPointsWithPorts());
                if (openChannels.size() <= maxConnections) continue;
                logger.warn("{} of open channels: {} exceeds maximum expected: {}.  This could be because there are connections to be cleaned up in the reaper.", new Object[]{openChannels.size(), maxConnections, openChannels});
            }
            stopped.set(true);
        }
        catch (Throwable throwable) {
            stopped.set(true);
            this.cluster().getConfiguration().getSocketOptions().setConnectTimeoutMillis(5000);
            this.cluster().getConfiguration().getSocketOptions().setReadTimeoutMillis(12000);
            logger.debug("Sleeping 20 seconds to allow connection reaper to clean up connections and for the pools to recover.");
            Uninterruptibles.sleepUninterruptibly((long)20L, (TimeUnit)TimeUnit.SECONDS);
            Collection<SocketChannel> openChannels = this.channelMonitor.openChannels(this.getContactPointsWithPorts());
            ((AbstractIntegerAssert)Assertions.assertThat((int)openChannels.size()).as("Number of open connections does not meet expected: %s", new Object[]{openChannels})).isLessThanOrEqualTo(maxConnections);
            Assertions.assertThat(this.cluster()).host(1).comesUpWithin(0L, TimeUnit.SECONDS);
            Assertions.assertThat(this.cluster()).host(2).comesUpWithin(0L, TimeUnit.SECONDS);
            Assertions.assertThat(this.cluster()).host(3).comesUpWithin(0L, TimeUnit.SECONDS);
            newSession.close();
            openChannels = this.channelMonitor.openChannels(this.getContactPointsWithPorts());
            ((AbstractIntegerAssert)Assertions.assertThat((int)openChannels.size()).as("Number of open connections does not meet expected: %s", new Object[]{openChannels})).isEqualTo(1);
            workerPool.shutdown();
            throw throwable;
        }
        this.cluster().getConfiguration().getSocketOptions().setConnectTimeoutMillis(5000);
        this.cluster().getConfiguration().getSocketOptions().setReadTimeoutMillis(12000);
        logger.debug("Sleeping 20 seconds to allow connection reaper to clean up connections and for the pools to recover.");
        Uninterruptibles.sleepUninterruptibly((long)20L, (TimeUnit)TimeUnit.SECONDS);
        Collection<SocketChannel> openChannels = this.channelMonitor.openChannels(this.getContactPointsWithPorts());
        ((AbstractIntegerAssert)Assertions.assertThat((int)openChannels.size()).as("Number of open connections does not meet expected: %s", new Object[]{openChannels})).isLessThanOrEqualTo(maxConnections);
        Assertions.assertThat(this.cluster()).host(1).comesUpWithin(0L, TimeUnit.SECONDS);
        Assertions.assertThat(this.cluster()).host(2).comesUpWithin(0L, TimeUnit.SECONDS);
        Assertions.assertThat(this.cluster()).host(3).comesUpWithin(0L, TimeUnit.SECONDS);
        newSession.close();
        openChannels = this.channelMonitor.openChannels(this.getContactPointsWithPorts());
        ((AbstractIntegerAssert)Assertions.assertThat((int)openChannels.size()).as("Number of open connections does not meet expected: %s", new Object[]{openChannels})).isEqualTo(1);
        workerPool.shutdown();
    }

    private void insertRecords() {
        int records = 30000;
        PreparedStatement insertStmt = this.session().prepare("insert into record (name, phone, value) values (?, ?, ?)");
        for (int i = 0; i < records; ++i) {
            if (i % 1000 == 0) {
                logger.debug("Inserting record {}.", (Object)i);
            }
            this.session().execute((Statement)insertStmt.bind(new Object[]{"0", Integer.toString(i), "test"}));
        }
        logger.debug("Inserts complete.");
    }

    public static class TimeoutStressWorker
    implements Runnable {
        private final Semaphore concurrentQueries;
        private final AtomicBoolean stopped;
        private final Session session;
        private final PreparedStatement statement;

        public TimeoutStressWorker(Session session, PreparedStatement statement, Semaphore concurrentQueries, AtomicBoolean stopped) {
            this.session = session;
            this.statement = statement;
            this.concurrentQueries = concurrentQueries;
            this.stopped = stopped;
        }

        @Override
        public void run() {
            while (!this.stopped.get()) {
                try {
                    this.concurrentQueries.acquire();
                    ResultSetFuture future = this.session.executeAsync((Statement)this.statement.bind(new Object[]{"0"}));
                    Futures.addCallback((ListenableFuture)future, (FutureCallback)new FutureCallback<ResultSet>(){

                        public void onSuccess(ResultSet result) {
                            TimeoutStressWorker.this.concurrentQueries.release();
                            if (executedQueries.incrementAndGet() % 1000 == 0) {
                                logger.debug("Successfully executed {}.  rows: {}", (Object)executedQueries.get(), (Object)result.getAvailableWithoutFetching());
                            }
                        }

                        public void onFailure(Throwable t) {
                            TimeoutStressWorker.this.concurrentQueries.release();
                            if (t instanceof NoHostAvailableException) {
                                // empty if block
                            }
                        }
                    });
                }
                catch (Exception e) {
                    logger.error("Failure while submitting query.", (Throwable)e);
                }
            }
        }
    }
}

