/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.gax.rpc.ClientStream;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.ApiExceptionMatcher;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.wire.ConnectedCommitter;
import com.google.cloud.pubsublite.internal.wire.ConnectedCommitterImpl;
import com.google.cloud.pubsublite.internal.wire.StreamFactory;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.InitialCommitCursorRequest;
import com.google.cloud.pubsublite.proto.InitialCommitCursorResponse;
import com.google.cloud.pubsublite.proto.SequencedCommitCursorResponse;
import com.google.cloud.pubsublite.proto.StreamingCommitCursorRequest;
import com.google.cloud.pubsublite.proto.StreamingCommitCursorResponse;
import cz.o2.proxima.internal.shaded.com.google.common.base.Preconditions;
import cz.o2.proxima.internal.shaded.com.google.common.truth.Truth;
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

@RunWith(value=JUnit4.class)
public class ConnectedCommitterImplTest {
    private static final ConnectedCommitterImpl.Factory FACTORY = new ConnectedCommitterImpl.Factory();
    @Mock
    private StreamFactory<StreamingCommitCursorRequest, StreamingCommitCursorResponse> streamFactory;
    @Mock
    private ClientStream<StreamingCommitCursorRequest> mockRequestStream;
    @Mock
    private ResponseObserver<SequencedCommitCursorResponse> mockOutputStream;
    private Optional<ResponseObserver<StreamingCommitCursorResponse>> leakedResponseStream = Optional.empty();
    private ConnectedCommitter committer;

    private static StreamingCommitCursorRequest initialRequest() {
        return StreamingCommitCursorRequest.newBuilder().setInitial(InitialCommitCursorRequest.newBuilder().setSubscription(((SubscriptionPath.Builder)((SubscriptionPath.Builder)SubscriptionPath.newBuilder().setProject(ProjectNumber.of((long)12345L))).setLocation(CloudZone.of((CloudRegion)CloudRegion.of((String)"us-east1"), (char)'a'))).setName(SubscriptionName.of((String)"some_subscription")).build().toString()).setPartition(1024L)).build();
    }

    @Before
    public void setUp() throws IOException {
        MockitoAnnotations.initMocks((Object)this);
        ((StreamFactory)Mockito.doAnswer(args -> {
            Preconditions.checkArgument((!this.leakedResponseStream.isPresent() ? 1 : 0) != 0);
            ResponseObserver responseObserver = (ResponseObserver)args.getArgument(0);
            this.leakedResponseStream = Optional.of(responseObserver);
            return this.mockRequestStream;
        }).when(this.streamFactory)).New((ResponseObserver)ArgumentMatchers.any());
    }

    @After
    public void tearDown() {
        this.leakedResponseStream.ifPresent(ResponseObserver::onComplete);
    }

    private Answer<Void> AnswerWith(StreamingCommitCursorResponse response) {
        return invocation -> {
            Preconditions.checkArgument((boolean)this.leakedResponseStream.isPresent());
            this.leakedResponseStream.get().onResponse((Object)response);
            return null;
        };
    }

    private Answer<Void> AnswerWith(StreamingCommitCursorResponse.Builder response) {
        return this.AnswerWith(response.build());
    }

