package io.ray.streaming.runtime.python;

import com.google.common.base.Preconditions;
import com.google.common.primitives.Primitives;
import io.ray.streaming.api.context.StreamingContext;
import io.ray.streaming.api.stream.DataStream;
import io.ray.streaming.api.stream.Stream;
import io.ray.streaming.python.PythonFunction;
import io.ray.streaming.python.PythonPartition;
import io.ray.streaming.python.stream.PythonDataStream;
import io.ray.streaming.python.stream.PythonStreamSource;
import io.ray.streaming.runtime.serialization.MsgPackSerializer;
import io.ray.streaming.runtime.util.ReflectionUtils;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ray/streaming/runtime/python/PythonGateway.class */
public class PythonGateway {
    private static final String REFERENCE_ID_PREFIX = "__gateway_reference_id__";
    private Map<String, Object> referenceMap = new HashMap();
    private StreamingContext streamingContext;
    private static final Logger LOG = LoggerFactory.getLogger(PythonGateway.class);
    private static MsgPackSerializer serializer = new MsgPackSerializer();

    public PythonGateway() {
        LOG.info("PythonGateway created");
    }

    public byte[] createStreamingContext() {
        this.streamingContext = StreamingContext.buildContext();
        LOG.info("StreamingContext created");
        this.referenceMap.put(getReferenceId(this.streamingContext), this.streamingContext);
        return serializer.serialize(getReferenceId(this.streamingContext));
    }

    public StreamingContext getStreamingContext() {
        return this.streamingContext;
    }

    public byte[] withConfig(byte[] bArr) {
        Preconditions.checkNotNull(this.streamingContext);
        try {
            Map map = (Map) serializer.deserialize(bArr);
            LOG.info("Set config {}", map);
            this.streamingContext.withConfig(map);
            return new byte[1];
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public byte[] createPythonStreamSource(byte[] bArr) {
        Preconditions.checkNotNull(this.streamingContext);
        try {
            PythonStreamSource from = PythonStreamSource.from(this.streamingContext, new PythonFunction(bArr));
            this.referenceMap.put(getReferenceId(from), from);
            return serializer.serialize(getReferenceId(from));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public byte[] execute(byte[] bArr) {
        LOG.info("Starting executing");
        this.streamingContext.execute((String) serializer.deserialize(bArr));
        return new byte[1];
    }

    public byte[] createPyFunc(byte[] bArr) {
        PythonFunction pythonFunction = new PythonFunction(bArr);
        this.referenceMap.put(getReferenceId(pythonFunction), pythonFunction);
        return serializer.serialize(getReferenceId(pythonFunction));
    }

    public byte[] createPyPartition(byte[] bArr) {
        PythonPartition pythonPartition = new PythonPartition(bArr);
        this.referenceMap.put(getReferenceId(pythonPartition), pythonPartition);
        return serializer.serialize(getReferenceId(pythonPartition));
    }

    public byte[] union(byte[] bArr) {
        DataStream union;
        List<Object> processParameters = processParameters((List) serializer.deserialize(bArr));
        LOG.info("Call union with streams {}", processParameters);
        Preconditions.checkArgument(processParameters.size() >= 2, "Union needs at least two streams");
        DataStream dataStream = (Stream) processParameters.get(0);
        List<Object> subList = processParameters.subList(1, processParameters.size());
        if (dataStream instanceof DataStream) {
            union = dataStream.union(subList);
        } else {
            Preconditions.checkArgument(dataStream instanceof PythonDataStream);
            union = ((PythonDataStream) dataStream).union(subList);
        }
        return serialize(union);
    }

    public byte[] callFunction(byte[] bArr) {
        try {
            List<Object> processParameters = processParameters((List) serializer.deserialize(bArr));
            LOG.info("callFunction params {}", processParameters);
            String str = (String) processParameters.get(0);
            return serialize(findMethod(Class.forName(str, true, getClass().getClassLoader()), (String) processParameters.get(1), (Class[]) processParameters.subList(2, processParameters.size()).stream().map((v0) -> {
                return v0.getClass();
            }).toArray(i -> {
                return new Class[i];
            })).invoke(null, processParameters.subList(2, processParameters.size()).toArray()));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public byte[] callMethod(byte[] bArr) {
        try {
            List<Object> processParameters = processParameters((List) serializer.deserialize(bArr));
            LOG.info("callMethod params {}", processParameters);
            Object obj = processParameters.get(0);
            return serialize(findMethod(obj.getClass(), (String) processParameters.get(1), (Class[]) processParameters.subList(2, processParameters.size()).stream().map((v0) -> {
                return v0.getClass();
            }).toArray(i -> {
                return new Class[i];
            })).invoke(obj, processParameters.subList(2, processParameters.size()).toArray()));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static Method findMethod(Class<?> cls, String str, Class[] clsArr) {
        List<Method> findMethods = ReflectionUtils.findMethods(cls, str);
        if (findMethods.size() == 1) {
            return findMethods.get(0);
        }
        Class[] clsArr2 = (Class[]) Arrays.stream(clsArr).map(Primitives::unwrap).toArray(i -> {
            return new Class[i];
        });
        Optional<Method> findAny = findMethods.stream().filter(method -> {
            if (Arrays.equals(method.getParameterTypes(), clsArr) || Arrays.equals(method.getParameterTypes(), clsArr2)) {
                return true;
            }
            if (clsArr.length != method.getParameterTypes().length) {
                return false;
            }
            for (int i2 = 0; i2 < method.getParameterTypes().length; i2++) {
                if (!method.getParameterTypes()[i2].isAssignableFrom(clsArr[i2])) {
                    return false;
                }
            }
            return true;
        }).findAny();
        Preconditions.checkArgument(findAny.isPresent(), String.format("Method %s with type %s doesn't exist on class %s", str, Arrays.toString(clsArr), cls));
        return findAny.get();
    }

    private byte[] serialize(Object obj) {
        if (!returnReference(obj)) {
            return serializer.serialize(obj);
        }
        this.referenceMap.put(getReferenceId(obj), obj);
        return serializer.serialize(getReferenceId(obj));
    }

    private static boolean returnReference(Object obj) {
        if (isBasic(obj)) {
            return false;
        }
        try {
            serializer.serialize(obj);
            return false;
        } catch (Exception e) {
            return true;
        }
    }

    private static boolean isBasic(Object obj) {
        return obj == null || (obj instanceof Boolean) || (obj instanceof Number) || (obj instanceof String) || (obj instanceof byte[]);
    }

    public byte[] newInstance(byte[] bArr) {
        String str = (String) serializer.deserialize(bArr);
        try {
            Object newInstance = Class.forName(str, true, Thread.currentThread().getContextClassLoader()).newInstance();
            this.referenceMap.put(getReferenceId(newInstance), newInstance);
            return serializer.serialize(getReferenceId(newInstance));
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
            throw new IllegalArgumentException(String.format("Create instance for class %s failed", str), e);
        }
    }

    private List<Object> processParameters(List<Object> list) {
        return (List) list.stream().map(this::processParameter).collect(Collectors.toList());
    }

    private Object processParameter(Object obj) {
        Object obj2;
        return (!(obj instanceof String) || (obj2 = this.referenceMap.get(obj)) == null) ? ((obj instanceof Byte) || (obj instanceof Short)) ? Integer.valueOf(((Number) obj).intValue()) : obj : obj2;
    }

    private String getReferenceId(Object obj) {
        return REFERENCE_ID_PREFIX + System.identityHashCode(obj);
    }
}
