/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.bigtable.gaxx.testing;

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiExceptionFactory;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.api.gax.rpc.ClientStreamingCallable;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.bigtable.gaxx.testing.FakeStatusCode;
import cz.o2.proxima.internal.shaded.com.google.common.base.Preconditions;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Queues;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;

@InternalApi(value="for testing")
public class FakeStreamingApi {

    public static class ClientStreamingStashCallable<RequestT, ResponseT>
    extends ClientStreamingCallable<RequestT, ResponseT> {
        private ApiCallContext context;
        private ApiStreamObserver<ResponseT> responseObserver;
        private AccumulatingStreamObserver<RequestT> requestObserver;
        private ResponseT response;

        public ClientStreamingStashCallable() {
        }

        public ClientStreamingStashCallable(ResponseT response) {
            this.response = response;
        }

        public ApiStreamObserver<RequestT> clientStreamingCall(ApiStreamObserver<ResponseT> responseObserver, ApiCallContext context) {
            Preconditions.checkNotNull(responseObserver);
            this.responseObserver = responseObserver;
            this.context = context;
            this.requestObserver = new AccumulatingStreamObserver();
            return this.requestObserver;
        }

        public ApiCallContext getContext() {
            return this.context;
        }

        public ApiStreamObserver<ResponseT> getActualObserver() {
            return this.responseObserver;
        }

        public List<RequestT> getActualRequests() {
            return this.requestObserver.getValues();
        }

        private void sendResponses() {
            this.responseObserver.onNext(this.response);
            this.responseObserver.onCompleted();
        }

        private class AccumulatingStreamObserver<T>
        implements ApiStreamObserver<T> {
            private List<T> requestList = new ArrayList<T>();
            private Throwable error;
            private boolean completed = false;

            private AccumulatingStreamObserver() {
            }

            public void onNext(T value) {
                this.requestList.add(value);
            }

            public void onError(Throwable t) {
                this.error = t;
            }

            public void onCompleted() {
                this.completed = true;
                ClientStreamingStashCallable.this.sendResponses();
            }

            public List<T> getValues() {
                if (!this.completed) {
                    throw new IllegalStateException("Stream not completed.");
                }
                if (this.error != null) {
                    throw ApiExceptionFactory.createException((Throwable)this.error, (StatusCode)FakeStatusCode.of(StatusCode.Code.UNKNOWN), (boolean)false);
                }
                return this.requestList;
            }
        }
    }

    public static class ServerStreamingStashCallable<RequestT, ResponseT>
    extends ServerStreamingCallable<RequestT, ResponseT> {
        private ApiCallContext context;
        private ResponseObserver<ResponseT> actualObserver;
        private RequestT actualRequest;
        private List<ResponseT> responseList;
        private final BlockingQueue<StreamControllerStash<ResponseT>> calls = Queues.newLinkedBlockingQueue();

        public ServerStreamingStashCallable() {
            this.responseList = new ArrayList<ResponseT>();
        }

        public ServerStreamingStashCallable(List<ResponseT> responseList) {
            this.responseList = responseList;
        }

        public void call(RequestT request, ResponseObserver<ResponseT> responseObserver, ApiCallContext context) {
            Preconditions.checkNotNull(request);
            Preconditions.checkNotNull(responseObserver);
            this.actualRequest = request;
            this.actualObserver = responseObserver;
            this.context = context;
            StreamControllerStash<ResponseT> controller = new StreamControllerStash<ResponseT>(this.responseList, responseObserver);
            this.calls.add(controller);
            controller.start();
        }

        public StreamControllerStash<ResponseT> popLastCall() {
            try {
                return this.calls.poll(1L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        public ApiCallContext getContext() {
            return this.context;
        }

        public ResponseObserver<ResponseT> getActualObserver() {
            return this.actualObserver;
        }

        public RequestT getActualRequest() {
            return this.actualRequest;
        }

        public static class StreamControllerStash<ResponseT>
        implements StreamController {
            final ResponseObserver<ResponseT> observer;
            final Queue<ResponseT> queue;
            boolean autoFlowControl = true;
            long numPending;
            long numDelivered;
            Throwable error;
            boolean delivering;
            boolean closed;

            public StreamControllerStash(List<ResponseT> responseList, ResponseObserver<ResponseT> observer) {
                this.observer = observer;
                this.queue = Queues.newArrayDeque(responseList);
            }

            public Throwable getError() {
                return this.error;
            }

            public long getNumDelivered() {
                return this.numDelivered;
            }

            public void start() {
                this.observer.onStart((StreamController)this);
                if (this.autoFlowControl) {
                    this.numPending = Integer.MAX_VALUE;
                }
                this.deliver();
            }

            public void disableAutoInboundFlowControl() {
                this.autoFlowControl = false;
            }

            public void request(int count) {
                this.numPending += (long)count;
                this.deliver();
            }

            public void cancel() {
                this.error = new CancellationException("User cancelled stream");
                this.deliver();
            }

            private void deliver() {
                if (this.delivering || this.closed) {
                    return;
                }
                this.delivering = true;
                try {
                    while (this.error == null && this.numPending > 0L && !this.queue.isEmpty()) {
                        --this.numPending;
                        ++this.numDelivered;
                        this.observer.onResponse(this.queue.poll());
                    }
                    if (this.error != null || this.queue.isEmpty()) {
                        if (this.error != null) {
                            this.observer.onError(this.error);
                        } else {
                            this.observer.onComplete();
                        }
                        this.closed = true;
                    }
                }
                finally {
                    this.delivering = false;
                }
            }
        }
    }
}

