package io.ray.streaming.runtime.worker.tasks;

import io.ray.api.Ray;
import io.ray.streaming.api.collector.Collector;
import io.ray.streaming.api.partition.Partition;
import io.ray.streaming.runtime.config.worker.WorkerInternalConfig;
import io.ray.streaming.runtime.core.collector.OutputCollector;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionEdge;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.core.processor.Processor;
import io.ray.streaming.runtime.transfer.ChannelId;
import io.ray.streaming.runtime.transfer.DataReader;
import io.ray.streaming.runtime.transfer.DataWriter;
import io.ray.streaming.runtime.worker.JobWorker;
import io.ray.streaming.runtime.worker.context.StreamingRuntimeContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ray/streaming/runtime/worker/tasks/StreamTask.class */
public abstract class StreamTask implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
    protected int taskId;
    protected Processor processor;
    protected JobWorker jobWorker;
    protected DataReader reader;
    List<Collector> collectors = new ArrayList();
    protected volatile boolean running = true;
    protected volatile boolean stopped = false;
    private Thread thread;

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamTask(int i, Processor processor, JobWorker jobWorker) {
        this.taskId = i;
        this.processor = processor;
        this.jobWorker = jobWorker;
        prepareTask();
        this.thread = new Thread(Ray.wrapRunnable(this), getClass().getName() + "-" + System.currentTimeMillis());
        this.thread.setDaemon(true);
    }

    private void prepareTask() {
        LOG.debug("Preparing stream task.");
        ExecutionVertex executionVertex = this.jobWorker.getExecutionVertex();
        this.jobWorker.getWorkerConfig().workerInternalConfig.setProperty(WorkerInternalConfig.WORKER_NAME_INTERNAL, executionVertex.getExecutionVertexName());
        this.jobWorker.getWorkerConfig().workerInternalConfig.setProperty(WorkerInternalConfig.OP_NAME_INTERNAL, executionVertex.getExecutionJobVertexName());
        List<ExecutionEdge> outputEdges = executionVertex.getOutputEdges();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (ExecutionEdge executionEdge : outputEdges) {
            arrayList.add(ChannelId.genIdStr(this.taskId, executionEdge.getTargetExecutionVertex().getExecutionVertexId(), executionVertex.getBuildTime()));
            arrayList2.add(executionEdge.getTargetExecutionVertex().getWorkerActor());
        }
        if (!arrayList2.isEmpty()) {
            DataWriter dataWriter = new DataWriter(arrayList, arrayList2, this.jobWorker.getWorkerConfig());
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            HashMap hashMap3 = new HashMap();
            for (int i = 0; i < outputEdges.size(); i++) {
                ExecutionEdge executionEdge2 = outputEdges.get(i);
                String targetExecutionJobVertexName = executionEdge2.getTargetExecutionJobVertexName();
                if (!hashMap3.containsKey(targetExecutionJobVertexName)) {
                    hashMap.put(targetExecutionJobVertexName, new ArrayList());
                    hashMap2.put(targetExecutionJobVertexName, new ArrayList());
                }
                ((List) hashMap.get(targetExecutionJobVertexName)).add(arrayList.get(i));
                ((List) hashMap2.get(targetExecutionJobVertexName)).add(arrayList2.get(i));
                hashMap3.put(targetExecutionJobVertexName, executionEdge2.getPartition());
            }
            hashMap3.keySet().forEach(str -> {
                this.collectors.add(new OutputCollector(dataWriter, (Collection) hashMap.get(str), (Collection) hashMap2.get(str), (Partition) hashMap3.get(str)));
            });
        }
        List<ExecutionEdge> inputEdges = executionVertex.getInputEdges();
        ArrayList arrayList3 = new ArrayList();
        ArrayList arrayList4 = new ArrayList();
        for (ExecutionEdge executionEdge3 : inputEdges) {
            arrayList3.add(ChannelId.genIdStr(executionEdge3.getSourceExecutionVertex().getExecutionVertexId(), this.taskId, executionVertex.getBuildTime()));
            arrayList4.add(executionEdge3.getSourceExecutionVertex().getWorkerActor());
        }
        if (!arrayList4.isEmpty()) {
            LOG.info("Register queue consumer, channels {}.", arrayList3);
            this.reader = new DataReader(arrayList3, arrayList4, this.jobWorker.getWorkerConfig());
        }
        this.processor.open(this.collectors, new StreamingRuntimeContext(executionVertex, this.jobWorker.getWorkerConfig().configMap, executionVertex.getParallelism()));
        LOG.debug("Finished preparing stream task.");
    }

    protected abstract void init() throws Exception;

    protected abstract void cancelTask() throws Exception;

    public void start() {
        LOG.info("Start stream task: {}-{}", getClass().getSimpleName(), Integer.valueOf(this.taskId));
        this.thread.start();
    }

    public void close() {
        this.running = false;
        if (this.thread.isAlive() && !Ray.getRuntimeContext().isSingleProcess()) {
            Runtime.getRuntime().halt(0);
            LOG.warn("runtime halt 0");
            System.exit(0);
        }
        LOG.info("Stream task close success.");
    }
}
