/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.driver.core;

import com.datastax.driver.core.CCMConfig;
import com.datastax.driver.core.CCMTestsSupport;
import com.datastax.driver.core.CloseFuture;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TestUtils;
import com.datastax.driver.core.utils.SocketChannelMonitor;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Stack;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@CCMConfig(dirtiesContext={true})
public class SessionStressTest
extends CCMTestsSupport {
    private static final Logger logger = LoggerFactory.getLogger(SessionStressTest.class);
    private ListeningExecutorService executorService;
    private Cluster stressCluster;
    private final SocketChannelMonitor channelMonitor = new SocketChannelMonitor();

    public SessionStressTest() {
        this.executorService = MoreExecutors.listeningDecorator((ExecutorService)Executors.newFixedThreadPool(8));
    }

    @AfterMethod(groups={"long"}, alwaysRun=true)
    public void shutdown() throws Exception {
        this.executorService.shutdown();
        try {
            boolean shutdown = this.executorService.awaitTermination(30L, TimeUnit.SECONDS);
            if (!shutdown) {
                Assert.fail((String)"executor ran for longer than expected");
            }
        }
        catch (InterruptedException e) {
            Assert.fail((String)"Interrupted while waiting for executor to shutdown");
        }
        finally {
            this.executorService = null;
            System.gc();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"long"})
    public void sessions_should_not_leak_connections() {
        this.channelMonitor.reportAtFixedInterval(1, TimeUnit.SECONDS);
        this.stressCluster = Cluster.builder().addContactPoints(this.getContactPoints()).withPort(this.ccm().getBinaryPort()).withPoolingOptions(new PoolingOptions().setCoreConnectionsPerHost(HostDistance.LOCAL, 1)).withNettyOptions(this.channelMonitor.nettyOptions()).build();
        try {
            this.stressCluster.init();
            Assert.assertEquals((int)this.stressCluster.manager.sessions.size(), (int)0);
            Assert.assertEquals((int)((Integer)this.stressCluster.getMetrics().getOpenConnections().getValue()), (int)1);
            Session session = this.stressCluster.connect();
            Assert.assertEquals((int)this.stressCluster.manager.sessions.size(), (int)1);
            int coreConnections = TestUtils.numberOfLocalCoreConnections(this.stressCluster);
            Assert.assertEquals((int)((Integer)this.stressCluster.getMetrics().getOpenConnections().getValue()), (int)(1 + coreConnections));
            Assert.assertEquals((int)this.channelMonitor.openChannels(this.getContactPointsWithPorts()).size(), (int)(1 + coreConnections));
            session.close();
            Assert.assertEquals((int)this.stressCluster.manager.sessions.size(), (int)0);
            Assert.assertEquals((int)((Integer)this.stressCluster.getMetrics().getOpenConnections().getValue()), (int)1);
            Assert.assertEquals((int)this.channelMonitor.openChannels(this.getContactPointsWithPorts()).size(), (int)1);
            int nbOfSessions = 2000;
            int halfOfTheSessions = nbOfSessions / 2;
            int nbOfIterations = 5;
            int sleepTime = 20;
            for (int iteration = 1; iteration <= nbOfIterations; ++iteration) {
                logger.info("On iteration {}/{}.", (Object)iteration, (Object)nbOfIterations);
                logger.info("Creating {} sessions.", (Object)nbOfSessions);
                this.waitFor(this.openSessionsConcurrently(nbOfSessions));
                Assert.assertEquals((int)this.stressCluster.manager.sessions.size(), (int)nbOfSessions);
                Assert.assertEquals((int)((Integer)this.stressCluster.getMetrics().getOpenConnections().getValue()), (int)(coreConnections * nbOfSessions + 1));
                Assert.assertEquals((int)this.channelMonitor.openChannels(this.getContactPointsWithPorts()).size(), (int)(coreConnections * nbOfSessions + 1));
                logger.info("Closing {}/{} sessions.", (Object)halfOfTheSessions, (Object)nbOfSessions);
                this.waitFor(this.closeSessionsConcurrently(halfOfTheSessions));
                Assert.assertEquals((int)this.stressCluster.manager.sessions.size(), (int)halfOfTheSessions);
                Assert.assertEquals((int)((Integer)this.stressCluster.getMetrics().getOpenConnections().getValue()), (int)(coreConnections * (nbOfSessions / 2) + 1));
                Assert.assertEquals((int)this.channelMonitor.openChannels(this.getContactPointsWithPorts()).size(), (int)(coreConnections * (nbOfSessions / 2) + 1));
                logger.info("Closing and Opening {} sessions concurrently.", (Object)halfOfTheSessions);
                CountDownLatch startSignal = new CountDownLatch(2);
                List openSessionFutures = this.openSessionsConcurrently(halfOfTheSessions, startSignal);
                List closeSessionsFutures = this.closeSessionsConcurrently(halfOfTheSessions, startSignal);
                startSignal.countDown();
                this.waitFor(openSessionFutures);
                this.waitFor(closeSessionsFutures);
                Assert.assertEquals((int)this.stressCluster.manager.sessions.size(), (int)halfOfTheSessions);
                Assert.assertEquals((int)((Integer)this.stressCluster.getMetrics().getOpenConnections().getValue()), (int)(coreConnections * (nbOfSessions / 2) + 1));
                Assert.assertEquals((int)this.channelMonitor.openChannels(this.getContactPointsWithPorts()).size(), (int)(coreConnections * (nbOfSessions / 2) + 1));
                logger.info("Closing remaining {} sessions.", (Object)halfOfTheSessions);
                this.waitFor(this.closeSessionsConcurrently(halfOfTheSessions));
                Assert.assertEquals((int)this.stressCluster.manager.sessions.size(), (int)0);
                Assert.assertEquals((int)((Integer)this.stressCluster.getMetrics().getOpenConnections().getValue()), (int)1);
                Assert.assertEquals((int)this.channelMonitor.openChannels(this.getContactPointsWithPorts()).size(), (int)1);
                logger.info("Sleeping {} seconds so that TCP connections are released by the OS", (Object)sleepTime);
                Uninterruptibles.sleepUninterruptibly((long)sleepTime, (TimeUnit)TimeUnit.SECONDS);
            }
        }
        finally {
            this.stressCluster.close();
            this.stressCluster = null;
            Assert.assertEquals((int)this.channelMonitor.openChannels(this.getContactPointsWithPorts()).size(), (int)0);
            this.channelMonitor.stop();
            this.channelMonitor.report();
            logger.info("Sleeping 60 extra seconds");
            Uninterruptibles.sleepUninterruptibly((long)60L, (TimeUnit)TimeUnit.SECONDS);
        }
    }

    private List<ListenableFuture<Session>> openSessionsConcurrently(int iterations) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        return this.openSessionsConcurrently(iterations, countDownLatch);
    }

    private List<ListenableFuture<Session>> openSessionsConcurrently(int iterations, CountDownLatch countDownLatch) {
        ArrayList sessionFutures = Lists.newArrayListWithCapacity((int)iterations);
        for (int i = 0; i < iterations; ++i) {
            sessionFutures.add(this.executorService.submit((Callable)new OpenSession(countDownLatch)));
        }
        countDownLatch.countDown();
        return sessionFutures;
    }

    private List<ListenableFuture<Void>> closeSessionsConcurrently(int iterations) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        return this.closeSessionsConcurrently(iterations, countDownLatch);
    }

    private List<ListenableFuture<Void>> closeSessionsConcurrently(int iterations, CountDownLatch countDownLatch) {
        Stack sessionsToClose = new Stack();
        Iterator iterator = this.stressCluster.manager.sessions.iterator();
        for (int i = 0; i < iterations; ++i) {
            sessionsToClose.push(iterator.next());
        }
        ArrayList closeFutures = Lists.newArrayListWithCapacity((int)iterations);
        for (int i = 0; i < iterations; ++i) {
            closeFutures.add(this.executorService.submit((Callable)new CloseSession((Session)sessionsToClose.pop(), countDownLatch)));
        }
        countDownLatch.countDown();
        ArrayList futures = Lists.newArrayListWithCapacity((int)iterations);
        for (ListenableFuture closeFuture : closeFutures) {
            try {
                futures.add(closeFuture.get());
            }
            catch (Exception e) {
                logger.error("Got interrupted exception while waiting on closeFuture.", (Throwable)e);
            }
        }
        return futures;
    }

    private <E> void waitFor(List<ListenableFuture<E>> futures) {
        for (Future future : futures) {
            try {
                future.get();
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Interrupted while waiting for future", e);
            }
            catch (ExecutionException e) {
                e.printStackTrace();
                Assert.fail((String)e.getMessage());
            }
        }
    }

    private static class CloseSession
    implements Callable<CloseFuture> {
        private Session session;
        private final CountDownLatch startSignal;

        CloseSession(Session session, CountDownLatch startSignal) {
            this.session = session;
            this.startSignal = startSignal;
        }

        @Override
        public CloseFuture call() throws Exception {
            this.startSignal.await();
            try {
                CloseFuture closeFuture = this.session.closeAsync();
                return closeFuture;
            }
            finally {
                this.session = null;
            }
        }
    }

    private class OpenSession
    implements Callable<Session> {
        private final CountDownLatch startSignal;

        OpenSession(CountDownLatch startSignal) {
            this.startSignal = startSignal;
        }

        @Override
        public Session call() throws Exception {
            this.startSignal.await();
            return SessionStressTest.this.stressCluster.connect();
        }
    }
}

