package dragon.network;

import dragon.Config;
import dragon.LocalCluster;
import dragon.network.comms.DragonCommsException;
import dragon.network.comms.IComms;
import dragon.topology.DragonTopology;
import dragon.topology.base.DragonEmitRuntimeException;
import dragon.tuple.NetworkTask;
import dragon.utils.NetworkTaskBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:dragon/network/Router.class */
public class Router {
    private static final Logger log = LogManager.getLogger(Router.class);
    private final IComms comms;
    private final HashMap<String, LocalCluster> localClusters;
    private final TopologyQueueMap inputQueues;
    private final TopologyQueueMap outputQueues;
    private final Config conf;
    private boolean shouldTerminate = false;
    private final ArrayList<Thread> outgoingThreads = new ArrayList<>();
    private final ArrayList<Thread> incomingThreads = new ArrayList<>();
    private final NodeContext context = Node.inst().getNodeProcessor().getAliveContext();
    private final LinkedBlockingQueue<NetworkTaskBuffer> outputsPending = new LinkedBlockingQueue<>();

    public Router(Config config, IComms iComms, HashMap<String, LocalCluster> hashMap) {
        this.comms = iComms;
        this.localClusters = hashMap;
        this.conf = config;
        this.inputQueues = new TopologyQueueMap(Integer.valueOf(config.getDragonRouterInputBufferSize()).intValue());
        this.outputQueues = new TopologyQueueMap(Integer.valueOf(config.getDragonRouterOutputBufferSize()).intValue());
        runExecutors();
    }

    public void terminate() {
        Iterator<Thread> it = this.outgoingThreads.iterator();
        while (it.hasNext()) {
            it.next().interrupt();
        }
        Iterator<Thread> it2 = this.incomingThreads.iterator();
        while (it2.hasNext()) {
            it2.next().interrupt();
        }
        if (this.outputsPending.size() > 0) {
            log.error("there are still outputs pending");
        }
        if (this.inputQueues.isEmpty() && this.outputQueues.isEmpty()) {
            return;
        }
        log.error("some io queues are not empty");
    }

