package org.apache.wayang.core.optimizer.enumeration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.Validate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.wayang.core.plan.executionplan.Channel;
import org.apache.wayang.core.plan.executionplan.ExecutionPlan;
import org.apache.wayang.core.plan.executionplan.ExecutionStage;
import org.apache.wayang.core.plan.executionplan.ExecutionStageLoop;
import org.apache.wayang.core.plan.executionplan.ExecutionTask;
import org.apache.wayang.core.plan.executionplan.PlatformExecution;
import org.apache.wayang.core.plan.wayangplan.InputSlot;
import org.apache.wayang.core.plan.wayangplan.LoopSubplan;
import org.apache.wayang.core.plan.wayangplan.OutputSlot;
import org.apache.wayang.core.platform.Platform;
import org.apache.wayang.core.util.Iterators;
import org.apache.wayang.core.util.OneTimeExecutable;
import org.apache.wayang.core.util.Tuple;

/* loaded from: input_file:org/apache/wayang/core/optimizer/enumeration/StageAssignmentTraversal.class */
public class StageAssignmentTraversal extends OneTimeExecutable {
    private static final Logger logger;
    private final ExecutionTaskFlow executionTaskFlow;
    private final Map<ExecutionTask, InterimStage> assignedInterimStages = new HashMap();
    private final Map<ExecutionTask, Set<InterimStage>> requiredStages = new HashMap();
    private final Collection<StageSplittingCriterion> splittingCriteria = new LinkedList();
    private final Collection<InterimStage> allStages = new LinkedList();
    private final Collection<InterimStage> newStages = new LinkedList();
    private Map<LoopSubplan, ExecutionStageLoop> stageLoops = new HashMap();
    private ExecutionPlan result;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/wayang/core/optimizer/enumeration/StageAssignmentTraversal$InterimStage.class */
    public interface InterimStage {
        Set<ExecutionTask> getTasks();

        Platform getPlatform();

        void addTask(ExecutionTask executionTask);

        void setOutbound(ExecutionTask executionTask);

        ExecutionStage toExecutionStage();

        InterimStage separate(Set<ExecutionTask> set);

        boolean getAndResetSplitMark();

        void markDependenciesUpdated();

        Set<ExecutionTask> getOutboundTasks();

        Collection<ExecutionTask> getStartTasks();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/wayang/core/optimizer/enumeration/StageAssignmentTraversal$InterimStageImpl.class */
    public class InterimStageImpl implements InterimStage {
        private final PlatformExecution platformExecution;
        private final Set<ExecutionTask> allTasks;
        private final Set<ExecutionTask> outboundTasks;
        private boolean isMarked;
        private final int sequenceNumber;
        static final /* synthetic */ boolean $assertionsDisabled;

        public InterimStageImpl(StageAssignmentTraversal stageAssignmentTraversal, PlatformExecution platformExecution) {
            this(platformExecution, 0);
        }

        private InterimStageImpl(PlatformExecution platformExecution, int i) {
            this.allTasks = new HashSet();
            this.outboundTasks = new HashSet();
            this.platformExecution = platformExecution;
            this.sequenceNumber = i;
        }

        @Override // org.apache.wayang.core.optimizer.enumeration.StageAssignmentTraversal.InterimStage
        public Platform getPlatform() {
            return this.platformExecution.getPlatform();
        }

        @Override // org.apache.wayang.core.optimizer.enumeration.StageAssignmentTraversal.InterimStage
        public void addTask(ExecutionTask executionTask) {
            this.allTasks.add(executionTask);
        }

        @Override // org.apache.wayang.core.optimizer.enumeration.StageAssignmentTraversal.InterimStage
        public void setOutbound(ExecutionTask executionTask) {
            Validate.isTrue(this.allTasks.contains(executionTask));
            this.outboundTasks.add(executionTask);
        }

        @Override // org.apache.wayang.core.optimizer.enumeration.StageAssignmentTraversal.InterimStage
        public Set<ExecutionTask> getOutboundTasks() {
            return this.outboundTasks;
        }

        @Override // org.apache.wayang.core.optimizer.enumeration.StageAssignmentTraversal.InterimStage
        public Collection<ExecutionTask> getStartTasks() {
            return (Collection) getTasks().stream().filter(this::checkIfStartTask).collect(Collectors.toList());
        }

