/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.cloudpubsub.internal;

import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageTransformer;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms;
import com.google.cloud.pubsublite.cloudpubsub.NackHandler;
import com.google.cloud.pubsublite.cloudpubsub.internal.AckSetTracker;
import com.google.cloud.pubsublite.cloudpubsub.internal.ResettableSubscriberFactory;
import com.google.cloud.pubsublite.cloudpubsub.internal.SinglePartitionSubscriber;
import com.google.cloud.pubsublite.internal.ApiExceptionMatcher;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.cloud.pubsublite.internal.testing.RetryingConnectionHelpers;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberResetHandler;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.PubsubMessage;
import cz.o2.proxima.internal.shaded.com.google.common.collect.ImmutableList;
import cz.o2.proxima.internal.shaded.com.google.common.truth.Truth;
import java.util.List;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
import org.mockito.verification.VerificationMode;

@RunWith(value=JUnit4.class)
public class SinglePartitionSubscriberTest {
    @Mock
    private MessageReceiver receiver;
    @Captor
    private ArgumentCaptor<AckReplyConsumer> ackConsumerCaptor;
    private final MessageTransformer<SequencedMessage, PubsubMessage> transformer = MessageTransforms.toCpsSubscribeTransformer();
    @Spy
    private AckSetTrackerFakeService ackSetTracker;
    @Mock
    private NackHandler nackHandler;
    @Mock
    private ResettableSubscriberFactory subscriberFactory;
    @Spy
    private SubscriberFakeService wireSubscriber;
    private SinglePartitionSubscriber subscriber;
    private static final Offset OFFSET = Offset.of((long)1L);
    private static final long BYTE_SIZE = 1392L;
    private static final SequencedMessage MESSAGE = SequencedMessage.of((Message)Message.builder().setData(ByteString.copyFromUtf8((String)"abc")).build(), (Timestamp)Timestamps.EPOCH, (Offset)OFFSET, (long)1392L);

    @Before
    public void setUp() throws Exception {
        MockitoAnnotations.initMocks((Object)this);
        Mockito.when((Object)this.subscriberFactory.newSubscriber((Consumer)ArgumentMatchers.any(), (SubscriberResetHandler)ArgumentMatchers.any())).thenReturn((Object)this.wireSubscriber);
        this.subscriber = new SinglePartitionSubscriber(this.receiver, this.transformer, (AckSetTracker)this.ackSetTracker, this.nackHandler, this.subscriberFactory, FlowControlSettings.builder().setMessagesOutstanding(100000L).setBytesOutstanding(1000000L).build());
        this.subscriber.startAsync().awaitRunning();
        ((ResettableSubscriberFactory)Mockito.verify((Object)this.subscriberFactory)).newSubscriber((Consumer)ArgumentMatchers.any(), (SubscriberResetHandler)ArgumentMatchers.any());
        ((AckSetTrackerFakeService)((Object)Mockito.verify((Object)((Object)this.ackSetTracker)))).startAsync();
        ((SubscriberFakeService)((Object)Mockito.verify((Object)((Object)this.wireSubscriber)))).startAsync();
    }

    @Test
    public void ackSetTrackerFailure() throws Exception {
        Future<Void> failed = RetryingConnectionHelpers.whenFailed((ApiService)this.subscriber);
        this.ackSetTracker.fail((Throwable)new CheckedApiException(StatusCode.Code.INVALID_ARGUMENT));
        failed.get();
        ((AckSetTrackerFakeService)((Object)Mockito.verify((Object)((Object)this.ackSetTracker)))).stopAsync();
        ((SubscriberFakeService)((Object)Mockito.verify((Object)((Object)this.wireSubscriber)))).stopAsync();
        ApiExceptionMatcher.assertThrowableMatches(this.subscriber.failureCause(), StatusCode.Code.INVALID_ARGUMENT);
    }

    @Test
    public void wireSubscriberFailure() throws Exception {
        Future<Void> failed = RetryingConnectionHelpers.whenFailed((ApiService)this.subscriber);
        this.wireSubscriber.fail((Throwable)new CheckedApiException(StatusCode.Code.INVALID_ARGUMENT));
        failed.get();
        ((AckSetTrackerFakeService)((Object)Mockito.verify((Object)((Object)this.ackSetTracker)))).stopAsync();
        ((SubscriberFakeService)((Object)Mockito.verify((Object)((Object)this.wireSubscriber)))).stopAsync();
        ApiExceptionMatcher.assertThrowableMatches(this.subscriber.failureCause(), StatusCode.Code.INVALID_ARGUMENT);
    }

