package io.servicetalk.http.netty;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderException;
import io.servicetalk.buffer.netty.BufferUtils;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AsyncCloseable;
import io.servicetalk.concurrent.api.AsyncCloseables;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.api.internal.SubscribableCompletable;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;
import io.servicetalk.concurrent.internal.RejectedSubscribeError;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.http.api.DefaultHttpExecutionContext;
import io.servicetalk.http.api.EmptyHttpHeaders;
import io.servicetalk.http.api.HttpExecutionContext;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpHeaderNames;
import io.servicetalk.http.api.HttpHeaderValues;
import io.servicetalk.http.api.HttpHeaders;
import io.servicetalk.http.api.HttpHeadersFactory;
import io.servicetalk.http.api.HttpProtocolVersion;
import io.servicetalk.http.api.HttpRequestMethod;
import io.servicetalk.http.api.HttpResponseMetaData;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequests;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpService;
import io.servicetalk.tcp.netty.internal.ReadOnlyTcpServerConfig;
import io.servicetalk.tcp.netty.internal.TcpServerBinder;
import io.servicetalk.tcp.netty.internal.TcpServerChannelInitializer;
import io.servicetalk.transport.api.ConnectionAcceptor;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.api.ExecutionContext;
import io.servicetalk.transport.api.ServerContext;
import io.servicetalk.transport.netty.internal.ChannelInitializer;
import io.servicetalk.transport.netty.internal.CloseHandler;
import io.servicetalk.transport.netty.internal.CopyByteBufHandlerChannelInitializer;
import io.servicetalk.transport.netty.internal.DefaultNettyConnection;
import io.servicetalk.transport.netty.internal.FlushStrategy;
import io.servicetalk.transport.netty.internal.NettyConnection;
import io.servicetalk.transport.netty.internal.NettyConnectionContext;
import io.servicetalk.transport.netty.internal.SplittingFlushStrategy;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.net.ssl.SSLSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/servicetalk/http/netty/NettyHttpServer.class */
public final class NettyHttpServer {
    private static final Logger LOGGER;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/servicetalk/http/netty/NettyHttpServer$ErrorLoggingHttpSubscriber.class */
    public static final class ErrorLoggingHttpSubscriber implements CompletableSource.Subscriber {
        private static final Logger LOGGER = LoggerFactory.getLogger(ErrorLoggingHttpSubscriber.class);

        private ErrorLoggingHttpSubscriber() {
        }

        public void onSubscribe(Cancellable cancellable) {
        }

        public void onComplete() {
        }

