package dragon;

import dragon.grouping.AbstractGrouping;
import dragon.network.DragonTopologyException;
import dragon.network.Node;
import dragon.network.operations.GroupOp;
import dragon.network.operations.TermTopoGroupOp;
import dragon.spout.SpoutOutputCollector;
import dragon.task.InputCollector;
import dragon.task.OutputCollector;
import dragon.task.TopologyContext;
import dragon.topology.BoltDeclarer;
import dragon.topology.DestComponentMap;
import dragon.topology.DragonTopology;
import dragon.topology.OutputFieldsDeclarer;
import dragon.topology.SpoutDeclarer;
import dragon.topology.StreamMap;
import dragon.topology.base.Bolt;
import dragon.topology.base.Component;
import dragon.topology.base.Spout;
import dragon.tuple.Fields;
import dragon.tuple.NetworkTask;
import dragon.tuple.Tuple;
import dragon.tuple.Values;
import dragon.utils.CircularBlockingQueue;
import dragon.utils.NetworkTaskBuffer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:dragon/LocalCluster.class */
public class LocalCluster {
    private static final Logger log = LogManager.getLogger(LocalCluster.class);
    private final Node node;
    private HashMap<String, HashMap<Integer, Bolt>> bolts;
    private HashMap<String, HashMap<Integer, Spout>> spouts;
    private CircularBlockingQueue<NetworkTaskBuffer>[] outputsPending;
    private HashMap<String, Config> spoutConfs;
    private HashMap<String, Config> boltConfs;
    private ArrayList<Thread> componentExecutorThreads;
    private ArrayList<Thread> networkExecutorThreads;
    private HashMap<Component, ArrayList<ComponentError>> componentErrors;
    private volatile State state;
    private final ReentrantLock haltLock;
    private final Condition restartCondition;
    private HashMap<Class, HashSet<GroupOp>> groupOperations;
    private String topologyName;
    private Config conf;
    private DragonTopology dragonTopology;
    private Thread tickThread;
    private Thread tickCounterThread;
    private long tickTime;
    private long tickCounterTime;
    private HashMap<String, Integer> boltTickCount;
    private final Fields tickFields;
    private int totalParallelismHint;
    private ArrayList<BoltPrepare> boltPrepareList;
    private ArrayList<SpoutOpen> spoutOpenList;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dragon/LocalCluster$BoltPrepare.class */
    public class BoltPrepare {
        public Bolt bolt;
        public TopologyContext context;
        public OutputCollector collector;

