package ai.tripl.arc.load;

import ai.tripl.arc.api.API;
import ai.tripl.arc.util.log.logger.Logger;
import java.net.URI;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.ForeachWriter;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$implicits$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StringType$;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple8;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: HTTPLoad.scala */
/* loaded from: input_file:ai/tripl/arc/load/HTTPLoadStage$.class */
public final class HTTPLoadStage$ implements Serializable {
    public static final HTTPLoadStage$ MODULE$ = null;

    static {
        new HTTPLoadStage$();
    }

    public Option<Dataset<Row>> execute(HTTPLoadStage hTTPLoadStage, SparkSession sparkSession, Logger logger, API.ARCContext aRCContext) {
        None$ apply;
        Dataset table = sparkSession.table(hTTPLoadStage.inputView());
        URI outputURI = hTTPLoadStage.outputURI();
        Map<String, String> headers = hTTPLoadStage.headers();
        List<Object> validStatusCodes = hTTPLoadStage.validStatusCodes();
        if (table.schema().length() == 1) {
            DataType dataType = table.schema().apply(0).dataType();
            StringType$ stringType$ = StringType$.MODULE$;
            if (dataType != null ? dataType.equals(stringType$) : stringType$ == null) {
                try {
                    if (aRCContext.isStreaming()) {
                        table.writeStream().foreach(new ForeachWriter<Row>(outputURI, headers, validStatusCodes) { // from class: ai.tripl.arc.load.HTTPLoadStage$$anon$3
                            private PoolingHttpClientConnectionManager poolingHttpClientConnectionManager;
                            private CloseableHttpClient httpClient;
                            private final String uri;
                            private final Map stageHeaders$1;
                            private final List stageValidStatusCodes$1;

                            private PoolingHttpClientConnectionManager poolingHttpClientConnectionManager() {
                                return this.poolingHttpClientConnectionManager;
                            }

                            private void poolingHttpClientConnectionManager_$eq(PoolingHttpClientConnectionManager poolingHttpClientConnectionManager) {
                                this.poolingHttpClientConnectionManager = poolingHttpClientConnectionManager;
                            }

                            private CloseableHttpClient httpClient() {
                                return this.httpClient;
                            }

                            private void httpClient_$eq(CloseableHttpClient closeableHttpClient) {
                                this.httpClient = closeableHttpClient;
                            }

                            private String uri() {
                                return this.uri;
                            }

                            public boolean open(long j, long j2) {
                                poolingHttpClientConnectionManager_$eq(new PoolingHttpClientConnectionManager());
                                httpClient_$eq(HttpClients.custom().setConnectionManager(poolingHttpClientConnectionManager()).build());
                                return true;
                            }

                            public void process(Row row) {
                                HttpPost httpPost = new HttpPost(uri());
                                this.stageHeaders$1.withFilter(new HTTPLoadStage$$anon$3$$anonfun$process$1(this)).foreach(new HTTPLoadStage$$anon$3$$anonfun$process$2(this, httpPost));
                                httpPost.setEntity(new StringEntity(row.getString(0)));
                                CloseableHttpResponse execute = httpClient().execute(httpPost);
                                if (!this.stageValidStatusCodes$1.contains(BoxesRunTime.boxToInteger(execute.getStatusLine().getStatusCode()))) {
                                    throw new Exception(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"HTTPLoad expects all response StatusCode(s) in [", "] but server responded with ", " (", ")."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.stageValidStatusCodes$1.mkString(", "), BoxesRunTime.boxToInteger(execute.getStatusLine().getStatusCode()), execute.getStatusLine().getReasonPhrase()})));
                                }
                                execute.close();
                                httpPost.releaseConnection();
                            }

                            public void close(Throwable th) {
                                httpClient().close();
                                poolingHttpClientConnectionManager().close();
                                if (th != null) {
                                    throw new Exception(th);
                                }
                                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                            }

                            {
                                this.stageHeaders$1 = headers;
                                this.stageValidStatusCodes$1 = validStatusCodes;
                                this.uri = outputURI.toString();
                            }
                        }).start();
                        apply = None$.MODULE$;
                    } else {
                        HTTPLoadStage$$anonfun$4 hTTPLoadStage$$anonfun$4 = new HTTPLoadStage$$anonfun$4(outputURI, headers, validStatusCodes);
                        SparkSession$implicits$ implicits = sparkSession.implicits();
                        TypeTags universe = package$.MODULE$.universe();
                        apply = Option$.MODULE$.apply(table.mapPartitions(hTTPLoadStage$$anonfun$4, implicits.newProductEncoder(universe.TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: ai.tripl.arc.load.HTTPLoadStage$$typecreator4$1
                            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                                mirror.universe();
                                return mirror.staticClass("ai.tripl.arc.load.HTTPLoadStage.Response").asType().toTypeConstructor();
                            }
                        }))).toDF());
                    }
                    return apply;
                } catch (Exception e) {
                    throw new HTTPLoadStage$$anon$2(hTTPLoadStage, e);
                }
            }
        }
        throw new HTTPLoadStage$$anon$1(hTTPLoadStage, "HTTPLoad requires inputView to be dataset with [value: string] signature.", table);
    }

    public HTTPLoadStage apply(HTTPLoad hTTPLoad, String str, Option<String> option, String str2, URI uri, Map<String, String> map, List<Object> list, Map<String, String> map2) {
        return new HTTPLoadStage(hTTPLoad, str, option, str2, uri, map, list, map2);
    }

    public Option<Tuple8<HTTPLoad, String, Option<String>, String, URI, Map<String, String>, List<Object>, Map<String, String>>> unapply(HTTPLoadStage hTTPLoadStage) {
        return hTTPLoadStage == null ? None$.MODULE$ : new Some(new Tuple8(hTTPLoadStage.plugin(), hTTPLoadStage.name(), hTTPLoadStage.description(), hTTPLoadStage.inputView(), hTTPLoadStage.outputURI(), hTTPLoadStage.headers(), hTTPLoadStage.validStatusCodes(), hTTPLoadStage.params()));
    }

    private Object readResolve() {
        return MODULE$;
    }

    private HTTPLoadStage$() {
        MODULE$ = this;
    }
}
