/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.netty.stream;

import io.micronaut.core.annotation.Internal;
import io.micronaut.http.netty.AbstractNettyHttpRequest;
import io.micronaut.http.netty.reactive.HandlerPublisher;
import io.micronaut.http.netty.reactive.HandlerSubscriber;
import io.micronaut.http.netty.stream.Http2Content;
import io.micronaut.http.netty.stream.StreamedHttpMessage;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.LinkedList;
import java.util.Queue;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
abstract class HttpStreamsHandler<In extends HttpMessage, Out extends HttpMessage>
extends ChannelDuplexHandler {
    public static final String HANDLER_BODY_PUBLISHER = "http-streams-codec-body-publisher";
    private static final Logger LOG = LoggerFactory.getLogger(HttpStreamsHandler.class);
    private final Queue<Outgoing> outgoing = new LinkedList<Outgoing>();
    private final Class<In> inClass;
    private final Class<Out> outClass;
    private In currentlyStreamedMessage;
    private boolean ignoreBodyRead;
    private boolean sendLastHttpContent;

    HttpStreamsHandler(Class<In> inClass, Class<Out> outClass) {
        this.inClass = inClass;
        this.outClass = outClass;
    }

    protected abstract boolean hasBody(In var1);

    protected abstract In createEmptyMessage(In var1);

    protected abstract In createStreamedMessage(In var1, Publisher<? extends HttpContent> var2);

    protected void receivedInMessage(ChannelHandlerContext ctx) {
    }

    protected void consumedInMessage(ChannelHandlerContext ctx) {
    }

    protected void receivedOutMessage(ChannelHandlerContext ctx) {
    }

    protected void sentOutMessage(ChannelHandlerContext ctx) {
    }

    protected void subscribeSubscriberToStream(StreamedHttpMessage msg, Subscriber<HttpContent> subscriber) {
        msg.subscribe(subscriber);
    }

    protected void bodyRequested(ChannelHandlerContext ctx) {
    }

    protected abstract boolean isClient();

    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        if (this.isValidInMessage(msg)) {
            this.receivedInMessage(ctx);
            final HttpMessage inMsg = (HttpMessage)this.inClass.cast(msg);
            if (inMsg instanceof FullHttpMessage) {
                ctx.fireChannelRead((Object)inMsg);
                this.consumedInMessage(ctx);
            } else if (!this.hasBody(inMsg)) {
                ctx.fireChannelRead((Object)this.createEmptyMessage(inMsg));
                this.consumedInMessage(ctx);
                this.ignoreBodyRead = true;
            } else {
                this.currentlyStreamedMessage = inMsg;
                final int streamId = this.getStreamId(msg);
                HandlerPublisher<Object> publisher = streamId > -1 ? new HandlerPublisher<Http2Content>(ctx.executor(), Http2Content.class){

                    @Override
                    protected boolean acceptInboundMessage(Object msg) {
                        return super.acceptInboundMessage(msg) && ((Http2Content)msg).stream().id() == streamId;
                    }

                    @Override
                    protected void cancelled() {
                        if (ctx.executor().inEventLoop()) {
                            HttpStreamsHandler.this.handleCancelled(ctx, inMsg);
                        } else {
                            ctx.executor().execute(() -> HttpStreamsHandler.this.handleCancelled(ctx, inMsg));
                        }
                    }

                    @Override
                    protected void requestDemand() {
                        HttpStreamsHandler.this.bodyRequested(ctx);
                        super.requestDemand();
                    }
                } : new HandlerPublisher<HttpContent>(ctx.executor(), HttpContent.class){

                    @Override
                    protected void cancelled() {
                        if (ctx.executor().inEventLoop()) {
                            HttpStreamsHandler.this.handleCancelled(ctx, inMsg);
                        } else {
                            ctx.executor().execute(() -> HttpStreamsHandler.this.handleCancelled(ctx, inMsg));
                        }
                    }

                    @Override
                    protected void requestDemand() {
                        HttpStreamsHandler.this.bodyRequested(ctx);
                        super.requestDemand();
                    }
                };
                ctx.channel().pipeline().addAfter(ctx.name(), HANDLER_BODY_PUBLISHER, (ChannelHandler)publisher);
                ctx.fireChannelRead((Object)this.createStreamedMessage(inMsg, (Publisher<HttpContent>)publisher));
            }
        } else if (msg instanceof HttpContent) {
            this.handleReadHttpContent(ctx, (HttpContent)msg);
        }
    }

    protected int getStreamId(Object msg) {
        if (msg instanceof HttpMessage) {
            return ((HttpMessage)msg).headers().getInt((CharSequence)AbstractNettyHttpRequest.STREAM_ID, -1);
        }
        return -1;
    }

    private void handleCancelled(ChannelHandlerContext ctx, In msg) {
        if (this.currentlyStreamedMessage == msg) {
            this.ignoreBodyRead = true;
            if (LOG.isTraceEnabled()) {
                LOG.trace("Calling ctx.read() for cancelled subscription");
            }
            if (this.isClient()) {
                ctx.read();
            } else {
                ctx.fireChannelWritabilityChanged();
            }
        }
    }

    private void handleReadHttpContent(ChannelHandlerContext ctx, HttpContent content) {
        if (!this.ignoreBodyRead) {
            ChannelHandler bodyPublisher = ctx.pipeline().get(HANDLER_BODY_PUBLISHER);
            if (bodyPublisher != null) {
                ctx.fireChannelRead((Object)content);
                if (content instanceof LastHttpContent) {
                    this.removeHandlerIfActive(ctx, HANDLER_BODY_PUBLISHER);
                    this.currentlyStreamedMessage = null;
                    this.consumedInMessage(ctx);
                }
            } else {
                ReferenceCountUtil.release((Object)content, (int)content.refCnt());
            }
        } else {
            ReferenceCountUtil.release((Object)content, (int)content.refCnt());
            if (content instanceof LastHttpContent) {
                this.ignoreBodyRead = false;
                if (this.currentlyStreamedMessage != null) {
                    this.removeHandlerIfActive(ctx, HANDLER_BODY_PUBLISHER);
                }
                this.currentlyStreamedMessage = null;
            }
        }
    }

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        if (this.ignoreBodyRead) {
            ctx.read();
            this.ignoreBodyRead = false;
        } else {
            ctx.fireChannelReadComplete();
        }
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (this.isValidOutMessage(msg)) {
            Outgoing out = new Outgoing(this, (HttpMessage)this.outClass.cast(msg), promise);
            this.receivedOutMessage(ctx);
            if (this.outgoing.isEmpty()) {
                this.outgoing.add(out);
                this.flushNext(ctx);
            } else {
                this.outgoing.add(out);
            }
        } else if (msg instanceof LastHttpContent) {
            this.sendLastHttpContent = false;
            ctx.write(msg, promise);
        } else {
            ctx.write(msg, promise);
        }
    }

    protected void unbufferedWrite(final ChannelHandlerContext ctx, Outgoing out) {
        if (out.message instanceof FullHttpMessage) {
            ctx.writeAndFlush(out.message, out.promise);
            out.promise.addListener(channelFuture -> this.executeInEventLoop(ctx, () -> {
                this.sentOutMessage(ctx);
                this.outgoing.remove();
                this.flushNext(ctx);
            }));
        } else if (out.message instanceof StreamedHttpMessage) {
            StreamedHttpMessage streamed = (StreamedHttpMessage)out.message;
            HandlerSubscriber<HttpContent> subscriber = new HandlerSubscriber<HttpContent>(ctx.executor()){

                @Override
                protected void error(Throwable error) {
                    try {
                        if (LOG.isErrorEnabled()) {
                            LOG.error("Error occurred writing stream response: " + error.getMessage(), error);
                        }
                        ctx.writeAndFlush((Object)new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR)).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
                    }
                    finally {
                        ctx.read();
                    }
                }

                @Override
                protected void complete() {
                    HttpStreamsHandler.this.executeInEventLoop(ctx, () -> HttpStreamsHandler.this.completeBody(ctx));
                }
            };
            this.sendLastHttpContent = true;
            ctx.writeAndFlush(out.message);
            ctx.pipeline().addAfter(ctx.name(), ctx.name() + "-body-subscriber", (ChannelHandler)subscriber);
            this.subscribeSubscriberToStream(streamed, (Subscriber<HttpContent>)subscriber);
        }
    }

    private void completeBody(ChannelHandlerContext ctx) {
        this.removeHandlerIfActive(ctx, ctx.name() + "-body-subscriber");
        if (this.sendLastHttpContent) {
            ChannelPromise promise = this.outgoing.peek().promise;
            ctx.writeAndFlush((Object)LastHttpContent.EMPTY_LAST_CONTENT, promise).addListener(channelFuture -> this.executeInEventLoop(ctx, () -> {
                this.outgoing.remove();
                this.sentOutMessage(ctx);
                this.flushNext(ctx);
            }));
            ctx.read();
        } else {
            this.outgoing.remove().promise.setSuccess();
            this.sentOutMessage(ctx);
            this.flushNext(ctx);
            ctx.read();
        }
    }

    private void removeHandlerIfActive(ChannelHandlerContext ctx, String name) {
        ChannelPipeline pipeline;
        ChannelHandler handler;
        if (ctx.channel().isActive() && (handler = (pipeline = ctx.pipeline()).get(name)) != null) {
            pipeline.remove(name);
        }
    }

    private void flushNext(ChannelHandlerContext ctx) {
        if (!this.outgoing.isEmpty()) {
            this.unbufferedWrite(ctx, this.outgoing.element());
        } else {
            ctx.fireChannelWritabilityChanged();
        }
    }

    private void executeInEventLoop(ChannelHandlerContext ctx, Runnable runnable) {
        if (ctx.executor().inEventLoop()) {
            runnable.run();
        } else {
            ctx.executor().execute(runnable);
        }
    }

    protected boolean isValidOutMessage(Object msg) {
        return this.outClass.isInstance(msg);
    }

    protected boolean isValidInMessage(Object msg) {
        return this.inClass.isInstance(msg);
    }

    static class Outgoing {
        final Out message;
        final ChannelPromise promise;
        final /* synthetic */ HttpStreamsHandler this$0;

        Outgoing(Out message, ChannelPromise promise) {
            this.this$0 = this$0;
            this.message = message;
            this.promise = promise;
        }
    }
}