        @Override // org.apache.wayang.core.optimizer.enumeration.StageAssignmentTraversal.InterimStage
        public Set<ExecutionTask> getTasks() {
            return this.allTasks;
        }

        @Override // org.apache.wayang.core.optimizer.enumeration.StageAssignmentTraversal.InterimStage
        public InterimStage separate(Set<ExecutionTask> set) {
            InterimStage createSplit = createSplit();
            Iterator<ExecutionTask> it = this.allTasks.iterator();
            while (it.hasNext()) {
                ExecutionTask next = it.next();
                if (set.contains(next)) {
                    it.remove();
                    createSplit.addTask(next);
                    if (this.outboundTasks.remove(next)) {
                        createSplit.setOutbound(next);
                    }
                }
            }
            for (ExecutionTask executionTask : this.allTasks) {
                for (int i = 0; i < executionTask.getNumOuputChannels(); i++) {
                    if (executionTask.getOutputChannels()[i].getConsumers().stream().anyMatch(executionTask2 -> {
                        return !this.allTasks.contains(executionTask2);
                    })) {
                        this.outboundTasks.add(executionTask);
                    }
                }
            }
            return createSplit;
        }

        public InterimStage createSplit() {
            return new InterimStageImpl(this.platformExecution, this.sequenceNumber + 1);
        }

        @Override // org.apache.wayang.core.optimizer.enumeration.StageAssignmentTraversal.InterimStage
        public void markDependenciesUpdated() {
            this.isMarked = true;
        }

        @Override // org.apache.wayang.core.optimizer.enumeration.StageAssignmentTraversal.InterimStage
        public boolean getAndResetSplitMark() {
            boolean z = this.isMarked;
            this.isMarked = false;
            return z;
        }

        public String toString() {
            return String.format("InterimStage%s", getStartTasks());
        }

        @Override // org.apache.wayang.core.optimizer.enumeration.StageAssignmentTraversal.InterimStage
        public ExecutionStage toExecutionStage() {
            Iterator<ExecutionTask> it = this.allTasks.iterator();
            LoopSubplan innermostLoop = it.next().getOperator().getInnermostLoop();
            if (!$assertionsDisabled && !Iterators.allMatch(it, executionTask -> {
                return executionTask.getOperator().getInnermostLoop() == innermostLoop;
            }, true)) {
                throw new AssertionError(String.format("There are different loops in the stage with the tasks %s.", this.allTasks.stream().map(executionTask2 -> {
                    return new Tuple(executionTask2, executionTask2.getOperator().getInnermostLoop());
                }).collect(Collectors.toList())));
            }
            ExecutionStage createStage = this.platformExecution.createStage(innermostLoop != null ? (ExecutionStageLoop) StageAssignmentTraversal.this.stageLoops.computeIfAbsent(innermostLoop, ExecutionStageLoop::new) : null, this.sequenceNumber);
            for (ExecutionTask executionTask3 : this.allTasks) {
                createStage.addTask(executionTask3);
                if (checkIfStartTask(executionTask3)) {
                    createStage.markAsStartTask(executionTask3);
                }
                if (checkIfTerminalTask(executionTask3)) {
                    createStage.markAsTerminalTask(executionTask3);
                }
            }
            if ($assertionsDisabled || !createStage.getTerminalTasks().isEmpty()) {
                return createStage;
            }
            throw new AssertionError(String.format("No terminal tasks among %s.", this.allTasks));
        }

        private boolean checkIfStartTask(ExecutionTask executionTask) {
            for (Channel channel : executionTask.getInputChannels()) {
                if (!checkIfFeedbackChannel(executionTask, channel)) {
                    if (equals(StageAssignmentTraversal.this.assignedInterimStages.get(channel.getProducer()))) {
                        return false;
                    }
                }
            }
            return true;
        }

        private boolean checkIfFeedbackChannel(ExecutionTask executionTask, Channel channel) {
            InputSlot<?> inputSlotFor;
            return executionTask.getOperator().isLoopHead() && (inputSlotFor = executionTask.getInputSlotFor(channel)) != null && inputSlotFor.isFeedback();
        }

        private boolean checkIfTerminalTask(ExecutionTask executionTask) {
            for (Channel channel : executionTask.getOutputChannels()) {
                if (!checkIfFeedforwardChannel(executionTask, channel)) {
                    Iterator<ExecutionTask> it = channel.getConsumers().iterator();
                    while (it.hasNext()) {
                        if (equals(StageAssignmentTraversal.this.assignedInterimStages.get(it.next()))) {
                            return false;
                        }
                    }
                }
            }
            return true;
        }

