package io.micronaut.http.netty.stream;

import io.micronaut.core.annotation.Internal;
import io.micronaut.http.netty.reactive.HandlerPublisher;
import io.micronaut.http.netty.reactive.HandlerSubscriber;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import java.util.LinkedList;
import java.util.Queue;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:io/micronaut/http/netty/stream/HttpStreamsHandler.class */
abstract class HttpStreamsHandler<In extends HttpMessage, Out extends HttpMessage> extends ChannelDuplexHandler {
    private static final Logger LOG = LoggerFactory.getLogger(HttpStreamsHandler.class);
    private final Queue<HttpStreamsHandler<In, Out>.Outgoing> outgoing = new LinkedList();
    private final Class<In> inClass;
    private final Class<Out> outClass;
    private In currentlyStreamedMessage;
    private boolean ignoreBodyRead;
    private boolean sendLastHttpContent;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/micronaut/http/netty/stream/HttpStreamsHandler$Outgoing.class */
    public class Outgoing {
        final Out message;
        final ChannelPromise promise;

        /* JADX INFO: Access modifiers changed from: package-private */
        public Outgoing(Out out, ChannelPromise channelPromise) {
            this.message = out;
            this.promise = channelPromise;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HttpStreamsHandler(Class<In> cls, Class<Out> cls2) {
        this.inClass = cls;
        this.outClass = cls2;
    }

    protected abstract boolean hasBody(In in);

    protected abstract In createEmptyMessage(In in);

    protected abstract In createStreamedMessage(In in, Publisher<HttpContent> publisher);

    protected void receivedInMessage(ChannelHandlerContext channelHandlerContext) {
    }

    protected void consumedInMessage(ChannelHandlerContext channelHandlerContext) {
    }

    protected void receivedOutMessage(ChannelHandlerContext channelHandlerContext) {
    }

    protected void sentOutMessage(ChannelHandlerContext channelHandlerContext) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void subscribeSubscriberToStream(StreamedHttpMessage streamedHttpMessage, Subscriber<HttpContent> subscriber) {
        streamedHttpMessage.subscribe(subscriber);
    }

    protected void bodyRequested(ChannelHandlerContext channelHandlerContext) {
    }

    public void channelRead(final ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (!isValidInMessage(obj)) {
            if (obj instanceof HttpContent) {
                handleReadHttpContent(channelHandlerContext, (HttpContent) obj);
                return;
            }
            return;
        }
        receivedInMessage(channelHandlerContext);
        final In cast = this.inClass.cast(obj);
        if (cast instanceof FullHttpMessage) {
            channelHandlerContext.fireChannelRead(cast);
            consumedInMessage(channelHandlerContext);
        } else if (!hasBody(cast)) {
            channelHandlerContext.fireChannelRead(createEmptyMessage(cast));
            consumedInMessage(channelHandlerContext);
            this.ignoreBodyRead = true;
        } else {
            this.currentlyStreamedMessage = cast;
            Publisher<HttpContent> publisher = new HandlerPublisher<HttpContent>(channelHandlerContext.executor(), HttpContent.class) { // from class: io.micronaut.http.netty.stream.HttpStreamsHandler.1
                @Override // io.micronaut.http.netty.reactive.HandlerPublisher
                protected void cancelled() {
                    if (channelHandlerContext.executor().inEventLoop()) {
                        HttpStreamsHandler.this.handleCancelled(channelHandlerContext, cast);
                        return;
                    }
                    EventExecutor executor = channelHandlerContext.executor();
                    ChannelHandlerContext channelHandlerContext2 = channelHandlerContext;
                    HttpMessage httpMessage = cast;
                    executor.execute(() -> {
                        HttpStreamsHandler.this.handleCancelled(channelHandlerContext2, httpMessage);
                    });
                }

                /* JADX INFO: Access modifiers changed from: protected */
                @Override // io.micronaut.http.netty.reactive.HandlerPublisher
                public void requestDemand() {
                    HttpStreamsHandler.this.bodyRequested(channelHandlerContext);
                    super.requestDemand();
                }
            };
            channelHandlerContext.channel().pipeline().addAfter(channelHandlerContext.name(), channelHandlerContext.name() + "-body-publisher", publisher);
            channelHandlerContext.fireChannelRead(createStreamedMessage(cast, publisher));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleCancelled(ChannelHandlerContext channelHandlerContext, In in) {
        if (this.currentlyStreamedMessage == in) {
            this.ignoreBodyRead = true;
            if (LOG.isTraceEnabled()) {
                LOG.trace("Calling ctx.read() for cancelled subscription");
            }
            channelHandlerContext.read();
        }
    }

    private void handleReadHttpContent(ChannelHandlerContext channelHandlerContext, HttpContent httpContent) {
        if (this.ignoreBodyRead) {
            ReferenceCountUtil.release(httpContent);
            if (httpContent instanceof LastHttpContent) {
                this.ignoreBodyRead = false;
                if (this.currentlyStreamedMessage != null) {
                    removeHandlerIfActive(channelHandlerContext, channelHandlerContext.name() + "-body-publisher");
                }
                this.currentlyStreamedMessage = null;
                return;
            }
            return;
        }
        if (!(httpContent instanceof LastHttpContent)) {
            channelHandlerContext.fireChannelRead(httpContent);
            return;
        }
        if (httpContent.content().readableBytes() > 0 || !((LastHttpContent) httpContent).trailingHeaders().isEmpty()) {
            channelHandlerContext.fireChannelRead(httpContent);
        } else {
            ReferenceCountUtil.release(httpContent);
        }
        removeHandlerIfActive(channelHandlerContext, channelHandlerContext.name() + "-body-publisher");
        this.currentlyStreamedMessage = null;
        consumedInMessage(channelHandlerContext);
    }

    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.ignoreBodyRead) {
            channelHandlerContext.read();
        } else {
            channelHandlerContext.fireChannelReadComplete();
        }
    }

    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
        if (!isValidOutMessage(obj)) {
            if (!(obj instanceof LastHttpContent)) {
                channelHandlerContext.write(obj, channelPromise);
                return;
            } else {
                this.sendLastHttpContent = false;
                channelHandlerContext.write(obj, channelPromise);
                return;
            }
        }
        HttpStreamsHandler<In, Out>.Outgoing outgoing = new Outgoing(this.outClass.cast(obj), channelPromise);
        receivedOutMessage(channelHandlerContext);
        if (!this.outgoing.isEmpty()) {
            this.outgoing.add(outgoing);
        } else {
            this.outgoing.add(outgoing);
            flushNext(channelHandlerContext);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void unbufferedWrite(final ChannelHandlerContext channelHandlerContext, HttpStreamsHandler<In, Out>.Outgoing outgoing) {
        if (outgoing.message instanceof FullHttpMessage) {
            channelHandlerContext.writeAndFlush(outgoing.message, outgoing.promise);
            outgoing.promise.addListener(channelFuture -> {
                executeInEventLoop(channelHandlerContext, () -> {
                    sentOutMessage(channelHandlerContext);
                    this.outgoing.remove();
                    flushNext(channelHandlerContext);
                });
            });
        } else if (outgoing.message instanceof StreamedHttpMessage) {
            StreamedHttpMessage streamedHttpMessage = (StreamedHttpMessage) outgoing.message;
            HandlerSubscriber<HttpContent> handlerSubscriber = new HandlerSubscriber<HttpContent>(channelHandlerContext.executor()) { // from class: io.micronaut.http.netty.stream.HttpStreamsHandler.2
                @Override // io.micronaut.http.netty.reactive.HandlerSubscriber
                protected void error(Throwable th) {
                    try {
                        if (HttpStreamsHandler.LOG.isErrorEnabled()) {
                            HttpStreamsHandler.LOG.error("Error occurred writing stream response: " + th.getMessage(), th);
                        }
                        channelHandlerContext.writeAndFlush(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR)).addListener(ChannelFutureListener.CLOSE);
                    } finally {
                        channelHandlerContext.read();
                    }
                }

                @Override // io.micronaut.http.netty.reactive.HandlerSubscriber
                protected void complete() {
                    HttpStreamsHandler httpStreamsHandler = HttpStreamsHandler.this;
                    ChannelHandlerContext channelHandlerContext2 = channelHandlerContext;
                    ChannelHandlerContext channelHandlerContext3 = channelHandlerContext;
                    httpStreamsHandler.executeInEventLoop(channelHandlerContext2, () -> {
                        HttpStreamsHandler.this.completeBody(channelHandlerContext3);
                    });
                }
            };
            this.sendLastHttpContent = true;
            channelHandlerContext.writeAndFlush(outgoing.message);
            channelHandlerContext.pipeline().addAfter(channelHandlerContext.name(), channelHandlerContext.name() + "-body-subscriber", handlerSubscriber);
            subscribeSubscriberToStream(streamedHttpMessage, handlerSubscriber);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void completeBody(ChannelHandlerContext channelHandlerContext) {
        removeHandlerIfActive(channelHandlerContext, channelHandlerContext.name() + "-body-subscriber");
        if (this.sendLastHttpContent) {
            channelHandlerContext.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, this.outgoing.peek().promise).addListener(channelFuture -> {
                executeInEventLoop(channelHandlerContext, () -> {
                    this.outgoing.remove();
                    sentOutMessage(channelHandlerContext);
                    flushNext(channelHandlerContext);
                });
            });
            channelHandlerContext.read();
        } else {
            this.outgoing.remove().promise.setSuccess();
            sentOutMessage(channelHandlerContext);
            flushNext(channelHandlerContext);
            channelHandlerContext.read();
        }
    }

    private void removeHandlerIfActive(ChannelHandlerContext channelHandlerContext, String str) {
        if (channelHandlerContext.channel().isActive()) {
            ChannelPipeline pipeline = channelHandlerContext.pipeline();
            if (pipeline.get(str) != null) {
                pipeline.remove(str);
            }
        }
    }

    private void flushNext(ChannelHandlerContext channelHandlerContext) {
        if (this.outgoing.isEmpty()) {
            channelHandlerContext.fireChannelWritabilityChanged();
        } else {
            unbufferedWrite(channelHandlerContext, this.outgoing.element());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void executeInEventLoop(ChannelHandlerContext channelHandlerContext, Runnable runnable) {
        if (channelHandlerContext.executor().inEventLoop()) {
            runnable.run();
        } else {
            channelHandlerContext.executor().execute(runnable);
        }
    }

    protected boolean isValidOutMessage(Object obj) {
        return this.outClass.isInstance(obj);
    }

    protected boolean isValidInMessage(Object obj) {
        return this.inClass.isInstance(obj);
    }
}
