package io.micronaut.http.netty.stream;

import io.micronaut.core.annotation.Internal;
import io.micronaut.http.exceptions.HttpStatusException;
import io.micronaut.http.netty.reactive.HandlerPublisher;
import io.micronaut.http.netty.reactive.HandlerSubscriber;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

/* JADX INFO: Access modifiers changed from: package-private */
@Internal
/* loaded from: input_file:io/micronaut/http/netty/stream/HttpStreamsHandler.class */
public abstract class HttpStreamsHandler<In extends HttpMessage, Out extends HttpMessage> extends ChannelDuplexHandler {
    public static final String HANDLER_BODY_PUBLISHER = "http-streams-codec-body-publisher";
    private static final Logger LOG = LoggerFactory.getLogger(HttpStreamsHandler.class);
    private final Queue<Outgoing<Out>> outgoing = new LinkedList();
    private final Class<In> inClass;
    private final Class<Out> outClass;
    private In currentlyStreamedMessage;
    private boolean ignoreBodyRead;
    private boolean sendLastHttpContent;
    private boolean outgoingInFlight;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/micronaut/http/netty/stream/HttpStreamsHandler$Outgoing.class */
    public static class Outgoing<O extends HttpMessage> {
        final O message;
        final ChannelPromise promise;

        Outgoing(O o, ChannelPromise channelPromise) {
            this.message = o;
            this.promise = channelPromise;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HttpStreamsHandler(Class<In> cls, Class<Out> cls2) {
        this.inClass = cls;
        this.outClass = cls2;
    }

    protected abstract boolean hasBody(In in);

    protected abstract In createEmptyMessage(In in);

    protected abstract In createStreamedMessage(In in, Publisher<? extends HttpContent> publisher);

    protected void receivedInMessage(ChannelHandlerContext channelHandlerContext) {
    }

    protected void consumedInMessage(ChannelHandlerContext channelHandlerContext) {
    }

    protected void receivedOutMessage(ChannelHandlerContext channelHandlerContext) {
    }

    protected void sentOutMessage(ChannelHandlerContext channelHandlerContext) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void subscribeSubscriberToStream(StreamedHttpMessage streamedHttpMessage, Subscriber<HttpContent> subscriber) {
        streamedHttpMessage.subscribe(subscriber);
    }

    protected void bodyRequested(ChannelHandlerContext channelHandlerContext) {
    }

    protected abstract boolean isClient();

    public void channelRead(final ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (!isValidInMessage(obj)) {
            if (obj instanceof HttpContent) {
                handleReadHttpContent(channelHandlerContext, (HttpContent) obj);
                return;
            }
            return;
        }
        receivedInMessage(channelHandlerContext);
        final In cast = this.inClass.cast(obj);
        if (cast instanceof FullHttpMessage) {
            FullHttpMessage fullHttpMessage = (FullHttpMessage) cast;
            if (!(fullHttpMessage instanceof FullHttpRequest) || fullHttpMessage.content().readableBytes() == 0) {
                channelHandlerContext.fireChannelRead(cast);
            } else {
                channelHandlerContext.fireChannelRead(createStreamedMessage(cast, Flux.just(fullHttpMessage)));
            }
            consumedInMessage(channelHandlerContext);
            return;
        }
        if (!hasBody(cast)) {
            channelHandlerContext.fireChannelRead(createEmptyMessage(cast));
            consumedInMessage(channelHandlerContext);
            this.ignoreBodyRead = true;
        } else {
            this.currentlyStreamedMessage = cast;
            HandlerPublisher<HttpContent> handlerPublisher = new HandlerPublisher<HttpContent>(channelHandlerContext.executor()) { // from class: io.micronaut.http.netty.stream.HttpStreamsHandler.1
                @Override // io.micronaut.http.netty.reactive.HandlerPublisher
                protected boolean acceptInboundMessage(Object obj2) {
                    return obj2 instanceof HttpContent;
                }

                /* JADX WARN: Multi-variable type inference failed */
                @Override // io.micronaut.http.netty.reactive.HandlerPublisher
                protected void cancelled() {
                    if (channelHandlerContext.executor().inEventLoop()) {
                        HttpStreamsHandler.this.handleCancelled(channelHandlerContext, cast);
                        return;
                    }
                    EventExecutor executor = channelHandlerContext.executor();
                    ChannelHandlerContext channelHandlerContext2 = channelHandlerContext;
                    HttpMessage httpMessage = cast;
                    executor.execute(() -> {
                        HttpStreamsHandler.this.handleCancelled(channelHandlerContext2, httpMessage);
                    });
                }

                /* JADX INFO: Access modifiers changed from: protected */
                @Override // io.micronaut.http.netty.reactive.HandlerPublisher
                public void requestDemand() {
                    HttpStreamsHandler.this.bodyRequested(channelHandlerContext);
                    super.requestDemand();
                }
            };
            channelHandlerContext.channel().pipeline().addAfter(channelHandlerContext.name(), HANDLER_BODY_PUBLISHER, handlerPublisher);
            channelHandlerContext.fireChannelRead(createStreamedMessage(cast, handlerPublisher));
        }
    }

    private void handleCancelled(ChannelHandlerContext channelHandlerContext, In in) {
        if (this.currentlyStreamedMessage == in) {
            this.ignoreBodyRead = true;
            if (LOG.isTraceEnabled()) {
                LOG.trace("Calling ctx.read() for cancelled subscription");
            }
            channelHandlerContext.read();
            if (isClient()) {
                channelHandlerContext.fireChannelWritabilityChanged();
            }
        }
    }

    private void handleReadHttpContent(ChannelHandlerContext channelHandlerContext, HttpContent httpContent) {
        if (this.ignoreBodyRead) {
            ReferenceCountUtil.release(httpContent, httpContent.refCnt());
            if (httpContent instanceof LastHttpContent) {
                this.ignoreBodyRead = false;
                if (this.currentlyStreamedMessage != null) {
                    removeHandlerIfActive(channelHandlerContext, HANDLER_BODY_PUBLISHER);
                }
                this.currentlyStreamedMessage = null;
            }
            channelHandlerContext.read();
            return;
        }
        if (channelHandlerContext.pipeline().get(HANDLER_BODY_PUBLISHER) == null) {
            ReferenceCountUtil.release(httpContent, httpContent.refCnt());
            return;
        }
        channelHandlerContext.fireChannelRead(httpContent);
        if (httpContent instanceof LastHttpContent) {
            this.currentlyStreamedMessage = null;
            removeHandlerIfActive(channelHandlerContext, HANDLER_BODY_PUBLISHER);
            consumedInMessage(channelHandlerContext);
        }
    }

    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.ignoreBodyRead) {
            channelHandlerContext.read();
        } else {
            channelHandlerContext.fireChannelReadComplete();
        }
    }

    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
        if (isValidOutMessage(obj)) {
            receivedOutMessage(channelHandlerContext);
            this.outgoing.add(new Outgoing<>((HttpMessage) obj, channelPromise));
            proceedWriteOutgoing(channelHandlerContext);
        } else if (!(obj instanceof LastHttpContent)) {
            channelHandlerContext.write(obj, channelPromise);
        } else {
            this.sendLastHttpContent = false;
            channelHandlerContext.write(obj, channelPromise);
        }
    }

    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) {
        proceedWriteOutgoing(channelHandlerContext);
    }

    private void proceedWriteOutgoing(ChannelHandlerContext channelHandlerContext) {
        while (!this.outgoingInFlight && channelHandlerContext.channel().isWritable() && !this.outgoing.isEmpty()) {
            Outgoing<Out> remove = this.outgoing.remove();
            unbufferedWrite(channelHandlerContext, remove.message, remove.promise);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void unbufferedWrite(final ChannelHandlerContext channelHandlerContext, final Out out, final ChannelPromise channelPromise) {
        if (out instanceof FullHttpMessage) {
            channelHandlerContext.writeAndFlush(out, channelPromise);
            sentOutMessage(channelHandlerContext);
        } else if (out instanceof StreamedHttpMessage) {
            this.outgoingInFlight = true;
            HandlerSubscriber<HttpContent> handlerSubscriber = new HandlerSubscriber<HttpContent>(channelHandlerContext.executor()) { // from class: io.micronaut.http.netty.stream.HttpStreamsHandler.2
                AtomicBoolean messageWritten = new AtomicBoolean();

                @Override // io.micronaut.http.netty.reactive.HandlerSubscriber
                public void onNext(HttpContent httpContent) {
                    if (!this.messageWritten.compareAndSet(false, true)) {
                        super.onNext((AnonymousClass2) httpContent);
                        return;
                    }
                    ChannelPromise newPromise = channelHandlerContext.newPromise();
                    this.lastWriteFuture = newPromise;
                    channelHandlerContext.writeAndFlush(out).addListener(future -> {
                        onNext(httpContent, newPromise);
                    });
                }

                @Override // io.micronaut.http.netty.reactive.HandlerSubscriber
                protected void error(Throwable th) {
                    try {
                        if (HttpStreamsHandler.LOG.isErrorEnabled()) {
                            HttpStreamsHandler.LOG.error("Error occurred writing stream response: " + th.getMessage(), th);
                        }
                        channelHandlerContext.writeAndFlush(new DefaultHttpResponse(HttpVersion.HTTP_1_1, th instanceof HttpStatusException ? HttpResponseStatus.valueOf(((HttpStatusException) th).getStatus().getCode(), th.getMessage()) : HttpResponseStatus.INTERNAL_SERVER_ERROR)).addListener(ChannelFutureListener.CLOSE);
                    } finally {
                        channelHandlerContext.read();
                    }
                }

                @Override // io.micronaut.http.netty.reactive.HandlerSubscriber
                protected void complete() {
                    if (this.messageWritten.compareAndSet(false, true)) {
                        channelHandlerContext.writeAndFlush(out).addListener(future -> {
                            doOnComplete();
                        });
                    } else {
                        doOnComplete();
                    }
                }

                private void doOnComplete() {
                    if (channelHandlerContext.executor().inEventLoop()) {
                        HttpStreamsHandler.this.completeBody(channelHandlerContext, channelPromise);
                        return;
                    }
                    EventExecutor executor = channelHandlerContext.executor();
                    ChannelHandlerContext channelHandlerContext2 = channelHandlerContext;
                    ChannelPromise channelPromise2 = channelPromise;
                    executor.execute(() -> {
                        HttpStreamsHandler.this.completeBody(channelHandlerContext2, channelPromise2);
                    });
                }
            };
            this.sendLastHttpContent = true;
            channelHandlerContext.pipeline().addAfter(channelHandlerContext.name(), channelHandlerContext.name() + "-body-subscriber", handlerSubscriber);
            subscribeSubscriberToStream((StreamedHttpMessage) out, handlerSubscriber);
        }
    }

    private void completeBody(ChannelHandlerContext channelHandlerContext, ChannelPromise channelPromise) {
        removeHandlerIfActive(channelHandlerContext, channelHandlerContext.name() + "-body-subscriber");
        if (this.sendLastHttpContent) {
            channelHandlerContext.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, channelPromise).addListener(future -> {
                sentOutMessage(channelHandlerContext);
                channelHandlerContext.read();
                this.outgoingInFlight = false;
                proceedWriteOutgoing(channelHandlerContext);
            });
            return;
        }
        channelPromise.setSuccess();
        sentOutMessage(channelHandlerContext);
        channelHandlerContext.read();
        this.outgoingInFlight = false;
        proceedWriteOutgoing(channelHandlerContext);
    }

    private void removeHandlerIfActive(ChannelHandlerContext channelHandlerContext, String str) {
        if (channelHandlerContext.channel().isActive()) {
            ChannelPipeline pipeline = channelHandlerContext.pipeline();
            if (pipeline.get(str) != null) {
                pipeline.remove(str);
            }
        }
    }

    protected boolean isValidOutMessage(Object obj) {
        return this.outClass.isInstance(obj);
    }

    protected boolean isValidInMessage(Object obj) {
        return this.inClass.isInstance(obj);
    }
}
