/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiService;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.ApiExceptionMatcher;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.testing.RetryingConnectionHelpers;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.internal.wire.CommitterImpl;
import com.google.cloud.pubsublite.internal.wire.ConnectedCommitter;
import com.google.cloud.pubsublite.internal.wire.ConnectedCommitterFactory;
import com.google.cloud.pubsublite.internal.wire.StreamFactory;
import com.google.cloud.pubsublite.proto.InitialCommitCursorRequest;
import com.google.cloud.pubsublite.proto.SequencedCommitCursorResponse;
import com.google.cloud.pubsublite.proto.StreamingCommitCursorRequest;
import com.google.cloud.pubsublite.proto.StreamingCommitCursorResponse;
import cz.o2.proxima.internal.shaded.com.google.common.truth.Truth;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(value=JUnit4.class)
public class CommitterImplTest {
    @Mock
    private StreamFactory<StreamingCommitCursorRequest, StreamingCommitCursorResponse> unusedStreamFactory;
    @Mock
    private ConnectedCommitter mockConnectedCommitter;
    @Mock
    private ConnectedCommitterFactory mockCommitterFactory;
    private final ExecutorService executorService = Executors.newCachedThreadPool();
    private Committer committer;
    private ResponseObserver<SequencedCommitCursorResponse> leakedResponseObserver;

    private static StreamingCommitCursorRequest initialRequest() {
        return StreamingCommitCursorRequest.newBuilder().setInitial(InitialCommitCursorRequest.newBuilder().setSubscription(((SubscriptionPath.Builder)((SubscriptionPath.Builder)SubscriptionPath.newBuilder().setProject(ProjectNumber.of((long)12345L))).setLocation(CloudZone.of((CloudRegion)CloudRegion.of((String)"us-east1"), (char)'a'))).setName(SubscriptionName.of((String)"some_subscription")).build().toString()).setPartition(1024L)).build();
    }

    SequencedCommitCursorResponse ResponseWithCount(long count) {
        return SequencedCommitCursorResponse.newBuilder().setAcknowledgedCommits(count).build();
    }

    @Before
    public void setUp() throws CheckedApiException {
        MockitoAnnotations.initMocks((Object)this);
        ((ConnectedCommitterFactory)Mockito.doAnswer(args -> {
            this.leakedResponseObserver = (ResponseObserver)args.getArgument(1);
            return this.mockConnectedCommitter;
        }).when((Object)this.mockCommitterFactory)).New((StreamFactory)ArgumentMatchers.any(), (ResponseObserver)ArgumentMatchers.any(), ArgumentMatchers.eq((Object)CommitterImplTest.initialRequest()));
        this.committer = new CommitterImpl(this.unusedStreamFactory, this.mockCommitterFactory, CommitterImplTest.initialRequest().getInitial());
        this.committer.startAsync().awaitRunning();
        ((ConnectedCommitterFactory)Mockito.verify((Object)this.mockCommitterFactory)).New((StreamFactory)ArgumentMatchers.any(), (ResponseObserver)ArgumentMatchers.any(), ArgumentMatchers.eq((Object)CommitterImplTest.initialRequest()));
    }

