package io.ray.streaming.runtime.worker;

import io.ray.streaming.runtime.config.StreamingWorkerConfig;
import io.ray.streaming.runtime.config.types.TransferChannelType;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.core.processor.OneInputProcessor;
import io.ray.streaming.runtime.core.processor.ProcessBuilder;
import io.ray.streaming.runtime.core.processor.SourceProcessor;
import io.ray.streaming.runtime.core.processor.StreamProcessor;
import io.ray.streaming.runtime.transfer.TransferHandler;
import io.ray.streaming.runtime.util.EnvUtil;
import io.ray.streaming.runtime.worker.context.JobWorkerContext;
import io.ray.streaming.runtime.worker.tasks.OneInputStreamTask;
import io.ray.streaming.runtime.worker.tasks.SourceStreamTask;
import io.ray.streaming.runtime.worker.tasks.StreamTask;
import java.io.Serializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ray/streaming/runtime/worker/JobWorker.class */
public class JobWorker implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(JobWorker.class);
    private static final byte[] NOT_READY_FLAG = new byte[4];
    private JobWorkerContext workerContext;
    private ExecutionVertex executionVertex;
    private StreamingWorkerConfig workerConfig;
    private StreamTask task;
    private TransferHandler transferHandler;

    public JobWorker() {
        LOG.info("Creating job worker succeeded.");
    }

    public Boolean init(JobWorkerContext jobWorkerContext) {
        LOG.info("Initiating job worker: {}. Worker context is: {}.", jobWorkerContext.getWorkerName(), jobWorkerContext);
        try {
            this.workerContext = jobWorkerContext;
            this.executionVertex = jobWorkerContext.getExecutionVertex();
            this.workerConfig = new StreamingWorkerConfig(this.executionVertex.getWorkerConfig());
            if (TransferChannelType.NATIVE_CHANNEL == this.workerConfig.transferConfig.channelType()) {
                this.transferHandler = new TransferHandler();
            }
            this.task = createStreamTask();
            if (this.task == null) {
                return false;
            }
            LOG.info("Initiating job worker succeeded: {}.", jobWorkerContext.getWorkerName());
            return true;
        } catch (Exception e) {
            LOG.error("Failed to initiate job worker.", e);
            return false;
        }
    }

    public Boolean start() {
        try {
            this.task.start();
            return true;
        } catch (Exception e) {
            LOG.error("Start worker [{}] occur error.", this.executionVertex.getExecutionVertexName(), e);
            return false;
        }
    }

    private StreamTask createStreamTask() {
        StreamTask oneInputStreamTask;
        StreamProcessor buildProcessor = ProcessBuilder.buildProcessor(this.executionVertex.getStreamOperator());
        LOG.debug("Stream processor created: {}.", buildProcessor);
        try {
            if (buildProcessor instanceof SourceProcessor) {
                oneInputStreamTask = new SourceStreamTask(getTaskId(), buildProcessor, this);
            } else {
                if (!(buildProcessor instanceof OneInputProcessor)) {
                    throw new RuntimeException("Unsupported processor type:" + buildProcessor);
                }
                oneInputStreamTask = new OneInputStreamTask(getTaskId(), buildProcessor, this);
            }
            LOG.info("Stream task created: {}.", oneInputStreamTask);
            return oneInputStreamTask;
        } catch (Exception e) {
            LOG.info("Failed to create stream task.", e);
            return null;
        }
    }

    public int getTaskId() {
        return this.executionVertex.getExecutionVertexId();
    }

    public StreamingWorkerConfig getWorkerConfig() {
        return this.workerConfig;
    }

    public JobWorkerContext getWorkerContext() {
        return this.workerContext;
    }

    public ExecutionVertex getExecutionVertex() {
        return this.executionVertex;
    }

    public StreamTask getTask() {
        return this.task;
    }

    public void onReaderMessage(byte[] bArr) {
        this.transferHandler.onReaderMessage(bArr);
    }

    public byte[] onReaderMessageSync(byte[] bArr) {
        return this.transferHandler == null ? NOT_READY_FLAG : this.transferHandler.onReaderMessageSync(bArr);
    }

    public void onWriterMessage(byte[] bArr) {
        this.transferHandler.onWriterMessage(bArr);
    }

    public byte[] onWriterMessageSync(byte[] bArr) {
        return this.transferHandler == null ? NOT_READY_FLAG : this.transferHandler.onWriterMessageSync(bArr);
    }

    static {
        EnvUtil.loadNativeLibraries();
    }
}
