package io.ray.streaming.runtime.master.coordinator;

import io.ray.api.BaseActorHandle;
import io.ray.api.id.ActorId;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.core.resource.Container;
import io.ray.streaming.runtime.master.JobMaster;
import io.ray.streaming.runtime.master.context.JobMasterRuntimeContext;
import io.ray.streaming.runtime.master.coordinator.command.BaseWorkerCmd;
import io.ray.streaming.runtime.master.coordinator.command.InterruptCheckpointRequest;
import io.ray.streaming.runtime.master.coordinator.command.WorkerRollbackRequest;
import io.ray.streaming.runtime.message.CallResult;
import io.ray.streaming.runtime.rpc.async.AsyncRemoteCaller;
import io.ray.streaming.runtime.transfer.channel.ChannelRecoverInfo;
import io.ray.streaming.runtime.util.ResourceUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.map.DefaultedMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ray/streaming/runtime/master/coordinator/FailoverCoordinator.class */
public class FailoverCoordinator extends BaseCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(FailoverCoordinator.class);
    private static final int ROLLBACK_RETRY_TIME_MS = 10000;
    private final Object cmdLock;
    private final AsyncRemoteCaller asyncRemoteCaller;
    private long currentCascadingGroupId;
    private final Map<ExecutionVertex, Boolean> isRollbacking;
    private static volatile /* synthetic */ int[] $SWITCH_TABLE$io$ray$streaming$runtime$message$CallResult$CallResultEnum;

    public FailoverCoordinator(JobMaster jobMaster, boolean z) {
        this(jobMaster, new AsyncRemoteCaller(), z);
    }

    public FailoverCoordinator(JobMaster jobMaster, AsyncRemoteCaller asyncRemoteCaller, boolean z) {
        super(jobMaster);
        this.cmdLock = new Object();
        this.currentCascadingGroupId = 0L;
        this.isRollbacking = DefaultedMap.decorate(new ConcurrentHashMap(), false);
        this.asyncRemoteCaller = asyncRemoteCaller;
        JobMasterRuntimeContext runtimeContext = jobMaster.getRuntimeContext();
        if (z) {
            runtimeContext.foCmds.addAll(runtimeContext.unfinishedFoCmds);
        }
        runtimeContext.unfinishedFoCmds.clear();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v13 */
    /* JADX WARN: Type inference failed for: r0v5, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v6, types: [java.lang.Throwable] */
    @Override // java.lang.Runnable
    public void run() {
        ?? r0;
        while (!this.closed) {
            try {
                r0 = this.cmdLock;
            } catch (Throwable th) {
                LOG.error("Fo coordinator occur err.", th);
            }
            synchronized (r0) {
                BaseWorkerCmd poll = this.jobMaster.getRuntimeContext().foCmds.poll(1L, TimeUnit.SECONDS);
                r0 = r0;
                if (poll != null && (poll instanceof WorkerRollbackRequest)) {
                    this.jobMaster.getRuntimeContext().unfinishedFoCmds.add(poll);
                    dealWithRollbackRequest((WorkerRollbackRequest) poll);
                }
            }
        }
        LOG.warn("Fo coordinator thread exit.");
    }

    private Boolean isDuplicateRequest(WorkerRollbackRequest workerRollbackRequest) {
        try {
            for (Object obj : this.runtimeContext.foCmds.toArray()) {
                if (workerRollbackRequest.fromActorId.equals(((BaseWorkerCmd) obj).fromActorId)) {
                    return true;
                }
            }
        } catch (Exception e) {
            LOG.warn("Check request is duplicated failed.", e);
        }
        return false;
    }

    public Boolean requestJobWorkerRollback(WorkerRollbackRequest workerRollbackRequest) {
        LOG.info("Request job worker rollback {}.", workerRollbackRequest);
        if (isDuplicateRequest(workerRollbackRequest).booleanValue()) {
            LOG.warn("Skip duplicated worker rollback request, {}.", workerRollbackRequest.toString());
            return true;
        }
        boolean offer = this.runtimeContext.foCmds.offer(workerRollbackRequest);
        this.jobMaster.saveContext();
        if (!offer) {
            LOG.warn("Request job worker rollback failed, because command queue is full.");
        }
        return Boolean.valueOf(offer);
    }

    private void dealWithRollbackRequest(WorkerRollbackRequest workerRollbackRequest) {
        LOG.info("Start deal with rollback request {}.", workerRollbackRequest);
        ExecutionVertex exeVertexFromRequest = getExeVertexFromRequest(workerRollbackRequest);
        if (workerRollbackRequest.getPid() != null && !workerRollbackRequest.getPid().equals(WorkerRollbackRequest.DEFAULT_PID)) {
            exeVertexFromRequest.setPid(workerRollbackRequest.getPid());
        }
        if (this.isRollbacking.get(exeVertexFromRequest).booleanValue()) {
            LOG.info("Vertex {} is rollbacking, skip rollback again.", exeVertexFromRequest);
            return;
        }
        Optional<Container> containerById = ResourceUtil.getContainerById(this.jobMaster.getResourceManager().getRegisteredContainers(), exeVertexFromRequest.getContainerId());
        if (containerById.isPresent()) {
            containerById.get().getHostname();
        }
        if (workerRollbackRequest.isForcedRollback) {
            interruptCheckpointAndRollback(workerRollbackRequest);
        } else {
            this.asyncRemoteCaller.checkIfNeedRollbackAsync(exeVertexFromRequest.getWorkerActor(), bool -> {
                if (bool.booleanValue()) {
                    interruptCheckpointAndRollback(workerRollbackRequest);
                } else {
                    LOG.info("Vertex {} doesn't need to rollback, skip it.", exeVertexFromRequest);
                }
            }, th -> {
                LOG.error("Exception when calling checkIfNeedRollbackAsync, maybe vertex is dead, ignore this request, vertex={}.", exeVertexFromRequest, th);
            });
        }
        LOG.info("Deal with rollback request {} success.", workerRollbackRequest);
    }

    private void interruptCheckpointAndRollback(WorkerRollbackRequest workerRollbackRequest) {
        if (workerRollbackRequest.cascadingGroupId == null) {
            long j = this.currentCascadingGroupId;
            this.currentCascadingGroupId = j + 1;
            workerRollbackRequest.cascadingGroupId = Long.valueOf(j);
        }
        rollback(this.jobMaster.getRuntimeContext().getLastValidCheckpointId().longValue(), workerRollbackRequest, this.currentCascadingGroupId);
        this.jobMaster.getRuntimeContext().cpCmds.offer(new InterruptCheckpointRequest());
    }

    private void rollback(long j, WorkerRollbackRequest workerRollbackRequest, long j2) {
        ExecutionVertex exeVertexFromRequest = getExeVertexFromRequest(workerRollbackRequest);
        LOG.info("Call vertex {} to rollback, checkpoint id is {}, cascadingGroupId={}.", new Object[]{exeVertexFromRequest, Long.valueOf(j), Long.valueOf(j2)});
        this.isRollbacking.put(exeVertexFromRequest, true);
        this.asyncRemoteCaller.rollback(exeVertexFromRequest.getWorkerActor(), Long.valueOf(j), callResult -> {
            List<WorkerRollbackRequest> arrayList = new ArrayList();
            switch ($SWITCH_TABLE$io$ray$streaming$runtime$message$CallResult$CallResultEnum()[callResult.getResultEnum().ordinal()]) {
                case 1:
                    ChannelRecoverInfo channelRecoverInfo = (ChannelRecoverInfo) callResult.getResultObj();
                    LOG.info("Vertex {} rollback done, dataLostQueues={}, msg={}, cascadingGroupId={}.", new Object[]{exeVertexFromRequest, channelRecoverInfo.getDataLostQueues(), callResult.getResultMsg(), Long.valueOf(j2)});
                    arrayList = cascadeUpstreamActors(channelRecoverInfo.getDataLostQueues(), exeVertexFromRequest, j2);
                    break;
                case 2:
                default:
                    LOG.error("Rollback vertex {} failed, result={}, cascadingGroupId={}, rollback this worker again after {} ms.", new Object[]{exeVertexFromRequest, callResult, Long.valueOf(j2), Integer.valueOf(ROLLBACK_RETRY_TIME_MS)});
                    Thread.sleep(10000L);
                    LOG.info("Add rollback request for {} again, cascadingGroupId={}.", exeVertexFromRequest, Long.valueOf(j2));
                    arrayList.add(new WorkerRollbackRequest(exeVertexFromRequest, "", "Rollback failed, try again.", false));
                    break;
                case 3:
                    LOG.info("Vertex skip rollback, result = {}, cascadingGroupId={}.", callResult, Long.valueOf(j2));
                    break;
            }
            ?? r0 = this.cmdLock;
            synchronized (r0) {
                this.jobMaster.getRuntimeContext().foCmds.addAll(arrayList);
                this.jobMaster.getRuntimeContext().unfinishedFoCmds.remove(workerRollbackRequest);
                this.jobMaster.saveContext();
                r0 = r0;
                this.isRollbacking.put(exeVertexFromRequest, false);
            }
        }, th -> {
            LOG.error("Exception when calling vertex to rollback, vertex={}.", exeVertexFromRequest, th);
            this.isRollbacking.put(exeVertexFromRequest, false);
        });
        LOG.info("Finish rollback vertex {}, checkpoint id is {}.", exeVertexFromRequest, Long.valueOf(j));
    }

    private List<WorkerRollbackRequest> cascadeUpstreamActors(Set<String> set, ExecutionVertex executionVertex, long j) {
        ArrayList arrayList = new ArrayList();
        set.forEach(str -> {
            ExecutionVertex executionVertex2 = getExecutionVertex(this.graphManager.getExecutionGraph().getPeerActor(executionVertex.getWorkerActor(), str));
            if (this.isRollbacking.get(executionVertex2).booleanValue()) {
                return;
            }
            LOG.info("Call upstream vertex {} of vertex {} to rollback, cascadingGroupId={}.", new Object[]{executionVertex2, executionVertex, Long.valueOf(j)});
            Optional<Container> containerById = ResourceUtil.getContainerById(this.jobMaster.getResourceManager().getRegisteredContainers(), executionVertex2.getContainerId());
            WorkerRollbackRequest workerRollbackRequest = new WorkerRollbackRequest(executionVertex2, containerById.isPresent() ? containerById.get().getHostname() : "", String.format("Cascading rollback from %s", executionVertex), true);
            workerRollbackRequest.cascadingGroupId = Long.valueOf(j);
            arrayList.add(workerRollbackRequest);
        });
        return arrayList;
    }

    private ExecutionVertex getExeVertexFromRequest(WorkerRollbackRequest workerRollbackRequest) {
        ActorId actorId = workerRollbackRequest.fromActorId;
        if (this.graphManager.getExecutionGraph().getActorById(actorId).isPresent()) {
            return getExecutionVertex(workerRollbackRequest.fromActorId);
        }
        throw new RuntimeException("Can not find ray actor of ID " + actorId);
    }

    private ExecutionVertex getExecutionVertex(BaseActorHandle baseActorHandle) {
        return this.graphManager.getExecutionGraph().getExecutionVertexByActorId(baseActorHandle.getId());
    }

    private ExecutionVertex getExecutionVertex(ActorId actorId) {
        return this.graphManager.getExecutionGraph().getExecutionVertexByActorId(actorId);
    }

    static /* synthetic */ int[] $SWITCH_TABLE$io$ray$streaming$runtime$message$CallResult$CallResultEnum() {
        int[] iArr = $SWITCH_TABLE$io$ray$streaming$runtime$message$CallResult$CallResultEnum;
        if (iArr != null) {
            return iArr;
        }
        int[] iArr2 = new int[CallResult.CallResultEnum.valuesCustom().length];
        try {
            iArr2[CallResult.CallResultEnum.FAILED.ordinal()] = 2;
        } catch (NoSuchFieldError unused) {
        }
        try {
            iArr2[CallResult.CallResultEnum.SKIPPED.ordinal()] = 3;
        } catch (NoSuchFieldError unused2) {
        }
        try {
            iArr2[CallResult.CallResultEnum.SUCCESS.ordinal()] = 1;
        } catch (NoSuchFieldError unused3) {
        }
        $SWITCH_TABLE$io$ray$streaming$runtime$message$CallResult$CallResultEnum = iArr2;
        return iArr2;
    }
}