        private boolean checkIfFeedforwardChannel(ExecutionTask executionTask, Channel channel) {
            OutputSlot<?> outputSlotFor;
            return executionTask.getOperator().isLoopHead() && (outputSlotFor = executionTask.getOutputSlotFor(channel)) != null && outputSlotFor.isFeedforward();
        }

        static {
            $assertionsDisabled = !StageAssignmentTraversal.class.desiredAssertionStatus();
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:org/apache/wayang/core/optimizer/enumeration/StageAssignmentTraversal$StageSplittingCriterion.class */
    public interface StageSplittingCriterion {
        boolean shouldSplit(ExecutionTask executionTask, Channel channel, ExecutionTask executionTask2);
    }

    private StageAssignmentTraversal(ExecutionTaskFlow executionTaskFlow, StageSplittingCriterion... stageSplittingCriterionArr) {
        for (ExecutionTask executionTask : executionTaskFlow.collectAllTasks()) {
            for (int i = 0; i < executionTask.getInputChannels().length; i++) {
                if (executionTask.getInputChannels()[i] == null) {
                    logger.warn("{} does not have an input channel @{}.", executionTask, Integer.valueOf(i));
                }
            }
            for (int i2 = 0; i2 < executionTask.getOutputChannels().length; i2++) {
                if (executionTask.getOutputChannels()[i2] == null) {
                    logger.warn("{} does not have an output channel @{}.", executionTask, Integer.valueOf(i2));
                }
            }
        }
        if (!$assertionsDisabled && !executionTaskFlow.isComplete()) {
            throw new AssertionError();
        }
        this.executionTaskFlow = executionTaskFlow;
        this.splittingCriteria.add(StageAssignmentTraversal::isSuitableForBreakpoint);
        this.splittingCriteria.add(StageAssignmentTraversal::isLoopHeadInvolved);
        this.splittingCriteria.add(StageAssignmentTraversal::isLoopBoarder);
        this.splittingCriteria.addAll(Arrays.asList(stageSplittingCriterionArr));
    }

    public static ExecutionPlan assignStages(ExecutionTaskFlow executionTaskFlow, StageSplittingCriterion... stageSplittingCriterionArr) {
        return new StageAssignmentTraversal(executionTaskFlow, stageSplittingCriterionArr).buildExecutionPlan();
    }

    private static boolean isSuitableForBreakpoint(ExecutionTask executionTask, Channel channel, ExecutionTask executionTask2) {
        return channel.isSuitableForBreakpoint();
    }

    private static boolean isLoopBoarder(ExecutionTask executionTask, Channel channel, ExecutionTask executionTask2) {
        return !executionTask.getOperator().getLoopStack().equals(executionTask2.getOperator().getLoopStack());
    }

    private static boolean isLoopHeadInvolved(ExecutionTask executionTask, Channel channel, ExecutionTask executionTask2) {
        return executionTask.getOperator().isLoopHead() || executionTask2.getOperator().isLoopHead();
    }

    private ExecutionPlan buildExecutionPlan() {
        tryExecute();
        return this.result;
    }

    @Override // org.apache.wayang.core.util.OneTimeExecutable
    protected void doExecute() {
        discoverInitialStages();
        refineStages();
        if (logger.isDebugEnabled()) {
            for (InterimStage interimStage : this.allStages) {
                logger.debug("Final stage {}: {}", interimStage, interimStage.getTasks());
            }
        }
        this.result = assembleExecutionPlan();
    }

    private void discoverInitialStages() {
        HashSet hashSet = new HashSet();
        LinkedList linkedList = new LinkedList(this.executionTaskFlow.getSinkTasks());
        while (!linkedList.isEmpty()) {
            ExecutionTask executionTask = (ExecutionTask) linkedList.poll();
            if (hashSet.add(executionTask)) {
                for (Channel channel : executionTask.getInputChannels()) {
                    if (!shouldVisitProducerOf(channel)) {
                        ExecutionTask producer = channel.getProducer();
                        if (checkIfShouldReusePlatformExecution(producer, channel, executionTask)) {
                            createStageFor(executionTask, producer.getStage().getPlatformExecution());
                        }
                    } else {
                        if (!$assertionsDisabled && channel.getProducer() == null) {
                            throw new AssertionError();
                        }
                        linkedList.add(channel.getProducer());
                    }
                }
            }
        }
        LinkedList linkedList2 = new LinkedList(hashSet);
        while (!linkedList2.isEmpty()) {
            createStageFor((ExecutionTask) linkedList2.poll(), null);
        }
        if (logger.isDebugEnabled()) {
            this.assignedInterimStages.values().stream().distinct().forEach(interimStage -> {
                logger.debug("Established initial stage with {}.", interimStage.getTasks());
            });
        }
        if ($assertionsDisabled) {
            return;
        }
        Stream stream = hashSet.stream();
        Map<ExecutionTask, InterimStage> map = this.assignedInterimStages;
        map.getClass();
        if (!stream.allMatch((v1) -> {
            return r1.containsKey(v1);
        })) {
            throw new AssertionError();
        }
    }

    private boolean checkIfShouldReusePlatformExecution(ExecutionTask executionTask, Channel channel, ExecutionTask executionTask2) {
        Platform platform = executionTask.getPlatform();
        return platform.equals(executionTask2.getPlatform()) && platform.isSinglePlatformExecutionPossible(executionTask, channel, executionTask2);
    }

    private void createStageFor(ExecutionTask executionTask, PlatformExecution platformExecution) {
        if (!$assertionsDisabled && executionTask.getStage() != null) {
            throw new AssertionError(String.format("%s has already stage %s.", executionTask, executionTask.getStage()));
        }
        if (this.assignedInterimStages.containsKey(executionTask)) {
            return;
        }
        if (platformExecution == null) {
            platformExecution = new PlatformExecution(executionTask.getOperator().getPlatform());
        }
        InterimStageImpl interimStageImpl = new InterimStageImpl(this, platformExecution);
        addStage(interimStageImpl);
        assignTaskAndExpand(executionTask, interimStageImpl);
    }

    private void addStage(InterimStage interimStage) {
        this.newStages.add(interimStage);
        this.allStages.add(interimStage);
    }

    private void assignTaskAndExpand(ExecutionTask executionTask, InterimStage interimStage) {
        assign(executionTask, interimStage);
        expandDownstream(executionTask, interimStage);
        expandUpstream(executionTask, interimStage);
    }

    private void assign(ExecutionTask executionTask, InterimStage interimStage) {
        if (!$assertionsDisabled && !executionTask.getOperator().getPlatform().equals(interimStage.getPlatform())) {
            throw new AssertionError();
        }
        interimStage.addTask(executionTask);
        logger.trace("Reassigned {} from {} to {}.", executionTask, this.assignedInterimStages.put(executionTask, interimStage), interimStage);
    }

    private void expandDownstream(ExecutionTask executionTask, InterimStage interimStage) {
        for (Channel channel : executionTask.getOutputChannels()) {
            if (!$assertionsDisabled && channel == null) {
                throw new AssertionError(String.format("%s has null output channels.", executionTask));
            }
            if (channel.isExecutionBreaker()) {
                interimStage.setOutbound(executionTask);
            }
            for (ExecutionTask executionTask2 : channel.getConsumers()) {
                if (this.assignedInterimStages.get(executionTask2) == null) {
                    handleTaskWithoutPlatformExecution(executionTask2, interimStage);
                }
            }
        }
    }

    private void expandUpstream(ExecutionTask executionTask, InterimStage interimStage) {
        for (Channel channel : executionTask.getInputChannels()) {
            if (shouldVisitProducerOf(channel)) {
                ExecutionTask producer = channel.getProducer();
                if (!$assertionsDisabled && producer == null) {
                    throw new AssertionError();
                }
                if (this.assignedInterimStages.get(producer) == null) {
                    handleTaskWithoutPlatformExecution(producer, interimStage);
                }
            }
        }
    }

    private void handleTaskWithoutPlatformExecution(ExecutionTask executionTask, InterimStage interimStage) {
        Platform platform = executionTask.getOperator().getPlatform();
        if (interimStage == null || !platform.equals(interimStage.getPlatform())) {
            return;
        }
        assignTaskAndExpand(executionTask, interimStage);
    }

    private void refineStages() {
        new ArrayList(this.newStages).forEach(this::applySplittingCriteria);
        splitStagesByPrecedence();
    }

    private void applySplittingCriteria(InterimStage interimStage) {
        Set<ExecutionTask> hashSet = new HashSet<>();
        HashSet hashSet2 = new HashSet();
        LinkedList linkedList = new LinkedList(interimStage.getStartTasks());
        while (!linkedList.isEmpty()) {
            ExecutionTask executionTask = (ExecutionTask) linkedList.poll();
            if (hashSet2.add(executionTask)) {
                boolean contains = hashSet.contains(executionTask);
                for (Channel channel : executionTask.getOutputChannels()) {
                    for (ExecutionTask executionTask2 : channel.getConsumers()) {
                        if (this.assignedInterimStages.get(executionTask2) == interimStage) {
                            if (contains || this.splittingCriteria.stream().anyMatch(stageSplittingCriterion -> {
                                return stageSplittingCriterion.shouldSplit(executionTask, channel, executionTask2);
                            })) {
                                if (!executionTask2.isFeedbackInput(channel)) {
                                    hashSet.add(executionTask2);
                                }
                            }
                            linkedList.add(executionTask2);
                        }
                    }
                }
            }
        }
        if (hashSet.isEmpty()) {
            return;
        }
        if (!$assertionsDisabled && hashSet.size() >= interimStage.getTasks().size()) {
            throw new AssertionError(String.format("Cannot separate all tasks from stage with tasks %s.", hashSet));
        }
        HashSet hashSet3 = new HashSet(interimStage.getTasks());
        hashSet3.removeAll(hashSet);
        do {
            applySplittingCriteria(splitStage(interimStage, separateConnectedComponent(hashSet)));
        } while (!hashSet.isEmpty());
        while (true) {
            Set<ExecutionTask> separateConnectedComponent = separateConnectedComponent(hashSet3);
            if (hashSet3.isEmpty()) {
                return;
            } else {
                applySplittingCriteria(splitStage(interimStage, separateConnectedComponent));
            }
        }
    }

    private Set<ExecutionTask> separateConnectedComponent(Set<ExecutionTask> set) {
        if (!$assertionsDisabled && set.isEmpty()) {
            throw new AssertionError();
        }
        LinkedList linkedList = new LinkedList();
        HashSet hashSet = new HashSet(set.size());
        Iterator<ExecutionTask> it = set.iterator();
        linkedList.add(it.next());
        it.remove();
        while (true) {
            ExecutionTask executionTask = (ExecutionTask) linkedList.poll();
            if (executionTask == null) {
                return hashSet;
            }
            hashSet.add(executionTask);
            for (Channel channel : executionTask.getInputChannels()) {
                if (!executionTask.isFeedbackInput(channel)) {
                    ExecutionTask producer = channel.getProducer();
                    if (set.remove(producer)) {
                        linkedList.add(producer);
                    }
                }
            }
            for (Channel channel2 : executionTask.getOutputChannels()) {
                for (ExecutionTask executionTask2 : channel2.getConsumers()) {
                    if (!executionTask2.isFeedbackInput(channel2) && set.remove(executionTask2)) {
                        linkedList.add(executionTask2);
                    }
                }
            }
        }
    }

    private void splitStagesByPrecedence() {
        for (InterimStage interimStage : this.newStages) {
            Iterator<ExecutionTask> it = interimStage.getTasks().iterator();
            while (it.hasNext()) {
                this.requiredStages.computeIfAbsent(it.next(), executionTask -> {
                    return new HashSet(4);
                }).add(interimStage);
            }
        }
        while (!this.newStages.isEmpty()) {
            Iterator<InterimStage> it2 = this.newStages.iterator();
            while (it2.hasNext()) {
                for (ExecutionTask executionTask2 : it2.next().getOutboundTasks()) {
                    HashSet hashSet = new HashSet(this.requiredStages.get(executionTask2));
                    for (Channel channel : executionTask2.getOutputChannels()) {
                        for (ExecutionTask executionTask3 : channel.getConsumers()) {
                            if (!executionTask3.isFeedbackInput(channel)) {
                                updateRequiredStages(executionTask3, hashSet);
                            }
                        }
                    }
                }
            }
            this.newStages.clear();
            new ArrayList(this.allStages).forEach(this::partitionStage);
        }
    }

    private void updateRequiredStages(ExecutionTask executionTask, Set<InterimStage> set) {
        InterimStage interimStage = this.assignedInterimStages.get(executionTask);
        boolean add = set.add(interimStage);
        Set<InterimStage> set2 = this.requiredStages.get(executionTask);
        if (set2.addAll(set)) {
            logger.debug("Updated required stages of {} to {}.", executionTask, set2);
            interimStage.markDependenciesUpdated();
            for (Channel channel : executionTask.getOutputChannels()) {
                for (ExecutionTask executionTask2 : channel.getConsumers()) {
                    if (!executionTask2.isFeedbackInput(channel)) {
                        updateRequiredStages(executionTask2, set);
                    }
                }
            }
        }
        if (add) {
            set.remove(interimStage);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v12, types: [java.util.HashSet, java.util.Set] */
    /* JADX WARN: Type inference failed for: r0v4, types: [java.util.HashSet, java.util.Collection, java.util.Set] */
    /* JADX WARN: Type inference failed for: r4v0, types: [org.apache.wayang.core.optimizer.enumeration.StageAssignmentTraversal] */
    private boolean partitionStage(InterimStage interimStage) {
        if (!interimStage.getAndResetSplitMark()) {
            return false;
        }
        int i = -1;
        LinkedList linkedList = new LinkedList();
        ?? hashSet = new HashSet();
        for (ExecutionTask executionTask : interimStage.getTasks()) {
            Set<InterimStage> set = this.requiredStages.get(executionTask);
            if (i == -1 || set.size() < i) {
                hashSet.addAll(linkedList);
                linkedList.clear();
                i = set.size();
            }
            (i == set.size() ? linkedList : hashSet).add(executionTask);
        }
        if (hashSet.isEmpty()) {
            logger.debug("No separable tasks found in marked stage {}.", interimStage);
            return false;
        }
        ?? hashSet2 = new HashSet(interimStage.getTasks());
        hashSet2.removeAll(hashSet);
        do {
            splitStage(interimStage, separateConnectedComponent(hashSet));
        } while (!hashSet.isEmpty());
        while (true) {
            Set<ExecutionTask> separateConnectedComponent = separateConnectedComponent(hashSet2);
            if (hashSet2.isEmpty()) {
                return true;
            }
            splitStage(interimStage, separateConnectedComponent);
        }
    }

    private InterimStage splitStage(InterimStage interimStage, Set<ExecutionTask> set) {
        if (logger.isDebugEnabled()) {
            HashSet hashSet = new HashSet(interimStage.getTasks());
            hashSet.removeAll(set);
            logger.debug("Separating " + set + " from " + hashSet + "...");
        }
        InterimStage separate = interimStage.separate(set);
        addStage(separate);
        Iterator<ExecutionTask> it = separate.getTasks().iterator();
        while (it.hasNext()) {
            assign(it.next(), separate);
        }
        return separate;
    }

    private ExecutionPlan assembleExecutionPlan() {
        HashMap hashMap = new HashMap(this.allStages.size());
        Iterator<ExecutionTask> it = this.executionTaskFlow.getSinkTasks().iterator();
        while (it.hasNext()) {
            assembleExecutionPlan(hashMap, null, it.next(), new HashSet<>());
        }
        ExecutionPlan executionPlan = new ExecutionPlan();
        Stream<ExecutionStage> filter = hashMap.values().stream().filter((v0) -> {
            return v0.isStartingStage();
        });
        executionPlan.getClass();
        filter.forEach(executionPlan::addStartingStage);
        return executionPlan;
    }

    private void assembleExecutionPlan(Map<InterimStage, ExecutionStage> map, ExecutionStage executionStage, ExecutionTask executionTask, HashSet<Object> hashSet) {
        ExecutionStage computeIfAbsent = map.computeIfAbsent(this.assignedInterimStages.get(executionTask), (v0) -> {
            return v0.toExecutionStage();
        });
        if (executionStage != null && !computeIfAbsent.equals(executionStage) && !computeIfAbsent.getSuccessors().contains(executionStage)) {
            computeIfAbsent.addSuccessor(executionStage);
        }
        if (hashSet.add(executionTask)) {
            for (Channel channel : executionTask.getInputChannels()) {
                if (shouldVisitProducerOf(channel)) {
                    assembleExecutionPlan(map, computeIfAbsent, channel.getProducer(), hashSet);
                }
            }
        }
    }

    private boolean shouldVisitProducerOf(Channel channel) {
        return !channel.isCopy();
    }

    static {
        $assertionsDisabled = !StageAssignmentTraversal.class.desiredAssertionStatus();
        logger = LogManager.getLogger(StageAssignmentTraversal.class);
    }
}
