package datadog.trace.instrumentation.ratpack;

import datadog.appsec.api.blocking.BlockingException;
import datadog.trace.api.gateway.BlockResponseFunction;
import datadog.trace.api.gateway.Flow;
import datadog.trace.api.gateway.RequestContext;
import datadog.trace.api.http.StoredByteBody;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.stream.TransformablePublisher;

/* loaded from: input_file:inst/datadog/trace/instrumentation/ratpack/RequestBodyCollectionPublisher.classdata */
public class RequestBodyCollectionPublisher implements TransformablePublisher<ByteBuf> {
    private final ByteBufIntoByteBufferCallback cb = new ByteBufIntoByteBufferCallback();
    private final StoredByteBody storedByteBody;
    private final Publisher<ByteBuf> input;

    /* loaded from: input_file:inst/datadog/trace/instrumentation/ratpack/RequestBodyCollectionPublisher$ByteBufIntoByteBufferCallback.classdata */
    public static class ByteBufIntoByteBufferCallback implements StoredByteBody.ByteBufferWriteCallback {
        ByteBuf byteBuf;

        @Override // datadog.trace.api.http.StoredByteBody.ByteBufferWriteCallback
        public void put(ByteBuffer byteBuffer) {
            this.byteBuf.readBytes(byteBuffer);
        }
    }

    public RequestBodyCollectionPublisher(StoredByteBody storedByteBody, Publisher<ByteBuf> publisher) {
        this.storedByteBody = storedByteBody;
        this.input = publisher;
    }

    public void subscribe(final Subscriber<? super ByteBuf> subscriber) {
        this.input.subscribe(new Subscriber<ByteBuf>() { // from class: datadog.trace.instrumentation.ratpack.RequestBodyCollectionPublisher.1
            private final AtomicBoolean done = new AtomicBoolean();

            public void onSubscribe(Subscription subscription) {
                subscriber.onSubscribe(subscription);
            }

            public void onNext(ByteBuf byteBuf) {
                RequestBodyCollectionPublisher.this.cb.byteBuf = byteBuf.duplicate();
                RequestBodyCollectionPublisher.this.storedByteBody.appendData(RequestBodyCollectionPublisher.this.cb, RequestBodyCollectionPublisher.this.cb.byteBuf.readableBytes());
                if (this.done.get()) {
                    return;
                }
                subscriber.onNext(byteBuf);
            }

            public void onError(Throwable th) {
                Flow.Action action = RequestBodyCollectionPublisher.this.storedByteBody.maybeNotify().getAction();
                if (action instanceof Flow.Action.RequestBlockingAction) {
                    block((Flow.Action.RequestBlockingAction) action, th);
                } else if (this.done.compareAndSet(false, true)) {
                    subscriber.onError(th);
                }
            }

            public void onComplete() {
                Flow.Action action = RequestBodyCollectionPublisher.this.storedByteBody.maybeNotify().getAction();
                if (action instanceof Flow.Action.RequestBlockingAction) {
                    block((Flow.Action.RequestBlockingAction) action, new BlockingException("Blocked request (for RequestBody/readStream)"));
                } else if (this.done.compareAndSet(false, true)) {
                    subscriber.onComplete();
                }
            }

            private void block(Flow.Action.RequestBlockingAction requestBlockingAction, Throwable th) {
                RequestContext requestContext;
                BlockResponseFunction blockResponseFunction;
                AgentSpan activeSpan = AgentTracer.activeSpan();
                if (activeSpan == null || (blockResponseFunction = (requestContext = activeSpan.getRequestContext()).getBlockResponseFunction()) == null) {
                    return;
                }
                blockResponseFunction.tryCommitBlockingResponse(requestContext.getTraceSegment(), requestBlockingAction.getStatusCode(), requestBlockingAction.getBlockingContentType(), requestBlockingAction.getExtraHeaders());
                if (this.done.compareAndSet(false, true)) {
                    subscriber.onError(th);
                }
            }
        });
    }
}
