package com.daml.grpc.adapter.server.rs;

import com.daml.grpc.adapter.CallCounter;
import com.daml.grpc.adapter.ExecutionSequencer;
import io.grpc.Status;
import io.grpc.stub.ServerCallStreamObserver;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/daml/grpc/adapter/server/rs/ServerSubscriber.class */
public class ServerSubscriber<Resp> implements Subscriber<Resp> {
    private static final Logger logger = LoggerFactory.getLogger(ServerSubscriber.class);

    @Nonnull
    private final ServerCallStreamObserver<Resp> responseObserver;

    @Nonnull
    private final ExecutionSequencer executionSequencer;

    @Nonnull
    private final CompletableFuture<Void> completionPromise = new CompletableFuture<>();

    @Nonnull
    private final String logPrefix = String.format("Call %d: ", Long.valueOf(CallCounter.getNewCallId()));
    private final ServerSubscriber<Resp>.BufferingEventHandler onReadyHandler = new BufferingEventHandler(subscription -> {
        return () -> {
            logger.trace("{}gRPC downstream is ready. Demanding new response from RS upstream.", this.logPrefix);
            subscription.request(1L);
        };
    }, "demand");
    private final ServerSubscriber<Resp>.BufferingEventHandler onCancelHandler = new BufferingEventHandler(subscription -> {
        return () -> {
            logger.trace("{}gRPC downstream canceled.", this.logPrefix);
            subscription.cancel();
            this.completionPromise.complete(null);
        };
    }, "cancellation");
    private boolean subscribed = false;
    public CompletableFuture<Void> completionFuture = this.completionPromise;

    /* loaded from: input_file:com/daml/grpc/adapter/server/rs/ServerSubscriber$BufferingEventHandler.class */
    private class BufferingEventHandler implements Runnable {

        @Nonnull
        private final Function<Subscription, Runnable> getPropagatingEventHandler;

        @Nullable
        private Runnable propagatingEventHandler;

        @Nonnull
        private final String eventKind;
        private boolean eventBuffered = false;

        @Nonnull
        private volatile Runnable currentEventHandler = () -> {
            ServerSubscriber.this.executionSequencer.sequence(() -> {
                if (this.propagatingEventHandler == null) {
                    bufferEvent();
                } else {
                    this.propagatingEventHandler.run();
                }
            });
        };

        @Override // java.lang.Runnable
        public void run() {
            this.currentEventHandler.run();
        }

        BufferingEventHandler(@Nonnull Function<Subscription, Runnable> function, @Nonnull String str) {
            this.getPropagatingEventHandler = function;
            this.eventKind = str;
        }

        private void bufferEvent() {
            ServerSubscriber.logger.trace("{}Buffered {} event from gRPC downstream.", ServerSubscriber.this.logPrefix, this.eventKind);
            this.eventBuffered = true;
        }

        void onSubscribe(@Nonnull Subscription subscription) {
            this.propagatingEventHandler = this.getPropagatingEventHandler.apply(subscription);
            this.currentEventHandler = () -> {
                ServerSubscriber.this.executionSequencer.sequence(this.propagatingEventHandler);
            };
            if (this.eventBuffered) {
                ServerSubscriber.logger.trace("{}Flushing {} event to RS upstream.", ServerSubscriber.this.logPrefix, this.eventKind);
                this.propagatingEventHandler.run();
                this.eventBuffered = false;
            }
        }
    }

    public ServerSubscriber(@Nonnull ServerCallStreamObserver<Resp> serverCallStreamObserver, @Nonnull ExecutionSequencer executionSequencer) {
        this.responseObserver = serverCallStreamObserver;
        this.executionSequencer = executionSequencer;
        serverCallStreamObserver.disableAutoInboundFlowControl();
        serverCallStreamObserver.setOnReadyHandler(this.onReadyHandler);
        serverCallStreamObserver.setOnCancelHandler(this.onCancelHandler);
    }

    public void onSubscribe(Subscription subscription) {
        if (subscription == null) {
            throw new NullPointerException("RS upstream called onSubscribe with null argument.");
        }
        logger.trace("{}RS upstream subscription registered. Setting up onCancelHandler and onReadyHandler handlers.", this.logPrefix);
        this.executionSequencer.sequence(() -> {
            if (this.subscribed) {
                subscription.cancel();
                return;
            }
            this.onCancelHandler.onSubscribe(subscription);
            this.onReadyHandler.onSubscribe(subscription);
            this.subscribed = true;
        });
    }

    public void onNext(Resp resp) {
        if (resp == null) {
            throw new NullPointerException("RS upstream called onNext with null argument.");
        }
        logger.trace("{}RS upstream emitted response message {}.", this.logPrefix, resp);
        this.executionSequencer.sequence(() -> {
            if (this.responseObserver.isCancelled()) {
                return;
            }
            this.responseObserver.onNext(resp);
            if (this.responseObserver.isReady()) {
                this.onReadyHandler.run();
            } else {
                logger.trace("{}Backpressuring as gRPC downstream is not ready for next element.", this.logPrefix);
            }
        });
    }

    public void onError(Throwable th) {
        if (th == null) {
            throw new NullPointerException("RS upstream called onError with null argument.");
        }
        logger.trace("{}RS upstream failed.", this.logPrefix, th);
        this.executionSequencer.sequence(() -> {
            if (this.responseObserver.isCancelled()) {
                return;
            }
            this.responseObserver.onError(Status.fromThrowable(th).asRuntimeException());
            this.completionPromise.completeExceptionally(th);
        });
    }

    public void onComplete() {
        logger.trace("{}RS upstream completed.", this.logPrefix);
        this.executionSequencer.sequence(() -> {
            if (this.responseObserver.isCancelled()) {
                return;
            }
            this.responseObserver.onCompleted();
            this.completionPromise.complete(null);
        });
    }
}
