package io.ray.streaming.runtime.worker.tasks;

import io.ray.api.Ray;
import io.ray.streaming.api.collector.Collector;
import io.ray.streaming.api.partition.Partition;
import io.ray.streaming.runtime.config.worker.WorkerInternalConfig;
import io.ray.streaming.runtime.context.ContextBackend;
import io.ray.streaming.runtime.context.OperatorCheckpointInfo;
import io.ray.streaming.runtime.core.collector.OutputCollector;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionEdge;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.core.processor.Processor;
import io.ray.streaming.runtime.generated.RemoteCall;
import io.ray.streaming.runtime.master.coordinator.command.WorkerCommitReport;
import io.ray.streaming.runtime.rpc.RemoteCallMaster;
import io.ray.streaming.runtime.transfer.DataReader;
import io.ray.streaming.runtime.transfer.DataWriter;
import io.ray.streaming.runtime.transfer.channel.ChannelRecoverInfo;
import io.ray.streaming.runtime.transfer.channel.OffsetInfo;
import io.ray.streaming.runtime.util.CheckpointStateUtil;
import io.ray.streaming.runtime.util.Serializer;
import io.ray.streaming.runtime.worker.JobWorker;
import io.ray.streaming.runtime.worker.context.JobWorkerContext;
import io.ray.streaming.runtime.worker.context.StreamingRuntimeContext;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ray/streaming/runtime/worker/tasks/StreamTask.class */
public abstract class StreamTask implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
    private final ContextBackend checkpointState;
    public long lastCheckpointId;
    protected Processor processor;
    protected JobWorker jobWorker;
    protected DataReader reader;
    protected DataWriter writer;
    public volatile boolean isInitialState = true;
    protected volatile boolean running = true;
    protected volatile boolean stopped = false;
    List<Collector> collectors = new ArrayList();
    private Set<Long> outdatedCheckpoints = new HashSet();
    private Thread thread = new Thread(Ray.wrapRunnable(this), String.valueOf(getClass().getName()) + "-" + System.currentTimeMillis());

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamTask(Processor processor, JobWorker jobWorker, long j) {
        this.processor = processor;
        this.jobWorker = jobWorker;
        this.checkpointState = jobWorker.contextBackend;
        this.lastCheckpointId = j;
        this.thread.setDaemon(true);
    }

    public ChannelRecoverInfo recover(boolean z) {
        if (z) {
            LOG.info("Stream task begin recover.");
        } else {
            LOG.info("Stream task first start begin.");
        }
        prepareTask(z);
        ChannelRecoverInfo channelRecoverInfo = new ChannelRecoverInfo(new HashMap());
        if (this.reader != null) {
            channelRecoverInfo = this.reader.getQueueRecoverInfo();
        }
        this.thread.setUncaughtExceptionHandler((thread, th) -> {
            LOG.error("Uncaught exception in runner thread.", th);
        });
        LOG.info("Start stream task: {}.", getClass().getSimpleName());
        this.thread.start();
        if (z) {
            LOG.info("Stream task recover end.");
        } else {
            LOG.info("Stream task first start finished.");
        }
        return channelRecoverInfo;
    }

    private void prepareTask(boolean z) {
        LOG.info("Preparing stream task, isRecreate={}.", Boolean.valueOf(z));
        ExecutionVertex executionVertex = this.jobWorker.getExecutionVertex();
        this.jobWorker.getWorkerConfig().workerInternalConfig.setProperty(WorkerInternalConfig.WORKER_NAME_INTERNAL, executionVertex.getExecutionVertexName());
        this.jobWorker.getWorkerConfig().workerInternalConfig.setProperty(WorkerInternalConfig.OP_NAME_INTERNAL, executionVertex.getExecutionJobVertexName());
        OperatorCheckpointInfo operatorCheckpointInfo = new OperatorCheckpointInfo();
        byte[] bArr = null;
        if (z) {
            String genOpCheckpointKey = genOpCheckpointKey(this.lastCheckpointId);
            LOG.info("Getting task checkpoints from state, cpKey={}, checkpointId={}.", genOpCheckpointKey, Long.valueOf(this.lastCheckpointId));
            bArr = CheckpointStateUtil.get(this.checkpointState, genOpCheckpointKey);
            if (bArr == null) {
                throw new RuntimeException(String.format("Task recover failed, checkpoint is null! cpKey=%s", genOpCheckpointKey));
            }
        }
        if (bArr != null) {
            operatorCheckpointInfo = (OperatorCheckpointInfo) Serializer.decode(bArr);
            this.processor.loadCheckpoint(operatorCheckpointInfo.processorCheckpoint);
            LOG.info("Stream task recover from checkpoint state, checkpoint bytes len={}, checkpointInfo={}.", Integer.valueOf(bArr.length), operatorCheckpointInfo);
        }
        if (!executionVertex.getOutputEdges().isEmpty()) {
            LOG.info("Register queue writer, channels={}, outputCheckpoints={}.", executionVertex.getOutputChannelIdList(), operatorCheckpointInfo.outputPoints);
            this.writer = new DataWriter(executionVertex.getOutputChannelIdList(), executionVertex.getOutputActorList(), operatorCheckpointInfo.outputPoints, this.jobWorker.getWorkerConfig());
        }
        if (!executionVertex.getInputEdges().isEmpty()) {
            LOG.info("Register queue reader, channels={}, inputCheckpoints={}.", executionVertex.getInputChannelIdList(), operatorCheckpointInfo.inputPoints);
            this.reader = new DataReader(executionVertex.getInputChannelIdList(), executionVertex.getInputActorList(), operatorCheckpointInfo.inputPoints, this.jobWorker.getWorkerConfig());
        }
        openProcessor();
        LOG.debug("Finished preparing stream task.");
    }

    private void openProcessor() {
        ExecutionVertex executionVertex = this.jobWorker.getExecutionVertex();
        List<ExecutionEdge> outputEdges = executionVertex.getOutputEdges();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        for (int i = 0; i < outputEdges.size(); i++) {
            ExecutionEdge executionEdge = outputEdges.get(i);
            String targetExecutionJobVertexName = executionEdge.getTargetExecutionJobVertexName();
            if (!hashMap3.containsKey(targetExecutionJobVertexName)) {
                hashMap.put(targetExecutionJobVertexName, new ArrayList());
                hashMap2.put(targetExecutionJobVertexName, new ArrayList());
            }
            ((List) hashMap.get(targetExecutionJobVertexName)).add(executionVertex.getOutputChannelIdList().get(i));
            ((List) hashMap2.get(targetExecutionJobVertexName)).add(executionVertex.getOutputActorList().get(i));
            hashMap3.put(targetExecutionJobVertexName, executionEdge.getPartition());
        }
        hashMap3.keySet().forEach(str -> {
            this.collectors.add(new OutputCollector(this.writer, (Collection) hashMap.get(str), (Collection) hashMap2.get(str), (Partition) hashMap3.get(str)));
        });
        this.processor.open(this.collectors, new StreamingRuntimeContext(executionVertex, this.jobWorker.getWorkerConfig().configMap, executionVertex.getParallelism()));
    }

    protected abstract void init() throws Exception;

    public void close() {
        this.running = false;
        if (this.thread.isAlive() && !Ray.getRuntimeContext().isSingleProcess()) {
            Runtime.getRuntime().halt(0);
            LOG.warn("runtime halt 0");
            System.exit(0);
        }
        LOG.info("Stream task close success.");
    }

    public boolean triggerCheckpoint(Long l) {
        throw new UnsupportedOperationException("Only source operator supports trigger checkpoints.");
    }

    public void doCheckpoint(long j, Map<String, OffsetInfo> map) {
        Map<String, OffsetInfo> map2 = null;
        if (this.writer != null) {
            map2 = this.writer.getOutputCheckpoints();
            ByteBuffer wrap = ByteBuffer.wrap(RemoteCall.Barrier.newBuilder().setId(j).m43build().toByteArray());
            wrap.order(ByteOrder.nativeOrder());
            this.writer.broadcastBarrier(j, wrap);
        }
        LOG.info("Start do checkpoint, cp id={}, inputPoints={}, outputPoints={}.", new Object[]{Long.valueOf(j), map, map2});
        this.lastCheckpointId = j;
        try {
            saveCpStateAndReport(new OperatorCheckpointInfo(map, map2, this.processor.saveCheckpoint(), j), j);
        } catch (Exception e) {
            LOG.error("Processor or op checkpoint exception.", e);
        }
        LOG.info("Operator do checkpoint {} finish.", Long.valueOf(j));
    }

    private void saveCpStateAndReport(OperatorCheckpointInfo operatorCheckpointInfo, long j) {
        saveCp(operatorCheckpointInfo, j);
        reportCommit(j);
        LOG.info("Finish save cp state and report, checkpoint id is {}.", Long.valueOf(j));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v13 */
    /* JADX WARN: Type inference failed for: r0v6, types: [io.ray.streaming.runtime.context.ContextBackend] */
    /* JADX WARN: Type inference failed for: r0v7, types: [java.lang.Throwable] */
    private void saveCp(OperatorCheckpointInfo operatorCheckpointInfo, long j) {
        byte[] encode = Serializer.encode(operatorCheckpointInfo);
        String genOpCheckpointKey = genOpCheckpointKey(j);
        LOG.info("Saving task checkpoint, cpKey={}, byte len={}, checkpointInfo={}.", new Object[]{genOpCheckpointKey, Integer.valueOf(encode.length), operatorCheckpointInfo});
        ?? r0 = this.checkpointState;
        synchronized (r0) {
            if (this.outdatedCheckpoints.contains(Long.valueOf(j))) {
                LOG.info("Outdated checkpoint, skip save checkpoint.");
                this.outdatedCheckpoints.remove(Long.valueOf(j));
            } else {
                CheckpointStateUtil.put(this.checkpointState, genOpCheckpointKey, encode);
            }
            r0 = r0;
        }
    }

    private void reportCommit(long j) {
        JobWorkerContext workerContext = this.jobWorker.getWorkerContext();
        LOG.info("Report commit async, checkpoint id {}.", Long.valueOf(j));
        RemoteCallMaster.reportJobWorkerCommitAsync(workerContext.getMaster(), new WorkerCommitReport(workerContext.getWorkerActorId(), j));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v12 */
    /* JADX WARN: Type inference failed for: r0v4, types: [io.ray.streaming.runtime.context.ContextBackend] */
    /* JADX WARN: Type inference failed for: r0v5, types: [java.lang.Throwable] */
    public void notifyCheckpointTimeout(long j) {
        String genOpCheckpointKey = genOpCheckpointKey(j);
        try {
            ?? r0 = this.checkpointState;
            synchronized (r0) {
                if (this.checkpointState.exists(genOpCheckpointKey)) {
                    this.checkpointState.remove(genOpCheckpointKey);
                } else {
                    this.outdatedCheckpoints.add(Long.valueOf(j));
                }
                r0 = r0;
            }
        } catch (Exception e) {
            LOG.error("Notify checkpoint timeout failed, checkpointId is {}.", Long.valueOf(j), e);
        }
    }

    public void clearExpiredCpState(long j) {
        String genOpCheckpointKey = genOpCheckpointKey(j);
        try {
            this.checkpointState.remove(genOpCheckpointKey);
        } catch (Exception e) {
            LOG.error("Failed to remove key {} from state backend.", genOpCheckpointKey, e);
        }
    }

    public void clearExpiredQueueMsg(long j) {
        String genOpCheckpointKey = genOpCheckpointKey(j);
        try {
            byte[] bArr = this.checkpointState.get(genOpCheckpointKey);
            if (bArr != null) {
                long j2 = ((OperatorCheckpointInfo) Serializer.decode(bArr)).checkpointId;
                if (this.writer != null) {
                    this.writer.clearCheckpoint(j2);
                }
            }
        } catch (Exception e) {
            LOG.error("Failed to get key {} from state backend.", genOpCheckpointKey, e);
        }
    }

    public String genOpCheckpointKey(long j) {
        JobWorkerContext workerContext = this.jobWorker.getWorkerContext();
        return String.valueOf(this.jobWorker.getWorkerConfig().checkpointConfig.jobWorkerOpCpPrefixKey()) + workerContext.getJobName() + "_" + workerContext.getWorkerName() + "_" + j;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void requestRollback(String str) {
        this.jobWorker.requestRollback(str);
    }

    public boolean isAlive() {
        return this.thread.isAlive();
    }
}
