package org.apache.wayang.api.python.executor;

import com.google.protobuf.ByteString;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Map;
import org.apache.wayang.core.api.exception.WayangException;

/* loaded from: input_file:org/apache/wayang/api/python/executor/ProcessFeeder.class */
public class ProcessFeeder<Input, Output> {
    private Socket socket;
    private ByteString serializedUDF;
    private Iterable<Input> input;
    int END_OF_DATA_SECTION = -1;
    int NULL = -5;

    public ProcessFeeder(Socket socket, ByteString byteString, Iterable<Input> iterable) {
        if (iterable == null) {
            throw new WayangException("Nothing to process with Python API");
        }
        this.socket = socket;
        this.serializedUDF = byteString;
        this.input = iterable;
    }

    public void send() {
        try {
            DataOutputStream dataOutputStream = new DataOutputStream(new BufferedOutputStream(this.socket.getOutputStream(), 65536));
            writeUDF(this.serializedUDF, dataOutputStream);
            writeIteratorToStream(this.input.iterator(), dataOutputStream);
            dataOutputStream.writeInt(this.END_OF_DATA_SECTION);
            dataOutputStream.flush();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void writeUDF(ByteString byteString, DataOutputStream dataOutputStream) {
        writeBytes(byteString.toByteArray(), dataOutputStream);
    }

    public void writeIteratorToStream(Iterator<Input> it, DataOutputStream dataOutputStream) {
        while (it.hasNext()) {
            write(it.next(), dataOutputStream);
        }
    }

    public void write(Object obj, DataOutputStream dataOutputStream) {
        try {
            if (obj == null) {
                dataOutputStream.writeInt(this.NULL);
            } else if ((obj instanceof Byte[]) || (obj instanceof byte[])) {
                writeBytes(obj, dataOutputStream);
            } else if (obj instanceof String) {
                writeUTF((String) obj, dataOutputStream);
            } else if (obj instanceof Object) {
                writeUTF(String.valueOf(obj), dataOutputStream);
            } else {
                if (!(obj instanceof Map.Entry)) {
                    throw new WayangException("Unexpected element type " + obj.getClass());
                }
                writeKeyValue((Map.Entry) obj, dataOutputStream);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void writeBytes(Object obj, DataOutputStream dataOutputStream) {
        try {
            if (obj instanceof Byte[]) {
                int length = ((Byte[]) obj).length;
                byte[] bArr = new byte[length];
                int i = 0;
                for (Byte b : (Byte[]) obj) {
                    int i2 = i;
                    i++;
                    bArr[i2] = b.byteValue();
                }
                dataOutputStream.writeInt(length);
                dataOutputStream.write(bArr);
            } else if (obj instanceof byte[]) {
                dataOutputStream.writeInt(((byte[]) obj).length);
                dataOutputStream.write((byte[]) obj);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void writeUTF(String str, DataOutputStream dataOutputStream) {
        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
        try {
            dataOutputStream.writeInt(bytes.length);
            dataOutputStream.write(bytes);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void writeKeyValue(Map.Entry entry, DataOutputStream dataOutputStream) {
        write(entry.getKey(), dataOutputStream);
        write(entry.getValue(), dataOutputStream);
    }
}
