package org.apache.spark.sql.ai.starlake.http;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.InternalRow$;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.execution.streaming.LongOffset;
import org.apache.spark.sql.execution.streaming.LongOffset$;
import org.apache.spark.sql.execution.streaming.SerializedOffset;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.unsafe.types.UTF8String;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ListBuffer;
import scala.collection.mutable.ListBuffer$;
import scala.io.Codec$;
import scala.io.Source$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: HttpIngestionJob.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005%f\u0001\u0002\r\u001a\u0001!B\u0001b\u0011\u0001\u0003\u0002\u0003\u0006I\u0001\u0012\u0005\t\u0011\u0002\u0011\t\u0011)A\u0005\u0013\")q\n\u0001C\u0001!\")Q\u000b\u0001C!-\"9Q\f\u0001a\u0001\n\u0003q\u0006b\u00022\u0001\u0001\u0004%\ta\u0019\u0005\u0007S\u0002\u0001\u000b\u0015B0\t\u000f)\u0004\u0001\u0019!C\u0001W\"9A\u000e\u0001a\u0001\n\u0003i\u0007BB8\u0001A\u0003&\u0011\nC\u0004q\u0001\t\u0007I\u0011A9\t\u000f\u0005-\u0001\u0001)A\u0005e\"I\u0011Q\u0002\u0001C\u0002\u0013\u0005\u0011q\u0002\u0005\t\u0003K\u0001\u0001\u0015!\u0003\u0002\u0012!9\u0011q\u0005\u0001\u0005\n\u0005%\u0002bBA&\u0001\u0011%\u0011Q\n\u0005\n\u0003+\u0002!\u0019!C\u0005\u0003/B\u0001\"!\u0017\u0001A\u0003%\u0011q\n\u0005\b\u00037\u0002A\u0011IA/\u0011\u001d\tY\u0007\u0001C\u0001\u0003[Bq!!\u001e\u0001\t\u0003\n9\bC\u0004\u0002 \u0002!\t%!)\t\u000f\u0005\u0015\u0006\u0001\"\u0011\u0002(\n\u0001\u0002\n\u001e;q'R\u0014X-Y7T_V\u00148-\u001a\u0006\u00035m\tA\u0001\u001b;ua*\u0011A$H\u0001\tgR\f'\u000f\\1lK*\u0011adH\u0001\u0003C&T!\u0001I\u0011\u0002\u0007M\fHN\u0003\u0002#G\u0005)1\u000f]1sW*\u0011A%J\u0001\u0007CB\f7\r[3\u000b\u0003\u0019\n1a\u001c:h\u0007\u0001\u0019B\u0001A\u00152sA\u0011!fL\u0007\u0002W)\u0011A&L\u0001\u0005Y\u0006twMC\u0001/\u0003\u0011Q\u0017M^1\n\u0005AZ#AB(cU\u0016\u001cG\u000f\u0005\u00023o5\t1G\u0003\u00025k\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003m}\t\u0011\"\u001a=fGV$\u0018n\u001c8\n\u0005a\u001a$AB*pkJ\u001cW\r\u0005\u0002;\u00036\t1H\u0003\u0002={\u0005a1oY1mC2|wmZ5oO*\u0011ahP\u0001\tif\u0004Xm]1gK*\t\u0001)A\u0002d_6L!AQ\u001e\u0003\u001bM#(/[2u\u0019><w-\u001b8h\u0003)\u0019\u0018\u000f\\\"p]R,\u0007\u0010\u001e\t\u0003\u000b\u001ak\u0011aH\u0005\u0003\u000f~\u0011!bU)M\u0007>tG/\u001a=u\u0003\u0011\u0001xN\u001d;\u0011\u0005)kU\"A&\u000b\u00031\u000bQa]2bY\u0006L!AT&\u0003\u0007%sG/\u0001\u0004=S:LGO\u0010\u000b\u0004#N#\u0006C\u0001*\u0001\u001b\u0005I\u0002\"B\"\u0004\u0001\u0004!\u0005\"\u0002%\u0004\u0001\u0004I\u0015AB:dQ\u0016l\u0017-F\u0001X!\tA6,D\u0001Z\u0015\tQv$A\u0003usB,7/\u0003\u0002]3\nQ1\u000b\u001e:vGR$\u0016\u0010]3\u0002\u001dA\u0014x\u000eZ;dKJ|eMZ:fiV\tq\f\u0005\u00023A&\u0011\u0011m\r\u0002\u000b\u0019>twm\u00144gg\u0016$\u0018A\u00059s_\u0012,8-\u001a:PM\u001a\u001cX\r^0%KF$\"\u0001Z4\u0011\u0005)+\u0017B\u00014L\u0005\u0011)f.\u001b;\t\u000f!4\u0011\u0011!a\u0001?\u0006\u0019\u0001\u0010J\u0019\u0002\u001fA\u0014x\u000eZ;dKJ|eMZ:fi\u0002\nabY8ogVlWM](gMN,G/F\u0001J\u0003I\u0019wN\\:v[\u0016\u0014xJ\u001a4tKR|F%Z9\u0015\u0005\u0011t\u0007b\u00025\n\u0003\u0003\u0005\r!S\u0001\u0010G>t7/^7fe>3gm]3uA\u0005a1\u000f\u001e:fC6\u0014UO\u001a4feV\t!\u000fE\u0002tqjl\u0011\u0001\u001e\u0006\u0003kZ\fq!\\;uC\ndWM\u0003\u0002x\u0017\u0006Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0005e$(A\u0003'jgR\u0014UO\u001a4feB\u001910!\u0002\u000f\u0007q\f\t\u0001\u0005\u0002~\u00176\taP\u0003\u0002��O\u00051AH]8pizJ1!a\u0001L\u0003\u0019\u0001&/\u001a3fM&!\u0011qAA\u0005\u0005\u0019\u0019FO]5oO*\u0019\u00111A&\u0002\u001bM$(/Z1n\u0005V4g-\u001a:!\u0003!1G.Y4Ti>\u0004XCAA\t!\u0011\t\u0019\"!\t\u000e\u0005\u0005U!\u0002BA\f\u00033\ta!\u0019;p[&\u001c'\u0002BA\u000e\u0003;\t!bY8oGV\u0014(/\u001a8u\u0015\r\ty\"L\u0001\u0005kRLG.\u0003\u0003\u0002$\u0005U!!D!u_6L7MQ8pY\u0016\fg.A\u0005gY\u0006<7\u000b^8qA\u0005a1/\u001a8e%\u0016\u001c\bo\u001c8tKR9A-a\u000b\u0002D\u0005\u001d\u0003bBA\u0017\u001f\u0001\u0007\u0011qF\u0001\u0003Q\u0016\u0004B!!\r\u0002@5\u0011\u00111\u0007\u0006\u0005\u0003k\t9$\u0001\u0006iiR\u00048/\u001a:wKJTA!!\u000f\u0002<\u0005\u0019a.\u001a;\u000b\u0007\u0005ur(A\u0002tk:LA!!\u0011\u00024\ta\u0001\n\u001e;q\u000bb\u001c\u0007.\u00198hK\"1\u0011QI\bA\u0002%\u000baa\u001d;biV\u001c\bBBA%\u001f\u0001\u0007!0\u0001\u0005sKN\u0004xN\\:f\u0003-\u0019H/\u0019:u'\u0016\u0014h/\u001a:\u0015\u0005\u0005=\u0003\u0003BA\u0019\u0003#JA!a\u0015\u00024\tQ\u0001\n\u001e;q'\u0016\u0014h/\u001a:\u0002\rM,'O^3s+\t\ty%A\u0004tKJ4XM\u001d\u0011\u0002\u0013\u001d,Go\u00144gg\u0016$XCAA0!\u0015Q\u0015\u0011MA3\u0013\r\t\u0019g\u0013\u0002\u0007\u001fB$\u0018n\u001c8\u0011\u0007I\n9'C\u0002\u0002jM\u0012aa\u00144gg\u0016$\u0018aE2p]Z,'\u000f\u001e+p\u0019>twm\u00144gg\u0016$H\u0003BA8\u0003c\u0002BASA1?\"9\u00111\u000f\u000bA\u0002\u0005\u0015\u0014AB8gMN,G/\u0001\u0005hKR\u0014\u0015\r^2i)\u0019\tI(a&\u0002\u001cB!\u00111PAI\u001d\u0011\ti(!$\u000f\t\u0005}\u00141\u0012\b\u0005\u0003\u0003\u000bII\u0004\u0003\u0002\u0004\u0006\u001debA?\u0002\u0006&\ta%\u0003\u0002%K%\u0011!eI\u0005\u0003A\u0005J1!a$ \u0003\u001d\u0001\u0018mY6bO\u0016LA!a%\u0002\u0016\nIA)\u0019;b\rJ\fW.\u001a\u0006\u0004\u0003\u001f{\u0002bBAM+\u0001\u0007\u0011qL\u0001\u0006gR\f'\u000f\u001e\u0005\b\u0003;+\u0002\u0019AA3\u0003\r)g\u000eZ\u0001\u0007G>lW.\u001b;\u0015\u0007\u0011\f\u0019\u000bC\u0004\u0002\u001eZ\u0001\r!!\u001a\u0002\tM$x\u000e\u001d\u000b\u0002I\u0002")
/* loaded from: input_file:org/apache/spark/sql/ai/starlake/http/HttpStreamSource.class */
public class HttpStreamSource implements Source, StrictLogging {
    private final SQLContext sqlContext;
    private final int port;
    private LongOffset producerOffset;
    private int consumerOffset;
    private final ListBuffer<String> streamBuffer;
    private final AtomicBoolean flagStop;
    private final HttpServer server;
    private final Logger logger;

