package org.apache.tez.test;

import com.google.common.collect.Sets;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;

/* loaded from: input_file:org/apache/tez/test/TestProcessor.class */
public class TestProcessor extends AbstractLogicalIOProcessor {
    Configuration conf;
    boolean doFail;
    boolean doRandomFail;
    float randomFailProbability;
    long sleepMs;
    Set<Integer> failingTaskIndices;
    int failingTaskAttemptUpto;
    Integer failAll;
    int verifyValue;
    Set<Integer> verifyTaskIndices;
    private static final Log LOG = LogFactory.getLog(TestProcessor.class);
    public static String TEZ_FAILING_PROCESSOR_DO_FAIL = "tez.failing-processor.do-fail";
    public static String TEZ_FAILING_PROCESSOR_DO_RANDOM_FAIL = "tez.failing-processor.do-random-fail";
    public static String TEZ_FAILING_PROCESSOR_RANDOM_FAIL_PROBABILITY = "tez.failing-processor.random-fail-probability";
    public static String TEZ_FAILING_PROCESSOR_SLEEP_MS = "tez.failing-processor.sleep-ms";
    public static String TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX = "tez.failing-processor.failing-task-index";
    public static String TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT = "tez.failing-processor.failing-upto-task-attempt";
    public static String TEZ_FAILING_PROCESSOR_VERIFY_VALUE = "tez.failing-processor.verify-value";
    public static String TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX = "tez.failing-processor.verify-task-index";

    public TestProcessor(ProcessorContext processorContext) {
        super(processorContext);
        this.doFail = false;
        this.doRandomFail = false;
        this.randomFailProbability = 0.0f;
        this.failingTaskIndices = Sets.newHashSet();
        this.failingTaskAttemptUpto = 0;
        this.failAll = new Integer(-1);
        this.verifyValue = -1;
        this.verifyTaskIndices = Sets.newHashSet();
    }

    public static ProcessorDescriptor getProcDesc(UserPayload userPayload) {
        return ProcessorDescriptor.create(TestProcessor.class.getName()).setUserPayload(userPayload == null ? UserPayload.create((ByteBuffer) null) : userPayload);
    }

    void throwException(String str) {
        RuntimeException runtimeException = new RuntimeException(str);
        getContext().fatalError(runtimeException, str);
        throw runtimeException;
    }

    public static String getVertexConfName(String str, String str2) {
        return str + "." + str2;
    }

    public static String getVertexConfName(String str, String str2, int i) {
        return str + "." + str2 + "." + String.valueOf(i);
    }

    public void initialize() throws Exception {
        if (getContext().getUserPayload() == null || !getContext().getUserPayload().hasPayload()) {
            return;
        }
        String taskVertexName = getContext().getTaskVertexName();
        this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
        this.verifyValue = this.conf.getInt(getVertexConfName(TEZ_FAILING_PROCESSOR_VERIFY_VALUE, taskVertexName, getContext().getTaskIndex()), -1);
        if (this.verifyValue != -1) {
            LOG.info("Verify value: " + this.verifyValue);
            for (String str : this.conf.getTrimmedStringCollection(getVertexConfName(TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, taskVertexName))) {
                LOG.info("Adding verify task index: " + str);
                this.verifyTaskIndices.add(Integer.valueOf(str));
            }
        }
        this.doFail = this.conf.getBoolean(getVertexConfName(TEZ_FAILING_PROCESSOR_DO_FAIL, taskVertexName), false);
        this.sleepMs = this.conf.getLong(getVertexConfName(TEZ_FAILING_PROCESSOR_SLEEP_MS, taskVertexName), 0L);
        LOG.info("doFail: " + this.doFail);
        if (this.doFail) {
            for (String str2 : this.conf.getTrimmedStringCollection(getVertexConfName(TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, taskVertexName))) {
                LOG.info("Adding failing task index: " + str2);
                this.failingTaskIndices.add(Integer.valueOf(str2));
            }
            this.failingTaskAttemptUpto = this.conf.getInt(getVertexConfName(TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, taskVertexName), 0);
            LOG.info("Adding failing attempt : " + this.failingTaskAttemptUpto + " dag: " + getContext().getDAGName());
        }
        this.doRandomFail = this.conf.getBoolean(TEZ_FAILING_PROCESSOR_DO_RANDOM_FAIL, false);
        this.randomFailProbability = this.conf.getFloat(TEZ_FAILING_PROCESSOR_RANDOM_FAIL_PROBABILITY, 0.0f);
        LOG.info("doRandomFail: " + this.doRandomFail);
        LOG.info("randomFailProbability: " + this.randomFailProbability);
    }

    public void handleEvents(List<Event> list) {
    }

    public void close() throws Exception {
    }

