package stream.runtime;

import java.io.Serializable;
import org.apache.spark.api.java.JavaRDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.io.LinkedListSink;
import stream.io.RddSink;
import stream.io.multi.MultiStream;
import streams.spark.StreamsSparkContext;

/* loaded from: input_file:stream/runtime/DProcess.class */
public class DProcess extends AbstractProcess implements Serializable {
    public static final String DEFAULT_WORKER_ID_KEY = "streams-spark-workerId";
    private static final long serialVersionUID = 1666190685883273119L;
    private static final transient Logger log = LoggerFactory.getLogger(DProcess.class);
    private String workerIdKey = DEFAULT_WORKER_ID_KEY;
    private DProcessContext context;
    private DProcessExecution executor;

    public void execute() throws Exception {
        if (getInput() instanceof MultiStream) {
            MultiStream input = getInput();
            if (input.getStreams().size() < 2) {
                log.warn("Process {} will run on a single executor because its input {} contains less than 2 sub-streams", getId(), input.getId());
            }
            if (StreamsSparkContext.getInstance().isStreamingJob()) {
                this.executor = new StreamingExecution();
            } else {
                this.executor = new BatchExecution();
            }
            this.context = new DProcessContext(this);
            this.executor.init(this.context, input, getOutput());
        } else {
            log.warn("Process {} can not be distributed because its input {} is not a MultiStream!", getId(), getInput().getId());
        }
        if (this.executor != null) {
            this.executor.execute();
        } else {
            executeAsDefaultProcess();
        }
    }

    private void executeAsDefaultProcess() throws Exception {
        if (getOutput() == null || !(getOutput() instanceof RddSink)) {
            log.info("Process {} executing as default process..");
            super.execute();
            return;
        }
        RddSink rddSink = (RddSink) getOutput();
        LinkedListSink linkedListSink = new LinkedListSink();
        log.info("Process {} executing as default process...", getId());
        setOutput(linkedListSink);
        super.execute();
        log.info("Distributing results of {}...", getId());
        JavaRDD<Data> parallelize = StreamsSparkContext.sc().parallelize(linkedListSink);
        log.info("Process {} forwarding result RDD to sink {}...", getId(), rddSink.getId());
        rddSink.write(parallelize);
    }

    public String getWorkerIdKey() {
        return this.context != null ? this.context.getWorkerIdKey() : this.workerIdKey;
    }

    public void setWorkerIdKey(String str) {
        if (this.context == null) {
            this.workerIdKey = str;
        } else {
            log.warn("Setting workerIdKey of {} to {} will not have an effect because the process is already initialized", getId(), str);
        }
    }
}
