/*
 * Decompiled with CFR 0.152.
 */
package com.digitalasset.grpc.adapter.client.rs;

import com.digitalasset.grpc.adapter.ExecutionSequencer;
import com.digitalasset.grpc.adapter.client.rs.DownstreamEventBuffer;
import io.grpc.stub.ClientCallStreamObserver;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class BufferingSubscription<Resp>
implements Subscription {
    private static final Logger logger = LoggerFactory.getLogger(BufferingSubscription.class);
    @Nonnull
    private final Consumer<Subscriber<? super Resp>> onFirstDemand;
    @Nonnull
    private final Supplier<ClientCallStreamObserver> requestObserverSupplier;
    @Nonnull
    private final ExecutionSequencer es;
    @Nonnull
    private final Subscriber<? super Resp> subscriber;
    @Nonnull
    private final String logPrefix;
    @Nonnull
    private final DownstreamEventBuffer downstreamEventBuffer;
    private boolean callStarted = false;
    @Nonnegative
    private int unsatisfiedDemand = 1;
    private boolean cancelled = false;
    private boolean firstRequestPending = true;

    BufferingSubscription(@Nonnull Consumer<Subscriber<? super Resp>> onFirstDemand, @Nonnull Supplier<ClientCallStreamObserver> requestObserverSupplier, @Nonnull ExecutionSequencer es, @Nonnull Subscriber<? super Resp> subscriber, @Nonnull String logPrefix) {
        this.onFirstDemand = onFirstDemand;
        this.requestObserverSupplier = requestObserverSupplier;
        this.es = es;
        this.subscriber = subscriber;
        this.logPrefix = logPrefix;
        this.downstreamEventBuffer = new DownstreamEventBuffer(this::propagateCancellation, this::propagateIntegerDemand, logPrefix);
    }

    @Nonnull
    public Subscriber<? super Resp> getSubscriber() {
        return this.subscriber;
    }

    boolean isCancelled() {
        return this.cancelled;
    }

    boolean isFirstRequestPending() {
        return this.firstRequestPending;
    }

    @Override
    public void request(long demand) {
        logger.trace("{}RS downstream signaled demand for {} elements.", (Object)this.logPrefix, (Object)demand);
        if (demand < 1L) {
            this.terminateStream(String.format("Subscriber signaled non-positive subscription request (%d)", demand));
        } else {
            this.es.sequence(() -> {
                if (!this.cancelled) {
                    this.accumulateDemand(demand);
                    this.propagateDemandIfNoElemsPending();
                } else {
                    logger.trace("{}Swallowing demand from gRPC upstream due to previous cancellation.", (Object)this.logPrefix);
                }
            });
        }
    }

    private void propagateDemandIfNoElemsPending() {
        ClientCallStreamObserver clientCallStreamObserver = this.requestObserverSupplier.get();
        if (clientCallStreamObserver != null && this.callStarted && this.unsatisfiedDemand == 0) {
            this.unsatisfiedDemand = this.downstreamEventBuffer.propagateCancellationOrDemand(clientCallStreamObserver);
        }
    }

    private void terminateStream(String cause) {
        this.subscriber.onError(new IllegalArgumentException(cause));
        this.cancel();
    }

    private void accumulateDemand(long demand) {
        if (this.firstRequestPending) {
            this.firstRequestPending = false;
            this.onFirstDemand.accept(this.subscriber);
            this.downstreamEventBuffer.bufferDemand(demand - 1L);
        } else {
            this.downstreamEventBuffer.bufferDemand(demand);
        }
    }

    private void propagateIntegerDemand(@Nonnull ClientCallStreamObserver requestObserver, @Nonnegative int demand) {
        requestObserver.request(demand);
    }

    @Override
    public void cancel() {
        logger.trace("{}RS downstream signaled cancellation.", (Object)this.logPrefix);
        this.es.sequence(() -> {
            this.cancelled = true;
            ClientCallStreamObserver requestObserver = this.requestObserverSupplier.get();
            if (requestObserver != null && this.callStarted) {
                this.propagateCancellation(requestObserver);
            } else {
                this.downstreamEventBuffer.bufferCancellation();
            }
        });
    }

    private void propagateCancellation(@Nonnull ClientCallStreamObserver requestObserver) {
        requestObserver.cancel("Client cancelled the call.", null);
    }

    void onNextElement(@Nonnull ClientCallStreamObserver requestObserver) {
        this.callStarted = true;
        if (--this.unsatisfiedDemand == 0) {
            this.unsatisfiedDemand = this.downstreamEventBuffer.propagateCancellationOrDemand(requestObserver);
        }
    }

    void onStreamClosure() {
        this.callStarted = true;
    }
}

