package stream.runtime;

import java.net.InetAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.xml.parsers.DocumentBuilderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import stream.Data;
import stream.ProcessContext;
import stream.data.DataFactory;
import stream.io.BlockingQueue;
import stream.io.DataStream;
import stream.io.DataStreamQueue;
import stream.runtime.rpc.RMINamingService;
import stream.runtime.setup.ContainerRefElementHandler;
import stream.runtime.setup.DocumentHandler;
import stream.runtime.setup.LibrariesElementHandler;
import stream.runtime.setup.MonitorElementHandler;
import stream.runtime.setup.ObjectCreator;
import stream.runtime.setup.ObjectFactory;
import stream.runtime.setup.ProcessElementHandler;
import stream.runtime.setup.ProcessorFactory;
import stream.runtime.setup.PropertiesHandler;
import stream.runtime.setup.QueueElementHandler;
import stream.runtime.setup.ServiceElementHandler;
import stream.runtime.setup.ServiceInjection;
import stream.runtime.setup.ServiceReference;
import stream.runtime.setup.StreamElementHandler;
import stream.runtime.shutdown.DependencyGraph;
import stream.runtime.shutdown.LocalShutdownCondition;
import stream.runtime.shutdown.ServerShutdownCondition;
import stream.service.NamingService;

/* loaded from: input_file:stream/runtime/ProcessContainer.class */
public class ProcessContainer {
    static Logger log = LoggerFactory.getLogger(ProcessContainer.class);
    static final List<ProcessContainer> container = new ArrayList();
    private static boolean runShutdownHook = true;
    protected final DependencyGraph depGraph;
    protected final ObjectFactory objectFactory;
    protected final ProcessorFactory processorFactory;
    protected String name;
    protected final ContainerContext context;
    protected final Map<String, DataStream> streams;
    protected final Map<String, DataStreamQueue> listeners;
    protected final List<AbstractProcess> processes;
    protected final Map<AbstractProcess, ProcessContext> processContexts;
    protected final List<ServiceReference> serviceRefs;
    protected final Map<String, ElementHandler> elementHandler;
    protected final List<DocumentHandler> documentHandler;
    protected NamingService namingService;
    protected final List<LifeCycle> lifeCyleObjects;
    boolean server;
    Long startTime;
    static final String[] extensions;

    public ProcessContainer(URL url) throws Exception {
        this(url, null);
    }