    public Offset initialOffset() {
        return Source.initialOffset$(this);
    }

    public Offset deserializeOffset(String str) {
        return Source.deserializeOffset$(this, str);
    }

    public void commit(Offset offset) {
        Source.commit$(this, offset);
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger logger) {
        this.logger = logger;
    }

    public StructType schema() {
        return StructType$.MODULE$.apply(new $colon.colon(new StructField("value", StringType$.MODULE$, true, StructField$.MODULE$.apply$default$4()), Nil$.MODULE$));
    }

    public LongOffset producerOffset() {
        return this.producerOffset;
    }

    public void producerOffset_$eq(LongOffset longOffset) {
        this.producerOffset = longOffset;
    }

    public int consumerOffset() {
        return this.consumerOffset;
    }

    public void consumerOffset_$eq(int i) {
        this.consumerOffset = i;
    }

    public ListBuffer<String> streamBuffer() {
        return this.streamBuffer;
    }

    public AtomicBoolean flagStop() {
        return this.flagStop;
    }

    public void org$apache$spark$sql$ai$starlake$http$HttpStreamSource$$sendResponse(HttpExchange httpExchange, int i, String str) {
        httpExchange.sendResponseHeaders(i, str.length());
        OutputStream responseBody = httpExchange.getResponseBody();
        responseBody.write(str.getBytes());
        responseBody.close();
    }

