package io.ray.streaming.runtime.worker;

import io.ray.api.Ray;
import io.ray.streaming.runtime.config.StreamingWorkerConfig;
import io.ray.streaming.runtime.config.types.TransferChannelType;
import io.ray.streaming.runtime.context.ContextBackend;
import io.ray.streaming.runtime.context.ContextBackendFactory;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.core.processor.OneInputProcessor;
import io.ray.streaming.runtime.core.processor.ProcessBuilder;
import io.ray.streaming.runtime.core.processor.SourceProcessor;
import io.ray.streaming.runtime.core.processor.StreamProcessor;
import io.ray.streaming.runtime.master.coordinator.command.WorkerRollbackRequest;
import io.ray.streaming.runtime.message.CallResult;
import io.ray.streaming.runtime.rpc.RemoteCallMaster;
import io.ray.streaming.runtime.transfer.TransferHandler;
import io.ray.streaming.runtime.transfer.channel.ChannelRecoverInfo;
import io.ray.streaming.runtime.util.CheckpointStateUtil;
import io.ray.streaming.runtime.util.EnvUtil;
import io.ray.streaming.runtime.util.Serializer;
import io.ray.streaming.runtime.worker.context.JobWorkerContext;
import io.ray.streaming.runtime.worker.tasks.OneInputStreamTask;
import io.ray.streaming.runtime.worker.tasks.SourceStreamTask;
import io.ray.streaming.runtime.worker.tasks.StreamTask;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ray/streaming/runtime/worker/JobWorker.class */
public class JobWorker implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(JobWorker.class);
    private static final byte[] NOT_READY_FLAG = new byte[4];
    public ContextBackend contextBackend;
    private JobWorkerContext workerContext;
    private ExecutionVertex executionVertex;
    private StreamingWorkerConfig workerConfig;
    private StreamTask task;
    private TransferHandler transferHandler;
    public final Object initialStateChangeLock = new Object();
    public AtomicBoolean isRecreate = new AtomicBoolean(false);
    private boolean isNeedRollback = false;
    private int rollbackCount = 0;

    public JobWorker(ExecutionVertex executionVertex) {
        LOG.info("Creating job worker.");
        this.executionVertex = executionVertex;
        this.workerConfig = new StreamingWorkerConfig(executionVertex.getWorkerConfig());
        this.contextBackend = ContextBackendFactory.getContextBackend(this.workerConfig);
        LOG.info("Ray.getRuntimeContext().wasCurrentActorRestarted()={}", Boolean.valueOf(Ray.getRuntimeContext().wasCurrentActorRestarted()));
        if (!Ray.getRuntimeContext().wasCurrentActorRestarted()) {
            saveContext();
            LOG.info("Job worker is fresh started, init success.");
            return;
        }
        LOG.info("Begin load job worker checkpoint state.");
        byte[] bArr = CheckpointStateUtil.get(this.contextBackend, getJobWorkerContextKey());
        if (bArr == null) {
            LOG.error("Worker is reconstructed, but can't load checkpoint. Check whether you checkpoint state is reliable. Current checkpoint state is {}.", this.contextBackend.getClass().getName());
            return;
        }
        JobWorkerContext jobWorkerContext = (JobWorkerContext) Serializer.decode(bArr);
        LOG.info("Worker recover from checkpoint state, byte len={}, context={}.", Integer.valueOf(bArr.length), jobWorkerContext);
        init(jobWorkerContext);
        requestRollback("LoadCheckpoint request rollback in new actor.");
    }

    public synchronized void saveContext() {
        byte[] encode = Serializer.encode(this.workerContext);
        String jobWorkerContextKey = getJobWorkerContextKey();
        LOG.info("Saving context, worker context={}, serialized byte length={}, key={}.", new Object[]{this.workerContext, Integer.valueOf(encode.length), jobWorkerContextKey});
        CheckpointStateUtil.put(this.contextBackend, jobWorkerContextKey, encode);
    }

    public Boolean init(JobWorkerContext jobWorkerContext) {
        LOG.info("Initiating job worker: {}. Worker context is: {}, pid={}.", new Object[]{jobWorkerContext.getWorkerName(), jobWorkerContext, EnvUtil.getJvmPid()});
        this.workerContext = jobWorkerContext;
        this.executionVertex = jobWorkerContext.getExecutionVertex();
        this.workerConfig = new StreamingWorkerConfig(this.executionVertex.getWorkerConfig());
        this.contextBackend = ContextBackendFactory.getContextBackend(this.workerConfig);
        LOG.info("Initiating job worker succeeded: {}.", jobWorkerContext.getWorkerName());
        saveContext();
        return true;
    }

    public CallResult<ChannelRecoverInfo> rollback(Long l, Long l2) {
        synchronized (this.initialStateChangeLock) {
            if (this.task != null && this.task.isAlive() && l.longValue() == this.task.lastCheckpointId && this.task.isInitialState) {
                return CallResult.skipped("Task is already in initial state, skip this rollback.");
            }
            LOG.info("Start rollback[{}], checkpoint is {}, remote call cost {}ms.", new Object[]{this.executionVertex.getExecutionJobVertexName(), l, Long.valueOf(System.currentTimeMillis() - l2.longValue())});
            this.rollbackCount++;
            if (this.rollbackCount > 1) {
                this.isRecreate.set(true);
            }
            try {
                if (TransferChannelType.NATIVE_CHANNEL == this.workerConfig.transferConfig.channelType()) {
                    this.transferHandler = new TransferHandler();
                }
                if (this.task != null) {
                    this.task.close();
                    this.task = null;
                }
                this.task = createStreamTask(l.longValue());
                ChannelRecoverInfo recover = this.task.recover(this.isRecreate.get());
                this.isNeedRollback = false;
                LOG.info("Rollback job worker success, checkpoint is {}, channelRecoverInfo is {}.", l, recover);
                return CallResult.success(recover);
            } catch (Exception e) {
                LOG.error("Rollback job worker has exception.", e);
                return CallResult.fail(ExceptionUtils.getStackTrace(e));
            }
        }
    }

    private StreamTask createStreamTask(long j) {
        StreamTask oneInputStreamTask;
        StreamProcessor buildProcessor = ProcessBuilder.buildProcessor(this.executionVertex.getStreamOperator());
        LOG.debug("Stream processor created: {}.", buildProcessor);
        if (buildProcessor instanceof SourceProcessor) {
            oneInputStreamTask = new SourceStreamTask(buildProcessor, this, j);
        } else {
            if (!(buildProcessor instanceof OneInputProcessor)) {
                throw new RuntimeException("Unsupported processor type:" + buildProcessor);
            }
            oneInputStreamTask = new OneInputStreamTask(buildProcessor, this, j);
        }
        LOG.info("Stream task created: {}.", oneInputStreamTask);
        return oneInputStreamTask;
    }

    public Boolean triggerCheckpoint(Long l) {
        LOG.info("Receive trigger, barrierId is {}.", l);
        if (this.task != null) {
            return Boolean.valueOf(this.task.triggerCheckpoint(l));
        }
        return false;
    }

    public Boolean notifyCheckpointTimeout(Long l) {
        LOG.info("Notify checkpoint timeout, checkpoint id is {}.", l);
        if (this.task != null) {
            this.task.notifyCheckpointTimeout(l.longValue());
        }
        return true;
    }

    public Boolean clearExpiredCheckpoint(Long l, Long l2) {
        LOG.info("Clear expired checkpoint state, checkpoint id is {}; Clear expired queue msg, checkpoint id is {}", l, l2);
        if (this.task != null) {
            if (l.longValue() > 0) {
                this.task.clearExpiredCpState(l.longValue());
            }
            this.task.clearExpiredQueueMsg(l2.longValue());
        }
        return true;
    }

    public void requestRollback(String str) {
        LOG.info("Request rollback.");
        this.isNeedRollback = true;
        this.isRecreate.set(true);
        if (RemoteCallMaster.requestJobWorkerRollback(this.workerContext.getMaster(), new WorkerRollbackRequest(this.workerContext.getWorkerActorId(), str, EnvUtil.getHostName(), EnvUtil.getJvmPid())).booleanValue()) {
            return;
        }
        LOG.warn("Job worker request rollback failed! exceptionMsg={}.", str);
    }

    public Boolean checkIfNeedRollback(Long l) {
        LOG.info("Finished checking if need to rollback with result: {}, rpc delay={}ms.", Boolean.valueOf(this.isNeedRollback), Long.valueOf(System.currentTimeMillis() - l.longValue()));
        return Boolean.valueOf(this.isNeedRollback);
    }

    public StreamingWorkerConfig getWorkerConfig() {
        return this.workerConfig;
    }

    public JobWorkerContext getWorkerContext() {
        return this.workerContext;
    }

    public ExecutionVertex getExecutionVertex() {
        return this.executionVertex;
    }

    public StreamTask getTask() {
        return this.task;
    }

    private String getJobWorkerContextKey() {
        return this.workerConfig.checkpointConfig.jobWorkerContextCpPrefixKey() + this.workerConfig.commonConfig.jobName() + "_" + this.executionVertex.getExecutionVertexId();
    }

    public void onReaderMessage(byte[] bArr) {
        if (this.transferHandler != null) {
            this.transferHandler.onReaderMessage(bArr);
        }
    }

    public byte[] onReaderMessageSync(byte[] bArr) {
        return this.transferHandler == null ? NOT_READY_FLAG : this.transferHandler.onReaderMessageSync(bArr);
    }

    public void onWriterMessage(byte[] bArr) {
        if (this.transferHandler != null) {
            this.transferHandler.onWriterMessage(bArr);
        }
    }

    public byte[] onWriterMessageSync(byte[] bArr) {
        return this.transferHandler == null ? NOT_READY_FLAG : this.transferHandler.onWriterMessageSync(bArr);
    }

    static {
        EnvUtil.loadNativeLibraries();
    }
}