        public BoltPrepare(Bolt bolt, TopologyContext topologyContext, OutputCollector outputCollector) {
            this.bolt = bolt;
            this.context = topologyContext;
            this.collector = outputCollector;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dragon/LocalCluster$SpoutOpen.class */
    public class SpoutOpen {
        public Spout spout;
        public TopologyContext context;
        public SpoutOutputCollector collector;

        public SpoutOpen(Spout spout, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            this.spout = spout;
            this.context = topologyContext;
            this.collector = spoutOutputCollector;
        }
    }

    /* loaded from: input_file:dragon/LocalCluster$State.class */
    public enum State {
        ALLOCATED,
        SUBMITTED,
        PREPROCESSING,
        RUNNING,
        TERMINATING,
        HALTED,
        FAULT
    }

    public LocalCluster() {
        this.haltLock = new ReentrantLock();
        this.restartCondition = this.haltLock.newCondition();
        this.tickTime = 0L;
        this.tickCounterTime = 0L;
        this.tickFields = new Fields("tick");
        this.totalParallelismHint = 0;
        this.node = null;
        this.state = State.ALLOCATED;
        this.groupOperations = new HashMap<>();
        this.componentErrors = new HashMap<>();
    }

    public LocalCluster(Node node) {
        this.haltLock = new ReentrantLock();
        this.restartCondition = this.haltLock.newCondition();
        this.tickTime = 0L;
        this.tickCounterTime = 0L;
        this.tickFields = new Fields("tick");
        this.totalParallelismHint = 0;
        this.node = node;
        this.state = State.ALLOCATED;
        this.groupOperations = new HashMap<>();
        this.componentErrors = new HashMap<>();
    }

    public void submitTopology(String str, Config config, DragonTopology dragonTopology) {
        Config config2 = null;
        try {
            config2 = new Config(Constants.DRAGON_PROPERTIES, true);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(-1);
        }
        config2.putAll(config);
        try {
            submitTopology(str, config2, dragonTopology, true);
        } catch (DragonRequiresClonableException | DragonTopologyException e2) {
            e2.printStackTrace();
            System.exit(-1);
        }
    }

    public void submitTopology(String str, Config config, DragonTopology dragonTopology, boolean z) throws DragonRequiresClonableException, DragonTopologyException {
        this.topologyName = str;
        this.conf = config;
        this.dragonTopology = dragonTopology;
        for (String str2 : dragonTopology.getSpoutMap().keySet()) {
            log.debug("preparing groupings for spout[" + str2 + "]");
            DestComponentMap destComponentMap = dragonTopology.getDestComponentMap(str2);
            if (destComponentMap == null) {
                throw new DragonTopologyException("spout [" + str2 + "] has no components listening to it");
            }
            for (String str3 : destComponentMap.keySet()) {
                StreamMap streamMap = dragonTopology.getStreamMap(str2, str3);
                BoltDeclarer boltDeclarer = dragonTopology.getBoltMap().get(str3);
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < boltDeclarer.getNumTasks(); i++) {
                    arrayList.add(Integer.valueOf(i));
                }
                Iterator<String> it = streamMap.keySet().iterator();
                while (it.hasNext()) {
                    Iterator<AbstractGrouping> it2 = dragonTopology.getGroupingsSet(str2, str3, it.next()).iterator();
                    while (it2.hasNext()) {
                        it2.next().prepare(null, null, arrayList);
                    }
                }
            }
        }
        for (String str4 : dragonTopology.getBoltMap().keySet()) {
            log.debug("preparing groupings for bolt[" + str4 + "]");
            DestComponentMap destComponentMap2 = dragonTopology.getDestComponentMap(str4);
            if (destComponentMap2 == null) {
                log.debug(str4 + " is a sink");
            } else {
                log.debug(destComponentMap2);
                for (String str5 : destComponentMap2.keySet()) {
                    StreamMap streamMap2 = dragonTopology.getStreamMap(str4, str5);
                    BoltDeclarer boltDeclarer2 = dragonTopology.getBoltMap().get(str5);
                    ArrayList arrayList2 = new ArrayList();
                    for (int i2 = 0; i2 < boltDeclarer2.getNumTasks(); i2++) {
                        arrayList2.add(Integer.valueOf(i2));
                    }
                    Iterator<String> it3 = streamMap2.keySet().iterator();
                    while (it3.hasNext()) {
                        Iterator<AbstractGrouping> it4 = dragonTopology.getGroupingsSet(str4, str5, it3.next()).iterator();
                        while (it4.hasNext()) {
                            it4.next().prepare(null, null, arrayList2);
                        }
                    }
                }
            }
        }
        this.networkExecutorThreads = new ArrayList<>();
        if (!z) {
            this.spoutOpenList = new ArrayList<>();
            this.boltPrepareList = new ArrayList<>();
        }
        int i3 = 0;
        int i4 = 0;
        int i5 = 0;
        this.spouts = new HashMap<>();
        this.spoutConfs = new HashMap<>();
        for (String str6 : dragonTopology.getSpoutMap().keySet()) {
            boolean z2 = this.node == null || dragonTopology.getReverseEmbedding() == null || dragonTopology.getReverseEmbedding().contains(this.node.getComms().getMyNodeDesc(), str6);
            HashMap<Integer, Spout> hashMap = null;
            SpoutDeclarer spoutDeclarer = dragonTopology.getSpoutMap().get(str6);
            ArrayList arrayList3 = new ArrayList();
            for (int i6 = 0; i6 < spoutDeclarer.getNumTasks(); i6++) {
                arrayList3.add(Integer.valueOf(i6));
            }
            if (z2) {
                log.debug("allocating spout [" + str6 + "]");
                this.spouts.put(str6, new HashMap<>());
                hashMap = this.spouts.get(str6);
                Map<String, Object> componentConfiguration = spoutDeclarer.getSpout().getComponentConfiguration();
                Config config2 = new Config();
                config2.putAll(componentConfiguration);
                this.spoutConfs.put(str6, config2);
            }
            int i7 = 0;
            for (int i8 = 0; i8 < spoutDeclarer.getNumTasks(); i8++) {
                boolean z3 = this.node == null || dragonTopology.getReverseEmbedding() == null || dragonTopology.getReverseEmbedding().contains(this.node.getComms().getMyNodeDesc(), str6, Integer.valueOf(i8));
                if (z3) {
                    try {
                        i7++;
                        i5++;
                    } catch (CloneNotSupportedException e) {
                        throw new DragonRequiresClonableException("bolts and spouts must be cloneable: " + spoutDeclarer.getSpout().getComponentId());
                    }
                }
                Spout spout = (Spout) spoutDeclarer.getSpout().clone();
                if (z3) {
                    hashMap.put(Integer.valueOf(i8), spout);
                }
                OutputFieldsDeclarer outputFieldsDeclarer = new OutputFieldsDeclarer(this, str6);
                spout.declareOutputFields(outputFieldsDeclarer);
                spout.setOutputFieldsDeclarer(outputFieldsDeclarer);
                TopologyContext topologyContext = new TopologyContext(str6, i8, arrayList3);
                spout.setTopologyContext(topologyContext);
                spout.setLocalCluster(this);
                SpoutOutputCollector spoutOutputCollector = new SpoutOutputCollector(this, spout);
                if (z3) {
                    i3 += spoutOutputCollector.getTotalBufferSpace();
                }
                spout.setOutputCollector(spoutOutputCollector);
                if (z3) {
                    if (z) {
                        try {
                            spout.open(config, topologyContext, spoutOutputCollector);
                        } catch (Throwable th) {
                            th.printStackTrace();
                            throw new DragonTopologyException("spout [" + spout.getInstanceId() + "] throw exception when opening: " + th.getMessage());
                        }
                    } else {
                        openLater(spout, topologyContext, spoutOutputCollector);
                    }
                }
            }
            this.totalParallelismHint += i7;
        }
        this.bolts = new HashMap<>();
        this.boltConfs = new HashMap<>();
        for (String str7 : dragonTopology.getBoltMap().keySet()) {
            boolean z4 = dragonTopology.getReverseEmbedding() == null || dragonTopology.getReverseEmbedding().contains(this.node.getComms().getMyNodeDesc(), str7);
            HashMap<Integer, Bolt> hashMap2 = null;
            BoltDeclarer boltDeclarer3 = dragonTopology.getBoltMap().get(str7);
            ArrayList arrayList4 = new ArrayList();
            for (int i9 = 0; i9 < boltDeclarer3.getNumTasks(); i9++) {
                arrayList4.add(Integer.valueOf(i9));
            }
            if (z4) {
                log.debug("allocating bolt [" + str7 + "]");
                this.bolts.put(str7, new HashMap<>());
                hashMap2 = this.bolts.get(str7);
                Map<String, Object> componentConfiguration2 = boltDeclarer3.getBolt().getComponentConfiguration();
                Config config3 = new Config();
                config3.putAll(componentConfiguration2);
                this.boltConfs.put(str7, config3);
            }
            int i10 = 0;
            for (int i11 = 0; i11 < boltDeclarer3.getNumTasks(); i11++) {
                boolean z5 = dragonTopology.getReverseEmbedding() == null || dragonTopology.getReverseEmbedding().contains(this.node.getComms().getMyNodeDesc(), str7, Integer.valueOf(i11));
                if (z5) {
                    try {
                        i10++;
                    } catch (CloneNotSupportedException e2) {
                        throw new DragonRequiresClonableException("bolts and spouts must be cloneable: " + boltDeclarer3.getBolt().getComponentId());
                    }
                }
                Bolt bolt = (Bolt) boltDeclarer3.getBolt().clone();
                if (z5) {
                    hashMap2.put(Integer.valueOf(i11), bolt);
                }
                OutputFieldsDeclarer outputFieldsDeclarer2 = new OutputFieldsDeclarer(this, str7);
                bolt.declareOutputFields(outputFieldsDeclarer2);
                bolt.setOutputFieldsDeclarer(outputFieldsDeclarer2);
                TopologyContext topologyContext2 = new TopologyContext(str7, i11, arrayList4);
                bolt.setTopologyContext(topologyContext2);
                InputCollector inputCollector = new InputCollector(this, bolt);
                if (z5) {
                    i4 += getConf().getDragonInputBufferSize();
                }
                bolt.setInputCollector(inputCollector);
                bolt.setLocalCluster(this);
                OutputCollector outputCollector = new OutputCollector(this, bolt);
                if (z5) {
                    i3 += outputCollector.getTotalBufferSpace();
                }
                bolt.setOutputCollector(outputCollector);
                if (z5) {
                    if (z) {
                        try {
                            bolt.prepare(config, topologyContext2, outputCollector);
                        } catch (Throwable th2) {
                            th2.printStackTrace();
                            throw new DragonTopologyException("bolt [" + bolt.getInstanceId() + "] threw exception when preparing: " + th2.getMessage());
                        }
                    } else {
                        prepareLater(bolt, topologyContext2, outputCollector);
                    }
                }
            }
            this.totalParallelismHint += i10;
        }
        log.info("total outputs buffer size is " + i3);
        this.outputsPending = new CircularBlockingQueue[config.getDragonLocalclusterThreads()];
        for (int i12 = 0; i12 < config.getDragonLocalclusterThreads(); i12++) {
            this.outputsPending[i12] = new CircularBlockingQueue<>(2 * i3);
        }
        log.info("total inputs buffer size is " + i4);
        this.componentExecutorThreads = new ArrayList<>();
        Iterator<HashMap<Integer, Spout>> it5 = this.spouts.values().iterator();
        while (it5.hasNext()) {
            for (final Spout spout2 : it5.next().values()) {
                Thread thread = new Thread() { // from class: dragon.LocalCluster.1
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        LocalCluster.log.info("starting up");
                        Component component = spout2;
                        while (!isInterrupted() && !spout2.isClosed()) {
                            if (LocalCluster.this.state == State.HALTED) {
                                LocalCluster.log.info("halted");
                                try {
                                    LocalCluster.this.haltLock.lock();
                                    LocalCluster.this.restartCondition.await();
                                    LocalCluster.log.info("resuming");
                                } catch (InterruptedException e3) {
                                    LocalCluster.log.info("interrupted");
                                } finally {
                                    LocalCluster.this.haltLock.unlock();
                                }
                            } else {
                                component.run();
                            }
                        }
                        LocalCluster.log.info("shutting down");
                    }
                };
                thread.setName(str + ":" + spout2.getComponentId() + ":" + spout2.getTaskId());
                this.componentExecutorThreads.add(thread);
            }
        }
        Iterator<HashMap<Integer, Bolt>> it6 = this.bolts.values().iterator();
        while (it6.hasNext()) {
            for (final Bolt bolt2 : it6.next().values()) {
                Thread thread2 = new Thread() { // from class: dragon.LocalCluster.2
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        LocalCluster.log.info("starting up");
                        Component component = bolt2;
                        while (!isInterrupted()) {
                            if (LocalCluster.this.state == State.HALTED) {
                                LocalCluster.log.info("halted");
                                try {
                                    LocalCluster.this.haltLock.lock();
                                    LocalCluster.this.restartCondition.await();
                                    LocalCluster.log.info("resuming");
                                } catch (InterruptedException e3) {
                                    LocalCluster.log.info("interrupted");
                                } finally {
                                    LocalCluster.this.haltLock.unlock();
                                }
                            } else {
                                component.run();
                            }
                        }
                        LocalCluster.log.info("shutting down");
                    }
                };
                thread2.setName(str + ":" + bolt2.getComponentId() + ":" + bolt2.getTaskId());
                this.componentExecutorThreads.add(thread2);
            }
        }
        this.boltTickCount = new HashMap<>();
        for (String str8 : this.bolts.keySet()) {
            if (this.boltConfs.get(str8).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS)) {
                this.boltTickCount.put(str8, (Integer) this.boltConfs.get(str8).get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS));
            }
        }
        this.tickCounterThread = new Thread() { // from class: dragon.LocalCluster.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                setName("tick counter");
                while (LocalCluster.this.state != State.TERMINATING && !isInterrupted()) {
                    if (LocalCluster.this.state == State.HALTED) {
                        LocalCluster.log.info(getName() + " halted");
                        try {
                            LocalCluster.this.haltLock.lock();
                            LocalCluster.this.restartCondition.await();
                            LocalCluster.log.info(getName() + " resuming");
                        } catch (InterruptedException e3) {
                            LocalCluster.log.info(getName() + " interrupted");
                            return;
                        } finally {
                            LocalCluster.this.haltLock.unlock();
                        }
                    }
                    if (LocalCluster.this.tickCounterTime == LocalCluster.this.tickTime) {
                        synchronized (LocalCluster.this.tickCounterThread) {
                            try {
                                wait();
                            } catch (InterruptedException e4) {
                                LocalCluster.log.info("tick counter thread exiting");
                                return;
                            }
                        }
                    }
                    while (LocalCluster.this.tickCounterTime < LocalCluster.this.tickTime) {
                        for (String str9 : LocalCluster.this.boltTickCount.keySet()) {
                            Integer valueOf = Integer.valueOf(LocalCluster.this.boltTickCount.get(str9).intValue() - 1);
                            if (valueOf.intValue() == 0) {
                                LocalCluster.this.issueTickTuple(str9);
                                valueOf = (Integer) LocalCluster.this.boltConfs.get(str9).get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
                            }
                            LocalCluster.this.boltTickCount.put(str9, valueOf);
                        }
                        LocalCluster.this.tickCounterTime++;
                    }
                }
            }
        };
        this.tickCounterThread.setName("tick counter");
        this.tickCounterThread.start();
        this.tickThread = new Thread() { // from class: dragon.LocalCluster.4
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                setName("tick");
                while (LocalCluster.this.state != State.TERMINATING && !isInterrupted()) {
                    if (LocalCluster.this.state == State.HALTED) {
                        LocalCluster.log.info(getName() + " halted");
                        try {
                            LocalCluster.this.haltLock.lock();
                            LocalCluster.this.restartCondition.await();
                            LocalCluster.log.info(getName() + " resuming");
                        } catch (InterruptedException e3) {
                            LocalCluster.log.info(getName() + " interrupted");
                            return;
                        } finally {
                            LocalCluster.this.haltLock.unlock();
                        }
                    }
                    try {
                        Thread.sleep(1000L);
                        LocalCluster.this.tickTime++;
                        synchronized (LocalCluster.this.tickCounterThread) {
                            LocalCluster.this.tickCounterThread.notify();
                        }
                    } catch (InterruptedException e4) {
                        LocalCluster.log.info("terminating tick thread");
                        return;
                    }
                }
            }
        };
        this.tickThread.setName("tick");
        this.tickThread.start();
        outputsScheduler();
        this.state = State.SUBMITTED;
        if (z) {
            runComponentThreads();
            this.state = State.RUNNING;
        }
    }

    private void prepareLater(Bolt bolt, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.boltPrepareList.add(new BoltPrepare(bolt, topologyContext, outputCollector));
    }

    private void openLater(Spout spout, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.spoutOpenList.add(new SpoutOpen(spout, topologyContext, spoutOutputCollector));
    }

    public void openAll() throws DragonTopologyException, DragonInvalidStateException {
        if (this.state != State.SUBMITTED) {
            throw new DragonInvalidStateException("state must be " + State.SUBMITTED.name());
        }
        log.info("opening bolts");
        Iterator<BoltPrepare> it = this.boltPrepareList.iterator();
        while (it.hasNext()) {
            BoltPrepare next = it.next();
            try {
                next.bolt.prepare(this.conf, next.context, next.collector);
            } catch (Throwable th) {
                th.printStackTrace();
                throw new DragonTopologyException("bolt [" + next.bolt.getInstanceId() + "] threw: " + th.toString());
            }
        }
        Iterator<SpoutOpen> it2 = this.spoutOpenList.iterator();
        while (it2.hasNext()) {
            SpoutOpen next2 = it2.next();
            try {
                next2.spout.open(this.conf, next2.context, next2.collector);
            } catch (Throwable th2) {
                th2.printStackTrace();
                throw new DragonTopologyException("spout [" + next2.spout.getInstanceId() + "] threw: " + th2.toString());
            }
        }
        runComponentThreads();
        this.state = State.RUNNING;
    }

    private void issueTickTuple(String str) {
        Tuple tuple = new Tuple();
        tuple.setValues(new Values("0"));
        tuple.setSourceComponent(Constants.SYSTEM_COMPONENT_ID);
        tuple.setSourceStreamId(Constants.SYSTEM_TICK_STREAM_ID);
        for (Bolt bolt : this.bolts.get(str).values()) {
            synchronized (bolt) {
                bolt.setTickTuple(tuple);
            }
        }
    }

    private void checkCloseCondition() {
        log.debug("starting shutdown thread");
        Thread thread = new Thread() { // from class: dragon.LocalCluster.5
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                LocalCluster.log.debug("waiting for spouts to close");
                while (true) {
                    boolean z = true;
                    Iterator<String> it = LocalCluster.this.spouts.keySet().iterator();
                    while (it.hasNext()) {
                        HashMap<Integer, Spout> hashMap = LocalCluster.this.spouts.get(it.next());
                        Iterator<Integer> it2 = hashMap.keySet().iterator();
                        while (true) {
                            if (!it2.hasNext()) {
                                break;
                            }
                            Spout spout = hashMap.get(it2.next());
                            if (!spout.isClosed()) {
                                z = false;
                                break;
                            }
                            LocalCluster.log.debug("not closed yet: " + spout.getInstanceId());
                        }
                        if (!z) {
                            break;
                        }
                    }
                    if (z) {
                        break;
                    }
                    LocalCluster.log.info("waiting for spouts to close...");
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                        LocalCluster.log.warn("interrupted while waiting for spouts to close");
                    }
                }
                LocalCluster.log.debug("emitting terminate tuples");
                Iterator<String> it3 = LocalCluster.this.spouts.keySet().iterator();
                while (it3.hasNext()) {
                    HashMap<Integer, Spout> hashMap2 = LocalCluster.this.spouts.get(it3.next());
                    Iterator<Integer> it4 = hashMap2.keySet().iterator();
                    while (it4.hasNext()) {
                        Spout spout2 = hashMap2.get(it4.next());
                        LocalCluster.log.debug("spout [" + spout2.getInstanceId() + "] emitting terminate tuple");
                        spout2.getOutputCollector().emitTerminateTuple();
                        spout2.getOutputCollector().expireAllTupleBundles();
                    }
                }
                LocalCluster.log.debug("waiting for bolts to close");
                while (true) {
                    boolean z2 = true;
                    Iterator<HashMap<Integer, Bolt>> it5 = LocalCluster.this.bolts.values().iterator();
                    while (it5.hasNext()) {
                        Iterator<Bolt> it6 = it5.next().values().iterator();
                        while (true) {
                            if (!it6.hasNext()) {
                                break;
                            } else if (!it6.next().isClosed()) {
                                z2 = false;
                                break;
                            }
                        }
                        if (!z2) {
                            break;
                        }
                    }
                    if (z2) {
                        break;
                    }
                    LocalCluster.log.info("waiting for work to complete...");
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e2) {
                        LocalCluster.log.warn("interrupted while waiting for work to complete");
                    }
                }
                LocalCluster.this.interruptAll();
                try {
                    Iterator<Thread> it7 = LocalCluster.this.componentExecutorThreads.iterator();
                    while (it7.hasNext()) {
                        it7.next().join();
                    }
                    Iterator<Thread> it8 = LocalCluster.this.networkExecutorThreads.iterator();
                    while (it8.hasNext()) {
                        it8.next().join();
                    }
                } catch (InterruptedException e3) {
                    LocalCluster.log.warn("threads may not have terminated");
                }
                try {
                    LocalCluster.this.tickThread.join();
                    LocalCluster.this.tickCounterThread.join();
                } catch (InterruptedException e4) {
                    LocalCluster.log.warn("interrupted while waiting for tick thread");
                }
                synchronized (LocalCluster.this.groupOperations) {
                    Iterator<GroupOp> it9 = LocalCluster.this.groupOperations.get(TermTopoGroupOp.class).iterator();
                    while (it9.hasNext()) {
                        LocalCluster.this.node.localClusterTerminated((TermTopoGroupOp) it9.next());
                    }
                }
            }
        };
        thread.setName("shutdown");
        thread.start();
    }

    private void runComponentThreads() {
        log.info("starting " + this.componentExecutorThreads.size() + " component executor threads");
        for (int i = 0; i < this.componentExecutorThreads.size(); i++) {
            this.componentExecutorThreads.get(i).start();
        }
    }

    private void outputsScheduler() {
        log.debug("starting the outputs scheduler with " + this.conf.getDragonLocalclusterThreads() + " threads");
        for (int i = 0; i < this.conf.getDragonLocalclusterThreads(); i++) {
            final int i2 = i;
            this.networkExecutorThreads.add(new Thread() { // from class: dragon.LocalCluster.6
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    LocalCluster.log.info("starting up");
                    HashSet hashSet = new HashSet();
                    while (!isInterrupted()) {
                        if (LocalCluster.this.state == State.HALTED) {
                            LocalCluster.log.info("halted");
                            try {
                                LocalCluster.this.haltLock.lock();
                                try {
                                    LocalCluster.this.restartCondition.await();
                                    LocalCluster.log.info("resuming");
                                    LocalCluster.this.haltLock.unlock();
                                } catch (InterruptedException e) {
                                    LocalCluster.log.info("interrupted");
                                    LocalCluster.this.haltLock.unlock();
                                }
                            } catch (Throwable th) {
                                LocalCluster.this.haltLock.unlock();
                                throw th;
                            }
                        } else {
                            try {
                                NetworkTaskBuffer take = LocalCluster.this.outputsPending[i2].take();
                                NetworkTask peek = take.peek();
                                while (true) {
                                    NetworkTask networkTask = peek;
                                    if (networkTask != null) {
                                        Tuple[] tuples = networkTask.getTuples();
                                        String componentId = networkTask.getComponentId();
                                        hashSet.clear();
                                        Iterator<Integer> it = networkTask.getTaskIds().iterator();
                                        while (it.hasNext()) {
                                            Integer next = it.next();
                                            if (LocalCluster.this.bolts.get(componentId).get(next).getInputCollector().getQueue().offer(tuples)) {
                                                hashSet.add(next);
                                            }
                                        }
                                        networkTask.getTaskIds().removeAll(hashSet);
                                        if (!networkTask.getTaskIds().isEmpty()) {
                                            LocalCluster.this.outputPending(take);
                                            break;
                                        } else {
                                            take.poll();
                                            peek = take.peek();
                                        }
                                    }
                                }
                            } catch (InterruptedException e2) {
                                LocalCluster.log.info("interrupted");
                            }
                        }
                    }
                    LocalCluster.log.info("shutting down");
                }
            });
            this.networkExecutorThreads.get(i).setName("netex " + i);
            this.networkExecutorThreads.get(i).start();
        }
    }

    public void outputPending(NetworkTaskBuffer networkTaskBuffer) {
        this.outputsPending[networkTaskBuffer.hashCode() % this.networkExecutorThreads.size()].offer(networkTaskBuffer);
    }

    public String getTopologyId() {
        return this.topologyName;
    }

    public Config getConf() {
        return this.conf;
    }

    public DragonTopology getTopology() {
        return this.dragonTopology;
    }

    public synchronized void setShouldTerminate() throws DragonInvalidStateException {
        if (this.state == State.TERMINATING) {
            return;
        }
        if (this.state == State.FAULT || this.state == State.ALLOCATED || this.state == State.SUBMITTED) {
            throw new DragonInvalidStateException("cannot change to terminate state");
        }
        log.info("terminating [" + getTopologyId() + "]");
        if (this.state == State.HALTED) {
            this.state = State.TERMINATING;
            try {
                this.haltLock.lock();
                this.restartCondition.signalAll();
            } finally {
                this.haltLock.unlock();
            }
        }
        this.state = State.TERMINATING;
        int i = 0;
        Iterator<HashMap<Integer, Spout>> it = this.spouts.values().iterator();
        while (it.hasNext()) {
            for (Spout spout : it.next().values()) {
                spout.setClosed();
                this.componentExecutorThreads.get(i).interrupt();
                try {
                    spout.close();
                } catch (Throwable th) {
                    th.printStackTrace();
                    log.error("throwable thrown by spout [" + spout.getInstanceId() + "] when closing: " + th.getMessage());
                }
                log.debug(spout.getComponentId() + ":" + spout.getTaskId() + " closed");
                i++;
            }
        }
        checkCloseCondition();
    }

    public void haltTopology() throws DragonInvalidStateException {
        if (this.state == State.HALTED) {
            return;
        }
        if (this.state != State.RUNNING) {
            throw new DragonInvalidStateException("must be running to halt the topology");
        }
        log.info("halting [" + getTopologyId() + "]");
        this.state = State.HALTED;
    }

    public void resumeTopology() throws DragonInvalidStateException {
        if (this.state != State.HALTED) {
            throw new DragonInvalidStateException("can only resume from halted state");
        }
        log.info("resuming topology");
        this.state = State.RUNNING;
        try {
            this.haltLock.lock();
            this.restartCondition.signalAll();
        } finally {
            this.haltLock.unlock();
        }
    }

    public void setFault() {
        if (this.state == State.FAULT) {
            return;
        }
        log.info("topology fault has occurred");
        closeAll();
        interruptAll();
        this.state = State.FAULT;
    }

    public void interruptAll() {
        if (this.state == State.FAULT) {
            return;
        }
        log.debug("interrupting all threads");
        if (this.state != State.ALLOCATED && this.state != State.SUBMITTED) {
            for (int i = 0; i < this.componentExecutorThreads.size(); i++) {
                this.componentExecutorThreads.get(i).interrupt();
            }
            for (int i2 = 0; i2 < this.networkExecutorThreads.size(); i2++) {
                this.networkExecutorThreads.get(i2).interrupt();
            }
        }
        this.tickThread.interrupt();
        this.tickCounterThread.interrupt();
    }

    public void closeAll() {
        Iterator<HashMap<Integer, Spout>> it = this.spouts.values().iterator();
        while (it.hasNext()) {
            for (Spout spout : it.next().values()) {
                if (spout.isClosing() || spout.isClosed()) {
                    log.debug(spout.getInstanceId() + " already closed");
                } else {
                    try {
                        spout.setClosing();
                        spout.close();
                        spout.setClosed();
                    } catch (Throwable th) {
                        th.printStackTrace();
                        log.error("throwable thrown by spout [" + spout.getInstanceId() + "] when closing: " + th.getMessage());
                    }
                    log.debug(spout.getInstanceId() + " closed");
                }
            }
        }
        Iterator<HashMap<Integer, Bolt>> it2 = this.bolts.values().iterator();
        while (it2.hasNext()) {
            for (Bolt bolt : it2.next().values()) {
                if (bolt.isClosing() || bolt.isClosed()) {
                    log.debug(bolt.getInstanceId() + " already closed");
                } else {
                    try {
                        bolt.setClosing();
                        bolt.close();
                        bolt.setClosed();
                    } catch (Throwable th2) {
                        th2.printStackTrace();
                        log.error("throwable thrown by bolt [" + bolt.getInstanceId() + "] when closing: " + th2.getMessage());
                    }
                    log.debug(bolt.getInstanceId() + " closed");
                }
            }
        }
    }

    public HashMap<String, HashMap<Integer, Bolt>> getBolts() {
        return this.bolts;
    }

    public HashMap<String, HashMap<Integer, Spout>> getSpouts() {
        return this.spouts;
    }

    public Node getNode() {
        return this.node;
    }

    public void setGroupOperation(GroupOp groupOp) {
        synchronized (this.groupOperations) {
            if (!this.groupOperations.containsKey(groupOp.getClass())) {
                this.groupOperations.put(groupOp.getClass(), new HashSet<>());
            }
            this.groupOperations.get(groupOp.getClass()).add(groupOp);
        }
    }

    public synchronized void componentException(Component component, String str, StackTraceElement[] stackTraceElementArr) {
        ComponentError componentError = new ComponentError(str, stackTraceElementArr);
        if (!this.componentErrors.containsKey(component)) {
            this.componentErrors.put(component, new ArrayList<>());
        }
        this.componentErrors.get(component).add(componentError);
        int dragonFaultsComponentTolerance = this.conf.getDragonFaultsComponentTolerance();
        if (this.spoutConfs.containsKey(component.getComponentId()) && this.spoutConfs.get(component.getComponentId()).containsKey(Config.DRAGON_FAULTS_COMPONENT_TOLERANCE)) {
            dragonFaultsComponentTolerance = this.spoutConfs.get(component.getComponentId()).getDragonFaultsComponentTolerance();
        }
        if (this.boltConfs.containsKey(component.getComponentId()) && this.boltConfs.get(component.getComponentId()).containsKey(Config.DRAGON_FAULTS_COMPONENT_TOLERANCE)) {
            dragonFaultsComponentTolerance = this.boltConfs.get(component.getComponentId()).getDragonFaultsComponentTolerance();
        }
        if (this.componentErrors.get(component).size() > dragonFaultsComponentTolerance) {
            log.fatal("component [" + component.getComponentId() + ":" + component.getTaskId() + "] has failed more than [" + dragonFaultsComponentTolerance + "] times");
            if (this.node == null) {
                System.exit(-1);
            } else if (this.state == State.RUNNING) {
                this.node.signalHaltTopology(this.topologyName);
            }
        }
    }

    public HashMap<Component, ArrayList<ComponentError>> getComponentErrors() {
        return this.componentErrors;
    }

    public State getState() {
        return this.state;
    }
}
