/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.driver.core;

import com.datastax.driver.core.AbstractSession;
import com.datastax.driver.core.AsyncContinuousPagingResult;
import com.datastax.driver.core.CCMTestsSupport;
import com.datastax.driver.core.CloseFuture;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Connection;
import com.datastax.driver.core.ContinuousPagingOptions;
import com.datastax.driver.core.ContinuousPagingResult;
import com.datastax.driver.core.ContinuousPagingSession;
import com.datastax.driver.core.GuavaCompatibility;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.HostConnectionPool;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SessionManager;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.datastax.driver.core.utils.CassandraVersion;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class AsyncQueryTest
extends CCMTestsSupport {
    Logger logger = LoggerFactory.getLogger(AsyncQueryTest.class);

    @DataProvider(name="keyspace")
    public static Object[][] keyspace() {
        return new Object[][]{{"asyncquerytest"}, {"\"AsyncQueryTest\""}};
    }

    @Override
    public void onTestContextInitialized() {
        for (Object[] objects : AsyncQueryTest.keyspace()) {
            String keyspace = (String)objects[0];
            this.execute(String.format("create keyspace %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}", keyspace), String.format("create table %s.foo(k int, v int, primary key (k, v))", keyspace));
            for (int v = 1; v <= 100; ++v) {
                this.execute(String.format("insert into %s.foo (k, v) values (1, %d)", keyspace, v));
            }
        }
    }

    @Test(groups={"short"})
    public void cancelled_query_should_release_the_connection() throws InterruptedException {
        ResultSetFuture future = this.session().executeAsync("select release_version from system.local");
        future.cancel(true);
        Assert.assertTrue((boolean)future.isCancelled());
        TimeUnit.MILLISECONDS.sleep(100L);
        HostConnectionPool pool = AsyncQueryTest.getPool(this.session());
        for (Connection connection : pool.connections) {
            Assert.assertEquals((int)connection.inFlight.get(), (int)0);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"short"})
    public void should_init_cluster_and_session_if_needed() throws Exception {
        Host host = (Host)this.cluster().getMetadata().allHosts().iterator().next();
        Cluster cluster2 = this.register(Cluster.builder().addContactPointsWithPorts((Collection)Lists.newArrayList((Object[])new InetSocketAddress[]{host.getSocketAddress()})).build());
        try {
            Session session2 = cluster2.newSession();
            Assertions.assertThat((Object)cluster2.manager.metadata).isNull();
            ResultSetFuture future = session2.executeAsync("select release_version from system.local");
            Row row = ((ResultSet)Uninterruptibles.getUninterruptibly((Future)future)).one();
            Assertions.assertThat((String)row.getString(0)).isNotEmpty();
        }
        finally {
            cluster2.close();
        }
    }

    @Test(groups={"short"}, dataProvider="keyspace", enabled=false, description="disabled because the blocking USE call in the current pool implementation makes it deadlock")
    public void should_chain_query_on_async_session_init_with_same_executor(String keyspace) throws Exception {
        ListenableFuture<Integer> resultFuture = this.connectAndQuery(keyspace, GuavaCompatibility.INSTANCE.sameThreadExecutor());
        Integer result = (Integer)Uninterruptibles.getUninterruptibly(resultFuture);
        Assertions.assertThat((Integer)result).isEqualTo(1);
    }

    @Test(groups={"short"}, dataProvider="keyspace")
    public void should_chain_query_on_async_session_init_with_different_executor(String keyspace) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(1);
        ListenableFuture<Integer> resultFuture = this.connectAndQuery(keyspace, executor);
        Integer result = (Integer)Uninterruptibles.getUninterruptibly(resultFuture);
        Assertions.assertThat((Integer)result).isEqualTo(1);
        executor.shutdownNow();
    }

    @Test(groups={"short"})
    public void should_propagate_error_to_chained_query_if_session_init_fails() throws Exception {
        ListenableFuture<Integer> resultFuture = this.connectAndQuery("wrong_keyspace", GuavaCompatibility.INSTANCE.sameThreadExecutor());
        try {
            Uninterruptibles.getUninterruptibly(resultFuture);
        }
        catch (ExecutionException e) {
            ((AbstractThrowableAssert)Assertions.assertThat((Throwable)e.getCause()).isInstanceOf(InvalidQueryException.class)).hasMessage("Keyspace 'wrong_keyspace' does not exist");
        }
    }

    @Test(groups={"short"})
    public void should_fail_when_synchronous_call_on_io_thread() throws Exception {
        for (int i = 0; i < 1000; ++i) {
            ResultSetFuture f = this.session().executeAsync("select release_version from system.local");
            ListenableFuture f2 = Futures.transform((ListenableFuture)f, (Function)new Function<ResultSet, Thread>(){

                public Thread apply(ResultSet input) {
                    AsyncQueryTest.this.session().execute("select release_version from system.local");
                    return Thread.currentThread();
                }
            });
            if (!this.isFailed((ListenableFuture<Thread>)f2, IllegalStateException.class, "Detected a synchronous call on an I/O thread")) continue;
            return;
        }
        Assertions.fail((String)"callback was not executed on io thread in 1000 attempts, something may be wrong.");
    }

    @Test(groups={"short"})
    public void should_fail_when_synchronous_call_on_io_thread_with_session_wrapper() throws Exception {
        SessionWrapper session = new SessionWrapper(this.session());
        for (int i = 0; i < 1000; ++i) {
            ResultSetFuture f = session.executeAsync("select release_version from system.local");
            ListenableFuture f2 = Futures.transform((ListenableFuture)f, (Function)new Function<ResultSet, Thread>((Session)session){
                final /* synthetic */ Session val$session;
                {
                    this.val$session = session;
                }

                public Thread apply(ResultSet input) {
                    this.val$session.execute("select release_version from system.local");
                    return Thread.currentThread();
                }
            });
            if (!this.isFailed((ListenableFuture<Thread>)f2, IllegalStateException.class, "Detected a synchronous call on an I/O thread")) continue;
            return;
        }
        Assertions.fail((String)"callback was not executed on io thread in 1000 attempts, something may be wrong.");
    }

    @Test(groups={"short"})
    @CassandraVersion(value="2.0.0", description="Paging is not supported until 2.0")
    public void should_fail_when_auto_paging_on_io_thread() throws Exception {
        for (int i = 0; i < 1000; ++i) {
            SimpleStatement statement = new SimpleStatement("select v from asyncquerytest.foo where k = 1");
            statement.setFetchSize(10);
            ResultSetFuture f = this.session().executeAsync((Statement)statement);
            ListenableFuture f2 = Futures.transform((ListenableFuture)f, (Function)new Function<ResultSet, Thread>(){

                public Thread apply(ResultSet rs) {
                    rs.all();
                    return Thread.currentThread();
                }
            });
            if (!this.isFailed((ListenableFuture<Thread>)f2, IllegalStateException.class, "Detected a synchronous call on an I/O thread")) continue;
            return;
        }
        Assertions.fail((String)"callback was not executed on io thread in 1000 attempts, something may be wrong.");
    }

    private boolean isFailed(ListenableFuture<Thread> future, Class<?> expectedException, String expectedMessage) {
        try {
            Thread executedThread = (Thread)future.get();
            if (executedThread != Thread.currentThread()) {
                Assertions.fail((String)("Expected a failed future, callback was executed on " + executedThread));
            } else {
                this.logger.warn("Future completed before transform callback registered, will try again.");
            }
        }
        catch (Exception e) {
            ((AbstractThrowableAssert)Assertions.assertThat((Throwable)Throwables.getRootCause((Throwable)e)).isInstanceOf(expectedException)).hasMessageContaining(expectedMessage);
            return true;
        }
        return false;
    }

    private ListenableFuture<Integer> connectAndQuery(String keyspace, Executor executor) {
        ListenableFuture sessionFuture = this.cluster().connectAsync(keyspace);
        ListenableFuture queryFuture = GuavaCompatibility.INSTANCE.transformAsync(sessionFuture, (AsyncFunction)new AsyncFunction<Session, ResultSet>(){

            public ListenableFuture<ResultSet> apply(Session session) throws Exception {
                return session.executeAsync("select v from foo where k = 1");
            }
        }, executor);
        return Futures.transform((ListenableFuture)queryFuture, (Function)new Function<ResultSet, Integer>(){

            public Integer apply(ResultSet rs) {
                return rs.one().getInt(0);
            }
        }, (Executor)executor);
    }

    private static HostConnectionPool getPool(Session session) {
        Collection pools = ((SessionManager)session).pools.values();
        Assert.assertEquals((int)pools.size(), (int)1);
        return (HostConnectionPool)pools.iterator().next();
    }

    private static class SessionWrapper
    extends AbstractSession {
        private final Session session;

        public SessionWrapper(Session session) {
            this.session = session;
        }

        public ResultSet execute(Statement statement) {
            this.checkNotInEventLoop();
            return this.executeAsync(statement).getUninterruptibly();
        }

        public String getLoggedKeyspace() {
            return this.session.getLoggedKeyspace();
        }

        public Session init() {
            return this.session.init();
        }

        public ListenableFuture<Session> initAsync() {
            return this.session.initAsync();
        }

        public ResultSetFuture executeAsync(Statement statement) {
            return this.session.executeAsync(statement);
        }

        public ListenableFuture<AsyncContinuousPagingResult> executeContinuouslyAsync(Statement statement, ContinuousPagingOptions options) {
            return ((ContinuousPagingSession)this.session).executeContinuouslyAsync(statement, options);
        }

        public ContinuousPagingResult executeContinuously(Statement statement, ContinuousPagingOptions options) {
            return ((ContinuousPagingSession)this.session).executeContinuously(statement, options);
        }

        public CloseFuture closeAsync() {
            return this.session.closeAsync();
        }

        public boolean isClosed() {
            return this.session.isClosed();
        }

        public Cluster getCluster() {
            return this.session.getCluster();
        }

        public Session.State getState() {
            return this.session.getState();
        }

        protected ListenableFuture<PreparedStatement> prepareAsync(String query, Map<String, ByteBuffer> customPayload) {
            return ((SessionManager)this.session).prepareAsync(query, customPayload);
        }
    }
}

