package org.apache.wayang.core.platform;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import org.apache.wayang.core.api.Job;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.executionplan.Channel;
import org.apache.wayang.core.plan.executionplan.ExecutionStage;
import org.apache.wayang.core.plan.executionplan.ExecutionTask;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.plan.wayangplan.InputSlot;
import org.apache.wayang.core.plan.wayangplan.LoopHeadOperator;
import org.apache.wayang.core.util.OneTimeExecutable;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.core.util.WayangCollections;

/* loaded from: input_file:org/apache/wayang/core/platform/PushExecutorTemplate.class */
public abstract class PushExecutorTemplate extends ExecutorTemplate {
    protected final Job job;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/wayang/core/platform/PushExecutorTemplate$StageExecution.class */
    protected class StageExecution extends OneTimeExecutable {
        private final Map<ExecutionTask, TaskActivator> stagedActivators;
        private final Queue<TaskActivator> readyActivators;
        private final Set<ExecutionTask> terminalTasks;
        private final Collection<ChannelInstance> allChannelInstances;
        private final ExecutionState executionState;
        private final OptimizationContext optimizationContext;
        static final /* synthetic */ boolean $assertionsDisabled;

        private StageExecution(ExecutionStage executionStage, OptimizationContext optimizationContext, ExecutionState executionState) {
            this.stagedActivators = new HashMap();
            this.readyActivators = new LinkedList();
            this.allChannelInstances = new LinkedList();
            this.executionState = executionState;
            this.optimizationContext = optimizationContext;
            if (!$assertionsDisabled && executionStage.getStartTasks().isEmpty()) {
                throw new AssertionError(String.format("No start tasks for %s.", executionStage));
            }
            executionStage.getStartTasks().forEach(this::scheduleStartTask);
            this.terminalTasks = WayangCollections.asSet((Collection) executionStage.getTerminalTasks());
        }

        private void scheduleStartTask(ExecutionTask executionTask) {
            TaskActivator taskActivator = new TaskActivator(executionTask, fetchOperatorContext(executionTask), this.executionState);
            if (!$assertionsDisabled && !taskActivator.isReady()) {
                throw new AssertionError(String.format("Stage starter %s is not immediately ready.", executionTask));
            }
            this.readyActivators.add(taskActivator);
        }

        private OptimizationContext.OperatorContext fetchOperatorContext(ExecutionTask executionTask) {
            OptimizationContext.OperatorContext operatorContext = this.optimizationContext.getOperatorContext(executionTask.getOperator());
            if (operatorContext == null) {
                PushExecutorTemplate.this.logger.warn("No OperatorContext for {} available.", executionTask);
            }
            return operatorContext;
        }

        void executeStage() {
            execute();
            updateExecutionState();
        }

        @Override // org.apache.wayang.core.util.OneTimeExecutable
        protected void doExecute() {
            while (true) {
                TaskActivator poll = this.readyActivators.poll();
                if (poll == null) {
                    return;
                }
                ExecutionTask task = poll.getTask();
                Tuple<List<ChannelInstance>, PartialExecution> execute = execute(poll, task);
                poll.dispose();
                List<ChannelInstance> field0 = execute.getField0();
                field0.stream().filter((v0) -> {
                    return Objects.nonNull(v0);
                }).forEach(this::store);
                PartialExecution field1 = execute.getField1();
                if (field1 != null) {
                    this.executionState.add(field1);
                }
                activateSuccessorTasks(task, field0);
                field0.stream().filter((v0) -> {
                    return Objects.nonNull(v0);
                }).forEach((v0) -> {
                    v0.disposeIfUnreferenced();
                });
            }
        }

        private Tuple<List<ChannelInstance>, PartialExecution> execute(TaskActivator taskActivator, ExecutionTask executionTask) {
            return executor().execute(taskActivator, this.terminalTasks.contains(executionTask));
        }

        private void store(ChannelInstance channelInstance) {
            this.allChannelInstances.add(channelInstance);
            channelInstance.noteObtainedReference();
        }

        private void activateSuccessorTasks(ExecutionTask executionTask, Collection<ChannelInstance> collection) {
            for (ChannelInstance channelInstance : collection) {
                if (channelInstance != null) {
                    for (ExecutionTask executionTask2 : channelInstance.getChannel().getConsumers()) {
                        if (executionTask2.getStage() == executionTask.getStage() && !executionTask2.getOperator().isLoopHead()) {
                            TaskActivator computeIfAbsent = this.stagedActivators.computeIfAbsent(executionTask2, executionTask3 -> {
                                return new TaskActivator(executionTask3, fetchOperatorContext(executionTask3), this.executionState);
                            });
                            computeIfAbsent.accept(channelInstance);
                            if (computeIfAbsent.isReady()) {
                                this.stagedActivators.remove(executionTask2);
                                this.readyActivators.add(computeIfAbsent);
                            }
                        }
                    }
                }
            }
        }

        private PushExecutorTemplate executor() {
            return PushExecutorTemplate.this;
        }

        private void updateExecutionState() {
            for (ChannelInstance channelInstance : this.allChannelInstances) {
                if (channelInstance.getChannel().isBetweenStages() || channelInstance.getChannel().getConsumers().stream().anyMatch(executionTask -> {
                    return executionTask.getOperator().isLoopHead();
                })) {
                    this.executionState.register(channelInstance);
                }
                channelInstance.noteDiscardedReference(true);
            }
            this.allChannelInstances.clear();
        }

