/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.hadoop.gcsio;

import com.google.cloud.hadoop.gcsio.Watchdog;
import com.google.storage.v2.ReadObjectResponse;
import com.google.storage.v2.WriteObjectRequest;
import cz.o2.proxima.beam.io.pubsub.io.grpc.ClientCall;
import cz.o2.proxima.beam.io.pubsub.io.grpc.Context;
import cz.o2.proxima.beam.io.pubsub.io.grpc.internal.NoopClientCall;
import cz.o2.proxima.beam.io.pubsub.io.grpc.stub.StreamObserver;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Lists;
import cz.o2.proxima.internal.shaded.com.google.common.flogger.GoogleLogger;
import cz.o2.proxima.internal.shaded.com.google.common.truth.Truth;
import cz.o2.proxima.internal.shaded.com.google.common.util.concurrent.Uninterruptibles;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class WatchdogTest {
    private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
    private Watchdog watchdog;
    private final Duration waitTime = Duration.ofSeconds(5L);
    private final Duration zeroWaitTime = Duration.ofSeconds(0L);

    @Before
    public void setUp() throws Exception {
        Duration checkInterval = Duration.ofSeconds(2L);
        this.watchdog = Watchdog.create((Duration)checkInterval);
    }

    @Test
    public void watchPassThroughClientStreamingRPC() {
        NoopClientCall clientCall = new NoopClientCall();
        StreamObserverStub streamObserver = new StreamObserverStub();
        StreamObserver watch = this.watchdog.watch((ClientCall)clientCall, streamObserver, this.waitTime);
        Truth.assertThat((Object)this.watchdog).isNotNull();
        Truth.assertThat((Iterable)this.watchdog.getOpenStreams()).hasSize(1);
        WriteObjectRequest value = WriteObjectRequest.newBuilder().build();
        watch.onNext((Object)value);
        Truth.assertThat(streamObserver.getObjects()).containsExactly(new Object[]{value});
        TimeoutException t = new TimeoutException("Request timeout out");
        watch.onError((Throwable)t);
        Truth.assertThat(streamObserver.getErrors()).containsExactly(new Object[]{t});
        watch.onCompleted();
        Truth.assertThat((Boolean)streamObserver.isCompleted()).isTrue();
        Truth.assertThat((Boolean)this.watchdog.getOpenStreams().isEmpty()).isTrue();
    }

    @Test
    public void watchPassThroughServerStreamingRPC() {
        Context.CancellableContext requestContext = Context.current().withCancellation();
        ReadObjectResponse defaultInstance = ReadObjectResponse.getDefaultInstance();
        Response<ReadObjectResponse> validResponse = new Response<ReadObjectResponse>(defaultInstance);
        Response errorResponse = new Response(new RuntimeException("Read timeout out"));
        ArrayList responseList = Lists.newArrayList((Object[])new Response[]{validResponse, errorResponse});
        ResponseIteratorStub responseIterator = new ResponseIteratorStub(responseList);
        Iterator watch = this.watchdog.watch(requestContext, responseIterator, this.waitTime);
        ReadObjectResponse next = (ReadObjectResponse)watch.next();
        Truth.assertThat((Object)next).isEqualTo(((Response)validResponse).object);
        Truth.assertThat((Object)this.watchdog).isNotNull();
        Truth.assertThat((Iterable)this.watchdog.getOpenStreams()).hasSize(1);
        Assert.assertThrows(RuntimeException.class, watch::hasNext);
        Truth.assertThat((Boolean)this.watchdog.getOpenStreams().isEmpty()).isTrue();
    }

    @Test
    public void watchOnClientStreamingRPCTimeout() {
        NoopClientCallStub clientCall = new NoopClientCallStub();
        StreamObserver<WriteObjectRequest> timeoutStreamObserver = new StreamObserver<WriteObjectRequest>(){

            public void onNext(WriteObjectRequest value) {
                ((GoogleLogger.Api)logger.atInfo()).log("Sleeping for 10 seconds");
                Uninterruptibles.sleepUninterruptibly((Duration)Duration.ofSeconds(10L));
            }

            public void onError(Throwable t) {
            }

            public void onCompleted() {
            }
        };
        StreamObserver watch = this.watchdog.watch(clientCall, (StreamObserver)timeoutStreamObserver, this.waitTime);
        WriteObjectRequest value = WriteObjectRequest.newBuilder().build();
        watch.onNext((Object)value);
        Truth.assertThat((Boolean)clientCall.cancelled).isTrue();
        Truth.assertThat((Throwable)clientCall.cause).isInstanceOf(TimeoutException.class);
    }

    @Test
    public void watchOnServerStreamingRPCTimeout() {
        Context.CancellableContext requestContext = Context.current().withCancellation();
        Iterator<ReadObjectResponse> responseIterator = new Iterator<ReadObjectResponse>(){

            @Override
            public boolean hasNext() {
                ((GoogleLogger.Api)logger.atInfo()).log("Sleeping for 10 seconds");
                Uninterruptibles.sleepUninterruptibly((Duration)Duration.ofSeconds(10L));
                return true;
            }

            @Override
            public ReadObjectResponse next() {
                return null;
            }
        };
        Iterator watch = this.watchdog.watch(requestContext, (Iterator)responseIterator, this.waitTime);
        Truth.assertThat((Boolean)watch.hasNext()).isTrue();
        Truth.assertThat((Boolean)requestContext.isCancelled()).isTrue();
    }

    @Test
    public void watchMultipleStreams() {
        NoopClientCallStub clientCall = new NoopClientCallStub();
        StreamObserver<WriteObjectRequest> timeoutStreamObserver = new StreamObserver<WriteObjectRequest>(){

            public void onNext(WriteObjectRequest value) {
                ((GoogleLogger.Api)logger.atInfo()).log("Sleeping for 10 seconds");
                Uninterruptibles.sleepUninterruptibly((Duration)Duration.ofSeconds(10L));
            }

            public void onError(Throwable t) {
            }

            public void onCompleted() {
            }
        };
        Context.CancellableContext requestContext = Context.current().withCancellation();
        Iterator<ReadObjectResponse> responseIterator = new Iterator<ReadObjectResponse>(){

            @Override
            public boolean hasNext() {
                ((GoogleLogger.Api)logger.atInfo()).log("Sleeping for 10 seconds");
                Uninterruptibles.sleepUninterruptibly((Duration)Duration.ofSeconds(10L));
                return true;
            }

            @Override
            public ReadObjectResponse next() {
                return null;
            }
        };
        Iterator clientStreamingRPCWatch = this.watchdog.watch(requestContext, (Iterator)responseIterator, this.waitTime);
        StreamObserver serverStreamingRPCWatch = this.watchdog.watch(clientCall, (StreamObserver)timeoutStreamObserver, this.waitTime);
        Truth.assertThat((Object)this.watchdog).isNotNull();
        Truth.assertThat((Iterable)this.watchdog.getOpenStreams()).hasSize(2);
        boolean actual = clientStreamingRPCWatch.hasNext();
        WriteObjectRequest value = WriteObjectRequest.newBuilder().build();
        serverStreamingRPCWatch.onNext((Object)value);
        Truth.assertThat((Boolean)actual).isTrue();
        Truth.assertThat((Boolean)requestContext.isCancelled()).isTrue();
        Truth.assertThat((Boolean)clientCall.cancelled).isTrue();
        Truth.assertThat((Throwable)clientCall.cause).isInstanceOf(TimeoutException.class);
        Truth.assertThat((Boolean)this.watchdog.getOpenStreams().isEmpty()).isTrue();
    }

    @Test
    public void watchOnClientStreamingRPCWithoutTimeout() {
        NoopClientCallStub clientCall = new NoopClientCallStub();
        StreamObserver<WriteObjectRequest> timeoutStreamObserver = new StreamObserver<WriteObjectRequest>(){

            public void onNext(WriteObjectRequest value) {
                ((GoogleLogger.Api)logger.atInfo()).log("Sleeping for 10 seconds");
                Uninterruptibles.sleepUninterruptibly((Duration)Duration.ofSeconds(10L));
            }

            public void onError(Throwable t) {
            }

            public void onCompleted() {
            }
        };
        StreamObserver watch = this.watchdog.watch(clientCall, (StreamObserver)timeoutStreamObserver, this.zeroWaitTime);
        WriteObjectRequest value = WriteObjectRequest.newBuilder().build();
        watch.onNext((Object)value);
        Truth.assertThat((Boolean)clientCall.cancelled).isFalse();
        Truth.assertThat((Throwable)clientCall.cause).isNull();
    }

    @Test
    public void watchOnServerStreamingRPCWithoutTimeout() {
        Context.CancellableContext requestContext = Context.current().withCancellation();
        Iterator<ReadObjectResponse> responseIterator = new Iterator<ReadObjectResponse>(){

            @Override
            public boolean hasNext() {
                ((GoogleLogger.Api)logger.atInfo()).log("Sleeping for 10 seconds");
                Uninterruptibles.sleepUninterruptibly((Duration)Duration.ofSeconds(10L));
                return true;
            }

            @Override
            public ReadObjectResponse next() {
                return null;
            }
        };
        Iterator watch = this.watchdog.watch(requestContext, (Iterator)responseIterator, this.zeroWaitTime);
        Truth.assertThat((Boolean)watch.hasNext()).isTrue();
        Truth.assertThat((Boolean)requestContext.isCancelled()).isFalse();
    }

    static final class NoopClientCallStub<ReqT, ResT>
    extends NoopClientCall<ReqT, ResT> {
        boolean cancelled;
        Throwable cause;

        NoopClientCallStub() {
        }

        public void cancel(String message, Throwable cause) {
            this.cancelled = true;
            this.cause = cause;
        }
    }

    static final class Response<T> {
        private final T object;
        private final RuntimeException throwable;

        public Response(T object) {
            this.object = object;
            this.throwable = null;
        }

        public Response(RuntimeException throwable) {
            this.throwable = throwable;
            this.object = null;
        }
    }

    static final class ResponseIteratorStub<T>
    implements Iterator<T> {
        private final Iterator<Response<T>> objects;

        public ResponseIteratorStub(List<Response<T>> objects) {
            this.objects = objects.listIterator();
        }

        @Override
        public boolean hasNext() {
            Response<T> next;
            boolean hasNext = this.objects.hasNext();
            if (hasNext && ((Response)(next = this.objects.next())).throwable != null) {
                throw ((Response)next).throwable;
            }
            return hasNext;
        }

        @Override
        public T next() {
            return (T)((Response)this.objects.next()).object;
        }
    }

    static final class StreamObserverStub<T>
    implements StreamObserver<T> {
        List<T> objects = new ArrayList<T>();
        List<Throwable> errors = new ArrayList<Throwable>();
        boolean completed;

        StreamObserverStub() {
        }

        public void onNext(T t) {
            this.objects.add(t);
        }

        public void onError(Throwable throwable) {
            this.errors.add(throwable);
        }

        public void onCompleted() {
            this.completed = true;
        }

        public List<T> getObjects() {
            return this.objects;
        }

        public List<Throwable> getErrors() {
            return this.errors;
        }

        public boolean isCompleted() {
            return this.completed;
        }
    }
}