    private Answer<Void> AnswerWith(StatusCode.Code error) {
        return invocation -> {
            Preconditions.checkArgument((boolean)this.leakedResponseStream.isPresent());
            this.leakedResponseStream.get().onError((Throwable)new CheckedApiException((StatusCode.Code)error).underlying);
            this.leakedResponseStream = Optional.empty();
            ((ClientStream)Mockito.verify(this.mockRequestStream)).closeSendWithError((Throwable)ArgumentMatchers.argThat((ArgumentMatcher)new ApiExceptionMatcher(error)));
            ((ResponseObserver)Mockito.verify(this.mockOutputStream)).onError((Throwable)ArgumentMatchers.argThat((ArgumentMatcher)new ApiExceptionMatcher(error)));
            Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.mockOutputStream});
            return null;
        };
    }

    @Test
    public void construct_SendsInitialThenResponse() throws Exception {
        ((ClientStream)Mockito.doAnswer(this.AnswerWith(StreamingCommitCursorResponse.newBuilder().setInitial(InitialCommitCursorResponse.getDefaultInstance()))).when(this.mockRequestStream)).send((Object)ConnectedCommitterImplTest.initialRequest());
        ConnectedCommitter committer = FACTORY.New(this.streamFactory, this.mockOutputStream, ConnectedCommitterImplTest.initialRequest());
        Throwable throwable = null;
        if (committer != null) {
            if (throwable != null) {
                try {
                    committer.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
            } else {
                committer.close();
            }
        }
    }

    @Test
    public void construct_SendsInitialThenError() throws Exception {
        ((ClientStream)Mockito.doAnswer(this.AnswerWith(StatusCode.Code.INTERNAL)).when(this.mockRequestStream)).send((Object)ConnectedCommitterImplTest.initialRequest());
        ConnectedCommitter committer = FACTORY.New(this.streamFactory, this.mockOutputStream, ConnectedCommitterImplTest.initialRequest());
        Throwable throwable = null;
        if (committer != null) {
            if (throwable != null) {
                try {
                    committer.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
            } else {
                committer.close();
            }
        }
    }

    @Test
    public void construct_SendsCommitResponseError() throws Exception {
        ((ClientStream)Mockito.doAnswer(this.AnswerWith(StreamingCommitCursorResponse.newBuilder().setCommit(SequencedCommitCursorResponse.getDefaultInstance()))).when(this.mockRequestStream)).send((Object)ConnectedCommitterImplTest.initialRequest());
        try (ConnectedCommitter committer = FACTORY.New(this.streamFactory, this.mockOutputStream, ConnectedCommitterImplTest.initialRequest());){
            ((ResponseObserver)Mockito.verify(this.mockOutputStream)).onError((Throwable)ArgumentMatchers.argThat((ArgumentMatcher)new ApiExceptionMatcher(StatusCode.Code.FAILED_PRECONDITION)));
            Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.mockOutputStream});
        }
        this.leakedResponseStream = Optional.empty();
    }

    private void initialize() {
        ((ClientStream)Mockito.doAnswer(this.AnswerWith(StreamingCommitCursorResponse.newBuilder().setInitial(InitialCommitCursorResponse.getDefaultInstance()))).when(this.mockRequestStream)).send((Object)ConnectedCommitterImplTest.initialRequest());
        this.committer = FACTORY.New(this.streamFactory, this.mockOutputStream, ConnectedCommitterImplTest.initialRequest());
        ((ClientStream)Mockito.verify(this.mockRequestStream)).send((Object)ConnectedCommitterImplTest.initialRequest());
    }

    @Test
    public void responseAfterClose_Dropped() throws Exception {
        this.initialize();
        this.committer.close();
        ((ClientStream)Mockito.verify(this.mockRequestStream)).closeSend();
        this.committer.commit(Offset.of((long)10L));
        ((ResponseObserver)Mockito.verify(this.mockOutputStream, (VerificationMode)Mockito.never())).onResponse(ArgumentMatchers.any());
    }

    @Test
    public void duplicateInitial_Abort() {
        this.initialize();
        StreamingCommitCursorResponse.Builder builder = StreamingCommitCursorResponse.newBuilder();
        builder.getInitialBuilder();
        this.leakedResponseStream.get().onResponse((Object)builder.build());
        ((ResponseObserver)Mockito.verify(this.mockOutputStream)).onError((Throwable)ArgumentMatchers.argThat((ArgumentMatcher)new ApiExceptionMatcher(StatusCode.Code.FAILED_PRECONDITION)));
        this.leakedResponseStream = Optional.empty();
    }

    @Test
    public void commitRequestProxied() {
        this.initialize();
        StreamingCommitCursorRequest.Builder builder = StreamingCommitCursorRequest.newBuilder();
        builder.getCommitBuilder().setCursor(Cursor.newBuilder().setOffset(154L));
        this.committer.commit(Offset.of((long)154L));
        ((ClientStream)Mockito.verify(this.mockRequestStream)).send((Object)builder.build());
    }

    @Test
    public void commitResponseProxied() {
        this.initialize();
        this.leakedResponseStream.get().onResponse((Object)StreamingCommitCursorResponse.newBuilder().setCommit(SequencedCommitCursorResponse.newBuilder().setAcknowledgedCommits(10L)).build());
        ((ResponseObserver)Mockito.verify(this.mockOutputStream)).onResponse((Object)SequencedCommitCursorResponse.newBuilder().setAcknowledgedCommits(10L).build());
    }

    @Test
    public void receiveTimeout_closesConnection() throws Exception {
        CountDownLatch connectionClosed = new CountDownLatch(1);
        ((ResponseObserver)Mockito.doAnswer(args -> {
            connectionClosed.countDown();
            return null;
        }).when(this.mockOutputStream)).onError((Throwable)ArgumentMatchers.any());
        ((ClientStream)Mockito.doAnswer(this.AnswerWith(StreamingCommitCursorResponse.newBuilder().setInitial(InitialCommitCursorResponse.getDefaultInstance()))).when(this.mockRequestStream)).send((Object)ConnectedCommitterImplTest.initialRequest());
        this.committer = new ConnectedCommitterImpl(this.streamFactory, this.mockOutputStream, ConnectedCommitterImplTest.initialRequest(), Duration.ofMillis(100L));
        ((ClientStream)Mockito.verify(this.mockRequestStream)).send((Object)ConnectedCommitterImplTest.initialRequest());
        Truth.assertThat((Boolean)connectionClosed.await(30L, TimeUnit.SECONDS)).isTrue();
        ((ClientStream)Mockito.verify(this.mockRequestStream)).closeSendWithError((Throwable)ArgumentMatchers.argThat((ArgumentMatcher)new ApiExceptionMatcher(StatusCode.Code.ABORTED)));
        ((ResponseObserver)Mockito.verify(this.mockOutputStream)).onError((Throwable)ArgumentMatchers.argThat((ArgumentMatcher)new ApiExceptionMatcher(StatusCode.Code.ABORTED)));
        this.committer.close();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.mockRequestStream});
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.mockOutputStream});
    }

    @Test
    public void initializationTimeout_closesConnection() throws Exception {
        CountDownLatch connectionClosed = new CountDownLatch(1);
        ((ResponseObserver)Mockito.doAnswer(args -> {
            connectionClosed.countDown();
            return null;
        }).when(this.mockOutputStream)).onError((Throwable)ArgumentMatchers.any());
        this.committer = new ConnectedCommitterImpl(this.streamFactory, this.mockOutputStream, ConnectedCommitterImplTest.initialRequest(), Duration.ofMillis(100L));
        Truth.assertThat((Boolean)connectionClosed.await(30L, TimeUnit.SECONDS)).isTrue();
        ((ClientStream)Mockito.verify(this.mockRequestStream)).send((Object)ConnectedCommitterImplTest.initialRequest());
        ((ClientStream)Mockito.verify(this.mockRequestStream)).closeSendWithError((Throwable)ArgumentMatchers.argThat((ArgumentMatcher)new ApiExceptionMatcher(StatusCode.Code.ABORTED)));
        ((ResponseObserver)Mockito.verify(this.mockOutputStream)).onError((Throwable)ArgumentMatchers.argThat((ArgumentMatcher)new ApiExceptionMatcher(StatusCode.Code.ABORTED)));
        this.committer.close();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.mockRequestStream});
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.mockOutputStream});
    }
}

