package stream.runtime;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Context;
import stream.Data;
import stream.Process;
import stream.ProcessContext;
import stream.Processor;
import stream.StatefulProcessor;
import stream.io.Sink;
import stream.io.Source;

/* loaded from: input_file:stream/runtime/AbstractProcess.class */
public abstract class AbstractProcess implements Process {
    static Logger log = LoggerFactory.getLogger(AbstractProcess.class);
    private String id;
    protected Context parentContext;
    protected ProcessContext processContext;
    protected Source source;
    protected Sink sink;
    protected final List<Processor> processors = new ArrayList();
    protected final Map<String, String> properties = new LinkedHashMap();
    protected Priority priority = new Priority();

    public void setInput(Source source) {
        this.source = source;
    }

    public Source getInput() {
        return this.source;
    }

    public void setOutput(Sink sink) {
        this.sink = sink;
    }

    public Sink getOutput() {
        return this.sink;
    }

    public String getId() {
        return this.id;
    }

    public void setId(String str) {
        this.id = str;
    }

    public Data process(Data data) {
        log.debug("processing data {}", data);
        Iterator<Processor> it = this.processors.iterator();
        while (it.hasNext()) {
            data = it.next().process(data);
            if (data == null) {
                return null;
            }
        }
        return data;
    }

    public void init(Context context) throws Exception {
        this.parentContext = context;
        this.processContext = new ProcessContextImpl(context);
        Iterator<Processor> it = this.processors.iterator();
        while (it.hasNext()) {
            StatefulProcessor statefulProcessor = (Processor) it.next();
            if (statefulProcessor instanceof StatefulProcessor) {
                statefulProcessor.init(this.processContext);
            }
        }
        log.debug("Process {} (source: {}) initialized, processors: ", this, getInput());
    }

    public void finish() throws Exception {
        log.debug("Finishing process {} (source: {})...", this, getInput());
        Iterator<Processor> it = this.processors.iterator();
        while (it.hasNext()) {
            StatefulProcessor statefulProcessor = (Processor) it.next();
            if (statefulProcessor instanceof StatefulProcessor) {
                try {
                    log.debug("Finishing processor {}", statefulProcessor);
                    statefulProcessor.finish();
                } catch (Exception e) {
                    log.error("Failed to finish processor '{}': {}", statefulProcessor, e.getMessage());
                    e.printStackTrace();
                }
            }
        }
    }

    public void execute() {
        try {
        } catch (Exception e) {
            log.error("Aborting process due to errors: {}", e.getMessage());
            e.printStackTrace();
        }
        if (getInput() == null) {
            log.error("Could not read from input!");
            throw new IOException("Can't read from input");
        }
        Data read = getInput().read();
        while (read != null) {
            Data process = process(read);
            if (process != null && getOutput() != null) {
                log.debug("Sending process output to connected sink {}", getOutput());
                getOutput().write(process);
            }
            read = getInput().read();
        }
        log.debug("No more items could be read, exiting this process.");
        try {
            finish();
        } catch (Exception e2) {
            log.warn("Error while finishing process: {}", e2.getMessage());
            e2.printStackTrace();
        }
    }

    public ProcessContext getContext() {
        return this.processContext;
    }

    public void add(Processor processor) {
        this.processors.add(processor);
    }

    public void remove(Processor processor) {
        this.processors.remove(processor);
    }

    public List<Processor> getProcessors() {
        return this.processors;
    }

    public Map<String, String> getProperties() {
        return this.properties;
    }

    public Priority getPriority() {
        return this.priority;
    }

    public void setPriority(Priority priority) {
        this.priority = priority;
    }

    public String toString() {
        return this.id != null ? getClass().getCanonicalName() + "[" + this.id + "]" : getClass().getCanonicalName() + "[" + super.toString() + "]";
    }
}
