package io.ray.streaming.runtime.worker.tasks;

import com.google.common.base.MoreObjects;
import io.ray.streaming.runtime.core.processor.Processor;
import io.ray.streaming.runtime.generated.RemoteCall;
import io.ray.streaming.runtime.serialization.CrossLangSerializer;
import io.ray.streaming.runtime.serialization.JavaSerializer;
import io.ray.streaming.runtime.serialization.Serializer;
import io.ray.streaming.runtime.transfer.exception.ChannelInterruptException;
import io.ray.streaming.runtime.transfer.message.BarrierMessage;
import io.ray.streaming.runtime.transfer.message.ChannelMessage;
import io.ray.streaming.runtime.transfer.message.DataMessage;
import io.ray.streaming.runtime.worker.JobWorker;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ray/streaming/runtime/worker/tasks/InputStreamTask.class */
public abstract class InputStreamTask extends StreamTask {
    private static final Logger LOG = LoggerFactory.getLogger(InputStreamTask.class);
    private final Serializer javaSerializer;
    private final Serializer crossLangSerializer;
    private final long readTimeoutMillis;

    public InputStreamTask(Processor processor, JobWorker jobWorker, long j) {
        super(processor, jobWorker, j);
        this.readTimeoutMillis = jobWorker.getWorkerConfig().transferConfig.readerTimerIntervalMs();
        this.javaSerializer = new JavaSerializer();
        this.crossLangSerializer = new CrossLangSerializer();
    }

    @Override // io.ray.streaming.runtime.worker.tasks.StreamTask
    protected void init() {
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v14, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v15, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v19 */
    @Override // java.lang.Runnable
    public void run() {
        while (this.running) {
            try {
                ?? r0 = this.jobWorker.initialStateChangeLock;
                synchronized (r0) {
                    ChannelMessage read = this.reader.read(this.readTimeoutMillis);
                    r0 = read;
                    if (r0 != 0) {
                        this.isInitialState = false;
                        if (read instanceof DataMessage) {
                            DataMessage dataMessage = (DataMessage) read;
                            byte[] bArr = new byte[dataMessage.body().remaining() - 1];
                            byte b = dataMessage.body().get();
                            dataMessage.body().get(bArr);
                            this.processor.process(b == 1 ? this.javaSerializer.deserialize(bArr) : this.crossLangSerializer.deserialize(bArr));
                        } else if (read instanceof BarrierMessage) {
                            BarrierMessage barrierMessage = (BarrierMessage) read;
                            byte[] bArr2 = new byte[barrierMessage.getData().remaining()];
                            barrierMessage.getData().get(bArr2);
                            long id = RemoteCall.Barrier.parseFrom(bArr2).getId();
                            LOG.info("Start to do checkpoint {}, worker name is {}.", Long.valueOf(id), this.jobWorker.getWorkerContext().getWorkerName());
                            doCheckpoint(id, barrierMessage.getInputOffsets());
                            LOG.info("Do checkpoint {} success.", Long.valueOf(id));
                        }
                    }
                }
            } catch (Throwable th) {
                if ((th instanceof ChannelInterruptException) || (ExceptionUtils.getRootCause(th) instanceof ChannelInterruptException)) {
                    LOG.info("queue has stopped.");
                } else {
                    LOG.error("Last success checkpointId={}, now occur error.", Long.valueOf(this.lastCheckpointId), th);
                    requestRollback(ExceptionUtils.getStackTrace(th));
                }
            }
        }
        LOG.info("Input stream task thread exit.");
        this.stopped = true;
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("processor", this.processor).toString();
    }
}
