package ch.squaredesk.nova.comm.http;

import ch.squaredesk.nova.comm.retrieving.IncomingMessage;
import ch.squaredesk.nova.comm.retrieving.MessageUnmarshaller;
import ch.squaredesk.nova.comm.sending.MessageMarshaller;
import ch.squaredesk.nova.metrics.Metrics;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.glassfish.grizzly.ReadHandler;
import org.glassfish.grizzly.http.Method;
import org.glassfish.grizzly.http.io.NIOReader;
import org.glassfish.grizzly.http.io.NIOWriter;
import org.glassfish.grizzly.http.server.HttpHandler;
import org.glassfish.grizzly.http.server.HttpServer;
import org.glassfish.grizzly.http.server.Request;
import org.glassfish.grizzly.http.server.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ch/squaredesk/nova/comm/http/RpcServer.class */
public class RpcServer<InternalMessageType> extends ch.squaredesk.nova.comm.rpc.RpcServer<String, RpcInvocation<InternalMessageType>> {
    private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);
    private final MessageMarshaller<InternalMessageType, String> messageMarshaller;
    private final MessageUnmarshaller<String, InternalMessageType> messageUnmarshaller;
    private final Map<String, Flowable<RpcInvocation<? extends InternalMessageType>>> mapDestinationToIncomingMessages;
    private final HttpServer httpServer;

    /* loaded from: input_file:ch/squaredesk/nova/comm/http/RpcServer$NonBlockingHttpHandler.class */
    private class NonBlockingHttpHandler extends HttpHandler {
        private final URL destination;
        private final Subject<RpcInvocation<? extends InternalMessageType>> stream;

        private NonBlockingHttpHandler(URL url, Subject<RpcInvocation<? extends InternalMessageType>> subject) {
            this.destination = url;
            this.stream = subject;
        }

        public void service(final Request request, final Response response) throws Exception {
            response.suspend();
            final NIOReader nIOReader = request.getNIOReader();
            nIOReader.notifyAvailable(new ReadHandler() { // from class: ch.squaredesk.nova.comm.http.RpcServer.NonBlockingHttpHandler.1
                private char[] inputBuffer = new char[0];

                public void onDataAvailable() throws Exception {
                    this.inputBuffer = RpcServer.appendAvailableDataToBuffer(nIOReader, this.inputBuffer);
                    nIOReader.notifyAvailable(this);
                }

                public void onError(Throwable th) {
                    RpcServer.logger.error("Error parsing request data", th);
                    response.setStatus(400, "Bad request");
                    response.resume();
                }

                public void onAllDataRead() throws Exception {
                    this.inputBuffer = RpcServer.appendAvailableDataToBuffer(nIOReader, this.inputBuffer);
                    String str = new String(this.inputBuffer);
                    try {
                        nIOReader.close();
                    } catch (Exception e) {
                    }
                    Object convertRequestData = str.trim().isEmpty() ? null : RpcServer.convertRequestData(str, RpcServer.this.messageUnmarshaller);
                    IncomingMessage incomingMessage = new IncomingMessage(convertRequestData, new RequestMessageMetaData(NonBlockingHttpHandler.this.destination, RpcServer.httpSpecificInfoFrom(request)));
                    Response response2 = response;
                    Consumer consumer = pair -> {
                        try {
                            try {
                                NIOWriter nIOWriter = response2.getNIOWriter();
                                Throwable th = null;
                                try {
                                    String convertResponseData = RpcServer.convertResponseData(pair._1, RpcServer.this.messageMarshaller);
                                    response2.setContentType("application/json");
                                    response2.setContentLength(convertResponseData.length());
                                    response2.setStatus(pair._2 == null ? 200 : ((ReplyInfo) pair._2).statusCode);
                                    RpcServer.writeResponse(convertResponseData, nIOWriter);
                                    RpcServer.this.metricsCollector.requestCompleted(convertRequestData, convertResponseData);
                                    if (nIOWriter != null) {
                                        if (0 != 0) {
                                            try {
                                                nIOWriter.close();
                                            } catch (Throwable th2) {
                                                th.addSuppressed(th2);
                                            }
                                        } else {
                                            nIOWriter.close();
                                        }
                                    }
                                    response2.resume();
                                } catch (Throwable th3) {
                                    if (nIOWriter != null) {
                                        if (0 != 0) {
                                            try {
                                                nIOWriter.close();
                                            } catch (Throwable th4) {
                                                th.addSuppressed(th4);
                                            }
                                        } else {
                                            nIOWriter.close();
                                        }
                                    }
                                    throw th3;
                                }
                            } catch (Exception e2) {
                                RpcServer.this.metricsCollector.requestCompletedExceptionally(convertRequestData, e2);
                                RpcServer.logger.error("An error occurred trying to send HTTP response " + pair, e2);
                                try {
                                    response2.sendError(500, "Internal server error");
                                } catch (Exception e3) {
                                    RpcServer.logger.error("Failed to send error 500 back to client", e3);
                                }
                                response2.resume();
                            }
                        } catch (Throwable th5) {
                            response2.resume();
                            throw th5;
                        }
                    };
                    Response response3 = response;
                    RpcInvocation rpcInvocation = new RpcInvocation(incomingMessage, consumer, th -> {
                        RpcServer.logger.error("An error occurred trying to process HTTP request " + str, th);
                        try {
                            response3.sendError(500, "Internal server error");
                        } catch (Exception e2) {
                            RpcServer.logger.error("Failed to send error 500 back to client", e2);
                        }
                    });
                    RpcServer.this.metricsCollector.requestReceived(rpcInvocation.request);
                    NonBlockingHttpHandler.this.stream.onNext(rpcInvocation);
                }
            });
        }
    }

    public RpcServer(HttpServer httpServer, MessageMarshaller<InternalMessageType, String> messageMarshaller, MessageUnmarshaller<String, InternalMessageType> messageUnmarshaller, Metrics metrics) {
        this(null, httpServer, messageMarshaller, messageUnmarshaller, metrics);
    }

    public RpcServer(String str, HttpServer httpServer, MessageMarshaller<InternalMessageType, String> messageMarshaller, MessageUnmarshaller<String, InternalMessageType> messageUnmarshaller, Metrics metrics) {
        super(str, metrics);
        this.mapDestinationToIncomingMessages = new ConcurrentHashMap();
        Objects.requireNonNull(httpServer, "httpServer must not be null");
        Objects.requireNonNull(messageMarshaller, "messageMarshaller must not be null");
        Objects.requireNonNull(messageUnmarshaller, "messageUnmarshaller must not be null");
        this.httpServer = httpServer;
        this.messageUnmarshaller = messageUnmarshaller;
        this.messageMarshaller = messageMarshaller;
    }

    public Flowable<RpcInvocation<InternalMessageType>> requests(String str) {
        try {
            URL url = new URL("http", "localhost", str);
            return this.mapDestinationToIncomingMessages.computeIfAbsent(str, str2 -> {
                logger.info("Listening to requests on " + str);
                Subject serialized = PublishSubject.create().toSerialized();
                NonBlockingHttpHandler nonBlockingHttpHandler = new NonBlockingHttpHandler(url, serialized);
                this.httpServer.getServerConfiguration().addHttpHandler(nonBlockingHttpHandler, new String[]{str});
                return serialized.toFlowable(BackpressureStrategy.BUFFER).doFinally(() -> {
                    this.mapDestinationToIncomingMessages.remove(str);
                    this.httpServer.getServerConfiguration().removeHttpHandler(nonBlockingHttpHandler);
                    logger.info("Stopped listening to requests on " + str);
                }).share();
            });
        } catch (MalformedURLException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static RequestInfo httpSpecificInfoFrom(Request request) throws Exception {
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : request.getParameterMap().entrySet()) {
            String[] strArr = (String[]) entry.getValue();
            String str = null;
            if (strArr != null && strArr.length > 0) {
                str = strArr[0];
            }
            hashMap.put(entry.getKey(), str);
        }
        return new RequestInfo(convert(request.getMethod()), hashMap);
    }

    private static HttpRequestMethod convert(Method method) {
        return method == Method.POST ? HttpRequestMethod.POST : method == Method.DELETE ? HttpRequestMethod.DELETE : method == Method.PUT ? HttpRequestMethod.PUT : HttpRequestMethod.GET;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> T convertRequestData(String str, MessageUnmarshaller<String, T> messageUnmarshaller) throws Exception {
        return (T) messageUnmarshaller.unmarshal(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> String convertResponseData(T t, MessageMarshaller<T, String> messageMarshaller) throws Exception {
        return (String) messageMarshaller.marshal(t);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void writeResponse(String str, NIOWriter nIOWriter) throws Exception {
        nIOWriter.write(str);
        nIOWriter.flush();
        nIOWriter.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static char[] appendAvailableDataToBuffer(NIOReader nIOReader, char[] cArr) throws IOException {
        char[] cArr2 = new char[nIOReader.readyData()];
        int read = nIOReader.read(cArr2);
        if (read <= 0) {
            return cArr;
        }
        char[] cArr3 = new char[cArr.length + read];
        System.arraycopy(cArr, 0, cArr3, 0, cArr.length);
        System.arraycopy(cArr2, 0, cArr3, cArr.length, read);
        return cArr3;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() throws IOException {
        this.httpServer.start();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() {
        try {
            this.httpServer.shutdown(2L, TimeUnit.SECONDS).get();
        } catch (Exception e) {
            logger.info("An error occurred, trying to shutdown REST HTTP server", e);
        }
    }
}