    public ProcessContainer(URL url, Map<String, ElementHandler> map) throws Exception {
        this.depGraph = new DependencyGraph();
        this.objectFactory = ObjectFactory.newInstance();
        this.processorFactory = new ProcessorFactory(this.objectFactory);
        this.name = null;
        this.streams = new LinkedHashMap();
        this.listeners = new LinkedHashMap();
        this.processes = new ArrayList();
        this.processContexts = new LinkedHashMap();
        this.serviceRefs = new ArrayList();
        this.elementHandler = new HashMap();
        this.documentHandler = new ArrayList();
        this.namingService = null;
        this.lifeCyleObjects = new ArrayList();
        this.server = true;
        this.startTime = 0L;
        LibrariesElementHandler librariesElementHandler = new LibrariesElementHandler(this.objectFactory);
        this.documentHandler.add(librariesElementHandler);
        this.documentHandler.add(new PropertiesHandler());
        this.elementHandler.put("Container-Ref", new ContainerRefElementHandler(this.objectFactory));
        this.elementHandler.put("Queue", new QueueElementHandler());
        this.elementHandler.put("Monitor", new MonitorElementHandler(this.objectFactory, this.processorFactory));
        this.elementHandler.put("Process", new ProcessElementHandler(this.objectFactory, this.processorFactory));
        this.elementHandler.put("Stream", new StreamElementHandler(this.objectFactory));
        this.elementHandler.put("Service", new ServiceElementHandler(this.objectFactory));
        this.elementHandler.put("Libs", librariesElementHandler);
        if (map != null) {
            this.elementHandler.putAll(map);
        }
        Document parse = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(url.openStream());
        Element documentElement = parse.getDocumentElement();
        Map<String, String> attributes = this.objectFactory.getAttributes(documentElement);
        if (System.getProperty("container.address") != null) {
            attributes.put("address", System.getProperty("container.address"));
        }
        if (System.getProperty("container.port") != null) {
            attributes.put("port", System.getProperty("container.port"));
        }
        try {
            this.server = new Boolean(attributes.get("server")).booleanValue();
        } catch (Exception e) {
            this.server = true;
        }
        if (!documentElement.getNodeName().equalsIgnoreCase("experiment") && !documentElement.getNodeName().equalsIgnoreCase("container")) {
            throw new Exception("Expecting root element to be 'container'!");
        }
        String hostAddress = InetAddress.getLocalHost().getHostAddress();
        this.name = InetAddress.getLocalHost().getHostName();
        if (this.name.indexOf(".") > 0) {
            this.name = this.name.substring(0, this.name.indexOf("."));
        }
        log.debug("Default hostname is: {}", hostAddress);
        if (attributes.containsKey("address") && !attributes.get("address").trim().isEmpty()) {
            hostAddress = InetAddress.getByName(attributes.get("address")).getHostAddress();
            log.debug("Container address will be {}", hostAddress);
        }
        Integer num = 0;
        if (attributes.containsKey("port") && !attributes.get("port").trim().isEmpty()) {
            num = new Integer(attributes.get("port"));
            log.debug("Container port will be {}", num);
        }
        if (documentElement.hasAttribute("id")) {
            this.name = documentElement.getAttribute("id");
        }
        try {
            String attribute = documentElement.getAttribute("namingService");
            if (attribute != null && !attribute.trim().isEmpty()) {
                this.namingService = (NamingService) this.objectFactory.create(attribute, attributes);
            }
            if (this.namingService == null) {
                if (attributes.containsKey("address")) {
                    log.debug("Creating RMI naming-service...");
                    System.setProperty("java.rmi.server.hostname", hostAddress);
                    this.namingService = new RMINamingService(this.name, hostAddress, num.intValue(), true);
                } else {
                    log.debug("No address specified, using local naming-service. Container will not be able to reference other containers!");
                    this.namingService = new DefaultNamingService();
                }
            }
            if (this.namingService instanceof LifeCycle) {
                this.lifeCyleObjects.add((LifeCycle) this.namingService);
            }
            log.debug("Using naming-service {}", this.namingService);
            this.context = new ContainerContext(this.name, this.namingService);
            init(parse);
        } catch (Exception e2) {
            log.error("Faild to instantiate naming service '{}': {}", documentElement.getAttribute("namingService"), e2.getMessage());
            throw new Exception("Faild to instantiate naming service '" + documentElement.getAttribute("namingService") + "': " + e2.getMessage());
        }
    }

    public DependencyGraph getDependencyGraph() {
        return this.depGraph;
    }

    public Set<DataStream> getStreams() {
        return new LinkedHashSet(this.streams.values());
    }

    public String getName() {
        return this.name;
    }

    public void setName(String str) {
        this.name = str;
    }

    public ContainerContext getContext() {
        return this.context;
    }

    public List<AbstractProcess> getProcesses() {
        return this.processes;
    }

    public List<ServiceReference> getServiceRefs() {
        return this.serviceRefs;
    }

    private void init(Document document) throws Exception {
        Element documentElement = document.getDocumentElement();
        if (documentElement.getAttribute("import") != null) {
            for (String str : documentElement.getAttribute("import").split(",")) {
                if (!str.trim().isEmpty()) {
                    this.objectFactory.addPackage(str.trim());
                }
            }
        }
        if (documentElement.getAttribute("name") == null) {
        }
        Iterator<DocumentHandler> it = this.documentHandler.iterator();
        while (it.hasNext()) {
            it.next().handle(this, document);
        }
        this.objectFactory.addVariables(this.context.getProperties());
        NodeList childNodes = documentElement.getChildNodes();
        if (this.context.getProperties().get("container.datafactory") != null) {
            log.debug("Using {} as default DataFactory for this container...", this.context.getProperties().get("container.datafactory"));
            DataFactory.setDefaultDataFactory((DataFactory) Class.forName(this.context.getProperties().get("container.datafactory")).newInstance());
        }
        for (int i = 0; i < childNodes.getLength(); i++) {
            Node item = childNodes.item(i);
            if (item.getNodeType() == 1) {
                Element element = (Element) item;
                for (ElementHandler elementHandler : this.elementHandler.values()) {
                    if (elementHandler.handlesElement(element)) {
                        elementHandler.handleElement(this, element);
                    }
                }
            }
        }
        connectProcesses();
    }

