package io.ray.streaming.runtime.master.coordinator;

import com.google.common.base.Preconditions;
import io.ray.api.BaseActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.id.ActorId;
import io.ray.runtime.exception.RayException;
import io.ray.streaming.runtime.master.JobMaster;
import io.ray.streaming.runtime.master.coordinator.command.BaseWorkerCmd;
import io.ray.streaming.runtime.master.coordinator.command.WorkerCommitReport;
import io.ray.streaming.runtime.rpc.RemoteCallWorker;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ray/streaming/runtime/master/coordinator/CheckpointCoordinator.class */
public class CheckpointCoordinator extends BaseCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
    private final Set<ActorId> pendingCheckpointActors;
    private final Set<Long> interruptedCheckpointSet;
    private final int cpIntervalSecs;
    private final int cpTimeoutSecs;

    public CheckpointCoordinator(JobMaster jobMaster) {
        super(jobMaster);
        this.pendingCheckpointActors = new HashSet();
        this.interruptedCheckpointSet = new HashSet();
        this.cpIntervalSecs = this.runtimeContext.getConf().masterConfig.checkpointConfig.cpIntervalSecs();
        this.cpTimeoutSecs = this.runtimeContext.getConf().masterConfig.checkpointConfig.cpTimeoutSecs();
        this.runtimeContext.lastCpTimestamp = System.currentTimeMillis();
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.closed) {
            try {
                BaseWorkerCmd poll = this.runtimeContext.cpCmds.poll(1L, TimeUnit.SECONDS);
                if (poll != null) {
                    if (poll instanceof WorkerCommitReport) {
                        processCommitReport((WorkerCommitReport) poll);
                    } else {
                        interruptCheckpoint();
                    }
                }
                if (this.pendingCheckpointActors.isEmpty()) {
                    maybeTriggerCheckpoint();
                } else if (timeoutOnWaitCheckpoint()) {
                    LOG.warn("Waiting for checkpoint {} timeout, pending cp actors is {}.", Long.valueOf(this.runtimeContext.lastCheckpointId), this.graphManager.getExecutionGraph().getActorName(this.pendingCheckpointActors));
                    interruptCheckpoint();
                }
            } catch (Throwable th) {
                LOG.error("Checkpoint coordinator occur err.", th);
                try {
                    interruptCheckpoint();
                } catch (Throwable th2) {
                    LOG.error("Ignore interrupt checkpoint exception in catch block.");
                }
            }
        }
        LOG.warn("Checkpoint coordinator thread exit.");
    }

    public Boolean reportJobWorkerCommit(WorkerCommitReport workerCommitReport) {
        LOG.info("Report job worker commit {}.", workerCommitReport);
        Boolean valueOf = Boolean.valueOf(this.runtimeContext.cpCmds.offer(workerCommitReport));
        if (!valueOf.booleanValue()) {
            LOG.warn("Report job worker commit failed, because command queue is full.");
        }
        return valueOf;
    }

    private void processCommitReport(WorkerCommitReport workerCommitReport) {
        LOG.info("Start process commit report {}, from actor name={}.", workerCommitReport, this.graphManager.getExecutionGraph().getActorName(workerCommitReport.fromActorId));
        try {
            Preconditions.checkArgument(workerCommitReport.commitCheckpointId == this.runtimeContext.lastCheckpointId, "expect checkpointId %s, but got %s", this.runtimeContext.lastCheckpointId, workerCommitReport);
            if (!this.pendingCheckpointActors.contains(workerCommitReport.fromActorId)) {
                LOG.warn("Invalid commit report, skipped.");
                return;
            }
            this.pendingCheckpointActors.remove(workerCommitReport.fromActorId);
            LOG.info("Pending actors after this commit: {}.", this.graphManager.getExecutionGraph().getActorName(this.pendingCheckpointActors));
            if (this.pendingCheckpointActors.isEmpty()) {
                this.runtimeContext.checkpointIds.add(Long.valueOf(this.runtimeContext.lastCheckpointId));
                if (clearExpiredCpStateAndQueueMsg()) {
                    this.jobMaster.saveContext();
                    LOG.info("Finish checkpoint: {}.", Long.valueOf(this.runtimeContext.lastCheckpointId));
                } else {
                    LOG.warn("Fail to do checkpoint: {}.", Long.valueOf(this.runtimeContext.lastCheckpointId));
                }
            }
            LOG.info("Process commit report {} success.", workerCommitReport);
        } catch (Throwable th) {
            LOG.warn("Process commit report has exception.", th);
        }
    }

    private void triggerCheckpoint() {
        this.interruptedCheckpointSet.clear();
        if (LOG.isInfoEnabled()) {
            LOG.info("Start trigger checkpoint {}.", Long.valueOf(this.runtimeContext.lastCheckpointId + 1));
        }
        this.pendingCheckpointActors.addAll(this.graphManager.getExecutionGraph().getAllActorsId());
        this.runtimeContext.lastCheckpointId++;
        ArrayList<ObjectRef> arrayList = new ArrayList();
        this.graphManager.getExecutionGraph().getSourceActors().forEach(baseActorHandle -> {
            arrayList.add(RemoteCallWorker.triggerCheckpoint(baseActorHandle, Long.valueOf(this.runtimeContext.lastCheckpointId)));
        });
        for (ObjectRef objectRef : arrayList) {
            if (objectRef.get() instanceof RayException) {
                LOG.warn("Trigger checkpoint has exception.", (RayException) objectRef.get());
                throw ((RayException) objectRef.get());
            }
        }
        this.runtimeContext.lastCpTimestamp = System.currentTimeMillis();
        LOG.info("Trigger checkpoint success.");
    }

    private void interruptCheckpoint() {
        if (this.interruptedCheckpointSet.contains(Long.valueOf(this.runtimeContext.lastCheckpointId))) {
            LOG.warn("Skip interrupt duplicated checkpoint id : {}.", Long.valueOf(this.runtimeContext.lastCheckpointId));
            return;
        }
        this.interruptedCheckpointSet.add(Long.valueOf(this.runtimeContext.lastCheckpointId));
        LOG.warn("Interrupt checkpoint, checkpoint id : {}.", Long.valueOf(this.runtimeContext.lastCheckpointId));
        List<BaseActorHandle> allActors = this.graphManager.getExecutionGraph().getAllActors();
        if (this.runtimeContext.lastCheckpointId > this.runtimeContext.getLastValidCheckpointId().longValue()) {
            RemoteCallWorker.notifyCheckpointTimeoutParallel(allActors, Long.valueOf(this.runtimeContext.lastCheckpointId));
        }
        if (!this.pendingCheckpointActors.isEmpty()) {
            this.pendingCheckpointActors.clear();
        }
        maybeTriggerCheckpoint();
    }

    private void maybeTriggerCheckpoint() {
        if (readyToTrigger()) {
            triggerCheckpoint();
        }
    }

    private boolean clearExpiredCpStateAndQueueMsg() {
        List<BaseActorHandle> allActors = this.graphManager.getExecutionGraph().getAllActors();
        if (1 == this.runtimeContext.checkpointIds.size()) {
            RemoteCallWorker.clearExpiredCheckpointParallel(allActors, 0L, this.runtimeContext.checkpointIds.get(0));
        }
        if (this.runtimeContext.checkpointIds.size() <= 1) {
            return true;
        }
        RemoteCallWorker.clearExpiredCheckpointParallel(allActors, this.runtimeContext.checkpointIds.remove(0), this.runtimeContext.checkpointIds.get(0));
        return true;
    }

    private boolean readyToTrigger() {
        return System.currentTimeMillis() - this.runtimeContext.lastCpTimestamp >= ((long) (this.cpIntervalSecs * 1000));
    }

    private boolean timeoutOnWaitCheckpoint() {
        return System.currentTimeMillis() - this.runtimeContext.lastCpTimestamp >= ((long) (this.cpTimeoutSecs * 1000));
    }
}