    @Test
    public void singleMessageAck() throws CheckedApiException {
        Runnable ack = (Runnable)Mockito.mock(Runnable.class);
        Mockito.when((Object)this.ackSetTracker.track(MESSAGE)).thenReturn((Object)ack);
        this.subscriber.onMessages((List)ImmutableList.of((Object)MESSAGE));
        ((AckSetTrackerFakeService)((Object)Mockito.verify((Object)((Object)this.ackSetTracker)))).track(MESSAGE);
        ((MessageReceiver)Mockito.verify((Object)this.receiver)).receiveMessage((PubsubMessage)ArgumentMatchers.eq((Object)this.transformer.transform((Object)MESSAGE)), (AckReplyConsumer)this.ackConsumerCaptor.capture());
        ((AckReplyConsumer)this.ackConsumerCaptor.getValue()).ack();
        ((Runnable)Mockito.verify((Object)ack)).run();
        ((SubscriberFakeService)((Object)Mockito.verify((Object)((Object)this.wireSubscriber)))).allowFlow(FlowControlRequest.newBuilder().setAllowedMessages(1L).setAllowedBytes(1392L).build());
    }

    @Test
    public void multiMessageAck() throws CheckedApiException {
        Runnable ack1 = (Runnable)Mockito.mock(Runnable.class);
        Runnable ack2 = (Runnable)Mockito.mock(Runnable.class);
        long bytes2 = 111L;
        SequencedMessage message2 = SequencedMessage.fromProto((com.google.cloud.pubsublite.proto.SequencedMessage)MESSAGE.toProto().toBuilder().setSizeBytes(bytes2).setPublishTime(Timestamps.fromMillis((long)System.currentTimeMillis())).setCursor(Cursor.newBuilder().setOffset(OFFSET.value() + 1L)).build());
        Mockito.when((Object)this.ackSetTracker.track(MESSAGE)).thenReturn((Object)ack1);
        Mockito.when((Object)this.ackSetTracker.track(message2)).thenReturn((Object)ack2);
        this.subscriber.onMessages((List)ImmutableList.of((Object)MESSAGE, (Object)message2));
        ((AckSetTrackerFakeService)((Object)Mockito.verify((Object)((Object)this.ackSetTracker)))).track(MESSAGE);
        ((AckSetTrackerFakeService)((Object)Mockito.verify((Object)((Object)this.ackSetTracker)))).track(message2);
        ((MessageReceiver)Mockito.verify((Object)this.receiver)).receiveMessage((PubsubMessage)ArgumentMatchers.eq((Object)this.transformer.transform((Object)MESSAGE)), (AckReplyConsumer)this.ackConsumerCaptor.capture());
        ((MessageReceiver)Mockito.verify((Object)this.receiver)).receiveMessage((PubsubMessage)ArgumentMatchers.eq((Object)this.transformer.transform((Object)message2)), (AckReplyConsumer)this.ackConsumerCaptor.capture());
        ((AckReplyConsumer)this.ackConsumerCaptor.getAllValues().get(1)).ack();
        ((Runnable)Mockito.verify((Object)ack2)).run();
        ((SubscriberFakeService)((Object)Mockito.verify((Object)((Object)this.wireSubscriber)))).allowFlow(FlowControlRequest.newBuilder().setAllowedMessages(1L).setAllowedBytes(bytes2).build());
        ((AckReplyConsumer)this.ackConsumerCaptor.getAllValues().get(0)).ack();
        ((Runnable)Mockito.verify((Object)ack1)).run();
        ((SubscriberFakeService)((Object)Mockito.verify((Object)((Object)this.wireSubscriber)))).allowFlow(FlowControlRequest.newBuilder().setAllowedMessages(1L).setAllowedBytes(1392L).build());
    }

