package org.finos.tracdap.webserver;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCountUtil;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.Flow;
import javax.annotation.Nonnull;
import org.apache.arrow.memory.BufferAllocator;
import org.finos.tracdap.common.data.DataContext;
import org.finos.tracdap.common.exception.EUnexpected;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/finos/tracdap/webserver/Http1Server.class */
public class Http1Server extends ChannelInboundHandlerAdapter {
    private static final List<HttpMethod> SUPPORTED_METHODS = List.of(HttpMethod.HEAD, HttpMethod.GET);
    private static final Logger log = LoggerFactory.getLogger(Http1Server.class);
    private final ContentServer contentServer;
    private final BufferAllocator arrowAllocator;
    private HttpRequest currentRequest;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/finos/tracdap/webserver/Http1Server$ResponseSender.class */
    public static class ResponseSender implements Flow.Subscriber<ByteBuf> {
        private static final long REQUEST_BUFFER = 32;
        private final ChannelHandlerContext ctx;
        private Flow.Subscription subscription;
        private long nPending;

        ResponseSender(ChannelHandlerContext channelHandlerContext) {
            this.ctx = channelHandlerContext;
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            this.nPending += REQUEST_BUFFER;
            subscription.request(REQUEST_BUFFER);
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onNext(ByteBuf byteBuf) {
            this.ctx.write(new DefaultHttpContent(byteBuf));
            this.nPending--;
            if (this.nPending < 16) {
                long j = REQUEST_BUFFER - this.nPending;
                this.nPending += j;
                this.subscription.request(j);
            }
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onError(Throwable th) {
            this.ctx.fireExceptionCaught(th);
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onComplete() {
            this.ctx.write(new DefaultLastHttpContent());
            this.ctx.flush();
        }
    }

    public Http1Server(ContentServer contentServer, BufferAllocator bufferAllocator) {
        this.contentServer = contentServer;
        this.arrowAllocator = bufferAllocator;
    }

    public void channelRead(@Nonnull ChannelHandlerContext channelHandlerContext, @Nonnull Object obj) {
        try {
            if (!(obj instanceof HttpObject)) {
                throw new EUnexpected();
            }
            boolean z = false;
            if (obj instanceof HttpRequest) {
                if (this.currentRequest != null) {
                    throw new EUnexpected();
                }
                this.currentRequest = (HttpRequest) obj;
                z = true;
            }
            if ((obj instanceof LastHttpContent) && this.currentRequest != null) {
                serveRequest(channelHandlerContext, this.currentRequest);
                this.currentRequest = null;
                z = true;
            }
            if (!z) {
                if (this.currentRequest == null || SUPPORTED_METHODS.contains(this.currentRequest.method())) {
                    channelHandlerContext.write(new DefaultFullHttpResponse(this.currentRequest.protocolVersion(), HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.copiedBuffer("Request could not be understood", StandardCharsets.UTF_8)));
                    channelHandlerContext.flush();
                } else {
                    channelHandlerContext.write(new DefaultFullHttpResponse(this.currentRequest.protocolVersion(), HttpResponseStatus.METHOD_NOT_ALLOWED, Unpooled.copiedBuffer(String.format("HTTP method not supported: %s [%s]", this.currentRequest.method().name(), this.currentRequest.uri()), StandardCharsets.UTF_8)));
                    channelHandlerContext.flush();
                }
            }
        } finally {
            ReferenceCountUtil.release(obj);
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        super.exceptionCaught(channelHandlerContext, th);
    }

    private void serveRequest(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest) {
        log.info("Request {} [{}]", httpRequest.method(), httpRequest.uri());
        if (httpRequest.method().equals(HttpMethod.HEAD)) {
            serveHeadRequest(channelHandlerContext, httpRequest);
        } else if (httpRequest.method().equals(HttpMethod.GET)) {
            serveGetRequest(channelHandlerContext, httpRequest);
        } else {
            channelHandlerContext.write(new DefaultFullHttpResponse(httpRequest.protocolVersion(), HttpResponseStatus.METHOD_NOT_ALLOWED));
            channelHandlerContext.flush();
        }
    }

    private void serveHeadRequest(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest) {
        this.contentServer.headRequest(httpRequest.uri(), new DataContext(channelHandlerContext.executor(), this.arrowAllocator)).thenAccept(contentResponse -> {
            serverHeadResponse(channelHandlerContext, httpRequest, contentResponse);
        }).exceptionally(th -> {
            return unexpectedError(channelHandlerContext, th);
        });
    }

    private void serverHeadResponse(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, ContentResponse contentResponse) {
        DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(httpRequest.protocolVersion(), contentResponse.statusCode);
        defaultFullHttpResponse.headers().setAll(contentResponse.headers);
        channelHandlerContext.write(defaultFullHttpResponse);
        channelHandlerContext.flush();
    }

    private void serveGetRequest(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest) {
        this.contentServer.getRequest(httpRequest.uri(), new DataContext(channelHandlerContext.executor(), this.arrowAllocator)).thenAccept(contentResponse -> {
            serveGetResponse(channelHandlerContext, httpRequest, contentResponse);
        }).exceptionally(th -> {
            return unexpectedError(channelHandlerContext, th);
        });
    }

    private void serveGetResponse(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, ContentResponse contentResponse) {
        DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(httpRequest.protocolVersion(), contentResponse.statusCode);
        defaultHttpResponse.headers().setAll(contentResponse.headers);
        channelHandlerContext.write(defaultHttpResponse);
        if (contentResponse.statusCode == HttpResponseStatus.OK) {
            contentResponse.reader.subscribe(new ResponseSender(channelHandlerContext));
        } else {
            channelHandlerContext.write(new DefaultLastHttpContent());
            channelHandlerContext.flush();
        }
    }

    private Void unexpectedError(ChannelHandlerContext channelHandlerContext, Throwable th) {
        channelHandlerContext.fireExceptionCaught(th);
        return null;
    }
}
