package reactor.io.net.impl.netty.http;

import io.netty.buffer.ByteBufHolder;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.LastHttpContent;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.fn.Consumer;
import reactor.io.buffer.Buffer;
import reactor.io.net.ChannelStream;
import reactor.io.net.ReactorChannelHandler;
import reactor.io.net.impl.netty.NettyChannelHandlerBridge;
import reactor.io.net.impl.netty.NettyChannelStream;
import reactor.rx.action.support.DefaultSubscriber;

/* loaded from: input_file:reactor/io/net/impl/netty/http/NettyHttpServerHandler.class */
public class NettyHttpServerHandler<IN, OUT> extends NettyChannelHandlerBridge<IN, OUT> {
    private final NettyChannelStream<IN, OUT> tcpStream;
    protected NettyHttpChannel<IN, OUT> request;

    public NettyHttpServerHandler(ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>> reactorChannelHandler, NettyChannelStream<IN, OUT> nettyChannelStream) {
        super(reactorChannelHandler, nettyChannelStream);
        this.tcpStream = nettyChannelStream;
    }

    @Override // reactor.io.net.impl.netty.NettyChannelHandlerBridge
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        channelHandlerContext.fireChannelActive();
        channelHandlerContext.read();
    }

    @Override // reactor.io.net.impl.netty.NettyChannelHandlerBridge
    public void channelRead(final ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        Class<?> cls = obj.getClass();
        if (this.request == null && HttpRequest.class.isAssignableFrom(cls)) {
            this.request = new NettyHttpChannel<>(this.tcpStream, (HttpRequest) obj);
            ((Publisher) this.handler.apply(this.request)).subscribe(new DefaultSubscriber<Void>() { // from class: reactor.io.net.impl.netty.http.NettyHttpServerHandler.1
                Subscription subscription;

                public void onSubscribe(Subscription subscription) {
                    this.subscription = subscription;
                    subscription.request(Long.MAX_VALUE);
                }

                public void onError(Throwable th) {
                    NettyHttpServerHandler.log.error("Error processing connection. Closing the channel.", th);
                    if (NettyHttpServerHandler.CHANNEL_REF.get(NettyHttpServerHandler.this) == 0) {
                        channelHandlerContext.channel().close();
                    }
                }

                public void onComplete() {
                    this.subscription.cancel();
                    if (channelHandlerContext.channel().isOpen()) {
                        if (NettyHttpServerHandler.log.isDebugEnabled()) {
                            NettyHttpServerHandler.log.debug("Close Http Response ");
                        }
                        NettyHttpServerHandler.this.writeLast(channelHandlerContext);
                    }
                }
            });
        }
        if (HttpContent.class.isAssignableFrom(cls)) {
            super.channelRead(channelHandlerContext, ((ByteBufHolder) obj).content());
        }
        postRead(channelHandlerContext, obj);
    }

    protected void postRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        if (this.channelSubscription == null || !DefaultLastHttpContent.class.equals(obj.getClass())) {
            return;
        }
        this.channelSubscription.onComplete();
        this.channelSubscription = null;
    }

    protected void writeLast(ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.writeAndFlush(this.request.checkHeader() ? this.request.getNettyResponse() : LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE);
    }

    @Override // reactor.io.net.impl.netty.NettyChannelHandlerBridge
    protected ChannelFuture doOnWrite(Object obj, ChannelHandlerContext channelHandlerContext) {
        return obj.getClass().equals(Buffer.class) ? channelHandlerContext.write(new DefaultHttpContent(convertBufferToByteBuff(channelHandlerContext, (Buffer) obj))) : channelHandlerContext.write(obj);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.io.net.impl.netty.NettyChannelHandlerBridge
    public void doOnSubscribe(ChannelHandlerContext channelHandlerContext, Subscription subscription, long j, Consumer<Void> consumer) {
        if (this.request.checkHeader()) {
            channelHandlerContext.write(this.request.getNettyResponse());
        }
        super.doOnSubscribe(channelHandlerContext, subscription, j, consumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyHttpServerHandler<IN, OUT> withWebsocketSupport(String str, String str2) {
        if (this.request.checkHeader()) {
            return new NettyHttpWSServerHandler(str, str2, this);
        }
        log.error("Cannot enable websocket if headers have already been sent");
        return this;
    }
}
