package io.drasi.source.sdk;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.drasi.source.sdk.models.BootstrapRequest;
import io.drasi.source.sdk.models.SourceElement;
import io.undertow.io.IoCallback;
import io.undertow.io.Sender;
import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/drasi/source/sdk/JsonStreamingHandler.class */
public class JsonStreamingHandler implements HttpHandler {
    private static final ObjectMapper objectMapper = new ObjectMapper();
    private final Logger log = LoggerFactory.getLogger(JsonStreamingHandler.class);
    private final Function<BootstrapRequest, BootstrapStream> streamFunction;

    public JsonStreamingHandler(Function<BootstrapRequest, BootstrapStream> function) {
        this.streamFunction = function;
    }

    public void handleRequest(HttpServerExchange httpServerExchange) {
        httpServerExchange.getRequestReceiver().receiveFullBytes((httpServerExchange2, bArr) -> {
            try {
                try {
                    final BootstrapStream apply = this.streamFunction.apply((BootstrapRequest) objectMapper.readValue(bArr, BootstrapRequest.class));
                    List<String> validate = apply.validate();
                    if (validate.isEmpty()) {
                        httpServerExchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/json");
                        httpServerExchange.getResponseHeaders().remove(Headers.CONTENT_LENGTH);
                        httpServerExchange.getResponseSender().send("", new IoCallback(this) { // from class: io.drasi.source.sdk.JsonStreamingHandler.1
                            final /* synthetic */ JsonStreamingHandler this$0;

                            {
                                this.this$0 = this;
                            }

                            public void onComplete(HttpServerExchange httpServerExchange2, Sender sender) {
                                this.this$0.streamJsonArray(httpServerExchange2, apply);
                            }

                            public void onException(HttpServerExchange httpServerExchange2, Sender sender, IOException iOException) {
                                this.this$0.log.error("Error sending response", iOException);
                                httpServerExchange2.setStatusCode(500);
                                httpServerExchange2.getResponseSender().send(iOException.getMessage());
                                httpServerExchange2.endExchange();
                                try {
                                    apply.close();
                                } catch (Exception e) {
                                    this.this$0.log.error("Error closing stream", e);
                                }
                            }
                        });
                    } else {
                        String join = String.join(", ", validate);
                        httpServerExchange2.setStatusCode(400);
                        httpServerExchange2.getResponseSender().send("Error validating request: " + join);
                        httpServerExchange2.endExchange();
                    }
                } catch (Exception e) {
                    this.log.error("Error creating stream", e);
                    httpServerExchange2.setStatusCode(500);
                    httpServerExchange2.getResponseSender().send(e.getMessage());
                    httpServerExchange2.endExchange();
                }
            } catch (Exception e2) {
                this.log.error("Error parsing request", e2);
                httpServerExchange2.setStatusCode(400);
                httpServerExchange2.getResponseSender().send("Error parsing request: " + e2.getMessage());
                httpServerExchange2.endExchange();
            }
        });
    }

    private void streamJsonArray(HttpServerExchange httpServerExchange, final BootstrapStream bootstrapStream) {
        SourceElement next = bootstrapStream.next();
        if (next != null) {
            httpServerExchange.getResponseSender().send(next.toJson() + "\n", new IoCallback(this) { // from class: io.drasi.source.sdk.JsonStreamingHandler.2
                final /* synthetic */ JsonStreamingHandler this$0;

                {
                    this.this$0 = this;
                }

                public void onComplete(HttpServerExchange httpServerExchange2, Sender sender) {
                    this.this$0.streamJsonArray(httpServerExchange2, bootstrapStream);
                }

                public void onException(HttpServerExchange httpServerExchange2, Sender sender, IOException iOException) {
                    this.this$0.log.error("Error sending response", iOException);
                    httpServerExchange2.setStatusCode(500);
                    httpServerExchange2.getResponseSender().send(iOException.getMessage());
                    httpServerExchange2.endExchange();
                    try {
                        bootstrapStream.close();
                    } catch (Exception e) {
                        this.this$0.log.error("Error closing stream", e);
                    }
                }
            });
            return;
        }
        httpServerExchange.endExchange();
        try {
            bootstrapStream.close();
        } catch (Exception e) {
            this.log.error("Error closing stream", e);
        }
    }
}
