/*
 * Decompiled with CFR 0.152.
 */
package org.finos.tracdap.test.grpc;

import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.finos.tracdap.common.exception.ETracInternal;
import org.finos.tracdap.common.exception.EUnexpected;
import org.finos.tracdap.common.grpc.GrpcErrorMapping;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GrpcTestStreams {
    public static <T> Flow.Subscriber<T> clientRequestStream(StreamObserver<T> observer) {
        return new ClientRequestStream<T>(observer);
    }

    public static <T> StreamObserver<T> clientResponseStream(Flow.Subscriber<T> subscriber) {
        return new ClientResponseStream<T>(subscriber);
    }

    public static <T> StreamObserver<T> clientResponseHandler(CompletableFuture<T> result) {
        return new ClientResultHandler<T>(result);
    }

    public static class ClientRequestStream<T>
    implements Flow.Subscriber<T> {
        private final Logger log = LoggerFactory.getLogger(this.getClass());
        private final StreamObserver<T> grpcObserver;
        private final AtomicBoolean subscribed = new AtomicBoolean(false);
        private Flow.Subscription subscription;

        public ClientRequestStream(StreamObserver<T> grpcObserver) {
            this.grpcObserver = grpcObserver;
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            boolean subscribedOk = this.subscribed.compareAndSet(false, true);
            if (!subscribedOk) {
                throw new ETracInternal("Multiple subscriptions on gRPC observer wrapper");
            }
            this.subscription = subscription;
            subscription.request(1L);
        }

        @Override
        public void onNext(T item) {
            this.grpcObserver.onNext(item);
            this.subscription.request(1L);
        }

        @Override
        public void onError(Throwable error) {
            StatusRuntimeException grpcError = GrpcErrorMapping.processError((Throwable)error);
            this.log.error("Client streaming failed in client: {}", (Object)grpcError.getMessage(), (Object)grpcError);
            this.grpcObserver.onError((Throwable)grpcError);
            this.subscription.cancel();
        }

        @Override
        public void onComplete() {
            this.log.info("Client streaming succeeded in client");
            this.grpcObserver.onCompleted();
        }
    }

    public static class ClientResponseStream<T>
    implements StreamObserver<T> {
        private final Logger log = LoggerFactory.getLogger(this.getClass());
        private final Flow.Subscriber<T> subscriber;

        public ClientResponseStream(Flow.Subscriber<T> subscriber) {
            this.subscriber = subscriber;
            Subscription subscription = new Subscription();
            subscriber.onSubscribe(subscription);
        }

        public void onNext(T value) {
            this.subscriber.onNext(value);
        }

        public void onError(Throwable error) {
            this.log.error("Server streaming failed in client: {}", (Object)error.getMessage(), (Object)error);
            this.subscriber.onError(error);
        }

        public void onCompleted() {
            this.log.info("Server streaming succeeded in client");
            this.subscriber.onComplete();
        }

        public static class Subscription
        implements Flow.Subscription {
            @Override
            public void request(long n) {
            }

            @Override
            public void cancel() {
            }
        }
    }

    public static class ClientResultHandler<T>
    implements StreamObserver<T> {
        private final CompletableFuture<T> resultFuture;
        private final AtomicReference<T> resultBuffer;

        public ClientResultHandler(CompletableFuture<T> result) {
            if (result.isDone()) {
                throw new EUnexpected();
            }
            this.resultFuture = result;
            this.resultBuffer = new AtomicReference<Object>(null);
        }

        public void onNext(T resultValue) {
            boolean bufferedOk = this.resultBuffer.compareAndSet(null, resultValue);
            if (!bufferedOk) {
                throw new EUnexpected();
            }
        }

        public void onError(Throwable error) {
            boolean errorOk = this.resultFuture.completeExceptionally(error);
            if (!errorOk) {
                throw new EUnexpected();
            }
        }

        public void onCompleted() {
            T resultValue = this.resultBuffer.get();
            if (resultValue == null) {
                throw new EUnexpected();
            }
            boolean resultOk = this.resultFuture.complete(resultValue);
            if (!resultOk) {
                throw new EUnexpected();
            }
        }
    }
}

