/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.gax.rpc.ClientStream;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.ApiExceptionMatcher;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriberImpl;
import com.google.cloud.pubsublite.internal.wire.StreamFactories;
import com.google.cloud.pubsublite.internal.wire.StreamFactory;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.InitialSubscribeRequest;
import com.google.cloud.pubsublite.proto.InitialSubscribeResponse;
import com.google.cloud.pubsublite.proto.MessageResponse;
import com.google.cloud.pubsublite.proto.SeekResponse;
import com.google.cloud.pubsublite.proto.SubscribeRequest;
import com.google.cloud.pubsublite.proto.SubscribeResponse;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import cz.o2.proxima.internal.shaded.com.google.common.base.Preconditions;
import cz.o2.proxima.internal.shaded.com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

@RunWith(value=JUnit4.class)
public class ConnectedSubscriberImplTest {
    private static final ConnectedSubscriberImpl.Factory FACTORY = new ConnectedSubscriberImpl.Factory();
    private static final Offset INITIAL_OFFSET = Offset.of((long)9000L);
    @Mock
    StreamFactories.SubscribeStreamFactory streamFactory;
    @Mock
    private ClientStream<SubscribeRequest> mockRequestStream;
    @Mock
    private ResponseObserver<List<SequencedMessage>> mockOutputStream;
    private Optional<ResponseObserver<SubscribeResponse>> leakedResponseStream = Optional.empty();
    private ConnectedSubscriberImpl subscriber;

    private static SubscribeRequest initialRequest() {
        return SubscribeRequest.newBuilder().setInitial(InitialSubscribeRequest.newBuilder().setSubscription(((SubscriptionPath.Builder)((SubscriptionPath.Builder)SubscriptionPath.newBuilder().setProject(ProjectNumber.of((long)12345L))).setLocation(CloudZone.of((CloudRegion)CloudRegion.of((String)"us-east1"), (char)'a'))).setName(SubscriptionName.of((String)"some_subscription")).build().toString()).setPartition(1024L)).build();
    }

    @Before
    public void setUp() throws IOException {
        MockitoAnnotations.initMocks((Object)this);
        Mockito.when((Object)this.streamFactory.New((ResponseObserver)ArgumentMatchers.any())).then(args -> {
            Preconditions.checkArgument((!this.leakedResponseStream.isPresent() ? 1 : 0) != 0);
            ResponseObserver responseObserver = (ResponseObserver)args.getArgument(0);
            this.leakedResponseStream = Optional.of(responseObserver);
            return this.mockRequestStream;
        });
    }

    @After
    public void tearDown() {
        if (this.leakedResponseStream.isPresent()) {
            this.leakedResponseStream.get().onComplete();
        }
    }

    private Answer<Void> AnswerWith(SubscribeResponse response) {
        return invocation -> {
            Preconditions.checkArgument((boolean)this.leakedResponseStream.isPresent());
            this.leakedResponseStream.get().onResponse((Object)response);
            ((ClientStream)Mockito.verify(this.mockRequestStream)).send((Object)ConnectedSubscriberImplTest.initialRequest());
            return null;
        };
    }

    private Answer<Void> AnswerWith(SubscribeResponse.Builder response) {
        return this.AnswerWith(response.build());
    }

