package my.artfultom.zallak;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import my.artfultom.zallak.config.Timeout;
import my.artfultom.zallak.dto.Entry;
import my.artfultom.zallak.dto.ResultList;
import my.artfultom.zallak.dto.SortedTuple;
import my.artfultom.zallak.node.InitNode;
import my.artfultom.zallak.node.MapNode;
import my.artfultom.zallak.node.ReduceNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:my/artfultom/zallak/NodeStarter.class */
public class NodeStarter {
    static final Logger LOGGER = LoggerFactory.getLogger(NodeStarter.class);
    private int poolSize;
    private InitNode<?, ?> initNode;
    private Timeout timeout = Timeout.of(60, TimeUnit.SECONDS);
    private final Map<String, MapNode> mapNodes = new ConcurrentHashMap();
    private final Map<String, ReduceNode> reduceNodes = new ConcurrentHashMap();

    public NodeStarter(int i) {
        this.poolSize = i;
    }

    public void add(InitNode initNode) {
        this.initNode = initNode;
    }

    public void add(MapNode mapNode) {
        if (this.mapNodes.containsKey(mapNode.getName())) {
            LOGGER.error("Node '" + mapNode.getName() + "' is already exist.");
        } else {
            this.mapNodes.put(mapNode.getName(), mapNode);
        }
    }

    public void add(ReduceNode reduceNode) {
        if (this.reduceNodes.containsKey(reduceNode.getName())) {
            LOGGER.error("Node '" + reduceNode.getName() + "' is already exist.");
        } else {
            this.reduceNodes.put(reduceNode.getName(), reduceNode);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v94, types: [java.util.Map] */
    public void start() {
        HashMap hashMap;
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(this.poolSize, this.poolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        ConcurrentLinkedQueue<Entry> concurrentLinkedQueue2 = new ConcurrentLinkedQueue();
        if (this.initNode == null) {
            LOGGER.error("Cannot find init node.");
            return;
        }
        try {
            concurrentLinkedQueue.addAll((ResultList) threadPoolExecutor.submit(() -> {
                return this.initNode.execute();
            }).get());
            while (true) {
                if (threadPoolExecutor.getActiveCount() <= 0 && threadPoolExecutor.getQueue().size() <= 0 && concurrentLinkedQueue.size() <= 0) {
                    break;
                }
                Entry entry = (Entry) concurrentLinkedQueue.poll();
                if (entry != null) {
                    MapNode mapNode = this.mapNodes.get(entry.getNodeName());
                    if (mapNode == null) {
                        LOGGER.error("Cannot find map node by name: " + entry.getNodeName());
                    } else {
                        CompletableFuture.supplyAsync(() -> {
                            return mapNode.execute(entry.getData());
                        }, threadPoolExecutor).whenComplete((resultList, th) -> {
                            if (th != null) {
                                LOGGER.error("Error occurred during execution of MapNode", th);
                            }
                        }).thenAccept(resultList2 -> {
                            Iterator<Entry<K, V>> it = resultList2.iterator();
                            while (it.hasNext()) {
                                Entry entry2 = (Entry) it.next();
                                boolean z = true;
                                if (this.mapNodes.containsKey(entry2.getNodeName())) {
                                    concurrentLinkedQueue.add(entry2);
                                    z = false;
                                }
                                if (this.reduceNodes.containsKey(entry2.getNodeName())) {
                                    concurrentLinkedQueue2.add(entry2);
                                    z = false;
                                }
                                if (z) {
                                    LOGGER.error("Unknown node: " + entry2.getNodeName());
                                }
                            }
                        });
                    }
                }
            }
            HashMap hashMap2 = new HashMap();
            for (Entry entry2 : concurrentLinkedQueue2) {
                for (SortedTuple sortedTuple : entry2.getData().keySet()) {
                    if (hashMap2.containsKey(entry2.getNodeName())) {
                        hashMap = (Map) hashMap2.get(entry2.getNodeName());
                    } else {
                        hashMap = new HashMap();
                        hashMap2.put(entry2.getNodeName(), hashMap);
                    }
                    hashMap.computeIfPresent(sortedTuple, (sortedTuple2, list) -> {
                        list.add(entry2.getData().get(sortedTuple));
                        return list;
                    });
                    hashMap.computeIfAbsent(sortedTuple, sortedTuple3 -> {
                        ArrayList arrayList = new ArrayList();
                        arrayList.add(entry2.getData().get(sortedTuple));
                        return arrayList;
                    });
                }
            }
            for (String str : hashMap2.keySet()) {
                ReduceNode reduceNode = this.reduceNodes.get(str);
                if (reduceNode == null) {
                    LOGGER.error("Cannot find reduce node by name: " + str);
                } else {
                    for (SortedTuple sortedTuple4 : ((Map) hashMap2.get(str)).keySet()) {
                        CompletableFuture.runAsync(() -> {
                            reduceNode.execute(sortedTuple4, (List) ((Map) hashMap2.get(str)).get(sortedTuple4));
                        }, threadPoolExecutor).whenComplete((r4, th2) -> {
                            if (th2 != null) {
                                LOGGER.error("Error occurred during execution of ReduceNode", th2);
                            }
                        });
                    }
                }
            }
            threadPoolExecutor.shutdown();
            threadPoolExecutor.awaitTermination(this.timeout.getTimeout(), this.timeout.getUnit());
        } catch (InterruptedException | ExecutionException e) {
            LOGGER.error(e.getMessage(), e);
        }
    }
}
