package io.ray.streaming.runtime.worker.tasks;

import com.google.common.base.MoreObjects;
import io.ray.streaming.runtime.core.processor.Processor;
import io.ray.streaming.runtime.serialization.CrossLangSerializer;
import io.ray.streaming.runtime.serialization.JavaSerializer;
import io.ray.streaming.runtime.serialization.Serializer;
import io.ray.streaming.runtime.transfer.DataMessage;
import io.ray.streaming.runtime.worker.JobWorker;

/* loaded from: input_file:io/ray/streaming/runtime/worker/tasks/InputStreamTask.class */
public abstract class InputStreamTask extends StreamTask {
    private volatile boolean running;
    private volatile boolean stopped;
    private long readTimeoutMillis;
    private final Serializer javaSerializer;
    private final Serializer crossLangSerializer;

    public InputStreamTask(int i, Processor processor, JobWorker jobWorker) {
        super(i, processor, jobWorker);
        this.running = true;
        this.stopped = false;
        this.readTimeoutMillis = jobWorker.getWorkerConfig().transferConfig.readerTimerIntervalMs();
        this.javaSerializer = new JavaSerializer();
        this.crossLangSerializer = new CrossLangSerializer();
    }

    @Override // io.ray.streaming.runtime.worker.tasks.StreamTask
    protected void init() {
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.running) {
            DataMessage read = this.reader.read(this.readTimeoutMillis);
            if (read != null) {
                byte[] bArr = new byte[read.body().remaining() - 1];
                byte b = read.body().get();
                read.body().get(bArr);
                this.processor.process(b == 1 ? this.javaSerializer.deserialize(bArr) : this.crossLangSerializer.deserialize(bArr));
            }
        }
        this.stopped = true;
    }

    @Override // io.ray.streaming.runtime.worker.tasks.StreamTask
    protected void cancelTask() throws Exception {
        this.running = false;
        do {
        } while (!this.stopped);
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("taskId", this.taskId).add("processor", this.processor).toString();
    }
}