    @Test
    public void singleMessageNackHandlerSuccessFuture() throws Exception {
        Runnable ack = (Runnable)Mockito.mock(Runnable.class);
        SettableApiFuture ackDone = SettableApiFuture.create();
        ((Runnable)Mockito.doAnswer(args -> ackDone.set(null)).when((Object)ack)).run();
        Mockito.when((Object)this.ackSetTracker.track(MESSAGE)).thenReturn((Object)ack);
        this.subscriber.onMessages((List)ImmutableList.of((Object)MESSAGE));
        ((AckSetTrackerFakeService)((Object)Mockito.verify((Object)((Object)this.ackSetTracker)))).track(MESSAGE);
        ((MessageReceiver)Mockito.verify((Object)this.receiver)).receiveMessage((PubsubMessage)ArgumentMatchers.eq((Object)this.transformer.transform((Object)MESSAGE)), (AckReplyConsumer)this.ackConsumerCaptor.capture());
        Mockito.when((Object)this.nackHandler.nack((PubsubMessage)this.transformer.transform((Object)MESSAGE))).thenReturn((Object)ApiFutures.immediateFuture(null));
        ((AckReplyConsumer)this.ackConsumerCaptor.getValue()).nack();
        ackDone.get();
        ((Runnable)Mockito.verify((Object)ack)).run();
        ((SubscriberFakeService)((Object)Mockito.verify((Object)((Object)this.wireSubscriber)))).allowFlow(FlowControlRequest.newBuilder().setAllowedMessages(1L).setAllowedBytes(1392L).build());
    }

    @Test
    public void singleMessageNackHandlerFailedFuture() throws Exception {
        Runnable ack = (Runnable)Mockito.mock(Runnable.class);
        Future<Void> trackerTerminated = RetryingConnectionHelpers.whenTerminated((ApiService)this.ackSetTracker);
        Future<Void> wireSubscriberTerminated = RetryingConnectionHelpers.whenTerminated((ApiService)this.wireSubscriber);
        Mockito.when((Object)this.ackSetTracker.track(MESSAGE)).thenReturn((Object)ack);
        this.subscriber.onMessages((List)ImmutableList.of((Object)MESSAGE));
        ((AckSetTrackerFakeService)((Object)Mockito.verify((Object)((Object)this.ackSetTracker)))).track(MESSAGE);
        ((MessageReceiver)Mockito.verify((Object)this.receiver)).receiveMessage((PubsubMessage)ArgumentMatchers.eq((Object)this.transformer.transform((Object)MESSAGE)), (AckReplyConsumer)this.ackConsumerCaptor.capture());
        Mockito.when((Object)this.nackHandler.nack((PubsubMessage)this.transformer.transform((Object)MESSAGE))).thenReturn((Object)ApiFutures.immediateFailedFuture((Throwable)new CheckedApiException(StatusCode.Code.INVALID_ARGUMENT)));
        ((AckReplyConsumer)this.ackConsumerCaptor.getValue()).nack();
        trackerTerminated.get();
        ((AckSetTrackerFakeService)((Object)Mockito.verify((Object)((Object)this.ackSetTracker)))).stopAsync();
        wireSubscriberTerminated.get();
        ((SubscriberFakeService)((Object)Mockito.verify((Object)((Object)this.wireSubscriber)))).stopAsync();
        ApiExceptionMatcher.assertThrowableMatches(this.subscriber.failureCause(), StatusCode.Code.INVALID_ARGUMENT);
    }

    @Test
    public void singleMessageNackAfterShutdownNoNackHandler() throws Exception {
        Runnable ack = (Runnable)Mockito.mock(Runnable.class);
        Mockito.when((Object)this.ackSetTracker.track(MESSAGE)).thenReturn((Object)ack);
        this.subscriber.onMessages((List)ImmutableList.of((Object)MESSAGE));
        ((AckSetTrackerFakeService)((Object)Mockito.verify((Object)((Object)this.ackSetTracker)))).track(MESSAGE);
        ((MessageReceiver)Mockito.verify((Object)this.receiver)).receiveMessage((PubsubMessage)ArgumentMatchers.eq((Object)this.transformer.transform((Object)MESSAGE)), (AckReplyConsumer)this.ackConsumerCaptor.capture());
        this.subscriber.stopAsync().awaitTerminated();
        ((AckReplyConsumer)this.ackConsumerCaptor.getValue()).nack();
        ((NackHandler)Mockito.verify((Object)this.nackHandler, (VerificationMode)Mockito.times((int)0))).nack((PubsubMessage)ArgumentMatchers.any());
    }

    @Test
    public void onSubscriberResetWaitsForAckSetTracker() throws CheckedApiException {
        Truth.assertThat((Boolean)this.subscriber.onSubscriberReset()).isTrue();
        ((AckSetTrackerFakeService)((Object)Mockito.verify((Object)((Object)this.ackSetTracker)))).waitUntilCommitted();
    }

    static abstract class SubscriberFakeService
    extends FakeApiService
    implements Subscriber {
        SubscriberFakeService() {
        }
    }

    static abstract class AckSetTrackerFakeService
    extends FakeApiService
    implements AckSetTracker {
        AckSetTrackerFakeService() {
        }
    }
}