        static {
            $assertionsDisabled = !PushExecutorTemplate.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/wayang/core/platform/PushExecutorTemplate$TaskActivator.class */
    public class TaskActivator {
        private final ExecutionTask task;
        private final ArrayList<ChannelInstance> inputChannelInstances;
        private final OptimizationContext.OperatorContext operatorContext;
        static final /* synthetic */ boolean $assertionsDisabled;

        TaskActivator(ExecutionTask executionTask, OptimizationContext.OperatorContext operatorContext, ExecutionState executionState) {
            if (!$assertionsDisabled && operatorContext != null && executionTask.getOperator() != operatorContext.getOperator()) {
                throw new AssertionError(String.format("Mismatch between %s and %s.", executionTask, operatorContext));
            }
            this.task = executionTask;
            this.operatorContext = operatorContext;
            this.inputChannelInstances = WayangCollections.createNullFilledArrayList(getOperator().getNumInputs());
            acceptFrom(executionState);
        }

        private void acceptFrom(ExecutionState executionState) {
            for (int i = 0; i < this.task.getNumInputChannels(); i++) {
                ChannelInstance channelInstance = executionState.getChannelInstance(this.task.getInputChannel(i));
                if (channelInstance != null) {
                    accept(channelInstance);
                }
            }
        }

        public void accept(ChannelInstance channelInstance) {
            int index;
            Channel channel = channelInstance.getChannel();
            if (this.task.getOperator().getNumInputs() != 0) {
                InputSlot<?> inputSlotFor = this.task.getInputSlotFor(channel);
                if (!$assertionsDisabled && inputSlotFor == null) {
                    throw new AssertionError(String.format("Could not identify an InputSlot in %s for %s.", this.task, channelInstance));
                }
                index = inputSlotFor.getIndex();
            } else {
                if (!$assertionsDisabled && !this.inputChannelInstances.isEmpty()) {
                    throw new AssertionError();
                }
                this.inputChannelInstances.add(null);
                index = 0;
            }
            if (!$assertionsDisabled && this.inputChannelInstances.get(index) != null) {
                throw new AssertionError(String.format("Tried to replace %s with %s as %dth input of %s.", this.inputChannelInstances.get(index), channelInstance, Integer.valueOf(index), this.task));
            }
            this.inputChannelInstances.set(index, channelInstance);
            channelInstance.noteObtainedReference();
        }

        public ExecutionTask getTask() {
            return this.task;
        }

        protected ExecutionOperator getOperator() {
            return this.task.getOperator();
        }

        public OptimizationContext.OperatorContext getOperatorContext() {
            return this.operatorContext;
        }

        protected List<ChannelInstance> getInputChannelInstances() {
            return this.inputChannelInstances;
        }

        protected void dispose() {
            Iterator<ChannelInstance> it = this.inputChannelInstances.iterator();
            while (it.hasNext()) {
                ChannelInstance next = it.next();
                if (next != null) {
                    next.noteDiscardedReference(true);
                }
            }
        }

        protected boolean isReady() {
            return isLoopInitializationReady() || isLoopIterationReady() || isPlainReady();
        }

        protected boolean isPlainReady() {
            for (InputSlot<?> inputSlot : getOperator().getAllInputs()) {
                if (this.inputChannelInstances.get(inputSlot.getIndex()) == null) {
                    return false;
                }
            }
            return true;
        }

        protected boolean isLoopInitializationReady() {
            if (!getOperator().isLoopHead()) {
                return false;
            }
            LoopHeadOperator loopHeadOperator = (LoopHeadOperator) getOperator();
            if (loopHeadOperator.getState() != LoopHeadOperator.State.NOT_STARTED) {
                return false;
            }
            Iterator<InputSlot<?>> it = loopHeadOperator.getLoopInitializationInputs().iterator();
            while (it.hasNext()) {
                if (this.inputChannelInstances.get(it.next().getIndex()) == null) {
                    return false;
                }
            }
            return true;
        }

        protected boolean isLoopIterationReady() {
            if (!getOperator().isLoopHead()) {
                return false;
            }
            LoopHeadOperator loopHeadOperator = (LoopHeadOperator) getOperator();
            if (loopHeadOperator.getState() != LoopHeadOperator.State.RUNNING) {
                return false;
            }
            Iterator<InputSlot<?>> it = loopHeadOperator.getLoopBodyInputs().iterator();
            while (it.hasNext()) {
                if (this.inputChannelInstances.get(it.next().getIndex()) == null) {
                    return false;
                }
            }
            return true;
        }

        static {
            $assertionsDisabled = !PushExecutorTemplate.class.desiredAssertionStatus();
        }
    }

    public PushExecutorTemplate(Job job) {
        super(job == null ? null : job.getCrossPlatformExecutor());
        this.job = job;
    }

    @Override // org.apache.wayang.core.platform.Executor
    public void execute(ExecutionStage executionStage, OptimizationContext optimizationContext, ExecutionState executionState) {
        if (!$assertionsDisabled && isDisposed()) {
            throw new AssertionError(String.format("%s has been disposed.", this));
        }
        new StageExecution(executionStage, optimizationContext, executionState).executeStage();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Tuple<List<ChannelInstance>, PartialExecution> execute(TaskActivator taskActivator, boolean z) {
        return execute(taskActivator.getTask(), taskActivator.getInputChannelInstances(), taskActivator.getOperatorContext(), z);
    }

    protected abstract Tuple<List<ChannelInstance>, PartialExecution> execute(ExecutionTask executionTask, List<ChannelInstance> list, OptimizationContext.OperatorContext operatorContext, boolean z);

    public Job getJob() {
        return this.job;
    }

    static {
        $assertionsDisabled = !PushExecutorTemplate.class.desiredAssertionStatus();
    }
}