    @Test
    public void construct_CallsFactoryNew() {
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.mockCommitterFactory});
        Mockito.verifyNoInteractions((Object[])new Object[]{this.mockConnectedCommitter});
    }

    @Test
    public void stopWaitsForCommit() throws Exception {
        Offset commitOffset = Offset.of((long)101L);
        ApiFuture future = this.committer.commitOffset(commitOffset);
        ((ConnectedCommitter)Mockito.verify((Object)this.mockConnectedCommitter)).commit(commitOffset);
        CountDownLatch latch = new CountDownLatch(1);
        Future<?> closeFuture = this.executorService.submit(() -> {
            latch.countDown();
            this.committer.stopAsync();
            this.committer.awaitTerminated();
            Truth.assertThat((Boolean)future.isDone()).isTrue();
        });
        latch.await();
        Thread.sleep(100L);
        Truth.assertThat((Boolean)future.isDone()).isFalse();
        this.leakedResponseObserver.onResponse((Object)this.ResponseWithCount(1L));
        closeFuture.get();
        ApiExceptionMatcher.assertFutureThrowsCode(this.committer.commitOffset(Offset.of((long)1L)), StatusCode.Code.FAILED_PRECONDITION);
    }

    @Test
    public void responseMoreThanSentError() throws Exception {
        Future<Void> failed = RetryingConnectionHelpers.whenFailed((ApiService)this.committer);
        ApiFuture future = this.committer.commitOffset(Offset.of((long)10L));
        this.leakedResponseObserver.onResponse((Object)this.ResponseWithCount(2L));
        failed.get();
        ApiExceptionMatcher.assertThrowableMatches(this.committer.failureCause(), StatusCode.Code.FAILED_PRECONDITION);
        ApiExceptionMatcher.assertFutureThrowsCode(future, StatusCode.Code.FAILED_PRECONDITION);
    }

    @Test
    public void multipleSentCompletedInOrder() {
        ApiFuture future1 = this.committer.commitOffset(Offset.of((long)10L));
        ApiFuture future2 = this.committer.commitOffset(Offset.of((long)1L));
        ApiFuture future3 = this.committer.commitOffset(Offset.of((long)87L));
        Truth.assertThat((Boolean)future1.isDone()).isFalse();
        Truth.assertThat((Boolean)future2.isDone()).isFalse();
        Truth.assertThat((Boolean)future3.isDone()).isFalse();
        this.leakedResponseObserver.onResponse((Object)this.ResponseWithCount(1L));
        Truth.assertThat((Boolean)future1.isDone()).isTrue();
        Truth.assertThat((Boolean)future2.isDone()).isFalse();
        Truth.assertThat((Boolean)future3.isDone()).isFalse();
        this.leakedResponseObserver.onResponse((Object)this.ResponseWithCount(2L));
        Truth.assertThat((Boolean)future2.isDone()).isTrue();
        Truth.assertThat((Boolean)future3.isDone()).isTrue();
        Truth.assertThat((Boolean)this.committer.isRunning()).isTrue();
    }

    @Test
    public void stopInCommitCallback() throws Exception {
        ApiFuture future = this.committer.commitOffset(Offset.of((long)10L));
        Future<Void> failed = RetryingConnectionHelpers.whenFailed((ApiService)this.committer);
        this.leakedResponseObserver.onError((Throwable)new CheckedApiException((StatusCode.Code)StatusCode.Code.FAILED_PRECONDITION).underlying);
        ApiExceptionMatcher.assertFutureThrowsCode(future, StatusCode.Code.FAILED_PRECONDITION);
        failed.get();
        ApiExceptionMatcher.assertThrowableMatches(this.committer.failureCause(), StatusCode.Code.FAILED_PRECONDITION);
    }

    @Test
    public void waitUntilEmptyReturnsWhenEmpty() throws Exception {
        ApiFuture future1 = this.committer.commitOffset(Offset.of((long)10L));
        ApiFuture future2 = this.committer.commitOffset(Offset.of((long)1L));
        Future<?> waitFuture = this.executorService.submit(() -> {
            try {
                this.committer.waitUntilEmpty();
            }
            catch (Throwable e) {
                throw new IllegalStateException(e);
            }
        });
        Truth.assertThat((Boolean)waitFuture.isDone()).isFalse();
        this.leakedResponseObserver.onResponse((Object)this.ResponseWithCount(1L));
        Truth.assertThat((Boolean)waitFuture.isDone()).isFalse();
        Truth.assertThat((Boolean)future1.isDone()).isTrue();
        Truth.assertThat((Boolean)future2.isDone()).isFalse();
        this.leakedResponseObserver.onResponse((Object)this.ResponseWithCount(1L));
        Truth.assertThat((Boolean)future1.isDone()).isTrue();
        Truth.assertThat((Boolean)future2.isDone()).isTrue();
        waitFuture.get(30L, TimeUnit.SECONDS);
    }

    @Test
    public void waitUntilEmptyReturnsOnShutdown() throws Exception {
        ApiFuture unusedFuture = this.committer.commitOffset(Offset.of((long)10L));
        CountDownLatch latch = new CountDownLatch(1);
        Future<?> waitFuture = this.executorService.submit(() -> {
            try {
                latch.await();
                this.committer.waitUntilEmpty();
            }
            catch (Throwable e) {
                throw new IllegalStateException(e);
            }
        });
        Truth.assertThat((Boolean)waitFuture.isDone()).isFalse();
        Future<?> stopFuture = this.executorService.submit(() -> {
            latch.countDown();
            this.committer.stopAsync().awaitTerminated();
        });
        this.leakedResponseObserver.onResponse((Object)this.ResponseWithCount(1L));
        stopFuture.get(30L, TimeUnit.SECONDS);
        waitFuture.get(30L, TimeUnit.SECONDS);
    }

    @Test
    public void waitUntilEmptyThrowsOnPermanentError() throws Exception {
        ApiFuture unusedFuture = this.committer.commitOffset(Offset.of((long)10L));
        Future<CheckedApiException> waitFuture = this.executorService.submit(() -> (CheckedApiException)Assert.assertThrows(CheckedApiException.class, () -> this.committer.waitUntilEmpty()));
        Truth.assertThat((Boolean)waitFuture.isDone()).isFalse();
        Future<Void> failed = RetryingConnectionHelpers.whenFailed((ApiService)this.committer);
        this.leakedResponseObserver.onResponse((Object)this.ResponseWithCount(2L));
        failed.get(30L, TimeUnit.SECONDS);
        waitFuture.get(30L, TimeUnit.SECONDS);
    }
}

