/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiService;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import cz.o2.proxima.internal.shaded.com.google.common.collect.ImmutableList;
import cz.o2.proxima.internal.shaded.com.google.common.truth.Truth;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;

@RunWith(value=JUnit4.class)
public class BlockingPullSubscriberImplTest {
    private final SubscriberFactory underlyingFactory = (SubscriberFactory)Mockito.mock(SubscriberFactory.class);
    private final Subscriber underlying = (Subscriber)Mockito.mock(Subscriber.class);
    private final FlowControlSettings flowControlSettings = FlowControlSettings.builder().setBytesOutstanding(10L).setMessagesOutstanding(20L).build();
    private BlockingPullSubscriberImpl subscriber;
    private Consumer<ImmutableList<SequencedMessage>> messageConsumer;
    private ApiService.Listener errorListener;
    private final ExecutorService executorService = Executors.newCachedThreadPool();

    @Before
    public void setUp() throws Exception {
        Mockito.when((Object)this.underlying.startAsync()).thenReturn((Object)this.underlying);
        FlowControlRequest flow = FlowControlRequest.newBuilder().setAllowedBytes(this.flowControlSettings.bytesOutstanding()).setAllowedMessages(this.flowControlSettings.messagesOutstanding()).build();
        Mockito.when((Object)this.underlyingFactory.newSubscriber((Consumer)ArgumentMatchers.any())).thenAnswer(args -> {
            this.messageConsumer = (Consumer)args.getArgument(0);
            return this.underlying;
        });
        ((Subscriber)Mockito.doAnswer(args -> {
            this.errorListener = (ApiService.Listener)args.getArgument(0);
            return null;
        }).when((Object)this.underlying)).addListener((ApiService.Listener)ArgumentMatchers.any(), (Executor)ArgumentMatchers.any());
        this.subscriber = new BlockingPullSubscriberImpl(this.underlyingFactory, this.flowControlSettings);
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.underlyingFactory, this.underlying});
        ((SubscriberFactory)inOrder.verify((Object)this.underlyingFactory)).newSubscriber((Consumer)ArgumentMatchers.any());
        ((Subscriber)inOrder.verify((Object)this.underlying)).addListener((ApiService.Listener)ArgumentMatchers.any(), (Executor)ArgumentMatchers.any());
        ((Subscriber)inOrder.verify((Object)this.underlying)).startAsync();
        ((Subscriber)inOrder.verify((Object)this.underlying)).awaitRunning();
        ((Subscriber)inOrder.verify((Object)this.underlying)).allowFlow(flow);
        Truth.assertThat(this.messageConsumer).isNotNull();
        Truth.assertThat((Object)this.errorListener).isNotNull();
    }

    @Test
    public void closeStops() {
        Mockito.when((Object)this.underlying.stopAsync()).thenReturn((Object)this.underlying);
        this.subscriber.close();
        ((Subscriber)Mockito.verify((Object)this.underlying)).stopAsync();
        ((Subscriber)Mockito.verify((Object)this.underlying)).awaitTerminated();
    }

    @Test
    public void onDataAfterErrorThrows() {
        CheckedApiException expected = new CheckedApiException(StatusCode.Code.INTERNAL);
        this.errorListener.failed(null, (Throwable)expected);
        ExecutionException e = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)this.subscriber.onData().get();
        });
        Truth.assertThat((Throwable)expected).isEqualTo((Object)e.getCause());
    }

    @Test
    public void onDataBeforeErrorThrows() throws Exception {
        CheckedApiException expected = new CheckedApiException(StatusCode.Code.INTERNAL);
        ApiFuture future = this.subscriber.onData();
        Thread.sleep(1000L);
        Truth.assertThat((Boolean)future.isDone()).isFalse();
        this.errorListener.failed(null, (Throwable)expected);
        ExecutionException e = (ExecutionException)Assert.assertThrows(ExecutionException.class, ((Future)future)::get);
        Truth.assertThat((Throwable)expected).isEqualTo((Object)e.getCause());
    }

    @Test
    public void onDataSuccess() throws Exception {
        SequencedMessage message = SequencedMessage.of((Message)Message.builder().build(), (Timestamp)Timestamps.EPOCH, (Offset)Offset.of((long)12L), (long)30L);
        Future<Void> future = this.executorService.submit(() -> (Void)this.subscriber.onData().get());
        this.messageConsumer.accept((ImmutableList<SequencedMessage>)ImmutableList.of((Object)message));
        future.get();
    }

    @Test
    public void pullMessage() throws Exception {
        int byteSize = 30;
        SequencedMessage message = SequencedMessage.of((Message)Message.builder().build(), (Timestamp)Timestamps.EPOCH, (Offset)Offset.of((long)12L), (long)byteSize);
        this.messageConsumer.accept((ImmutableList<SequencedMessage>)ImmutableList.of((Object)message));
        Truth.assertThat(Optional.of(message)).isEqualTo((Object)this.subscriber.messageIfAvailable());
        ((Subscriber)Mockito.verify((Object)this.underlying)).allowFlow(FlowControlRequest.newBuilder().setAllowedBytes((long)byteSize).setAllowedMessages(1L).build());
    }

    @Test
    public void pullMessageNoMessage() throws Exception {
        Truth.assertThat(Optional.empty()).isEqualTo((Object)this.subscriber.messageIfAvailable());
    }

    @Test
    public void pullMessageWhenError() {
        CheckedApiException expected = new CheckedApiException(StatusCode.Code.INTERNAL);
        this.errorListener.failed(null, (Throwable)expected);
        CheckedApiException e = (CheckedApiException)Assert.assertThrows(CheckedApiException.class, () -> this.subscriber.messageIfAvailable());
        Truth.assertThat((Throwable)expected).isEqualTo((Object)e);
    }

    @Test
    public void pullMessagePrioritizeErrorOverExistingMessage() {
        CheckedApiException expected = new CheckedApiException(StatusCode.Code.INTERNAL);
        this.errorListener.failed(null, (Throwable)expected);
        SequencedMessage message = SequencedMessage.of((Message)Message.builder().build(), (Timestamp)Timestamps.EPOCH, (Offset)Offset.of((long)12L), (long)30L);
        this.messageConsumer.accept((ImmutableList<SequencedMessage>)ImmutableList.of((Object)message));
        CheckedApiException e = (CheckedApiException)Assert.assertThrows(CheckedApiException.class, () -> this.subscriber.messageIfAvailable());
        Truth.assertThat((Throwable)expected).isEqualTo((Object)e);
    }

    @Test
    public void onlyOneMessageDeliveredWhenMultiCalls() throws Exception {
        SequencedMessage message = SequencedMessage.of((Message)Message.builder().build(), (Timestamp)Timestamps.EPOCH, (Offset)Offset.of((long)12L), (long)30L);
        this.messageConsumer.accept((ImmutableList<SequencedMessage>)ImmutableList.of((Object)message));
        AtomicInteger count = new AtomicInteger(0);
        CountDownLatch latch = new CountDownLatch(1);
        ArrayList futures = new ArrayList();
        for (int i = 0; i < 10; ++i) {
            futures.add(this.executorService.submit(() -> {
                try {
                    latch.await();
                    if (this.subscriber.messageIfAvailable().isPresent()) {
                        count.incrementAndGet();
                    }
                }
                catch (Exception e) {
                    throw new IllegalStateException(e);
                }
            }));
        }
        latch.countDown();
        for (Future future : futures) {
            future.get();
        }
        Truth.assertThat((Integer)1).isEqualTo((Object)count.get());
    }
}

