package io.servicetalk.http.netty;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpLifecycleObserver;
import io.servicetalk.http.api.HttpRequestMetaData;
import io.servicetalk.http.api.HttpResponseMetaData;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.api.StreamingHttpService;
import io.servicetalk.http.api.StreamingHttpServiceFilter;
import io.servicetalk.http.api.StreamingHttpServiceFilterFactory;
import io.servicetalk.http.netty.NoopHttpLifecycleObserver;
import io.servicetalk.transport.api.ConnectionInfo;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.class */
public final class HttpMessageDiscardWatchdogServiceFilter implements StreamingHttpServiceFilterFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpMessageDiscardWatchdogServiceFilter.class);
    static final StreamingHttpServiceFilterFactory INSTANCE = new HttpMessageDiscardWatchdogServiceFilter();
    static final StreamingHttpServiceFilterFactory CLEANER = new HttpLifecycleObserverServiceFilter(new CleanerHttpLifecycleObserver());
    private static final ContextMap.Key<AtomicReference<Publisher<?>>> MESSAGE_PUBLISHER_KEY = ContextMap.Key.newKey(HttpMessageDiscardWatchdogServiceFilter.class.getName() + ".messagePublisher", generifyAtomicReference());

    /* loaded from: input_file:io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter$CleanerHttpLifecycleObserver.class */
    private static final class CleanerHttpLifecycleObserver implements HttpLifecycleObserver {
        private CleanerHttpLifecycleObserver() {
        }

        public HttpLifecycleObserver.HttpExchangeObserver onNewExchange() {
            return new HttpLifecycleObserver.HttpExchangeObserver() { // from class: io.servicetalk.http.netty.HttpMessageDiscardWatchdogServiceFilter.CleanerHttpLifecycleObserver.1

                @Nullable
                private ContextMap requestContext;

                public HttpLifecycleObserver.HttpRequestObserver onRequest(HttpRequestMetaData httpRequestMetaData) {
                    this.requestContext = httpRequestMetaData.context();
                    return NoopHttpLifecycleObserver.NoopHttpRequestObserver.INSTANCE;
                }

                public HttpLifecycleObserver.HttpResponseObserver onResponse(HttpResponseMetaData httpResponseMetaData) {
                    return NoopHttpLifecycleObserver.NoopHttpResponseObserver.INSTANCE;
                }

                public void onExchangeFinally() {
                    AtomicReference atomicReference;
                    if (this.requestContext == null || (atomicReference = (AtomicReference) this.requestContext.get(HttpMessageDiscardWatchdogServiceFilter.MESSAGE_PUBLISHER_KEY)) == null || atomicReference.get() == null) {
                        return;
                    }
                    HttpMessageDiscardWatchdogServiceFilter.LOGGER.warn("Discovered un-drained HTTP response message body which has been dropped by user code - this is a strong indication of a bug in a user-defined filter. Responses (or their message body) must be fully consumed before discarding.");
                }

                public void onConnectionSelected(ConnectionInfo connectionInfo) {
                }

                public void onResponseError(Throwable th) {
                }

                public void onResponseCancel() {
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter$NoopSubscriber.class */
    public static final class NoopSubscriber implements PublisherSource.Subscriber<Object> {
        static final NoopSubscriber INSTANCE = new NoopSubscriber();

        private NoopSubscriber() {
        }

        public void onSubscribe(PublisherSource.Subscription subscription) {
        }

        public void onNext(@Nullable Object obj) {
        }

        public void onError(Throwable th) {
        }

        public void onComplete() {
        }
    }

    private HttpMessageDiscardWatchdogServiceFilter() {
    }

    public StreamingHttpServiceFilter create(StreamingHttpService streamingHttpService) {
        return new StreamingHttpServiceFilter(streamingHttpService) { // from class: io.servicetalk.http.netty.HttpMessageDiscardWatchdogServiceFilter.1
            static final /* synthetic */ boolean $assertionsDisabled;

            public Single<StreamingHttpResponse> handle(HttpServiceContext httpServiceContext, StreamingHttpRequest streamingHttpRequest, StreamingHttpResponseFactory streamingHttpResponseFactory) {
                return delegate().handle(httpServiceContext, streamingHttpRequest, streamingHttpResponseFactory).map(streamingHttpResponse -> {
                    AtomicReference atomicReference = (AtomicReference) streamingHttpRequest.context().computeIfAbsent(HttpMessageDiscardWatchdogServiceFilter.MESSAGE_PUBLISHER_KEY, key -> {
                        return new AtomicReference();
                    });
                    if (!$assertionsDisabled && atomicReference == null) {
                        throw new AssertionError();
                    }
                    if (atomicReference.getAndSet(streamingHttpResponse.messageBody()) != null) {
                        HttpMessageDiscardWatchdogServiceFilter.LOGGER.warn("Discovered un-drained HTTP response message body which has been dropped by user code - this is a strong indication of a bug in a user-defined filter. Responses (or their message body) must be fully consumed before retrying.");
                    }
                    return streamingHttpResponse.transformMessageBody(publisher -> {
                        return publisher.beforeSubscriber(() -> {
                            atomicReference.set(null);
                            return NoopSubscriber.INSTANCE;
                        });
                    });
                });
            }

            static {
                $assertionsDisabled = !HttpMessageDiscardWatchdogServiceFilter.class.desiredAssertionStatus();
            }
        };
    }

    /* renamed from: requiredOffloads, reason: merged with bridge method [inline-methods] */
    public HttpExecutionStrategy m112requiredOffloads() {
        return HttpExecutionStrategies.offloadNone();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> Class<T> generifyAtomicReference() {
        return AtomicReference.class;
    }
}