        public void onError(Throwable th) {
            if (th instanceof CloseHandler.CloseEventObservedException) {
                if (((CloseHandler.CloseEventObservedException) th).event() == CloseHandler.CloseEvent.CHANNEL_CLOSED_INBOUND && (th.getCause() instanceof ClosedChannelException)) {
                    LOGGER.trace("Client closed the connection without sending 'Connection: close' header", th);
                    return;
                } else if (th.getCause() instanceof DecoderException) {
                    LOGGER.warn("Can not decode HTTP message, no more requests will be received on this connection.", th);
                    return;
                }
            }
            LOGGER.debug("Unexpected error received while processing connection, {}", "no more requests will be received on this connection.", th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/servicetalk/http/netty/NettyHttpServer$NettyHttpServerConnection.class */
    public static final class NettyHttpServerConnection extends HttpServiceContext implements NettyConnectionContext {
        private static final Logger LOGGER = LoggerFactory.getLogger(NettyHttpServerConnection.class);
        private final StreamingHttpService service;
        private final HttpExecutionStrategy strategy;
        private final NettyConnection<Object, Object> connection;
        private final HttpHeadersFactory headersFactory;
        private final HttpExecutionContext executionContext;
        private final SplittingFlushStrategy splittingFlushStrategy;
        private final boolean drainRequestPayloadBody;
        private final boolean requireTrailerHeader;

        /* JADX INFO: Access modifiers changed from: package-private */
        public NettyHttpServerConnection(NettyConnection<Object, Object> nettyConnection, StreamingHttpService streamingHttpService, HttpExecutionStrategy httpExecutionStrategy, HttpProtocolVersion httpProtocolVersion, HttpHeadersFactory httpHeadersFactory, boolean z, boolean z2) {
            super(httpHeadersFactory, new DefaultHttpResponseFactory(httpHeadersFactory, nettyConnection.executionContext().bufferAllocator(), httpProtocolVersion), new DefaultStreamingHttpResponseFactory(httpHeadersFactory, nettyConnection.executionContext().bufferAllocator(), httpProtocolVersion), new DefaultBlockingStreamingHttpResponseFactory(httpHeadersFactory, nettyConnection.executionContext().bufferAllocator(), httpProtocolVersion));
            this.connection = nettyConnection;
            this.headersFactory = httpHeadersFactory;
            this.executionContext = new DefaultHttpExecutionContext(nettyConnection.executionContext().bufferAllocator(), nettyConnection.executionContext().ioExecutor(), nettyConnection.executionContext().executor(), httpExecutionStrategy);
            this.service = streamingHttpService;
            this.strategy = httpExecutionStrategy;
            this.splittingFlushStrategy = new SplittingFlushStrategy(nettyConnection.defaultFlushStrategy(), obj -> {
                return obj instanceof HttpResponseMetaData ? SplittingFlushStrategy.FlushBoundaryProvider.FlushBoundary.Start : obj instanceof HttpHeaders ? SplittingFlushStrategy.FlushBoundaryProvider.FlushBoundary.End : SplittingFlushStrategy.FlushBoundaryProvider.FlushBoundary.InProgress;
            });
            nettyConnection.updateFlushStrategy((flushStrategy, z3) -> {
                return this.splittingFlushStrategy;
            });
            this.drainRequestPayloadBody = z;
            this.requireTrailerHeader = z2;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void process(boolean z) {
            SourceAdapters.toSource(handleRequestAndWriteResponse(this.connection.read().liftSyncToSingle(new SpliceFlatStreamToMetaSingle((httpRequestMetaData, publisher) -> {
                return StreamingHttpRequests.newTransportRequest(httpRequestMetaData.method(), httpRequestMetaData.requestTarget(), httpRequestMetaData.version(), httpRequestMetaData.headers(), m245executionContext().bufferAllocator(), publisher, this.requireTrailerHeader, this.headersFactory);
            })), z)).subscribe(new ErrorLoggingHttpSubscriber());
        }

        public Cancellable updateFlushStrategy(NettyConnectionContext.FlushStrategyProvider flushStrategyProvider) {
            return this.splittingFlushStrategy.updateFlushStrategy(flushStrategyProvider);
        }

        public FlushStrategy defaultFlushStrategy() {
            return this.connection.defaultFlushStrategy();
        }

        private Completable handleRequestAndWriteResponse(Single<StreamingHttpRequest> single, boolean z) {
            Publisher flatMapPublisher = single.flatMapPublisher(streamingHttpRequest -> {
                SingleSubscriberProcessor singleSubscriberProcessor = new SingleSubscriberProcessor();
                AtomicBoolean atomicBoolean = this.drainRequestPayloadBody ? new AtomicBoolean() : null;
                StreamingHttpRequest transformMessageBody = streamingHttpRequest.transformMessageBody(publisher -> {
                    return publisher.afterSubscriber(() -> {
                        if (this.drainRequestPayloadBody) {
                            atomicBoolean.set(true);
                        }
                        return new PublisherSource.Subscriber<Object>() { // from class: io.servicetalk.http.netty.NettyHttpServer.NettyHttpServerConnection.1
                            public void onSubscribe(PublisherSource.Subscription subscription) {
                            }

                            public void onNext(Object obj) {
                            }

                            public void onError(Throwable th) {
                                if (NettyHttpServerConnection.this.drainRequestPayloadBody && (th instanceof RejectedSubscribeError)) {
                                    return;
                                }
                                singleSubscriberProcessor.onComplete();
                            }

                            public void onComplete() {
                                singleSubscriberProcessor.onComplete();
                            }
                        };
                    });
                });
                HttpRequestMethod method = transformMessageBody.method();
                HttpKeepAlive responseKeepAlive = HttpKeepAlive.responseKeepAlive(transformMessageBody);
                Publisher invokeService = this.strategy.invokeService(m245executionContext().executor(), transformMessageBody, streamingHttpRequest -> {
                    return this.service.handle(this, streamingHttpRequest, streamingResponseFactory()).recoverWith(th -> {
                        return Single.succeeded(newErrorResponse(th, this.executionContext.executor(), streamingHttpRequest.version(), responseKeepAlive));
                    }).flatMapPublisher(streamingHttpResponse -> {
                        responseKeepAlive.addConnectionHeaderIfNecessary(streamingHttpResponse);
                        FlushStrategy determineFlushStrategyForApi = AbstractStreamingHttpConnection.determineFlushStrategyForApi(streamingHttpResponse);
                        if (determineFlushStrategyForApi != null) {
                            this.splittingFlushStrategy.updateFlushStrategy((flushStrategy, z2) -> {
                                return z2 ? determineFlushStrategyForApi : flushStrategy;
                            }, 1);
                        }
                        return handleResponse(method, streamingHttpResponse);
                    });
                }, (th, executor) -> {
                    return Publisher.from(new Object[]{newErrorResponse(th, executor, transformMessageBody.version(), responseKeepAlive), EmptyHttpHeaders.INSTANCE});
                });
                if (this.drainRequestPayloadBody) {
                    invokeService = invokeService.concat(Completable.defer(() -> {
                        return atomicBoolean.get() ? Completable.completed() : transformMessageBody.messageBody().ignoreElements().onErrorResume(th2 -> {
                            return Completable.completed();
                        });
                    }));
                }
                return invokeService.concat(singleSubscriberProcessor);
            });
            return this.connection.write(z ? flatMapPublisher.repeat(i -> {
                return true;
            }) : flatMapPublisher);
        }

        @Nonnull
        private static Publisher<Object> handleResponse(HttpRequestMethod httpRequestMethod, StreamingHttpResponse streamingHttpResponse) {
            if (HeaderUtils.canAddResponseContentLength(streamingHttpResponse, httpRequestMethod)) {
                return HeaderUtils.setResponseContentLength(streamingHttpResponse);
            }
            Publisher<Object> scanWith = Publisher.from(streamingHttpResponse).concat(streamingHttpResponse.messageBody()).scanWith(HeaderUtils::insertTrailersMapper);
            HeaderUtils.addResponseTransferEncodingIfNecessary(streamingHttpResponse, httpRequestMethod);
            return scanWith;
        }

        private StreamingHttpResponse newErrorResponse(Throwable th, Executor executor, HttpProtocolVersion httpProtocolVersion, HttpKeepAlive httpKeepAlive) {
            StreamingHttpResponse internalServerError;
            if (th instanceof RejectedExecutionException) {
                LOGGER.error("Task rejected by Executor {} for service={}, connection={}", new Object[]{executor, this.service, this, th});
                internalServerError = streamingResponseFactory().serviceUnavailable();
            } else {
                LOGGER.error("Internal server error service={} connection={}", new Object[]{this.service, this, th});
                internalServerError = streamingResponseFactory().internalServerError();
            }
            internalServerError.version(httpProtocolVersion).setHeader(HttpHeaderNames.CONTENT_LENGTH, HttpHeaderValues.ZERO);
            httpKeepAlive.addConnectionHeaderIfNecessary(internalServerError);
            return internalServerError;
        }

        public SocketAddress localAddress() {
            return this.connection.localAddress();
        }

        public SocketAddress remoteAddress() {
            return this.connection.remoteAddress();
        }

        @Nullable
        public SSLSession sslSession() {
            return this.connection.sslSession();
        }

        /* renamed from: executionContext, reason: merged with bridge method [inline-methods] */
        public HttpExecutionContext m245executionContext() {
            return this.executionContext;
        }

        @Nullable
        public <T> T socketOption(SocketOption<T> socketOption) {
            return (T) this.connection.socketOption(socketOption);
        }

        /* renamed from: protocol, reason: merged with bridge method [inline-methods] */
        public HttpProtocolVersion m244protocol() {
            return this.connection.protocol();
        }

        public Single<Throwable> transportError() {
            return this.connection.transportError();
        }

        public Completable onClosing() {
            return this.connection.onClosing();
        }

        public Completable onClose() {
            return this.connection.onClose();
        }

        public Completable closeAsync() {
            return this.connection.closeAsync();
        }

        public Completable closeAsyncGracefully() {
            return this.connection.closeAsyncGracefully();
        }

        public Channel nettyChannel() {
            return this.connection.nettyChannel();
        }

        public String toString() {
            return getClass().getSimpleName() + '(' + this.connection + ')';
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/servicetalk/http/netty/NettyHttpServer$NettyHttpServerContext.class */
    public static final class NettyHttpServerContext implements ServerContext {
        private final ServerContext delegate;
        private final ListenableAsyncCloseable asyncCloseable;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* JADX WARN: Multi-variable type inference failed */
        public NettyHttpServerContext(ServerContext serverContext, StreamingHttpService streamingHttpService) {
            this.delegate = serverContext;
            this.asyncCloseable = AsyncCloseables.toListenableAsyncCloseable(AsyncCloseables.newCompositeCloseable().appendAll(new AsyncCloseable[]{streamingHttpService, serverContext}));
        }

        public SocketAddress listenAddress() {
            return this.delegate.listenAddress();
        }

        public ExecutionContext executionContext() {
            return this.delegate.executionContext();
        }

        public Completable closeAsync() {
            return this.asyncCloseable.closeAsync().whenFinally(() -> {
                NettyHttpServer.LOGGER.debug("Stopped HTTP server for address {}.", listenAddress());
            });
        }

        public Completable closeAsyncGracefully() {
            return this.asyncCloseable.closeAsyncGracefully();
        }

        public Completable onClose() {
            return this.asyncCloseable.onClose();
        }

        public String toString() {
            return this.delegate.toString();
        }
    }

    /* loaded from: input_file:io/servicetalk/http/netty/NettyHttpServer$SingleSubscriberProcessor.class */
    private static final class SingleSubscriberProcessor extends SubscribableCompletable implements CompletableSource.Processor, Cancellable {
        private static final Object CANCELLED = new Object();
        private static final AtomicReferenceFieldUpdater<SingleSubscriberProcessor, Object> stateUpdater = AtomicReferenceFieldUpdater.newUpdater(SingleSubscriberProcessor.class, Object.class, "state");

        @Nullable
        private volatile Object state;

        private SingleSubscriberProcessor() {
        }

        protected void handleSubscribe(CompletableSource.Subscriber subscriber) {
            subscriber.onSubscribe(this);
            while (true) {
                Object obj = this.state;
                if (obj instanceof TerminalNotification) {
                    ((TerminalNotification) obj).terminate(subscriber);
                    return;
                }
                if (obj instanceof CompletableSource.Subscriber) {
                    subscriber.onError(new DuplicateSubscribeException(obj, subscriber));
                    return;
                } else {
                    if (obj == CANCELLED) {
                        return;
                    }
                    if (obj == null && stateUpdater.compareAndSet(this, null, subscriber)) {
                        return;
                    }
                }
            }
        }

        public void onSubscribe(Cancellable cancellable) {
        }

        public void onComplete() {
            Object andSet = stateUpdater.getAndSet(this, TerminalNotification.complete());
            if (andSet instanceof CompletableSource.Subscriber) {
                ((CompletableSource.Subscriber) andSet).onComplete();
            }
        }

        public void onError(Throwable th) {
            Object andSet = stateUpdater.getAndSet(this, TerminalNotification.error(th));
            if (andSet instanceof CompletableSource.Subscriber) {
                ((CompletableSource.Subscriber) andSet).onError(th);
            }
        }

        public void cancel() {
            this.state = CANCELLED;
        }
    }

    private NettyHttpServer() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Single<ServerContext> bind(HttpExecutionContext httpExecutionContext, ReadOnlyHttpServerConfig readOnlyHttpServerConfig, SocketAddress socketAddress, @Nullable ConnectionAcceptor connectionAcceptor, StreamingHttpService streamingHttpService, boolean z) {
        if (!$assertionsDisabled && readOnlyHttpServerConfig.h1Config() == null) {
            throw new AssertionError();
        }
        ReadOnlyTcpServerConfig tcpConfig = readOnlyHttpServerConfig.tcpConfig();
        return TcpServerBinder.bind(socketAddress, tcpConfig, false, httpExecutionContext, connectionAcceptor, (channel, connectionObserver) -> {
            return initChannel(channel, httpExecutionContext, readOnlyHttpServerConfig, new TcpServerChannelInitializer(tcpConfig, connectionObserver), streamingHttpService, z, connectionObserver);
        }, nettyHttpServerConnection -> {
            nettyHttpServerConnection.process(true);
        }).map(serverContext -> {
            LOGGER.debug("Started HTTP/1.1 server for address {}.", serverContext.listenAddress());
            return new NettyHttpServerContext(serverContext, streamingHttpService);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Single<NettyHttpServerConnection> initChannel(Channel channel, HttpExecutionContext httpExecutionContext, ReadOnlyHttpServerConfig readOnlyHttpServerConfig, ChannelInitializer channelInitializer, StreamingHttpService streamingHttpService, boolean z, ConnectionObserver connectionObserver) {
        return initChannel(channel, httpExecutionContext, readOnlyHttpServerConfig, channelInitializer, streamingHttpService, z, connectionObserver, CloseHandler.forPipelinedRequestResponse(false, channel.config()));
    }

    static Single<NettyHttpServerConnection> initChannel(Channel channel, HttpExecutionContext httpExecutionContext, ReadOnlyHttpServerConfig readOnlyHttpServerConfig, ChannelInitializer channelInitializer, StreamingHttpService streamingHttpService, boolean z, ConnectionObserver connectionObserver, CloseHandler closeHandler) {
        H1ProtocolConfig h1Config = readOnlyHttpServerConfig.h1Config();
        if ($assertionsDisabled || h1Config != null) {
            return HttpDebugUtils.showPipeline(DefaultNettyConnection.initChannel(channel, httpExecutionContext.bufferAllocator(), httpExecutionContext.executor(), HeaderUtils.LAST_CHUNK_PREDICATE, closeHandler, readOnlyHttpServerConfig.tcpConfig().flushStrategy(), readOnlyHttpServerConfig.tcpConfig().idleTimeoutMs(), channelInitializer.andThen(getChannelInitializer(BufferUtils.getByteBufAllocator(httpExecutionContext.bufferAllocator()), h1Config, closeHandler)), httpExecutionContext.executionStrategy(), HttpProtocolVersion.HTTP_1_1, connectionObserver, false).map(defaultNettyConnection -> {
                return new NettyHttpServerConnection(defaultNettyConnection, streamingHttpService, httpExecutionContext.executionStrategy(), HttpProtocolVersion.HTTP_1_1, h1Config.headersFactory(), z, readOnlyHttpServerConfig.allowDropTrailersReadFromTransport());
            }), HttpProtocolVersion.HTTP_1_1, channel);
        }
        throw new AssertionError();
    }

    private static ChannelInitializer getChannelInitializer(ByteBufAllocator byteBufAllocator, H1ProtocolConfig h1ProtocolConfig, CloseHandler closeHandler) {
        return new CopyByteBufHandlerChannelInitializer(byteBufAllocator).andThen(channel -> {
            ArrayDeque arrayDeque = new ArrayDeque(2);
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast(new ChannelHandler[]{new HttpRequestDecoder(arrayDeque, byteBufAllocator, h1ProtocolConfig.headersFactory(), h1ProtocolConfig.maxStartLineLength(), h1ProtocolConfig.maxHeaderFieldLength(), h1ProtocolConfig.specExceptions().allowPrematureClosureBeforePayloadBody(), closeHandler)});
            pipeline.addLast(new ChannelHandler[]{new HttpResponseEncoder(arrayDeque, h1ProtocolConfig.headersEncodedSizeEstimate(), h1ProtocolConfig.trailersEncodedSizeEstimate(), closeHandler)});
        });
    }

    static {
        $assertionsDisabled = !NettyHttpServer.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(NettyHttpServer.class);
    }
}