    protected void connectProcesses() throws Exception {
        log.debug("Wiring process inputs to data-streams...");
        for (AbstractProcess abstractProcess : this.processes) {
            if (abstractProcess instanceof Process) {
                Process process = (Process) abstractProcess;
                String input = process.getInput();
                if (input == null) {
                    throw new RuntimeException("Process '" + process + "' is not connected to any input-stream!");
                }
                BlockingQueue blockingQueue = (DataStream) this.streams.get(input);
                if (blockingQueue == null) {
                    log.debug("No stream defined for name '{}' - creating a listener-queue for key '{}'", input, input);
                    BlockingQueue blockingQueue2 = new BlockingQueue();
                    registerQueue(input, blockingQueue2, false);
                    blockingQueue = blockingQueue2;
                }
                this.depGraph.add(process, blockingQueue);
                process.setDataStream(blockingQueue);
            }
        }
    }

    public void registerQueue(String str, DataStreamQueue dataStreamQueue, boolean z) throws Exception {
        log.debug("A new queue '{}' is registered for id '{}'", dataStreamQueue, str);
        if (z) {
            this.listeners.put(str, dataStreamQueue);
        }
        setStream(str, dataStreamQueue);
        this.context.register(str, dataStreamQueue);
    }

    protected void injectServices() throws Exception {
        ServiceInjection.injectServices(getServiceRefs(), getContext(), this.depGraph);
    }

    public void setStream(String str, DataStream dataStream) {
        this.streams.put(str, dataStream);
    }

