/*
 * Decompiled with CFR 0.152.
 */
package com.digitalasset.grpc.adapter.server.rs;

import com.digitalasset.grpc.adapter.CallCounter;
import com.digitalasset.grpc.adapter.ExecutionSequencer;
import io.grpc.Status;
import io.grpc.stub.ServerCallStreamObserver;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServerSubscriber<Resp>
implements Subscriber<Resp> {
    private static final Logger logger = LoggerFactory.getLogger(ServerSubscriber.class);
    @Nonnull
    private final ServerCallStreamObserver<Resp> responseObserver;
    @Nonnull
    private final CompletableFuture<Void> completionPromise = new CompletableFuture();
    @Nonnull
    private final ExecutionSequencer executionSequencer;
    @Nonnull
    private final String logPrefix = String.format("Call %d: ", CallCounter.getNewCallId());
    private final BufferingEventHandler onReadyHandler = new BufferingEventHandler(subscription -> () -> {
        logger.trace("{}gRPC downstream is ready. Demanding new response from RS upstream.", (Object)this.logPrefix);
        subscription.request(1L);
    }, "demand");
    private final BufferingEventHandler onCancelHandler = new BufferingEventHandler(subscription -> () -> {
        logger.trace("{}gRPC downstream canceled.", (Object)this.logPrefix);
        subscription.cancel();
        this.completionPromise.complete(null);
    }, "cancellation");
    private boolean subscribed = false;
    public CompletableFuture<Void> completionFuture = this.completionPromise;

    public ServerSubscriber(@Nonnull ServerCallStreamObserver<Resp> responseObserver, @Nonnull ExecutionSequencer executionSequencer) {
        this.responseObserver = responseObserver;
        this.executionSequencer = executionSequencer;
        responseObserver.disableAutoInboundFlowControl();
        responseObserver.setOnReadyHandler(this.onReadyHandler);
        responseObserver.setOnCancelHandler(this.onCancelHandler);
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        if (subscription == null) {
            throw new NullPointerException("RS upstream called onSubscribe with null argument.");
        }
        logger.trace("{}RS upstream subscription registered. Setting up onCancelHandler and onReadyHandler handlers.", (Object)this.logPrefix);
        this.executionSequencer.sequence(() -> {
            if (!this.subscribed) {
                this.onCancelHandler.onSubscribe(subscription);
                this.onReadyHandler.onSubscribe(subscription);
                this.subscribed = true;
            } else {
                subscription.cancel();
            }
        });
    }

    @Override
    public void onNext(Resp response) {
        if (response == null) {
            throw new NullPointerException("RS upstream called onNext with null argument.");
        }
        logger.trace("{}RS upstream emitted response message {}.", (Object)this.logPrefix, (Object)response);
        this.executionSequencer.sequence(() -> {
            if (!this.responseObserver.isCancelled()) {
                this.responseObserver.onNext(response);
                if (this.responseObserver.isReady()) {
                    this.onReadyHandler.run();
                } else {
                    logger.trace("{}Backpressuring as gRPC downstream is not ready for next element.", (Object)this.logPrefix);
                }
            }
        });
    }

    @Override
    public void onError(Throwable throwable2) {
        if (throwable2 == null) {
            throw new NullPointerException("RS upstream called onError with null argument.");
        }
        logger.trace("{}RS upstream failed.", (Object)this.logPrefix, (Object)throwable2);
        this.executionSequencer.sequence(() -> {
            if (!this.responseObserver.isCancelled()) {
                this.responseObserver.onError(Status.fromThrowable(throwable2).asRuntimeException());
                this.completionPromise.completeExceptionally(throwable2);
            }
        });
    }

    @Override
    public void onComplete() {
        logger.trace("{}RS upstream completed.", (Object)this.logPrefix);
        this.executionSequencer.sequence(() -> {
            if (!this.responseObserver.isCancelled()) {
                this.responseObserver.onCompleted();
                this.completionPromise.complete(null);
            }
        });
    }

    private class BufferingEventHandler
    implements Runnable {
        private boolean eventBuffered = false;
        @Nonnull
        private final Function<Subscription, Runnable> getPropagatingEventHandler;
        @Nullable
        private Runnable propagatingEventHandler;
        @Nonnull
        private final String eventKind;
        @Nonnull
        private volatile Runnable currentEventHandler;

        @Override
        public void run() {
            this.currentEventHandler.run();
        }

        BufferingEventHandler(@Nonnull Function<Subscription, Runnable> getPropagatingEventHandler, String eventKind) {
            this.getPropagatingEventHandler = getPropagatingEventHandler;
            this.eventKind = eventKind;
            this.currentEventHandler = () -> ServerSubscriber.this.executionSequencer.sequence(() -> {
                if (this.propagatingEventHandler == null) {
                    this.bufferEvent();
                } else {
                    this.propagatingEventHandler.run();
                }
            });
        }

        private void bufferEvent() {
            logger.trace("{}Buffered {} event from gRPC downstream.", (Object)ServerSubscriber.this.logPrefix, (Object)this.eventKind);
            this.eventBuffered = true;
        }

        void onSubscribe(@Nonnull Subscription subscription) {
            this.propagatingEventHandler = this.getPropagatingEventHandler.apply(subscription);
            this.currentEventHandler = () -> ServerSubscriber.this.executionSequencer.sequence(this.propagatingEventHandler);
            if (this.eventBuffered) {
                logger.trace("{}Flushing {} event to RS upstream.", (Object)ServerSubscriber.this.logPrefix, (Object)this.eventKind);
                this.propagatingEventHandler.run();
                this.eventBuffered = false;
            }
        }
    }
}

