package org.factcast.client.grpc;

import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.assertj.core.util.Lists;
import org.factcast.client.grpc.ClientStreamObserver;
import org.factcast.core.Fact;
import org.factcast.core.FactValidationException;
import org.factcast.core.subscription.FactStreamInfo;
import org.factcast.core.subscription.InternalSubscription;
import org.factcast.core.subscription.StaleSubscriptionDetectedException;
import org.factcast.core.subscription.SubscriptionImpl;
import org.factcast.core.subscription.observer.FactObserver;
import org.factcast.grpc.api.conv.ProtoConverter;
import org.factcast.grpc.api.gen.FactStoreProto;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:org/factcast/client/grpc/ClientStreamObserverTest.class */
class ClientStreamObserverTest {

    @Mock
    private FactObserver factObserver;
    private ClientStreamObserver uut;
    private final ProtoConverter converter = new ProtoConverter();
    private SubscriptionImpl subscription;

    ClientStreamObserverTest() {
    }

    @BeforeEach
    void setUp() {
        this.subscription = (SubscriptionImpl) Mockito.spy(new SubscriptionImpl(this.factObserver));
        this.uut = new ClientStreamObserver(this.subscription, 0L);
    }

    @Test
    void testConstructorNull() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            new ClientStreamObserver((InternalSubscription) null, 0L);
        });
    }

    @Test
    void registersForCleanup() {
        ((SubscriptionImpl) Mockito.verify(this.subscription, Mockito.times(2))).onClose((Runnable) ArgumentMatchers.any());
    }

    @Test
    void shutsdownOnSubscriptionClose() {
        this.subscription.close();
        org.assertj.core.api.Assertions.assertThat(this.uut.clientBoundExecutor().isShutdown()).isTrue();
    }

    @Test
    void shutsdownOnErrorRecieved() {
        org.assertj.core.api.Assertions.assertThat(this.uut.clientBoundExecutor().isShutdown()).isFalse();
        this.uut.onError(new IOException());
        org.assertj.core.api.Assertions.assertThat(this.uut.clientBoundExecutor().isShutdown()).isTrue();
    }

    @Test
    void shutsdownOnCompleteRecieved() {
        org.assertj.core.api.Assertions.assertThat(this.uut.clientBoundExecutor().isShutdown()).isFalse();
        this.uut.onCompleted();
        org.assertj.core.api.Assertions.assertThat(this.uut.clientBoundExecutor().isShutdown()).isTrue();
    }

    @Test
    void rethrowsProcessingError() {
        ((FactObserver) Mockito.doThrow(new Throwable[]{new UnsupportedOperationException()}).when(this.factObserver)).onNext((Fact) ArgumentMatchers.any());
        FactStoreProto.MSG_Notification createNotificationFor = this.converter.createNotificationFor(Fact.of("{\"ns\":\"ns\",\"id\":\"" + UUID.randomUUID() + "\"}", "{}"));
        org.assertj.core.api.Assertions.assertThatThrownBy(() -> {
            this.uut.onNext(createNotificationFor);
        }).isInstanceOf(UnsupportedOperationException.class);
    }

    @Test
    void testOnNext() {
        Fact of = Fact.of("{\"ns\":\"ns\",\"id\":\"" + UUID.randomUUID() + "\"}", "{}");
        this.uut.onNext(this.converter.createNotificationFor(of));
        ((FactObserver) Mockito.verify(this.factObserver)).onNext(of);
    }

    @Test
    void testFastForward() {
        UUID randomUUID = UUID.randomUUID();
        this.uut.onNext(this.converter.createNotificationForFastForward(randomUUID));
        ((FactObserver) Mockito.verify(this.factObserver)).onFastForward(randomUUID);
    }

    @Test
    void testOnNextList() {
        this.uut.onNext(this.converter.createNotificationFor(Lists.newArrayList(new Fact[]{Fact.of("{\"ns\":\"ns\",\"id\":\"" + UUID.randomUUID() + "\"}", "{}"), Fact.of("{\"ns\":\"ns\",\"id\":\"" + UUID.randomUUID() + "\"}", "{}")})));
        ((FactObserver) Mockito.verify(this.factObserver, Mockito.times(2))).onNext((Fact) ArgumentMatchers.any(Fact.class));
    }

    @Test
    void testOnNextFailsOnUnknownMessage() {
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.uut.onNext(FactStoreProto.MSG_Notification.newBuilder().setType(FactStoreProto.MSG_Notification.Type.UNRECOGNIZED).build());
        });
    }

    @Test
    void testOnCatchup() {
        this.uut.onNext(this.converter.createCatchupNotification());
        ((FactObserver) Mockito.verify(this.factObserver)).onCatchup();
    }

    @Test
    void testFailOnUnknownType() {
        this.uut.onNext(FactStoreProto.MSG_Notification.newBuilder().setTypeValue(999).build());
        ((SubscriptionImpl) Mockito.verify(this.subscription)).notifyError((Throwable) ArgumentMatchers.any(RuntimeException.class));
    }

    @Test
    void testOnComplete() {
        this.uut.onNext(this.converter.createCompleteNotification());
        ((FactObserver) Mockito.verify(this.factObserver)).onComplete();
    }

    @Test
    void testOnTransportComplete() {
        this.uut.onCompleted();
        ((FactObserver) Mockito.verify(this.factObserver)).onComplete();
    }

    @Test
    void testOnError() {
        this.uut.onError(new IOException());
        ((FactObserver) Mockito.verify(this.factObserver)).onError((Throwable) ArgumentMatchers.any());
    }

    @Test
    void translatesException() {
        FactValidationException factValidationException = new FactValidationException("disappointed");
        Metadata metadata = new Metadata();
        metadata.put(Metadata.Key.of("msg-bin", Metadata.BINARY_BYTE_MARSHALLER), factValidationException.getMessage().getBytes());
        metadata.put(Metadata.Key.of("exc-bin", Metadata.BINARY_BYTE_MARSHALLER), factValidationException.getClass().getName().getBytes());
        this.uut.onError(new StatusRuntimeException(Status.UNKNOWN, metadata));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Throwable.class);
        ((FactObserver) Mockito.verify(this.factObserver)).onError((Throwable) forClass.capture());
        org.assertj.core.api.Assertions.assertThat((Throwable) forClass.getValue()).isInstanceOf(FactValidationException.class);
    }

    @Test
    void detectsStillbirth() throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.uut = new ClientStreamObserver(this.subscription, 10L) { // from class: org.factcast.client.grpc.ClientStreamObserverTest.1
            public void onError(Throwable th) {
                super.onError(th);
                countDownLatch.countDown();
            }
        };
        org.assertj.core.api.Assertions.assertThat(countDownLatch.await(300L, TimeUnit.MILLISECONDS)).isTrue();
        ((SubscriptionImpl) Mockito.verify(this.subscription)).notifyError((Throwable) ArgumentMatchers.any(StaleSubscriptionDetectedException.class));
    }

    @Test
    void detectsStaleness() throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.uut = (ClientStreamObserver) Mockito.spy(new ClientStreamObserver(this.subscription, 10L) { // from class: org.factcast.client.grpc.ClientStreamObserverTest.2
            public void onError(Throwable th) {
                super.onError(th);
                countDownLatch.countDown();
            }
        });
        this.uut.onNext(FactStoreProto.MSG_Notification.newBuilder().setType(FactStoreProto.MSG_Notification.Type.KeepAlive).build());
        for (int i = 0; i < 10; i++) {
            this.uut.onNext(FactStoreProto.MSG_Notification.newBuilder().setType(FactStoreProto.MSG_Notification.Type.KeepAlive).build());
            sleep(10L);
        }
        org.assertj.core.api.Assertions.assertThat(countDownLatch.await(300L, TimeUnit.MILLISECONDS)).isTrue();
        ((SubscriptionImpl) Mockito.verify(this.subscription)).notifyError((Throwable) ArgumentMatchers.any(StaleSubscriptionDetectedException.class));
    }

    @Test
    void disablesKeepaliveCheckingOnError() {
        this.uut = (ClientStreamObserver) Mockito.spy(new ClientStreamObserver(this.subscription, 10L));
        this.uut.onError(new RuntimeException());
        ((ClientStreamObserver) Mockito.verify(this.uut)).disableKeepalive();
    }

    @Test
    void disablesKeepaliveCheckingOnComplete() {
        this.uut = (ClientStreamObserver) Mockito.spy(new ClientStreamObserver(this.subscription, 10L));
        this.uut.onCompleted();
        ((ClientStreamObserver) Mockito.verify(this.uut)).disableKeepalive();
    }

    @Test
    void detectsMissingKeepalives() {
        this.uut = (ClientStreamObserver) Mockito.spy(new ClientStreamObserver(this.subscription, 10L));
        ClientStreamObserver clientStreamObserver = this.uut;
        Objects.requireNonNull(clientStreamObserver);
        new ClientStreamObserver.ClientKeepalive(clientStreamObserver, 50);
        ((ClientStreamObserver) Mockito.verify(this.uut, Mockito.after(150L).never())).onError((Throwable) ArgumentMatchers.any());
        ((ClientStreamObserver) Mockito.verify(this.uut, Mockito.timeout(250L).times(1))).onError((Throwable) ArgumentMatchers.any(StaleSubscriptionDetectedException.class));
    }

    @Test
    void detectSubscriptionStarvation() {
        long j = (2 * 100) + 200;
        this.uut = new ClientStreamObserver(this.subscription, 100L);
        sleep(100 / 2);
        ((SubscriptionImpl) Mockito.verify(this.subscription, Mockito.never())).notifyError((Throwable) ArgumentMatchers.any());
        for (int i = 0; i < 5; i++) {
            sleep(100L);
            this.uut.onNext(this.converter.createKeepaliveNotification());
        }
        ((SubscriptionImpl) Mockito.verify(this.subscription, Mockito.after(j - 50).never())).notifyError((Throwable) ArgumentMatchers.any());
        ((SubscriptionImpl) Mockito.verify(this.subscription, Mockito.timeout(j).times(1))).notifyError((Throwable) ArgumentMatchers.any(StaleSubscriptionDetectedException.class));
    }

    @Test
    void handlesFactStreamInfo() {
        FactStreamInfo factStreamInfo = new FactStreamInfo(1L, 10L);
        this.uut.onNext(this.converter.createInfoNotification(factStreamInfo));
        ((SubscriptionImpl) Mockito.verify(this.subscription)).notifyFactStreamInfo(factStreamInfo);
    }

    private void sleep(long j) {
        Thread.sleep(j);
    }
}
