package tech.mlsql.arrow.python.runner;

import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ArrowColumnVector;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import scala.Array$;
import scala.Function1;
import scala.MatchError;
import scala.PartialFunction;
import scala.Predef$;
import scala.Tuple3;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.mutable.Buffer$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import tech.mlsql.arrow.ArrowBatchStreamWriter;
import tech.mlsql.arrow.ArrowConverters$;
import tech.mlsql.arrow.ArrowUtils$;
import tech.mlsql.arrow.Utils$;
import tech.mlsql.arrow.context.CommonTaskContext;
import tech.mlsql.common.utils.distribute.socket.server.SocketServerInExecutor$;

/* compiled from: SparkSocketRunner.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005ea\u0001B\u0006\r\u0001]A\u0001B\b\u0001\u0003\u0002\u0003\u0006Ia\b\u0005\tU\u0001\u0011\t\u0011)A\u0005?!A1\u0006\u0001B\u0001B\u0003%q\u0004C\u0003-\u0001\u0011\u0005Q\u0006C\u00034\u0001\u0011\u0005A\u0007C\u0003O\u0001\u0011\u0005q\nC\u0003~\u0001\u0011\u0005apB\u0004\u0002\u00121A\t!a\u0005\u0007\r-a\u0001\u0012AA\u000b\u0011\u0019a\u0013\u0002\"\u0001\u0002\u0018\t\t2\u000b]1sWN{7m[3u%Vtg.\u001a:\u000b\u00055q\u0011A\u0002:v]:,'O\u0003\u0002\u0010!\u00051\u0001/\u001f;i_:T!!\u0005\n\u0002\u000b\u0005\u0014(o\\<\u000b\u0005M!\u0012!B7mgFd'\"A\u000b\u0002\tQ,7\r[\u0002\u0001'\t\u0001\u0001\u0004\u0005\u0002\u001a95\t!DC\u0001\u001c\u0003\u0015\u00198-\u00197b\u0013\ti\"D\u0001\u0004B]f\u0014VMZ\u0001\u000beVtg.\u001a:OC6,\u0007C\u0001\u0011(\u001d\t\tS\u0005\u0005\u0002#55\t1E\u0003\u0002%-\u00051AH]8pizJ!A\n\u000e\u0002\rA\u0013X\rZ3g\u0013\tA\u0013F\u0001\u0004TiJLgn\u001a\u0006\u0003Mi\tA\u0001[8ti\u0006QA/[7f5>tW-\u00133\u0002\rqJg.\u001b;?)\u0011q\u0003'\r\u001a\u0011\u0005=\u0002Q\"\u0001\u0007\t\u000by!\u0001\u0019A\u0010\t\u000b)\"\u0001\u0019A\u0010\t\u000b-\"\u0001\u0019A\u0010\u0002\u001bM,'O^3U_N#(/Z1n)\t)D\n\u0006\u00027yA\u0019\u0011dN\u001d\n\u0005aR\"!B!se\u0006L\bCA\r;\u0013\tY$DA\u0002B]fDQ!P\u0003A\u0002y\n\u0011b\u001e:ji\u00164UO\\2\u0011\tey\u0014)S\u0005\u0003\u0001j\u0011\u0011BR;oGRLwN\\\u0019\u0011\u0005\t;U\"A\"\u000b\u0005\u0011+\u0015AA5p\u0015\u00051\u0015\u0001\u00026bm\u0006L!\u0001S\"\u0003\u0019=+H\u000f];u'R\u0014X-Y7\u0011\u0005eQ\u0015BA&\u001b\u0005\u0011)f.\u001b;\t\u000b5+\u0001\u0019A\u0010\u0002\u0015QD'/Z1e\u001d\u0006lW-\u0001\ftKJ4X\rV8TiJ,\u0017-\\,ji\"\f%O]8x)\u00151\u0004+[9w\u0011\u0015\tf\u00011\u0001S\u0003\u0011IG/\u001a:\u0011\u0007MC6L\u0004\u0002U-:\u0011!%V\u0005\u00027%\u0011qKG\u0001\ba\u0006\u001c7.Y4f\u0013\tI&L\u0001\u0005Ji\u0016\u0014\u0018\r^8s\u0015\t9&\u0004\u0005\u0002]O6\tQL\u0003\u0002_?\u0006A1-\u0019;bYf\u001cHO\u0003\u0002aC\u0006\u00191/\u001d7\u000b\u0005\t\u001c\u0017!B:qCJ\\'B\u00013f\u0003\u0019\t\u0007/Y2iK*\ta-A\u0002pe\u001eL!\u0001[/\u0003\u0017%sG/\u001a:oC2\u0014vn\u001e\u0005\u0006U\u001a\u0001\ra[\u0001\u0007g\u000eDW-\\1\u0011\u00051|W\"A7\u000b\u00059|\u0016!\u0002;za\u0016\u001c\u0018B\u00019n\u0005)\u0019FO];diRK\b/\u001a\u0005\u0006e\u001a\u0001\ra]\u0001\u0013[\u0006D(+Z2pe\u0012\u001c\b+\u001a:CCR\u001c\u0007\u000e\u0005\u0002\u001ai&\u0011QO\u0007\u0002\u0004\u0013:$\b\"B<\u0007\u0001\u0004A\u0018aB2p]R,\u0007\u0010\u001e\t\u0003snl\u0011A\u001f\u0006\u0003oBI!\u0001 >\u0003#\r{W.\\8o)\u0006\u001c8nQ8oi\u0016DH/A\fsK\u0006$gI]8n'R\u0014X-Y7XSRD\u0017I\u001d:poR9q0!\u0003\u0002\f\u0005=\u0001#BA\u0001\u0003\u000fYVBAA\u0002\u0015\r\t)AG\u0001\u000bG>dG.Z2uS>t\u0017bA-\u0002\u0004!)!f\u0002a\u0001?!1\u0011QB\u0004A\u0002M\fA\u0001]8si\")qo\u0002a\u0001q\u0006\t2\u000b]1sWN{7m[3u%Vtg.\u001a:\u0011\u0005=J1CA\u0005\u0019)\t\t\u0019\u0002")
/* loaded from: input_file:tech/mlsql/arrow/python/runner/SparkSocketRunner.class */
public class SparkSocketRunner {
    private final String runnerName;
    private final String host;
    private final String timeZoneId;

