/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.driver.core;

import com.datastax.driver.core.CCMTestsSupport;
import com.datastax.driver.core.GuavaCompatibility;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.utils.CassandraVersion;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Future;
import org.assertj.core.api.Assertions;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@CassandraVersion(value="2.0.0", description="uses paging")
public class AsyncResultSetTest
extends CCMTestsSupport {
    @Override
    public void onTestContextInitialized() {
        this.execute("create table ints (i int primary key)");
    }

    @BeforeMethod(groups={"short"})
    public void cleanup() {
        this.session().execute("truncate ints");
    }

    @Test(groups={"short"})
    public void should_iterate_single_page_result_set_asynchronously() {
        this.should_iterate_result_set_asynchronously(100, 500);
    }

    @Test(groups={"short"})
    public void should_iterate_multi_page_result_set_asynchronously() {
        this.should_iterate_result_set_asynchronously(1000, 20);
    }

    private void should_iterate_result_set_asynchronously(int totalCount, int fetchSize) {
        for (int i = 0; i < totalCount; ++i) {
            this.session().execute(String.format("insert into ints (i) values (%d)", i));
        }
        Statement statement = new SimpleStatement("select * from ints").setFetchSize(fetchSize);
        ResultsAccumulator results = new ResultsAccumulator();
        ListenableFuture future = GuavaCompatibility.INSTANCE.transformAsync((ListenableFuture)this.session().executeAsync(statement), (AsyncFunction)results);
        Futures.getUnchecked((Future)future);
        Assertions.assertThat((int)results.all.size()).isEqualTo(totalCount);
    }

    static class ResultsAccumulator
    implements AsyncFunction<ResultSet, ResultSet> {
        final Set<Integer> all = new ConcurrentSkipListSet<Integer>();

        ResultsAccumulator() {
        }

        public ListenableFuture<ResultSet> apply(ResultSet rs) throws Exception {
            boolean wasLastPage;
            int remainingInPage = rs.getAvailableWithoutFetching();
            for (Row row : rs) {
                this.all.add(row.getInt(0));
                if (--remainingInPage != 0) continue;
                break;
            }
            boolean bl = wasLastPage = rs.getExecutionInfo().getPagingState() == null;
            if (wasLastPage) {
                return Futures.immediateFuture((Object)rs);
            }
            return GuavaCompatibility.INSTANCE.transformAsync(rs.fetchMoreResults(), (AsyncFunction)this);
        }
    }
}