    private Answer<Void> AnswerWith(StatusCode.Code error) {
        return invocation -> {
            Preconditions.checkArgument((boolean)this.leakedResponseStream.isPresent());
            this.leakedResponseStream.get().onError((Throwable)new CheckedApiException((StatusCode.Code)error).underlying);
            this.leakedResponseStream = Optional.empty();
            ((ClientStream)Mockito.verify(this.mockRequestStream)).closeSendWithError((Throwable)ArgumentMatchers.argThat((ArgumentMatcher)new ApiExceptionMatcher(error)));
            ((ResponseObserver)Mockito.verify(this.mockOutputStream)).onError((Throwable)ArgumentMatchers.argThat((ArgumentMatcher)new ApiExceptionMatcher(error)));
            Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.mockOutputStream});
            return null;
        };
    }

    @Test
    public void construct_SendsInitialThenResponse() {
        ((ClientStream)Mockito.doAnswer(this.AnswerWith(SubscribeResponse.newBuilder().setInitial(InitialSubscribeResponse.getDefaultInstance()))).when(this.mockRequestStream)).send((Object)ConnectedSubscriberImplTest.initialRequest());
        ConnectedSubscriberImpl subscriber = FACTORY.New((StreamFactory)this.streamFactory, this.mockOutputStream, ConnectedSubscriberImplTest.initialRequest());
        Throwable throwable = null;
        if (subscriber != null) {
            if (throwable != null) {
                try {
                    subscriber.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
            } else {
                subscriber.close();
            }
        }
    }

    @Test
    public void construct_SendsInitialThenError() {
        ((ClientStream)Mockito.doAnswer(this.AnswerWith(StatusCode.Code.INTERNAL)).when(this.mockRequestStream)).send((Object)ConnectedSubscriberImplTest.initialRequest());
        ConnectedSubscriberImpl subscriber = FACTORY.New((StreamFactory)this.streamFactory, this.mockOutputStream, ConnectedSubscriberImplTest.initialRequest());
        Throwable throwable = null;
        if (subscriber != null) {
            if (throwable != null) {
                try {
                    subscriber.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
            } else {
                subscriber.close();
            }
        }
    }

    @Test
    public void construct_SendsMessageResponseError() {
        ((ClientStream)Mockito.doAnswer(this.AnswerWith(SubscribeResponse.newBuilder().setMessages(MessageResponse.getDefaultInstance()))).when(this.mockRequestStream)).send((Object)ConnectedSubscriberImplTest.initialRequest());
        try (ConnectedSubscriberImpl subscriber = FACTORY.New((StreamFactory)this.streamFactory, this.mockOutputStream, ConnectedSubscriberImplTest.initialRequest());){
            ((ResponseObserver)Mockito.verify(this.mockOutputStream)).onError((Throwable)ArgumentMatchers.argThat((ArgumentMatcher)new ApiExceptionMatcher(StatusCode.Code.FAILED_PRECONDITION)));
            Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.mockOutputStream});
        }
        this.leakedResponseStream = Optional.empty();
    }

    @Test
    public void construct_SendsSeekResponseError() {
        ((ClientStream)Mockito.doAnswer(this.AnswerWith(SubscribeResponse.newBuilder().setSeek(SeekResponse.getDefaultInstance()))).when(this.mockRequestStream)).send((Object)ConnectedSubscriberImplTest.initialRequest());
        try (ConnectedSubscriberImpl subscriber = FACTORY.New((StreamFactory)this.streamFactory, this.mockOutputStream, ConnectedSubscriberImplTest.initialRequest());){
            ((ResponseObserver)Mockito.verify(this.mockOutputStream)).onError((Throwable)ArgumentMatchers.argThat((ArgumentMatcher)new ApiExceptionMatcher(StatusCode.Code.FAILED_PRECONDITION)));
            Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.mockOutputStream});
        }
        this.leakedResponseStream = Optional.empty();
    }

    private void initialize() {
        ((ClientStream)Mockito.doAnswer(this.AnswerWith(SubscribeResponse.newBuilder().setInitial(InitialSubscribeResponse.newBuilder().setCursor(Cursor.newBuilder().setOffset(INITIAL_OFFSET.value()))))).when(this.mockRequestStream)).send((Object)ConnectedSubscriberImplTest.initialRequest());
        this.subscriber = FACTORY.New((StreamFactory)this.streamFactory, this.mockOutputStream, ConnectedSubscriberImplTest.initialRequest());
    }

    @Test
    public void responseAfterClose_Dropped() {
        this.initialize();
        this.subscriber.close();
        ((ClientStream)Mockito.verify(this.mockRequestStream)).closeSend();
        this.leakedResponseStream.get().onResponse((Object)SubscribeResponse.newBuilder().setMessages(MessageResponse.newBuilder().addMessages(this.messageWithOffset(Offset.of((long)20L)).toProto())).build());
        ((ResponseObserver)Mockito.verify(this.mockOutputStream, (VerificationMode)Mockito.never())).onResponse(ArgumentMatchers.any());
    }

    @Test
    public void duplicateInitial_Abort() {
        this.initialize();
        SubscribeResponse.Builder builder = SubscribeResponse.newBuilder().setInitial(InitialSubscribeResponse.getDefaultInstance());
        this.leakedResponseStream.get().onResponse((Object)builder.build());
        ((ResponseObserver)Mockito.verify(this.mockOutputStream)).onError((Throwable)ArgumentMatchers.argThat((ArgumentMatcher)new ApiExceptionMatcher(StatusCode.Code.FAILED_PRECONDITION)));
        this.leakedResponseStream = Optional.empty();
    }

    @Test
    public void emptyMessagesResponse_Abort() {
        this.initialize();
        SubscribeResponse.Builder builder = SubscribeResponse.newBuilder().setMessages(MessageResponse.getDefaultInstance());
        this.leakedResponseStream.get().onResponse((Object)builder.build());
        ((ResponseObserver)Mockito.verify(this.mockOutputStream)).onError((Throwable)ArgumentMatchers.argThat((ArgumentMatcher)new ApiExceptionMatcher(StatusCode.Code.FAILED_PRECONDITION)));
        this.leakedResponseStream = Optional.empty();
    }

    private SequencedMessage messageWithOffset(Offset offset) {
        return SequencedMessage.of((Message)Message.builder().setData(ByteString.copyFromUtf8((String)"abc")).build(), (Timestamp)Timestamps.EPOCH, (Offset)offset, (long)123L);
    }

    @Test
    public void outOfOrderMessagesResponse_Abort() {
        this.initialize();
        SubscribeResponse.Builder builder = SubscribeResponse.newBuilder();
        builder.getMessagesBuilder().addMessages(this.messageWithOffset(Offset.of((long)10L)).toProto());
        builder.getMessagesBuilder().addMessages(this.messageWithOffset(Offset.of((long)10L)).toProto());
        this.leakedResponseStream.get().onResponse((Object)builder.build());
        ((ResponseObserver)Mockito.verify(this.mockOutputStream)).onError((Throwable)ArgumentMatchers.argThat((ArgumentMatcher)new ApiExceptionMatcher(StatusCode.Code.FAILED_PRECONDITION)));
        this.leakedResponseStream = Optional.empty();
    }

    @Test
    public void validMessagesResponse() {
        this.initialize();
        SubscribeResponse.Builder builder = SubscribeResponse.newBuilder();
        builder.getMessagesBuilder().addMessages(this.messageWithOffset(Offset.of((long)10L)).toProto());
        builder.getMessagesBuilder().addMessages(this.messageWithOffset(Offset.of((long)11L)).toProto());
        this.leakedResponseStream.get().onResponse((Object)builder.build());
        ((ResponseObserver)Mockito.verify(this.mockOutputStream)).onResponse((Object)ImmutableList.of((Object)this.messageWithOffset(Offset.of((long)10L)), (Object)this.messageWithOffset(Offset.of((long)11L))));
    }

    @Test
    public void allowFlowRequest() {
        this.initialize();
        FlowControlRequest request = FlowControlRequest.newBuilder().setAllowedBytes(2L).setAllowedMessages(3L).build();
        this.subscriber.allowFlow(request);
        ((ClientStream)Mockito.verify(this.mockRequestStream)).send((Object)SubscribeRequest.newBuilder().setFlowControl(request).build());
    }

    @Test
    public void seekResponse_Aborts() {
        this.initialize();
        this.leakedResponseStream.get().onResponse((Object)SubscribeResponse.newBuilder().setSeek(SeekResponse.newBuilder().setCursor(Cursor.newBuilder().setOffset(10L))).build());
        ((ResponseObserver)Mockito.verify(this.mockOutputStream)).onError((Throwable)ArgumentMatchers.argThat((ArgumentMatcher)new ApiExceptionMatcher(StatusCode.Code.FAILED_PRECONDITION)));
        ((ClientStream)Mockito.verify(this.mockRequestStream)).closeSendWithError((Throwable)ArgumentMatchers.argThat((ArgumentMatcher)new ApiExceptionMatcher(StatusCode.Code.FAILED_PRECONDITION)));
        this.leakedResponseStream = Optional.empty();
    }
}