    public void run(Map<String, LogicalInput> map, Map<String, LogicalOutput> map2) throws Exception {
        LOG.info("Sleeping ms: " + this.sleepMs);
        Iterator<LogicalInput> it = map.values().iterator();
        while (it.hasNext()) {
            it.next().start();
        }
        Iterator<LogicalOutput> it2 = map2.values().iterator();
        while (it2.hasNext()) {
            it2.next().start();
        }
        Thread.sleep(this.sleepMs);
        if (this.doRandomFail) {
            int taskAttemptNumber = getContext().getTaskAttemptNumber();
            int i = this.conf.getInt("tez.am.task.max.failed.attempts", 4);
            if (taskAttemptNumber < i - 1) {
                float random = (float) Math.random();
                LOG.info("FailingProcessor random fail turned on. Do a roll: " + getContext().getUniqueIdentifier() + " dag: " + getContext().getDAGName() + " taskIndex: " + getContext().getTaskIndex() + " taskAttempt: " + taskAttemptNumber + " maxFailedAttempt: " + i + " rollNumber: " + random + " randomFailProbability " + this.randomFailProbability);
                if (random < this.randomFailProbability) {
                    LOG.info("FailingProcessor: rollNumber < randomFailProbability. Do fail.");
                    throwException("FailingProcessor: rollNumber < randomFailProbability. Do fail.");
                }
            }
        } else if (this.doFail && ((this.failingTaskIndices.contains(this.failAll) || this.failingTaskIndices.contains(Integer.valueOf(getContext().getTaskIndex()))) && (this.failingTaskAttemptUpto == this.failAll.intValue() || this.failingTaskAttemptUpto >= getContext().getTaskAttemptNumber()))) {
            String str = "FailingProcessor: " + getContext().getUniqueIdentifier() + " dag: " + getContext().getDAGName() + " taskIndex: " + getContext().getTaskIndex() + " taskAttempt: " + getContext().getTaskAttemptNumber();
            LOG.info(str);
            throwException(str);
        }
        if (map.entrySet().size() > 0) {
            LOG.info("Reading input of current FailingProcessor: " + getContext().getUniqueIdentifier() + " dag: " + getContext().getDAGName() + " vertex: " + getContext().getTaskVertexName() + " taskIndex: " + getContext().getTaskIndex() + " taskAttempt: " + getContext().getTaskAttemptNumber());
        }
        int taskAttemptNumber2 = getContext().getTaskAttemptNumber() + 1;
        LOG.info("initializing vertex= " + getContext().getTaskVertexName() + " taskIndex: " + getContext().getTaskIndex() + " taskAttempt: " + getContext().getTaskAttemptNumber() + " sum= " + taskAttemptNumber2);
        for (Map.Entry<String, LogicalInput> entry : map.entrySet()) {
            if (entry.getValue() instanceof TestInput) {
                int doRead = entry.getValue().doRead();
                LOG.info("Reading input: " + entry.getKey() + " inputValue= " + doRead);
                taskAttemptNumber2 += doRead;
            } else {
                LOG.info("Ignoring non TestInput: " + entry.getKey() + " inputClass= " + entry.getValue().getClass().getSimpleName());
            }
        }
        if (map2.entrySet().size() > 0) {
            LOG.info("Writing output of current FailingProcessor: " + getContext().getUniqueIdentifier() + " dag: " + getContext().getDAGName() + " vertex: " + getContext().getTaskVertexName() + " taskIndex: " + getContext().getTaskIndex() + " taskAttempt: " + getContext().getTaskAttemptNumber());
        }
        for (Map.Entry<String, LogicalOutput> entry2 : map2.entrySet()) {
            if (entry2.getValue() instanceof TestOutput) {
                LOG.info("Writing output: " + entry2.getKey() + " sum= " + taskAttemptNumber2);
                entry2.getValue().write(taskAttemptNumber2);
            } else {
                LOG.info("Ignoring non TestOutput: " + entry2.getKey() + " outputClass= " + entry2.getValue().getClass().getSimpleName());
            }
        }
        LOG.info("Output for DAG: " + getContext().getDAGName() + " vertex: " + getContext().getTaskVertexName() + " task: " + getContext().getTaskIndex() + " attempt: " + getContext().getTaskAttemptNumber() + " is: " + taskAttemptNumber2);
        if (this.verifyTaskIndices.contains(new Integer(getContext().getTaskIndex()))) {
            if (this.verifyValue == -1 || this.verifyValue == taskAttemptNumber2) {
                LOG.info("Verified output for DAG: " + getContext().getDAGName() + " vertex: " + getContext().getTaskVertexName() + " task: " + getContext().getTaskIndex() + " attempt: " + getContext().getTaskAttemptNumber() + " is: " + taskAttemptNumber2);
            } else {
                throwException(("Expected output mismatch of current FailingProcessor: " + getContext().getUniqueIdentifier() + " dag: " + getContext().getDAGName() + " vertex: " + getContext().getTaskVertexName() + " taskIndex: " + getContext().getTaskIndex() + " taskAttempt: " + getContext().getTaskAttemptNumber()) + "\nExpected output: " + this.verifyValue + " got: " + taskAttemptNumber2);
            }
        }
    }
}