    public void run() throws Exception {
        if (!container.contains(this)) {
            container.add(this);
        }
        this.startTime = Long.valueOf(System.currentTimeMillis());
        ContainerController containerController = new ContainerController(this);
        log.debug("Registering container-controller {}", containerController);
        this.namingService.register(".ctrl", containerController);
        injectServices();
        if (!this.server && this.streams.isEmpty() && this.listeners.isEmpty()) {
            throw new Exception("No data-stream defined!");
        }
        log.debug("Need to handle {} sources: {}", Integer.valueOf(this.streams.size()), this.streams.keySet());
        log.debug("Experiment contains {} stream processes", Integer.valueOf(this.processes.size()));
        log.debug("Initializing all DataStreams...");
        for (String str : this.streams.keySet()) {
            DataStream dataStream = this.streams.get(str);
            log.debug("Initializing stream '{}'", str);
            dataStream.init();
        }
        log.debug("Creating {} active processes...", Integer.valueOf(this.processes.size()));
        long currentTimeMillis = System.currentTimeMillis();
        for (AbstractProcess abstractProcess : this.processes) {
            abstractProcess.setDaemon(true);
            ProcessContext processContext = this.processContexts.get(abstractProcess);
            if (processContext == null) {
                processContext = new ProcessContextImpl(this.context);
                this.processContexts.put(abstractProcess, processContext);
            }
            log.debug("Initializing process with process-context...");
            abstractProcess.init(processContext);
            abstractProcess.addListener(new ProcessListener() { // from class: stream.runtime.ProcessContainer.2
                @Override // stream.runtime.ProcessListener
                public void processStarted(AbstractProcess abstractProcess2) {
                    ProcessContainer.log.debug("Starting process {}", abstractProcess2);
                }

                @Override // stream.runtime.ProcessListener
                public void processFinished(AbstractProcess abstractProcess2) {
                    ProcessContainer.log.debug("Process {} finished, removing from dependency-graph.", abstractProcess2);
                    ProcessContainer.this.depGraph.remove(abstractProcess2);
                    List<LifeCycle> remove = ProcessContainer.this.depGraph.remove(abstractProcess2);
                    ProcessContainer.log.debug("End-of-life for: {}", remove);
                    for (LifeCycle lifeCycle : remove) {
                        try {
                            ProcessContainer.log.debug("Calling finish() for LifeCycle object {}", lifeCycle);
                            lifeCycle.finish();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            log.debug("Starting stream-process [{}]", abstractProcess);
            abstractProcess.start();
            log.debug("Stream-process started.");
        }
        log.debug("Waiting for container to finish...");
        (this.server ? new ServerShutdownCondition() : new LocalShutdownCondition()).waitForCondition(this.depGraph);
        runShutdownHook = false;
        long currentTimeMillis2 = System.currentTimeMillis();
        log.trace("Running processes: {}", this.processes);
        log.info("ProcessContainer finished all processes after {} ms", Long.valueOf(currentTimeMillis2 - currentTimeMillis));
    }

    public Set<String> getStreamListenerNames() {
        return this.listeners.keySet();
    }

    public ObjectFactory getObjectFactory() {
        return this.objectFactory;
    }

    public void dataArrived(String str, Data data) {
        if (!this.listeners.containsKey(str)) {
            log.warn("No listener defined for {}", str);
        } else {
            log.debug("Adding item {} into queue {}", data, str);
            this.listeners.get(str).process(data);
        }
    }

    public void shutdown() {
        if (runShutdownHook) {
            synchronized (this.processes) {
                for (AbstractProcess abstractProcess : this.processes) {
                    log.debug("Sending SHUTDOWN signal to process {}", abstractProcess);
                    try {
                        abstractProcess.finish();
                    } catch (Exception e) {
                        log.error("Failed to properly shutdown process: {}", e.getMessage());
                    }
                }
            }
            log.debug("Sending finish() signal to life-cycle objects...");
            for (LifeCycle lifeCycle : this.lifeCyleObjects) {
                try {
                    log.debug("   sending finish() to {}", lifeCycle);
                    lifeCycle.finish();
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            }
            while (!this.processes.isEmpty()) {
                log.debug("Waiting for {} processes to finish...", Integer.valueOf(this.processes.size()));
                try {
                    Iterator<AbstractProcess> it = this.processes.iterator();
                    while (it.hasNext()) {
                        AbstractProcess next = it.next();
                        if (!next.isAlive()) {
                            log.debug("another process finished...");
                            it.remove();
                            for (LifeCycle lifeCycle2 : this.depGraph.remove(next)) {
                                try {
                                    log.info("Calling finish() for LifeCycle object {}", lifeCycle2);
                                    lifeCycle2.finish();
                                } catch (Exception e3) {
                                    e3.printStackTrace();
                                }
                            }
                        }
                    }
                    log.debug("Waiting for {} processes to finish...", Integer.valueOf(this.processes.size()));
                    log.debug("   processes: {}", this.processes);
                    try {
                        Thread.sleep(500L);
                    } catch (Exception e4) {
                    }
                } catch (Exception e5) {
                    e5.printStackTrace();
                }
            }
            log.info("Container shut down.");
        }
    }

    public void setProcessContext(Process process, ProcessContext processContext) {
        this.processContexts.put(process, processContext);
    }

    static {
        log.debug("Adding container shutdown-hook");
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: stream.runtime.ProcessContainer.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                if (ProcessContainer.runShutdownHook) {
                    if ("disabled".equalsIgnoreCase(System.getProperty("container.shutdown-hook"))) {
                        ProcessContainer.log.warn("Shutdown-hook disabled...");
                        return;
                    }
                    ProcessContainer.log.debug("Running shutdown-hook...");
                    for (ProcessContainer processContainer : ProcessContainer.container) {
                        ProcessContainer.log.debug("Sending shutdown signal to {}", processContainer);
                        processContainer.shutdown();
                    }
                }
            }
        });
        extensions = new String[]{"stream.moa.MoaObjectFactory", "stream.script.JavaScriptProcessorFactory"};
        for (String str : extensions) {
            try {
                ObjectFactory.registerObjectCreator((ObjectCreator) Class.forName(str).newInstance());
                log.debug("Registered extension {}", str);
            } catch (Exception e) {
                log.debug("Failed to register extension '{}': {}", str, e.getMessage());
                if (log.isTraceEnabled()) {
                    e.printStackTrace();
                }
            }
        }
    }
}
