package org.apache.wayang.api.python.executor;

import org.apache.wayang.api.python.function.PythonCode;
import org.apache.wayang.api.python.function.PythonUDF;
import org.apache.wayang.core.api.exception.WayangException;

/* loaded from: input_file:org/apache/wayang/api/python/executor/PythonWorkerManager.class */
public class PythonWorkerManager<Input, Output> {
    private PythonUDF<Input, Output> udf;
    private PythonCode serializedUDF;
    private Iterable<Input> inputIterator;

    public PythonWorkerManager(PythonUDF<Input, Output> pythonUDF, PythonCode pythonCode, Iterable<Input> iterable) {
        this.udf = pythonUDF;
        this.serializedUDF = pythonCode;
        this.inputIterator = iterable;
    }

    public Iterable<Output> execute() {
        PythonProcessCaller pythonProcessCaller = new PythonProcessCaller(this.serializedUDF);
        if (pythonProcessCaller.isReady()) {
            new Thread(() -> {
                new ProcessFeeder(pythonProcessCaller.getSocket(), this.udf, this.serializedUDF, this.inputIterator).send();
            }).start();
            return new ProcessReceiver(pythonProcessCaller.getSocket()).getIterable();
        }
        int localPort = pythonProcessCaller.getSocket().getLocalPort();
        pythonProcessCaller.close();
        throw new WayangException("Not possible to work with the Socket provided on port: " + localPort);
    }
}