    private void runExecutors() {
        for (int i = 0; i < Integer.valueOf(this.conf.getDragonRouterOutputThreads()).intValue(); i++) {
            this.outgoingThreads.add(new Thread() { // from class: dragon.network.Router.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    Router.log.info("starting up");
                    while (!Router.this.shouldTerminate) {
                        try {
                            NetworkTaskBuffer take = Router.this.outputsPending.take();
                            HashMap hashMap = new HashMap();
                            take.bufferLock.lock();
                            NetworkTask poll = take.poll();
                            if (poll != null) {
                                try {
                                    try {
                                        HashSet<Integer> taskIds = poll.getTaskIds();
                                        HashMap<Integer, NodeDescriptor> hashMap2 = Router.this.localClusters.get(poll.getTopologyId()).getTopology().getEmbedding().get(poll.getComponentId());
                                        hashMap.clear();
                                        Iterator<Integer> it = taskIds.iterator();
                                        while (it.hasNext()) {
                                            Integer next = it.next();
                                            NodeDescriptor nodeDescriptor = hashMap2.get(next);
                                            if (!hashMap.containsKey(nodeDescriptor)) {
                                                hashMap.put(nodeDescriptor, new HashSet());
                                            }
                                            ((HashSet) hashMap.get(nodeDescriptor)).add(next);
                                        }
                                        for (NodeDescriptor nodeDescriptor2 : hashMap.keySet()) {
                                            poll.init(poll.getTuples(), (HashSet) hashMap.get(nodeDescriptor2), poll.getComponentId(), poll.getTopologyId());
                                            try {
                                                if (Router.this.context.containsKey(nodeDescriptor2.toString())) {
                                                    Router.this.comms.sendNetworkTask(nodeDescriptor2, poll);
                                                } else {
                                                    Router.log.error("dopping network task since receiver [" + nodeDescriptor2 + "] is no longer alive");
                                                }
                                            } catch (DragonCommsException e) {
                                                Router.log.error("failed to send network task to [" + nodeDescriptor2 + "]");
                                                Node.inst().nodeFault(nodeDescriptor2);
                                            }
                                        }
                                    } catch (Throwable th) {
                                        take.bufferLock.unlock();
                                        throw th;
                                        break;
                                    }
                                } catch (NullPointerException e2) {
                                    Router.log.error("topology [" + poll.getTopologyId() + "] no longer exists, dropping network task");
                                    take.bufferLock.unlock();
                                }
                            }
                            take.bufferLock.unlock();
                        } catch (InterruptedException e3) {
                            Router.log.info("interrupted while taking from queue");
                        }
                    }
                    Router.log.info("shutting down");
                }
            });
            this.outgoingThreads.get(i).setName("router out " + i);
            this.outgoingThreads.get(i).start();
        }
        for (int i2 = 0; i2 < Integer.valueOf(this.conf.getDragonRouterInputThreads()).intValue(); i2++) {
            this.incomingThreads.add(new Thread() { // from class: dragon.network.Router.2
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    Router.log.info("starting up");
                    while (!Router.this.shouldTerminate) {
                        try {
                            NetworkTask receiveNetworkTask = Router.this.comms.receiveNetworkTask();
                            try {
                                NetworkTaskBuffer buffer = Router.this.inputQueues.getBuffer(receiveNetworkTask);
                                String topologyId = receiveNetworkTask.getTopologyId();
                                buffer.put(receiveNetworkTask);
                                if (buffer.size() == 1) {
                                    Router.this.localClusters.get(topologyId).outputPending(buffer);
                                }
                            } catch (InterruptedException e) {
                                Router.log.info("interrupted");
                            } catch (NullPointerException e2) {
                                Router.log.error("received a network task for a non-existant topology [" + receiveNetworkTask.getTopologyId() + "]");
                            }
                        } catch (InterruptedException e3) {
                            Router.log.info("interrupted");
                        }
                    }
                    Router.log.info("shutting down");
                }
            });
            this.incomingThreads.get(i2).setName("router in " + i2);
            this.incomingThreads.get(i2).start();
        }
    }

    public void submitTopology(String str, DragonTopology dragonTopology) {
        for (NodeDescriptor nodeDescriptor : dragonTopology.getReverseEmbedding().keySet()) {
            if (nodeDescriptor.equals(this.comms.getMyNodeDesc())) {
                for (String str2 : dragonTopology.getReverseEmbedding().get(nodeDescriptor).keySet()) {
                    if (dragonTopology.getBoltMap().containsKey(str2)) {
                        Iterator<String> it = dragonTopology.getBoltMap().get(str2).groupings.keySet().iterator();
                        while (it.hasNext()) {
                            for (String str3 : dragonTopology.getBoltMap().get(str2).groupings.get(it.next()).keySet()) {
                                log.debug("preparing input queue [" + str + "," + str2 + "," + str3 + "]");
                                this.inputQueues.prepare(str, str2, str3);
                            }
                        }
                    }
                }
            } else {
                for (String str4 : dragonTopology.getReverseEmbedding().get(nodeDescriptor).keySet()) {
                    if (dragonTopology.getBoltMap().containsKey(str4)) {
                        Iterator<String> it2 = dragonTopology.getBoltMap().get(str4).groupings.keySet().iterator();
                        while (it2.hasNext()) {
                            for (String str5 : dragonTopology.getBoltMap().get(str4).groupings.get(it2.next()).keySet()) {
                                log.debug("preparing output queue [" + str + "," + str4 + "," + str5 + "]");
                                this.outputQueues.prepare(str, str4, str5);
                            }
                        }
                    }
                }
            }
        }
    }

    public void terminateTopology(String str, DragonTopology dragonTopology) {
        for (NodeDescriptor nodeDescriptor : dragonTopology.getReverseEmbedding().keySet()) {
            if (nodeDescriptor.equals(this.comms.getMyNodeDesc())) {
                for (String str2 : dragonTopology.getReverseEmbedding().get(nodeDescriptor).keySet()) {
                    if (dragonTopology.getBoltMap().containsKey(str2)) {
                        Iterator<String> it = dragonTopology.getBoltMap().get(str2).groupings.keySet().iterator();
                        while (it.hasNext()) {
                            for (String str3 : dragonTopology.getBoltMap().get(str2).groupings.get(it.next()).keySet()) {
                                log.debug("dropping input queue [" + str + "," + str2 + "," + str3 + "]");
                                this.inputQueues.drop(str, str2, str3);
                            }
                        }
                    }
                }
            } else {
                for (String str4 : dragonTopology.getReverseEmbedding().get(nodeDescriptor).keySet()) {
                    if (dragonTopology.getBoltMap().containsKey(str4)) {
                        Iterator<String> it2 = dragonTopology.getBoltMap().get(str4).groupings.keySet().iterator();
                        while (it2.hasNext()) {
                            for (String str5 : dragonTopology.getBoltMap().get(str4).groupings.get(it2.next()).keySet()) {
                                log.debug("dropping output queue [" + str + "," + str4 + "," + str5 + "]");
                                this.outputQueues.drop(str, str4, str5);
                            }
                        }
                    }
                }
            }
        }
    }

    public void put(NetworkTask networkTask) throws InterruptedException {
        try {
            NetworkTaskBuffer buffer = this.outputQueues.getBuffer(networkTask);
            buffer.put(networkTask);
            this.outputsPending.put(buffer);
        } catch (NullPointerException e) {
            throw new DragonEmitRuntimeException("could not put task on outgoing router queue: " + e.getMessage());
        }
    }
}
