/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.spanner;

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.ExecutorProvider;
import com.google.cloud.spanner.AsyncResultSet;
import com.google.cloud.spanner.AsyncResultSetImpl;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerApiFutures;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Struct;
import com.google.common.base.Function;
import com.google.common.collect.Range;
import com.google.common.truth.Truth;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

@RunWith(value=JUnit4.class)
public class AsyncResultSetImplTest {
    private ExecutorProvider mockedProvider;
    private ExecutorProvider simpleProvider;

    @Before
    public void setup() {
        this.mockedProvider = (ExecutorProvider)Mockito.mock(ExecutorProvider.class);
        Mockito.when((Object)this.mockedProvider.getExecutor()).thenReturn((Object)((ScheduledExecutorService)Mockito.mock(ScheduledExecutorService.class)));
        this.simpleProvider = SpannerOptions.createAsyncExecutorProvider((int)1, (long)1L, (TimeUnit)TimeUnit.SECONDS);
    }

    @Test
    public void close() {
        AsyncResultSetImpl rs = new AsyncResultSetImpl(this.mockedProvider, (ResultSet)Mockito.mock(ResultSet.class), 10);
        rs.close();
        rs.close();
        Assert.assertThrows(IllegalStateException.class, () -> rs.setCallback((Executor)Mockito.mock(Executor.class), (AsyncResultSet.ReadyCallback)Mockito.mock(AsyncResultSet.ReadyCallback.class)));
        Assert.assertThrows(IllegalStateException.class, () -> rs.toList((Function)Mockito.mock(Function.class)));
        Assert.assertThrows(IllegalStateException.class, () -> rs.toListAsync((Function)Mockito.mock(Function.class), (Executor)Mockito.mock(Executor.class)));
        AsyncResultSetImpl rs2 = new AsyncResultSetImpl(this.mockedProvider, (ResultSet)Mockito.mock(ResultSet.class), 10);
        rs2.setCallback((Executor)Mockito.mock(Executor.class), (AsyncResultSet.ReadyCallback)Mockito.mock(AsyncResultSet.ReadyCallback.class));
        rs2.close();
        rs2.cancel();
        rs2.resume();
    }

