package io.ray.streaming.runtime.rpc.async;

import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ray/streaming/runtime/rpc/async/RemoteCallPool.class */
public class RemoteCallPool implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteCallPool.class);
    private static final int WAIT_TIME_MS = 5;
    private static final long WARNING_PERIOD = 10000;
    private final List<RemoteCallBundle> pendingObjectBundles = new LinkedList();
    private Map<RemoteCallBundle, Callback<Object>> singletonHandlerMap = new ConcurrentHashMap();
    private Map<RemoteCallBundle, Callback<List<Object>>> bundleHandlerMap = new ConcurrentHashMap();
    private Map<RemoteCallBundle, ExceptionHandler<Throwable>> bundleExceptionHandlerMap = new ConcurrentHashMap();
    private ThreadPoolExecutor callBackPool = new ThreadPoolExecutor(2, Runtime.getRuntime().availableProcessors(), 1, TimeUnit.MINUTES, new LinkedBlockingQueue(), new CallbackThreadFactory());
    private volatile boolean stop = false;

    @FunctionalInterface
    /* loaded from: input_file:io/ray/streaming/runtime/rpc/async/RemoteCallPool$Callback.class */
    public interface Callback<T> {
        void handle(T t) throws Throwable;
    }

    /* loaded from: input_file:io/ray/streaming/runtime/rpc/async/RemoteCallPool$CallbackThreadFactory.class */
    static class CallbackThreadFactory implements ThreadFactory {
        private AtomicInteger cnt = new AtomicInteger(0);

        CallbackThreadFactory() {
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setUncaughtExceptionHandler((thread2, th) -> {
                RemoteCallPool.LOG.error("Callback err.", th);
            });
            thread.setName("callback-thread-" + this.cnt.getAndIncrement());
            return thread;
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:io/ray/streaming/runtime/rpc/async/RemoteCallPool$ExceptionHandler.class */
    public interface ExceptionHandler<T> {
        void handle(T t);
    }

    /* loaded from: input_file:io/ray/streaming/runtime/rpc/async/RemoteCallPool$RemoteCallBundle.class */
    private static class RemoteCallBundle {
        List<ObjectRef<Object>> objects;
        boolean isSingletonBundle;
        long lastWarnTs = System.currentTimeMillis();
        long createTime = System.currentTimeMillis();

        RemoteCallBundle(List<ObjectRef<Object>> list, boolean z) {
            this.objects = list;
            this.isSingletonBundle = z;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("[");
            this.objects.forEach(objectRef -> {
                sb.append(objectRef.toString()).append(",");
            });
            sb.append("]");
            return sb.toString();
        }
    }

    public RemoteCallPool() {
        Thread thread = new Thread(Ray.wrapRunnable(this), "remote-pool-loop");
        thread.setUncaughtExceptionHandler((thread2, th) -> {
            LOG.error("Error in remote call pool thread.", th);
        });
        thread.start();
    }

    public <T> void bindCallback(ObjectRef<T> objectRef, Callback<T> callback, ExceptionHandler<Throwable> exceptionHandler) {
        RemoteCallBundle remoteCallBundle = new RemoteCallBundle(Collections.singletonList(objectRef), true);
        this.singletonHandlerMap.put(remoteCallBundle, callback);
        this.bundleExceptionHandlerMap.put(remoteCallBundle, exceptionHandler);
        synchronized (this.pendingObjectBundles) {
            this.pendingObjectBundles.add(remoteCallBundle);
        }
    }

    public void bindCallback(List<ObjectRef<Object>> list, Callback<List<Object>> callback, ExceptionHandler<Throwable> exceptionHandler) {
        RemoteCallBundle remoteCallBundle = new RemoteCallBundle(list, false);
        this.bundleHandlerMap.put(remoteCallBundle, callback);
        this.bundleExceptionHandlerMap.put(remoteCallBundle, exceptionHandler);
        synchronized (this.pendingObjectBundles) {
            this.pendingObjectBundles.add(remoteCallBundle);
        }
    }

    public void stop() {
        this.stop = true;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.stop) {
            try {
                if (this.pendingObjectBundles.isEmpty()) {
                    Thread.sleep(5L);
                } else {
                    synchronized (this.pendingObjectBundles) {
                        Iterator<RemoteCallBundle> it = this.pendingObjectBundles.iterator();
                        while (it.hasNext()) {
                            RemoteCallBundle next = it.next();
                            List ready = Ray.wait(next.objects, next.objects.size(), 5).getReady();
                            if (ready.size() != next.objects.size()) {
                                long currentTimeMillis = System.currentTimeMillis();
                                long j = currentTimeMillis - next.createTime;
                                if (j > WARNING_PERIOD && currentTimeMillis - next.lastWarnTs > WARNING_PERIOD) {
                                    next.lastWarnTs = currentTimeMillis;
                                    LOG.warn("Bundle has being waiting for {} ms, bundle = {}.", Long.valueOf(j), next);
                                }
                            } else {
                                ExceptionHandler<Throwable> exceptionHandler = this.bundleExceptionHandlerMap.get(next);
                                if (next.isSingletonBundle) {
                                    this.callBackPool.execute(Ray.wrapRunnable(() -> {
                                        try {
                                            this.singletonHandlerMap.get(next).handle(((ObjectRef) ready.get(0)).get());
                                            this.singletonHandlerMap.remove(next);
                                        } catch (Throwable th) {
                                            LOG.error("Error when get object, objectId = {}.", ((ObjectRef) ready.get(0)).toString(), th);
                                            if (exceptionHandler != null) {
                                                exceptionHandler.handle(th);
                                            }
                                        }
                                    }));
                                } else {
                                    List list = (List) ready.stream().map((v0) -> {
                                        return v0.get();
                                    }).collect(Collectors.toList());
                                    List list2 = (List) ready.stream().map((v0) -> {
                                        return v0.toString();
                                    }).collect(Collectors.toList());
                                    this.callBackPool.execute(Ray.wrapRunnable(() -> {
                                        try {
                                            this.bundleHandlerMap.get(next).handle(list);
                                            this.bundleHandlerMap.remove(next);
                                        } catch (Throwable th) {
                                            LOG.error("Error when get object, objectIds = {}.", list2, th);
                                            if (exceptionHandler != null) {
                                                exceptionHandler.handle(th);
                                            }
                                        }
                                    }));
                                }
                                it.remove();
                            }
                        }
                    }
                }
            } catch (Exception e) {
                LOG.error("Exception in wait loop.", e);
            }
        }
        LOG.info("Wait loop finished.");
    }
}
