package stream.runtime;

import java.util.LinkedList;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.io.RddSink;
import stream.io.Sink;
import stream.io.multi.MultiStream;
import streams.spark.StreamsSparkContext;

/* loaded from: input_file:stream/runtime/BatchExecution.class */
public class BatchExecution implements DProcessExecution {
    private static final transient Logger log = LoggerFactory.getLogger(BatchExecution.class);
    private DProcessContext context;
    private transient JavaRDD<String> idRdd;
    private transient Sink output;

    @Override // stream.runtime.DProcessExecution
    public void init(DProcessContext dProcessContext, MultiStream multiStream, Sink sink) throws Exception {
        log.info("Initializing {} for {} workers...", dProcessContext.getProcessId(), Integer.valueOf(multiStream.getStreams().size()));
        this.context = dProcessContext;
        LinkedList linkedList = new LinkedList(multiStream.getStreams().keySet());
        this.idRdd = StreamsSparkContext.sc().parallelize(linkedList, linkedList.size());
        this.output = sink;
    }

    @Override // stream.runtime.DProcessExecution
    public void execute() throws Exception {
        log.info("\n\n==== BATCH executor of {} STARTED ====\n", this.context.getProcessId());
        JavaRDD<Data> persist = this.idRdd.flatMap(new BatchTask(this.context)).persist(StorageLevel.MEMORY_AND_DISK_SER());
        this.idRdd.unpersist();
        if (this.output == null || !(this.output instanceof RddSink)) {
            log.info("{} collecting results...", this.context.getProcessId());
            List collect = persist.collect();
            if (this.output != null) {
                log.info("{} writing {} collected data items to sink {}...", new Object[]{this.context.getProcessId(), Integer.valueOf(collect.size()), this.output.getId()});
                this.output.write(collect);
            } else {
                log.info("{} collected {} data items not writing to any sink.", this.context.getProcessId(), Integer.valueOf(collect.size()));
            }
        } else {
            log.info("{} forwarding result RDD to sink {}...", this.context.getProcessId(), this.output.getId());
            ((RddSink) this.output).write(persist);
        }
        log.info("\n\n==== executor of {} FINISHED ====\n", this.context.getProcessId());
    }
}
