package stream.runtime;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.Process;
import stream.io.ReceiverSink;
import stream.io.Stream;
import stream.io.multi.LazySeqMultiStream;
import stream.io.multi.MultiStream;
import stream.runtime.setup.factory.ObjectFactory;
import stream.runtime.setup.factory.ProcessorFactory;
import stream.runtime.setup.handler.DProcessElementHandler;
import stream.util.Variables;

/* loaded from: input_file:stream/runtime/StreamingReceiver.class */
public class StreamingReceiver extends Receiver<Data> {
    private static transient Logger log = LoggerFactory.getLogger(StreamingReceiver.class);
    private static final long serialVersionUID = -3536119501663636816L;
    private DProcessContext context;
    private List<String> substreamIds;

    public StreamingReceiver(DProcessContext dProcessContext) {
        this(dProcessContext, null);
    }

    public StreamingReceiver(DProcessContext dProcessContext, List<String> list) {
        super(StorageLevel.MEMORY_AND_DISK());
        this.context = dProcessContext;
        this.substreamIds = list;
    }

    /* JADX WARN: Type inference failed for: r0v0, types: [stream.runtime.StreamingReceiver$1] */
    public void onStart() {
        new Thread() { // from class: stream.runtime.StreamingReceiver.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                StreamingReceiver.this.receive();
            }
        }.start();
    }

    public void onStop() {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void receive() {
        try {
            Variables loadUserProperties = StreamRuntime.loadUserProperties();
            HashMap hashMap = new HashMap();
            ObjectFactory newInstance = ObjectFactory.newInstance();
            hashMap.put(DProcessElementHandler.ELEMENT_TAG, new DProcessElementHandler(newInstance, new ProcessorFactory(newInstance), true));
            ProcessContainer processContainer = new ProcessContainer(this.context.getXmlDoc(), hashMap, loadUserProperties);
            Process process = null;
            Iterator it = processContainer.getProcesses().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Process process2 = (Process) it.next();
                if (((String) process2.getProperties().get("id")).equals(this.context.getProcessId())) {
                    process = process2;
                    break;
                }
            }
            if (process == null) {
                stop("Could not find input of process with ID '" + this.context.getProcessId() + "'!");
            }
            processContainer.getProcesses().clear();
            processContainer.getProcesses().add(process);
            if (this.substreamIds != null && !(process.getInput() instanceof MultiStream)) {
                log.error("Process source with ID {} is no instance of MultiStream! Proceeding without limiting this source to substream IDs.", this.context.getProcessId());
                this.substreamIds = null;
            }
            LazySeqMultiStream lazySeqMultiStream = new LazySeqMultiStream() { // from class: stream.runtime.StreamingReceiver.2
                @Override // stream.io.multi.LazySeqMultiStream
                public Data readNext() throws Exception {
                    Data readNext = super.readNext();
                    if (readNext != null) {
                        readNext.put(StreamingReceiver.this.context.getWorkerIdKey(), Integer.valueOf(StreamingReceiver.this.streamId()));
                    }
                    return readNext;
                }
            };
            if (this.substreamIds != null) {
                for (String str : this.substreamIds) {
                    lazySeqMultiStream.addStream(str, (Stream) process.getInput().getStreams().get(str));
                }
            } else {
                lazySeqMultiStream.addStream(process.getInput().getId(), (Stream) process.getInput());
            }
            process.setInput(lazySeqMultiStream);
            processContainer.streams.clear();
            processContainer.streams.put("Receiver" + streamId(), lazySeqMultiStream);
            log.info("Receiver {} STARTING to receive from {} substream(s).", Integer.valueOf(streamId()), Integer.valueOf(lazySeqMultiStream.getStreams().size()));
            process.setOutput(new ReceiverSink(streamId(), this));
            processContainer.execute();
            log.info("FINISHED receiver {}", Integer.valueOf(streamId()));
        } catch (Exception e) {
            stop("Error reading input data", e);
        }
    }
}