    private HttpServer startServer() {
        HttpServer create = HttpServer.create(new InetSocketAddress(this.port), 0);
        create.setExecutor(Executors.newCachedThreadPool());
        create.createContext("/", new HttpHandler(this) { // from class: org.apache.spark.sql.ai.starlake.http.HttpStreamSource$$anon$1
            private final /* synthetic */ HttpStreamSource $outer;

            public void handle(HttpExchange httpExchange) {
                String mkString = Source$.MODULE$.fromInputStream(httpExchange.getRequestBody(), Codec$.MODULE$.fallbackSystemCodec()).mkString();
                this.$outer.producerOffset_$eq(this.$outer.producerOffset().$plus(1L));
                this.$outer.streamBuffer().$plus$eq(mkString);
                this.$outer.org$apache$spark$sql$ai$starlake$http$HttpStreamSource$$sendResponse(httpExchange, 200, "{\"success\": true}");
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        });
        create.start();
        return create;
    }

    private HttpServer server() {
        return this.server;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public Option<org.apache.spark.sql.execution.streaming.Offset> getOffset() {
        LongOffset producerOffset;
        synchronized (this) {
            producerOffset = producerOffset();
        }
        return producerOffset.offset() == -1 ? None$.MODULE$ : new Some(producerOffset);
    }

    public Option<LongOffset> convertToLongOffset(org.apache.spark.sql.execution.streaming.Offset offset) {
        if (offset instanceof LongOffset) {
            return new Some((LongOffset) offset);
        }
        if (!(offset instanceof SerializedOffset)) {
            return None$.MODULE$;
        }
        return new Some(LongOffset$.MODULE$.apply((SerializedOffset) offset));
    }

    /* JADX WARN: Multi-variable type inference failed */
    public Dataset<Row> getBatch(Option<org.apache.spark.sql.execution.streaming.Offset> option, org.apache.spark.sql.execution.streaming.Offset offset) {
        ListBuffer listBuffer;
        long offset2 = ((LongOffset) convertToLongOffset((org.apache.spark.sql.execution.streaming.Offset) option.getOrElse(() -> {
            return new LongOffset(-1L);
        })).getOrElse(() -> {
            return new LongOffset(-1L);
        })).offset();
        long offset3 = ((LongOffset) convertToLongOffset(offset).getOrElse(() -> {
            return new LongOffset(-1L);
        })).offset();
        synchronized (this) {
            listBuffer = (ListBuffer) streamBuffer().slice(((int) offset2) - consumerOffset(), ((int) offset3) - consumerOffset());
        }
        SparkContext sparkContext = this.sqlContext.sparkContext();
        return this.sqlContext.sparkSession().internalCreateDataFrame(sparkContext.parallelize(listBuffer, sparkContext.parallelize$default$2(), ClassTag$.MODULE$.apply(String.class)).map(str -> {
            return InternalRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{UTF8String.fromString(str)}));
        }, ClassTag$.MODULE$.apply(InternalRow.class)), StructType$.MODULE$.apply(new $colon.colon(new StructField("value", StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), Nil$.MODULE$)), true);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void commit(org.apache.spark.sql.execution.streaming.Offset offset) {
        LongOffset longOffset;
        Some convertToLongOffset = convertToLongOffset(offset);
        if (!(convertToLongOffset instanceof Some) || (longOffset = (LongOffset) convertToLongOffset.value()) == null) {
            throw new Exception(new StringBuilder(33).append("Cannot commit with end offset => ").append(offset).toString());
        }
        long offset2 = longOffset.offset();
        if (offset2 < 0) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        synchronized (this) {
            streamBuffer().trimStart(((int) offset2) - consumerOffset());
            consumerOffset_$eq((int) offset2);
        }
        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
    }

    public void stop() {
        server().stop(30);
    }

    public HttpStreamSource(SQLContext sQLContext, int i) {
        this.sqlContext = sQLContext;
        this.port = i;
        Source.$init$(this);
        StrictLogging.$init$(this);
        this.producerOffset = new LongOffset(-1L);
        this.consumerOffset = -1;
        this.streamBuffer = ListBuffer$.MODULE$.apply(Nil$.MODULE$);
        this.flagStop = new AtomicBoolean(false);
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info("Http Server started on port {}", new Object[]{BoxesRunTime.boxToInteger(i)});
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        this.server = startServer();
    }
}