    @Test
    public void tryNextNotAllowed() {
        try (AsyncResultSetImpl rs = new AsyncResultSetImpl(this.mockedProvider, (ResultSet)Mockito.mock(ResultSet.class), 10);){
            rs.setCallback((Executor)Mockito.mock(Executor.class), (AsyncResultSet.ReadyCallback)Mockito.mock(AsyncResultSet.ReadyCallback.class));
            IllegalStateException e = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> rs.tryNext());
            Truth.assertThat((String)e.getMessage()).contains((CharSequence)"tryNext may only be called from a DataReady callback.");
        }
    }

    @Test
    public void toList() {
        ResultSet delegate = (ResultSet)Mockito.mock(ResultSet.class);
        Mockito.when((Object)delegate.next()).thenReturn((Object)true, (Object[])new Boolean[]{true, true, false});
        Mockito.when((Object)delegate.getCurrentRowAsStruct()).thenReturn((Object)((Struct)Mockito.mock(Struct.class)));
        try (AsyncResultSetImpl rs = new AsyncResultSetImpl(this.simpleProvider, delegate, 10);){
            List list = rs.toList(ignored -> new Object());
            Truth.assertThat((Iterable)list).hasSize(3);
        }
    }

    @Test
    public void toListPropagatesError() {
        ResultSet delegate = (ResultSet)Mockito.mock(ResultSet.class);
        Mockito.when((Object)delegate.next()).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException((ErrorCode)ErrorCode.INVALID_ARGUMENT, (String)"invalid query")});
        try (AsyncResultSetImpl rs = new AsyncResultSetImpl(this.simpleProvider, delegate, 10);){
            SpannerException e = (SpannerException)Assert.assertThrows(SpannerException.class, () -> rs.toList(ignored -> new Object()));
            Truth.assertThat((Comparable)e.getErrorCode()).isEqualTo((Object)ErrorCode.INVALID_ARGUMENT);
            Truth.assertThat((String)e.getMessage()).contains((CharSequence)"invalid query");
        }
    }

    @Test
    public void toListAsync() throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(1);
        ResultSet delegate = (ResultSet)Mockito.mock(ResultSet.class);
        Mockito.when((Object)delegate.next()).thenReturn((Object)true, (Object[])new Boolean[]{true, true, false});
        Mockito.when((Object)delegate.getCurrentRowAsStruct()).thenReturn((Object)((Struct)Mockito.mock(Struct.class)));
        try (AsyncResultSetImpl rs = new AsyncResultSetImpl(this.simpleProvider, delegate, 10);){
            ApiFuture future = rs.toListAsync(ignored -> new Object(), (Executor)executor);
            Truth.assertThat((Iterable)((Iterable)future.get())).hasSize(3);
        }
        executor.shutdown();
    }

    @Test
    public void toListAsyncPropagatesError() throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(1);
        ResultSet delegate = (ResultSet)Mockito.mock(ResultSet.class);
        Mockito.when((Object)delegate.next()).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException((ErrorCode)ErrorCode.INVALID_ARGUMENT, (String)"invalid query")});
        try (AsyncResultSetImpl rs = new AsyncResultSetImpl(this.simpleProvider, delegate, 10);){
            ExecutionException e = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> rs.toListAsync(ignored -> new Object(), (Executor)executor).get());
            Truth.assertThat((Throwable)e.getCause()).isInstanceOf(SpannerException.class);
            SpannerException se = (SpannerException)e.getCause();
            Truth.assertThat((Comparable)se.getErrorCode()).isEqualTo((Object)ErrorCode.INVALID_ARGUMENT);
            Truth.assertThat((String)se.getMessage()).contains((CharSequence)"invalid query");
        }
        executor.shutdown();
    }

    @Test
    public void withCallback() throws InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        ResultSet delegate = (ResultSet)Mockito.mock(ResultSet.class);
        Mockito.when((Object)delegate.next()).thenReturn((Object)true, (Object[])new Boolean[]{true, true, false});
        Mockito.when((Object)delegate.getCurrentRowAsStruct()).thenReturn((Object)((Struct)Mockito.mock(Struct.class)));
        AtomicInteger callbackCounter = new AtomicInteger();
        AtomicInteger rowCounter = new AtomicInteger();
        CountDownLatch finishedLatch = new CountDownLatch(1);
        try (AsyncResultSetImpl rs = new AsyncResultSetImpl(this.simpleProvider, delegate, 10);){
            rs.setCallback((Executor)executor, resultSet -> {
                AsyncResultSet.CursorState state;
                callbackCounter.incrementAndGet();
                while ((state = resultSet.tryNext()) == AsyncResultSet.CursorState.OK) {
                    rowCounter.incrementAndGet();
                }
                if (state == AsyncResultSet.CursorState.DONE) {
                    finishedLatch.countDown();
                }
                return AsyncResultSet.CallbackResponse.CONTINUE;
            });
        }
        finishedLatch.await();
        Truth.assertThat((Integer)callbackCounter.get()).isIn(Range.closed((Comparable)Integer.valueOf(1), (Comparable)Integer.valueOf(4)));
        Truth.assertThat((Integer)rowCounter.get()).isEqualTo((Object)3);
    }

    @Test
    public void callbackReceivesError() throws InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        ResultSet delegate = (ResultSet)Mockito.mock(ResultSet.class);
        Mockito.when((Object)delegate.next()).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException((ErrorCode)ErrorCode.INVALID_ARGUMENT, (String)"invalid query")});
        LinkedBlockingDeque receivedErr = new LinkedBlockingDeque(1);
        try (AsyncResultSetImpl rs = new AsyncResultSetImpl(this.simpleProvider, delegate, 10);){
            rs.setCallback((Executor)executor, resultSet -> {
                try {
                    resultSet.tryNext();
                    receivedErr.push(new Exception("missing expected exception"));
                }
                catch (SpannerException e) {
                    receivedErr.push(e);
                }
                return AsyncResultSet.CallbackResponse.DONE;
            });
        }
        Exception e = (Exception)receivedErr.take();
        Truth.assertThat((Throwable)e).isInstanceOf(SpannerException.class);
        SpannerException se = (SpannerException)((Object)e);
        Truth.assertThat((Comparable)se.getErrorCode()).isEqualTo((Object)ErrorCode.INVALID_ARGUMENT);
        Truth.assertThat((String)se.getMessage()).contains((CharSequence)"invalid query");
    }

    @Test
    public void callbackReceivesErrorHalfwayThrough() throws InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        ResultSet delegate = (ResultSet)Mockito.mock(ResultSet.class);
        Mockito.when((Object)delegate.next()).thenReturn((Object)true).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException((ErrorCode)ErrorCode.INVALID_ARGUMENT, (String)"invalid query")});
        Mockito.when((Object)delegate.getCurrentRowAsStruct()).thenReturn((Object)((Struct)Mockito.mock(Struct.class)));
        AtomicInteger rowCount = new AtomicInteger();
        LinkedBlockingDeque receivedErr = new LinkedBlockingDeque(1);
        try (AsyncResultSetImpl rs = new AsyncResultSetImpl(this.simpleProvider, delegate, 10);){
            rs.setCallback((Executor)executor, resultSet -> {
                try {
                    if (resultSet.tryNext() != AsyncResultSet.CursorState.DONE) {
                        rowCount.incrementAndGet();
                        return AsyncResultSet.CallbackResponse.CONTINUE;
                    }
                }
                catch (SpannerException e) {
                    receivedErr.push(e);
                }
                return AsyncResultSet.CallbackResponse.DONE;
            });
        }
        Exception e = (Exception)receivedErr.take();
        Truth.assertThat((Throwable)e).isInstanceOf(SpannerException.class);
        SpannerException se = (SpannerException)((Object)e);
        Truth.assertThat((Comparable)se.getErrorCode()).isEqualTo((Object)ErrorCode.INVALID_ARGUMENT);
        Truth.assertThat((String)se.getMessage()).contains((CharSequence)"invalid query");
        Truth.assertThat((Integer)rowCount.get()).isEqualTo((Object)1);
    }

    @Test
    public void pauseResume() throws InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        ResultSet delegate = (ResultSet)Mockito.mock(ResultSet.class);
        Mockito.when((Object)delegate.next()).thenReturn((Object)true, (Object[])new Boolean[]{true, true, false});
        Mockito.when((Object)delegate.getCurrentRowAsStruct()).thenReturn((Object)((Struct)Mockito.mock(Struct.class)));
        AtomicInteger callbackCounter = new AtomicInteger();
        LinkedBlockingDeque queue = new LinkedBlockingDeque(1);
        AtomicBoolean finished = new AtomicBoolean(false);
        try (AsyncResultSetImpl rs = new AsyncResultSetImpl(this.simpleProvider, delegate, 10);){
            rs.setCallback((Executor)executor, resultSet -> {
                callbackCounter.incrementAndGet();
                AsyncResultSet.CursorState state = resultSet.tryNext();
                if (state == AsyncResultSet.CursorState.OK) {
                    try {
                        queue.put(new Object());
                    }
                    catch (InterruptedException e) {
                        return AsyncResultSet.CallbackResponse.DONE;
                    }
                    return AsyncResultSet.CallbackResponse.PAUSE;
                }
                finished.set(true);
                return AsyncResultSet.CallbackResponse.DONE;
            });
            int rowCounter = 0;
            while (!finished.get()) {
                Object o = queue.poll(1L, TimeUnit.MILLISECONDS);
                if (o != null) {
                    ++rowCounter;
                }
                rs.resume();
            }
            Truth.assertThat((Integer)callbackCounter.get()).isEqualTo((Object)4);
            Truth.assertThat((Integer)rowCounter).isEqualTo((Object)3);
        }
    }

    @Test
    public void testCallbackIsNotCalledWhilePaused() throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        int simulatedRows = 100;
        ResultSet delegate = (ResultSet)Mockito.mock(ResultSet.class);
        Mockito.when((Object)delegate.next()).thenAnswer((Answer)new Answer<Boolean>(){
            int row = 0;

            public Boolean answer(InvocationOnMock invocation) throws Throwable {
                ++this.row;
                if (this.row > 100) {
                    return false;
                }
                return true;
            }
        });
        Mockito.when((Object)delegate.getCurrentRowAsStruct()).thenReturn((Object)((Struct)Mockito.mock(Struct.class)));
        AtomicInteger callbackCounter = new AtomicInteger();
        LinkedBlockingDeque queue = new LinkedBlockingDeque(1);
        AtomicBoolean paused = new AtomicBoolean();
        try (AsyncResultSetImpl rs = new AsyncResultSetImpl(this.simpleProvider, delegate, 10);){
            ApiFuture callbackResult = rs.setCallback((Executor)executor, resultSet -> {
                Assert.assertFalse((boolean)paused.get());
                callbackCounter.incrementAndGet();
                try {
                    while (true) {
                        switch (resultSet.tryNext()) {
                            case OK: {
                                paused.set(true);
                                queue.put(new Object());
                                return AsyncResultSet.CallbackResponse.PAUSE;
                            }
                            case DONE: {
                                return AsyncResultSet.CallbackResponse.DONE;
                            }
                            case NOT_READY: {
                                return AsyncResultSet.CallbackResponse.CONTINUE;
                            }
                        }
                    }
                }
                catch (InterruptedException e) {
                    throw SpannerExceptionFactory.propagateInterrupt((InterruptedException)e);
                }
            });
            int rowCounter = 0;
            while (!callbackResult.isDone()) {
                Object o = queue.poll(1L, TimeUnit.MILLISECONDS);
                if (o != null) {
                    ++rowCounter;
                }
                Thread.yield();
                paused.set(false);
                rs.resume();
            }
            while (queue.poll() != null) {
                ++rowCounter;
            }
            Assert.assertNull((Object)callbackResult.get());
            Truth.assertThat((Integer)callbackCounter.get()).isEqualTo((Object)101);
            Truth.assertThat((Integer)rowCounter).isEqualTo((Object)100);
        }
    }

    @Test
    public void testCallbackIsNotCalledWhilePausedAndCanceled() throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        ResultSet delegate = (ResultSet)Mockito.mock(ResultSet.class);
        AtomicInteger callbackCounter = new AtomicInteger();
        try (AsyncResultSetImpl rs = new AsyncResultSetImpl(this.simpleProvider, delegate, 10);){
            ApiFuture callbackResult = rs.setCallback((Executor)executor, resultSet -> {
                callbackCounter.getAndIncrement();
                return AsyncResultSet.CallbackResponse.PAUSE;
            });
            rs.cancel();
            SpannerException exception = (SpannerException)Assert.assertThrows(SpannerException.class, () -> SpannerApiFutures.get((ApiFuture)callbackResult));
            Assert.assertEquals((Object)ErrorCode.CANCELLED, (Object)exception.getErrorCode());
            Assert.assertEquals((long)1L, (long)callbackCounter.get());
        }
    }

    @Test
    public void cancel() throws InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        ResultSet delegate = (ResultSet)Mockito.mock(ResultSet.class);
        Mockito.when((Object)delegate.next()).thenReturn((Object)true, (Object[])new Boolean[]{true, true, false});
        Mockito.when((Object)delegate.getCurrentRowAsStruct()).thenReturn((Object)((Struct)Mockito.mock(Struct.class)));
        AtomicInteger callbackCounter = new AtomicInteger();
        LinkedBlockingDeque queue = new LinkedBlockingDeque(1);
        AtomicBoolean finished = new AtomicBoolean(false);
        try (AsyncResultSetImpl rs = new AsyncResultSetImpl(this.simpleProvider, delegate, 10);){
            rs.setCallback((Executor)executor, resultSet -> {
                callbackCounter.incrementAndGet();
                try {
                    AsyncResultSet.CursorState state = resultSet.tryNext();
                    if (state == AsyncResultSet.CursorState.OK) {
                        try {
                            queue.put(new Object());
                        }
                        catch (InterruptedException e) {
                            return AsyncResultSet.CallbackResponse.DONE;
                        }
                    }
                    return callbackCounter.get() == 2 ? AsyncResultSet.CallbackResponse.PAUSE : AsyncResultSet.CallbackResponse.CONTINUE;
                }
                catch (SpannerException e) {
                    if (e.getErrorCode() == ErrorCode.CANCELLED) {
                        finished.set(true);
                    }
                    return AsyncResultSet.CallbackResponse.DONE;
                }
            });
            int rowCounter = 0;
            while (!finished.get()) {
                Object o = queue.poll(1L, TimeUnit.MILLISECONDS);
                if (o != null) {
                    ++rowCounter;
                }
                if (rowCounter != 2) continue;
                rs.cancel();
                rs.resume();
            }
            Truth.assertThat((Integer)callbackCounter.get()).isIn(Range.closed((Comparable)Integer.valueOf(2), (Comparable)Integer.valueOf(4)));
            Truth.assertThat((Integer)rowCounter).isIn(Range.closed((Comparable)Integer.valueOf(2), (Comparable)Integer.valueOf(3)));
        }
    }

    @Test
    public void callbackReturnsError() throws InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        ResultSet delegate = (ResultSet)Mockito.mock(ResultSet.class);
        Mockito.when((Object)delegate.next()).thenReturn((Object)true, (Object[])new Boolean[]{true, true, false});
        Mockito.when((Object)delegate.getCurrentRowAsStruct()).thenReturn((Object)((Struct)Mockito.mock(Struct.class)));
        AtomicInteger callbackCounter = new AtomicInteger();
        try (AsyncResultSetImpl rs = new AsyncResultSetImpl(this.simpleProvider, delegate, 10);){
            rs.setCallback((Executor)executor, resultSet -> {
                callbackCounter.incrementAndGet();
                throw new RuntimeException("async test");
            });
            ExecutionException e = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> rs.getResult().get());
            Truth.assertThat((Throwable)e.getCause()).isInstanceOf(SpannerException.class);
            SpannerException se = (SpannerException)e.getCause();
            Truth.assertThat((Comparable)se.getErrorCode()).isEqualTo((Object)ErrorCode.UNKNOWN);
            Truth.assertThat((String)se.getMessage()).contains((CharSequence)"async test");
            Truth.assertThat((Integer)callbackCounter.get()).isEqualTo((Object)1);
        }
    }

    @Test
    public void callbackReturnsDoneBeforeEnd_shouldStopIteration() throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        ResultSet delegate = (ResultSet)Mockito.mock(ResultSet.class);
        Mockito.when((Object)delegate.next()).thenReturn((Object)true, (Object[])new Boolean[]{true, true, false});
        Mockito.when((Object)delegate.getCurrentRowAsStruct()).thenReturn((Object)((Struct)Mockito.mock(Struct.class)));
        try (AsyncResultSetImpl rs = new AsyncResultSetImpl(this.simpleProvider, delegate, 10);){
            rs.setCallback((Executor)executor, ignored -> AsyncResultSet.CallbackResponse.DONE);
            rs.getResult().get(10L, TimeUnit.SECONDS);
        }
    }
}

