package ai.tripl.arc.load;

import ai.tripl.arc.api.API;
import ai.tripl.arc.load.HTTPLoadStage;
import ai.tripl.arc.util.log.logger.Logger;
import java.net.URI;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.ForeachWriter;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$implicits$;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.NullType$;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StringType$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Serializable;
import scala.Some;
import scala.Tuple2;
import scala.Tuple9;
import scala.collection.BufferedIterator;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.io.Codec$;
import scala.io.Source$;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: HTTPLoad.scala */
/* loaded from: input_file:ai/tripl/arc/load/HTTPLoadStage$.class */
public final class HTTPLoadStage$ implements Serializable {
    public static HTTPLoadStage$ MODULE$;

    static {
        new HTTPLoadStage$();
    }

    public Option<Dataset<Row>> execute(HTTPLoadStage hTTPLoadStage, SparkSession sparkSession, Logger logger, API.ARCContext aRCContext) {
        None$ apply;
        Dataset table = sparkSession.table(hTTPLoadStage.inputView());
        final URI outputURI = hTTPLoadStage.outputURI();
        final Map<String, String> headers = hTTPLoadStage.headers();
        final List<Object> validStatusCodes = hTTPLoadStage.validStatusCodes();
        if (table.schema().length() == 1) {
            DataType dataType = table.schema().apply(0).dataType();
            StringType$ stringType$ = StringType$.MODULE$;
            if (dataType != null ? dataType.equals(stringType$) : stringType$ == null) {
                try {
                    if (aRCContext.isStreaming()) {
                        table.writeStream().foreach(new ForeachWriter<Row>(outputURI, headers, validStatusCodes) { // from class: ai.tripl.arc.load.HTTPLoadStage$$anon$2
                            private PoolingHttpClientConnectionManager poolingHttpClientConnectionManager;
                            private CloseableHttpClient httpClient;
                            private final String uri;
                            private final Map stageHeaders$1;
                            private final List stageValidStatusCodes$1;

                            private PoolingHttpClientConnectionManager poolingHttpClientConnectionManager() {
                                return this.poolingHttpClientConnectionManager;
                            }

                            private void poolingHttpClientConnectionManager_$eq(PoolingHttpClientConnectionManager poolingHttpClientConnectionManager) {
                                this.poolingHttpClientConnectionManager = poolingHttpClientConnectionManager;
                            }

                            private CloseableHttpClient httpClient() {
                                return this.httpClient;
                            }

                            private void httpClient_$eq(CloseableHttpClient closeableHttpClient) {
                                this.httpClient = closeableHttpClient;
                            }

                            private String uri() {
                                return this.uri;
                            }

                            public boolean open(long j, long j2) {
                                poolingHttpClientConnectionManager_$eq(new PoolingHttpClientConnectionManager());
                                httpClient_$eq(HttpClients.custom().setConnectionManager(poolingHttpClientConnectionManager()).build());
                                return true;
                            }

                            public void process(Row row) {
                                HttpPost httpPost = new HttpPost(uri());
                                this.stageHeaders$1.withFilter(tuple2 -> {
                                    return BoxesRunTime.boxToBoolean($anonfun$process$1(tuple2));
                                }).foreach(tuple22 -> {
                                    $anonfun$process$2(httpPost, tuple22);
                                    return BoxedUnit.UNIT;
                                });
                                httpPost.setEntity(new StringEntity(row.getString(0)));
                                CloseableHttpResponse execute = httpClient().execute(httpPost);
                                if (!this.stageValidStatusCodes$1.contains(BoxesRunTime.boxToInteger(execute.getStatusLine().getStatusCode()))) {
                                    throw new Exception(new StringBuilder(80).append("HTTPLoad expects all response StatusCode(s) in [").append(this.stageValidStatusCodes$1.mkString(", ")).append("] but server responded with ").append(execute.getStatusLine().getStatusCode()).append(" (").append(execute.getStatusLine().getReasonPhrase()).append(").").toString());
                                }
                                execute.close();
                                httpPost.releaseConnection();
                            }

                            public void close(Throwable th) {
                                httpClient().close();
                                poolingHttpClientConnectionManager().close();
                                if (th != null) {
                                    throw new Exception(th);
                                }
                                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                            }

                            public static final /* synthetic */ boolean $anonfun$process$1(Tuple2 tuple2) {
                                return tuple2 != null;
                            }

                            public static final /* synthetic */ void $anonfun$process$2(HttpPost httpPost, Tuple2 tuple2) {
                                if (tuple2 == null) {
                                    throw new MatchError(tuple2);
                                }
                                httpPost.addHeader((String) tuple2._1(), (String) tuple2._2());
                                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                            }

                            {
                                this.stageHeaders$1 = headers;
                                this.stageValidStatusCodes$1 = validStatusCodes;
                                this.uri = outputURI.toString();
                            }
                        }).start();
                        apply = None$.MODULE$;
                    } else {
                        Function1 function1 = iterator -> {
                            int i;
                            DataType dataType2;
                            PoolingHttpClientConnectionManager poolingHttpClientConnectionManager = new PoolingHttpClientConnectionManager();
                            poolingHttpClientConnectionManager.setMaxTotal(50);
                            CloseableHttpClient build = HttpClients.custom().setConnectionManager(poolingHttpClientConnectionManager).build();
                            String uri = outputURI.toString();
                            BufferedIterator buffered = iterator.buffered();
                            boolean hasNext = buffered.hasNext();
                            if (true == hasNext) {
                                i = ((Row) buffered.head()).fieldIndex("value");
                            } else {
                                if (false != hasNext) {
                                    throw new MatchError(BoxesRunTime.boxToBoolean(hasNext));
                                }
                                i = 0;
                            }
                            int i2 = i;
                            boolean hasNext2 = buffered.hasNext();
                            if (true == hasNext2) {
                                dataType2 = ((Row) buffered.head()).schema().apply(i2).dataType();
                            } else {
                                if (false != hasNext2) {
                                    throw new MatchError(BoxesRunTime.boxToBoolean(hasNext2));
                                }
                                dataType2 = NullType$.MODULE$;
                            }
                            DataType dataType3 = dataType2;
                            return buffered.map(row -> {
                                HttpEntity byteArrayEntity;
                                HttpPost httpPost = new HttpPost(uri);
                                headers.withFilter(tuple2 -> {
                                    return BoxesRunTime.boxToBoolean($anonfun$execute$3(tuple2));
                                }).foreach(tuple22 -> {
                                    $anonfun$execute$4(httpPost, tuple22);
                                    return BoxedUnit.UNIT;
                                });
                                if (dataType3 instanceof StringType) {
                                    byteArrayEntity = new StringEntity(row.getString(i2));
                                } else {
                                    if (!(dataType3 instanceof BinaryType)) {
                                        throw new MatchError(dataType3);
                                    }
                                    byteArrayEntity = new ByteArrayEntity((byte[]) row.get(i2));
                                }
                                httpPost.setEntity(byteArrayEntity);
                                try {
                                    CloseableHttpResponse execute = build.execute(httpPost);
                                    if (!validStatusCodes.contains(BoxesRunTime.boxToInteger(execute.getStatusLine().getStatusCode()))) {
                                        throw new Exception(new StringBuilder(80).append("HTTPLoad expects all response StatusCode(s) in [").append(validStatusCodes.mkString(", ")).append("] but server responded with ").append(execute.getStatusLine().getStatusCode()).append(" (").append(execute.getStatusLine().getReasonPhrase()).append(").").toString());
                                    }
                                    String mkString = Source$.MODULE$.fromInputStream(execute.getEntity().getContent(), Codec$.MODULE$.fallbackSystemCodec()).mkString();
                                    execute.close();
                                    return new HTTPLoadStage.Response(execute.getStatusLine().getStatusCode(), execute.getStatusLine().getReasonPhrase(), mkString);
                                } finally {
                                    httpPost.releaseConnection();
                                    poolingHttpClientConnectionManager.close();
                                }
                            });
                        };
                        SparkSession$implicits$ implicits = sparkSession.implicits();
                        TypeTags universe = package$.MODULE$.universe();
                        apply = Option$.MODULE$.apply(table.mapPartitions(function1, implicits.newProductEncoder(universe.TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: ai.tripl.arc.load.HTTPLoadStage$$typecreator4$1
                            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                                mirror.universe();
                                return mirror.staticClass("ai.tripl.arc.load.HTTPLoadStage.Response").asType().toTypeConstructor();
                            }
                        }))).toDF());
                    }
                    return apply;
                } catch (Exception e) {
                    throw new HTTPLoadStage$$anon$3(e, hTTPLoadStage);
                }
            }
        }
        throw new HTTPLoadStage$$anon$1("HTTPLoad requires inputView to be dataset with [value: string] signature.", hTTPLoadStage, table);
    }

    public HTTPLoadStage apply(HTTPLoad hTTPLoad, Option<String> option, String str, Option<String> option2, String str2, URI uri, Map<String, String> map, List<Object> list, Map<String, String> map2) {
        return new HTTPLoadStage(hTTPLoad, option, str, option2, str2, uri, map, list, map2);
    }

    public Option<Tuple9<HTTPLoad, Option<String>, String, Option<String>, String, URI, Map<String, String>, List<Object>, Map<String, String>>> unapply(HTTPLoadStage hTTPLoadStage) {
        return hTTPLoadStage == null ? None$.MODULE$ : new Some(new Tuple9(hTTPLoadStage.plugin(), hTTPLoadStage.id(), hTTPLoadStage.name(), hTTPLoadStage.description(), hTTPLoadStage.inputView(), hTTPLoadStage.outputURI(), hTTPLoadStage.headers(), hTTPLoadStage.validStatusCodes(), hTTPLoadStage.params()));
    }

    private Object readResolve() {
        return MODULE$;
    }

    public static final /* synthetic */ boolean $anonfun$execute$3(Tuple2 tuple2) {
        return tuple2 != null;
    }

    public static final /* synthetic */ void $anonfun$execute$4(HttpPost httpPost, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        httpPost.addHeader((String) tuple2._1(), (String) tuple2._2());
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    private HTTPLoadStage$() {
        MODULE$ = this;
    }
}