    public Object[] serveToStream(String str, Function1<OutputStream, BoxedUnit> function1) {
        Tuple3 tuple3 = SocketServerInExecutor$.MODULE$.setupOneConnectionServer(this.host, this.runnerName, socket -> {
            $anonfun$serveToStream$1(function1, socket);
            return BoxedUnit.UNIT;
        });
        if (tuple3 == null) {
            throw new MatchError(tuple3);
        }
        Tuple3 tuple32 = new Tuple3((ServerSocket) tuple3._1(), (String) tuple3._2(), BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(tuple3._3())));
        return (Object[]) Array$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{(ServerSocket) tuple32._1(), (String) tuple32._2(), BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(tuple32._3()))}), ClassTag$.MODULE$.Any());
    }

    public Object[] serveToStreamWithArrow(Iterator<InternalRow> iterator, StructType structType, int i, CommonTaskContext commonTaskContext) {
        return serveToStream(this.runnerName, outputStream -> {
            $anonfun$serveToStreamWithArrow$1(this, structType, iterator, i, commonTaskContext, outputStream);
            return BoxedUnit.UNIT;
        });
    }

    public Iterator<InternalRow> readFromStreamWithArrow(String str, int i, final CommonTaskContext commonTaskContext) {
        Socket socket = new Socket(str, i);
        final DataInputStream dataInputStream = new DataInputStream(socket.getInputStream());
        final DataOutputStream dataOutputStream = new DataOutputStream(socket.getOutputStream());
        final SparkSocketRunner sparkSocketRunner = null;
        return new ReaderIterator<ColumnarBatch>(sparkSocketRunner, dataInputStream, commonTaskContext, dataOutputStream) { // from class: tech.mlsql.arrow.python.runner.SparkSocketRunner$$anon$1
            private final BufferAllocator allocator;
            private ArrowStreamReader reader;
            private VectorSchemaRoot root;
            private StructType schema;
            private ColumnVector[] vectors;
            private boolean batchLoaded;
            private final DataInputStream stream$1;
            private final DataOutputStream outfile$1;

            private BufferAllocator allocator() {
                return this.allocator;
            }

            private ArrowStreamReader reader() {
                return this.reader;
            }

            private void reader_$eq(ArrowStreamReader arrowStreamReader) {
                this.reader = arrowStreamReader;
            }

            private VectorSchemaRoot root() {
                return this.root;
            }

            private void root_$eq(VectorSchemaRoot vectorSchemaRoot) {
                this.root = vectorSchemaRoot;
            }

            private StructType schema() {
                return this.schema;
            }

            private void schema_$eq(StructType structType) {
                this.schema = structType;
            }

            private ColumnVector[] vectors() {
                return this.vectors;
            }

            private void vectors_$eq(ColumnVector[] columnVectorArr) {
                this.vectors = columnVectorArr;
            }

            private boolean batchLoaded() {
                return this.batchLoaded;
            }

            private void batchLoaded_$eq(boolean z) {
                this.batchLoaded = z;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // tech.mlsql.arrow.python.runner.ReaderIterator
            public ColumnarBatch read() {
                ColumnarBatch read;
                ColumnarBatch columnarBatch;
                ColumnarBatch columnarBatch2;
                try {
                    if (reader() == null || !batchLoaded()) {
                        int readInt = this.stream$1.readInt();
                        if (SpecialLengths$.MODULE$.START_ARROW_STREAM() == readInt) {
                            try {
                                reader_$eq(new ArrowStreamReader(this.stream$1, allocator()));
                                root_$eq(reader().getVectorSchemaRoot());
                                schema_$eq(ArrowUtils$.MODULE$.fromArrowSchema(root().getSchema()));
                                vectors_$eq((ColumnVector[]) ((TraversableOnce) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(root().getFieldVectors()).asScala()).map(fieldVector -> {
                                    return new ArrowColumnVector(fieldVector);
                                }, Buffer$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(ColumnVector.class)));
                                read = read();
                            } catch (Throwable th) {
                                if (th instanceof IOException) {
                                    IOException iOException = (IOException) th;
                                    if (iOException.getMessage().contains("Missing schema") || iOException.getMessage().contains("Expected schema but header was")) {
                                        logInfo(() -> {
                                            return "Arrow read schema fail";
                                        }, iOException);
                                        reader_$eq(null);
                                        read = read();
                                    }
                                }
                                throw th;
                            }
                            columnarBatch = read;
                        } else if (SpecialLengths$.MODULE$.ARROW_STREAM_CRASH() == readInt) {
                            columnarBatch = read();
                        } else {
                            if (SpecialLengths$.MODULE$.PYTHON_EXCEPTION_THROWN() == readInt) {
                                throw handlePythonException(this.outfile$1);
                            }
                            if (SpecialLengths$.MODULE$.END_OF_DATA_SECTION() != readInt) {
                                throw new MatchError(BoxesRunTime.boxToInteger(readInt));
                            }
                            handleEndOfDataSection(this.outfile$1);
                            columnarBatch = null;
                        }
                        columnarBatch2 = columnarBatch;
                    } else {
                        batchLoaded_$eq(reader().loadNextBatch());
                        if (batchLoaded()) {
                            ColumnarBatch columnarBatch3 = new ColumnarBatch(vectors());
                            columnarBatch3.setNumRows(root().getRowCount());
                            columnarBatch2 = columnarBatch3;
                        } else {
                            reader().close(false);
                            allocator().close();
                            columnarBatch2 = read();
                        }
                    }
                    return columnarBatch2;
                } catch (Throwable th2) {
                    PartialFunction handleException = handleException();
                    if (handleException.isDefinedAt(th2)) {
                        return (ColumnarBatch) handleException.apply(th2);
                    }
                    throw th2;
                }
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(dataInputStream, System.currentTimeMillis(), commonTaskContext);
                this.stream$1 = dataInputStream;
                this.outfile$1 = dataOutputStream;
                this.allocator = ArrowUtils$.MODULE$.rootAllocator().newChildAllocator("stdin reader ", 0L, Long.MAX_VALUE);
                commonTaskContext.readerRegister(() -> {
                }).apply(reader(), allocator());
                this.batchLoaded = true;
            }
        }.flatMap(columnarBatch -> {
            return (Iterator) JavaConverters$.MODULE$.asScalaIteratorConverter(columnarBatch.rowIterator()).asScala();
        });
    }

    public static final /* synthetic */ void $anonfun$serveToStream$1(Function1 function1, Socket socket) {
        BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(socket.getOutputStream());
        Utils$.MODULE$.tryWithSafeFinally(() -> {
            function1.apply(bufferedOutputStream);
        }, () -> {
            bufferedOutputStream.close();
        });
    }

    public static final /* synthetic */ void $anonfun$serveToStreamWithArrow$1(SparkSocketRunner sparkSocketRunner, StructType structType, Iterator iterator, int i, CommonTaskContext commonTaskContext, OutputStream outputStream) {
        ArrowBatchStreamWriter arrowBatchStreamWriter = new ArrowBatchStreamWriter(structType, outputStream, sparkSocketRunner.timeZoneId);
        arrowBatchStreamWriter.writeBatches(ArrowConverters$.MODULE$.toBatchIterator(iterator, structType, i, sparkSocketRunner.timeZoneId, commonTaskContext));
        arrowBatchStreamWriter.end();
    }

    public SparkSocketRunner(String str, String str2, String str3) {
        this.runnerName = str;
        this.host = str2;
        this.timeZoneId = str3;
    }
}
