package org.apache.tez.runtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.RunnableWithNdc;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.runtime.RuntimeTask;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.InputFrameworkInterface;
import org.apache.tez.runtime.api.LogicalIOProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.MergedInputContext;
import org.apache.tez.runtime.api.MergedLogicalInput;
import org.apache.tez.runtime.api.ObjectRegistry;
import org.apache.tez.runtime.api.Output;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.OutputFrameworkInterface;
import org.apache.tez.runtime.api.Processor;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.TaskContext;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezInputContextImpl;
import org.apache.tez.runtime.api.impl.TezMergedInputContextImpl;
import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
import org.apache.tez.runtime.api.impl.TezProcessorContextImpl;
import org.apache.tez.runtime.api.impl.TezUmbilical;
import org.apache.tez.runtime.common.resources.MemoryDistributor;
import org.apache.tez.runtime.internals.api.events.SystemEventProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.class */
public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
    private static final Logger LOG = LoggerFactory.getLogger(LogicalIOProcessorRuntimeTask.class);

    @VisibleForTesting
    private final String[] localDirs;
    final List<InputSpec> inputSpecs;
    final ConcurrentMap<String, LogicalInput> inputsMap;
    final ConcurrentMap<String, InputContext> inputContextMap;
    final List<OutputSpec> outputSpecs;
    final ConcurrentMap<String, LogicalOutput> outputsMap;
    final ConcurrentMap<String, OutputContext> outputContextMap;
    final List<GroupInputSpec> groupInputSpecs;
    ConcurrentHashMap<String, MergedLogicalInput> groupInputsMap;
    final ConcurrentHashMap<String, LogicalInput> initializedInputs;
    final ConcurrentHashMap<String, LogicalOutput> initializedOutputs;
    private boolean processorClosed;
    final ProcessorDescriptor processorDescriptor;
    AbstractLogicalIOProcessor processor;
    ProcessorContext processorContext;
    private final MemoryDistributor initialMemoryDistributor;
    final LinkedHashMap<String, LogicalInput> runInputMap;
    final LinkedHashMap<String, LogicalOutput> runOutputMap;
    private final Map<String, ByteBuffer> serviceConsumerMetadata;
    private final Map<String, String> envMap;
    final ExecutorService initializerExecutor;
    private final CompletionService<Void> initializerCompletionService;
    private final Multimap<String, String> startedInputsMap;
    LinkedBlockingQueue<TezEvent> eventsToBeProcessed;
    Thread eventRouterThread;
    private final int appAttemptNumber;
    private volatile InputReadyTracker inputReadyTracker;
    private volatile ObjectRegistry objectRegistry;
    private final ExecutionContext ExecutionContext;
    private final long memAvailable;
    private final HadoopShim hadoopShim;
    private final int maxEventBacklog;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.tez.runtime.LogicalIOProcessorRuntimeTask$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/tez/runtime/LogicalIOProcessorRuntimeTask$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$tez$runtime$api$impl$EventMetaData$EventProducerConsumerType = new int[EventMetaData.EventProducerConsumerType.values().length];

        static {
            try {
                $SwitchMap$org$apache$tez$runtime$api$impl$EventMetaData$EventProducerConsumerType[EventMetaData.EventProducerConsumerType.INPUT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$tez$runtime$api$impl$EventMetaData$EventProducerConsumerType[EventMetaData.EventProducerConsumerType.OUTPUT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$tez$runtime$api$impl$EventMetaData$EventProducerConsumerType[EventMetaData.EventProducerConsumerType.PROCESSOR.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$tez$runtime$api$impl$EventMetaData$EventProducerConsumerType[EventMetaData.EventProducerConsumerType.SYSTEM.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:org/apache/tez/runtime/LogicalIOProcessorRuntimeTask$InitializeInputCallable.class */
    private class InitializeInputCallable extends CallableWithNdc<Void> {
        private final InputSpec inputSpec;
        private final int inputIndex;

        public InitializeInputCallable(InputSpec inputSpec, int i) {
            this.inputSpec = inputSpec;
            this.inputIndex = i;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: callInternal, reason: merged with bridge method [inline-methods] */
        public Void m3callInternal() throws Exception {
            String name = Thread.currentThread().getName();
            try {
                Thread.currentThread().setName(name + " Initialize: {" + this.inputSpec.getSourceVertexName() + "}");
                Void _callInternal = _callInternal();
                Thread.currentThread().setName(name);
                return _callInternal;
            } catch (Throwable th) {
                Thread.currentThread().setName(name);
                throw th;
            }
        }

        protected Void _callInternal() throws Exception {
            if (LogicalIOProcessorRuntimeTask.LOG.isDebugEnabled()) {
                LogicalIOProcessorRuntimeTask.LOG.debug("Initializing Input using InputSpec: " + this.inputSpec);
            }
            String sourceVertexName = this.inputSpec.getSourceVertexName();
            InputContext createInputContext = LogicalIOProcessorRuntimeTask.this.createInputContext(LogicalIOProcessorRuntimeTask.this.inputsMap, this.inputSpec, this.inputIndex);
            LogicalInput createInput = LogicalIOProcessorRuntimeTask.this.createInput(this.inputSpec, createInputContext);
            LogicalIOProcessorRuntimeTask.this.inputsMap.put(sourceVertexName, createInput);
            LogicalIOProcessorRuntimeTask.this.inputContextMap.put(sourceVertexName, createInputContext);
            LogicalIOProcessorRuntimeTask.this.sendTaskGeneratedEvents(((InputFrameworkInterface) createInput).initialize(), EventMetaData.EventProducerConsumerType.INPUT, createInputContext.getTaskVertexName(), createInputContext.getSourceVertexName(), LogicalIOProcessorRuntimeTask.this.taskSpec.getTaskAttemptID());
            LogicalIOProcessorRuntimeTask.this.initializedInputs.put(sourceVertexName, createInput);
            if (LogicalIOProcessorRuntimeTask.LOG.isDebugEnabled()) {
                LogicalIOProcessorRuntimeTask.LOG.debug("Initialized Input with src edge: " + sourceVertexName);
            }
            LogicalIOProcessorRuntimeTask.this.initializedInputs.put(sourceVertexName, createInput);
            return null;
        }
    }

    /* loaded from: input_file:org/apache/tez/runtime/LogicalIOProcessorRuntimeTask$InitializeOutputCallable.class */
    private class InitializeOutputCallable extends CallableWithNdc<Void> {
        private final OutputSpec outputSpec;
        private final int outputIndex;

        public InitializeOutputCallable(OutputSpec outputSpec, int i) {
            this.outputSpec = outputSpec;
            this.outputIndex = i;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: callInternal, reason: merged with bridge method [inline-methods] */
        public Void m4callInternal() throws Exception {
            String name = Thread.currentThread().getName();
            try {
                Thread.currentThread().setName(name + " Initialize: {" + this.outputSpec.getDestinationVertexName() + "}");
                Void _callInternal = _callInternal();
                Thread.currentThread().setName(name);
                return _callInternal;
            } catch (Throwable th) {
                Thread.currentThread().setName(name);
                throw th;
            }
        }

        protected Void _callInternal() throws Exception {
            if (LogicalIOProcessorRuntimeTask.LOG.isDebugEnabled()) {
                LogicalIOProcessorRuntimeTask.LOG.debug("Initializing Output using OutputSpec: " + this.outputSpec);
            }
            String destinationVertexName = this.outputSpec.getDestinationVertexName();
            OutputContext createOutputContext = LogicalIOProcessorRuntimeTask.this.createOutputContext(this.outputSpec, this.outputIndex);
            LogicalOutput createOutput = LogicalIOProcessorRuntimeTask.this.createOutput(this.outputSpec, createOutputContext);
            LogicalIOProcessorRuntimeTask.this.outputsMap.put(destinationVertexName, createOutput);
            LogicalIOProcessorRuntimeTask.this.outputContextMap.put(destinationVertexName, createOutputContext);
            LogicalIOProcessorRuntimeTask.this.sendTaskGeneratedEvents(((OutputFrameworkInterface) createOutput).initialize(), EventMetaData.EventProducerConsumerType.OUTPUT, createOutputContext.getTaskVertexName(), createOutputContext.getDestinationVertexName(), LogicalIOProcessorRuntimeTask.this.taskSpec.getTaskAttemptID());
            LogicalIOProcessorRuntimeTask.this.initializedOutputs.put(destinationVertexName, createOutput);
            if (LogicalIOProcessorRuntimeTask.LOG.isDebugEnabled()) {
                LogicalIOProcessorRuntimeTask.LOG.debug("Initialized Output with dest edge: " + destinationVertexName);
            }
            LogicalIOProcessorRuntimeTask.this.initializedOutputs.put(destinationVertexName, createOutput);
            return null;
        }
    }

    /* loaded from: input_file:org/apache/tez/runtime/LogicalIOProcessorRuntimeTask$StartInputCallable.class */
    private static class StartInputCallable extends CallableWithNdc<Void> {
        private final LogicalInput input;
        private final String srcVertexName;

        public StartInputCallable(LogicalInput logicalInput, String str) {
            this.input = logicalInput;
            this.srcVertexName = str;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: callInternal, reason: merged with bridge method [inline-methods] */
        public Void m5callInternal() throws Exception {
            String name = Thread.currentThread().getName();
            try {
                Thread.currentThread().setName(name + " Start: {" + this.srcVertexName + "}");
                Void _callInternal = _callInternal();
                Thread.currentThread().setName(name);
                return _callInternal;
            } catch (Throwable th) {
                Thread.currentThread().setName(name);
                throw th;
            }
        }

        protected Void _callInternal() throws Exception {
            if (LogicalIOProcessorRuntimeTask.LOG.isDebugEnabled()) {
                LogicalIOProcessorRuntimeTask.LOG.debug("Starting Input with src edge: " + this.srcVertexName);
            }
            this.input.start();
            LogicalIOProcessorRuntimeTask.LOG.info("Started Input with src edge: " + this.srcVertexName);
            return null;
        }
    }

    public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int i, Configuration configuration, String[] strArr, TezUmbilical tezUmbilical, Map<String, ByteBuffer> map, Map<String, String> map2, Multimap<String, String> multimap, ObjectRegistry objectRegistry, String str, ExecutionContext executionContext, long j, boolean z, HadoopShim hadoopShim) throws IOException {
        super(taskSpec, configuration, tezUmbilical, str, z);
        this.processorClosed = false;
        this.eventRouterThread = null;
        LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: " + taskSpec);
        int size = taskSpec.getInputs().size();
        int size2 = taskSpec.getOutputs().size();
        this.localDirs = strArr;
        this.inputSpecs = taskSpec.getInputs();
        this.inputsMap = new ConcurrentHashMap(size);
        this.inputContextMap = new ConcurrentHashMap(size);
        this.outputSpecs = taskSpec.getOutputs();
        this.outputsMap = new ConcurrentHashMap(size2);
        this.outputContextMap = new ConcurrentHashMap(size2);
        this.runInputMap = new LinkedHashMap<>();
        this.runOutputMap = new LinkedHashMap<>();
        this.initializedInputs = new ConcurrentHashMap<>();
        this.initializedOutputs = new ConcurrentHashMap<>();
        this.processorDescriptor = taskSpec.getProcessorDescriptor();
        this.serviceConsumerMetadata = map;
        this.envMap = map2;
        this.eventsToBeProcessed = new LinkedBlockingQueue<>();
        this.state.set(RuntimeTask.State.NEW);
        this.appAttemptNumber = i;
        int i2 = size + size2;
        this.initializerExecutor = Executors.newFixedThreadPool(i2 == 0 ? 1 : i2, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("I/O Setup %d").build());
        this.initializerCompletionService = new ExecutorCompletionService(this.initializerExecutor);
        this.groupInputSpecs = taskSpec.getGroupInputs();
        this.initialMemoryDistributor = new MemoryDistributor(size, size2, configuration);
        this.startedInputsMap = multimap;
        this.inputReadyTracker = new InputReadyTracker();
        this.objectRegistry = objectRegistry;
        this.ExecutionContext = executionContext;
        this.memAvailable = j;
        this.hadoopShim = hadoopShim;
        this.maxEventBacklog = configuration.getInt("tez.task.max-event-backlog", 10000);
    }

    public void initialize() throws Exception {
        Preconditions.checkState(this.state.get() == RuntimeTask.State.NEW, "Already initialized");
        this.state.set(RuntimeTask.State.INITED);
        this.processorContext = createProcessorContext();
        this.processor = createProcessor(this.processorDescriptor.getClassName(), this.processorContext);
        int i = 0;
        int i2 = 0;
        Iterator<InputSpec> it = this.taskSpec.getInputs().iterator();
        while (it.hasNext()) {
            int i3 = i2;
            i2++;
            this.initializerCompletionService.submit(new InitializeInputCallable(it.next(), i3));
            i++;
        }
        int i4 = 0;
        Iterator<OutputSpec> it2 = this.taskSpec.getOutputs().iterator();
        while (it2.hasNext()) {
            int i5 = i4;
            i4++;
            this.initializerCompletionService.submit(new InitializeOutputCallable(it2.next(), i5));
            i++;
        }
        initializeLogicalIOProcessor();
        for (int i6 = 0; i6 < i; i6++) {
            LOG.info("Waiting for " + (i - i6) + " initializers to finish");
            try {
                this.initializerCompletionService.take().get();
            } catch (ExecutionException e) {
                if (!(e.getCause() instanceof Exception)) {
                    throw new Exception(e);
                }
                throw ((Exception) e.getCause());
            }
        }
        LOG.info("All initializers finished");
        initializeGroupInputs();
        this.inputReadyTracker.setGroupedInputs(this.groupInputsMap == null ? null : this.groupInputsMap.values());
        HashSet newHashSet = Sets.newHashSet();
        if (this.groupInputSpecs != null && !this.groupInputSpecs.isEmpty()) {
            for (GroupInputSpec groupInputSpec : this.groupInputSpecs) {
                this.runInputMap.put(groupInputSpec.getGroupName(), this.groupInputsMap.get(groupInputSpec.getGroupName()));
                newHashSet.addAll(groupInputSpec.getGroupVertices());
            }
        }
        this.initialMemoryDistributor.makeInitialAllocations();
        LOG.info("Starting Inputs/Outputs");
        int i7 = 0;
        for (InputSpec inputSpec : this.inputSpecs) {
            if (newHashSet.contains(inputSpec.getSourceVertexName())) {
                LOG.info("Ignoring " + inputSpec.getSourceVertexName() + " for start, since it will be controlled via it's Group");
            } else if (!inputAlreadyStarted(this.taskSpec.getVertexName(), inputSpec.getSourceVertexName())) {
                this.startedInputsMap.put(this.taskSpec.getVertexName(), inputSpec.getSourceVertexName());
                i7++;
                this.initializerCompletionService.submit(new StartInputCallable(this.inputsMap.get(inputSpec.getSourceVertexName()), inputSpec.getSourceVertexName()));
                LOG.info("Input: " + inputSpec.getSourceVertexName() + " being auto started by the framework. Subsequent instances will not be auto-started");
            }
        }
        if (this.groupInputSpecs != null) {
            for (GroupInputSpec groupInputSpec2 : this.groupInputSpecs) {
                if (!inputAlreadyStarted(this.taskSpec.getVertexName(), groupInputSpec2.getGroupName())) {
                    i7++;
                    this.initializerCompletionService.submit(new StartInputCallable(this.groupInputsMap.get(groupInputSpec2.getGroupName()), groupInputSpec2.getGroupName()));
                    LOG.info("InputGroup: " + groupInputSpec2.getGroupName() + " being auto started by the framework. Subsequent instance will not be auto-started");
                }
            }
        }
        this.initializerExecutor.shutdown();
        LOG.info("Num IOs determined for AutoStart: " + i7);
        for (int i8 = 0; i8 < i7; i8++) {
            LOG.info("Waiting for " + (i7 - i8) + " IOs to start");
            try {
                this.initializerCompletionService.take().get();
            } catch (ExecutionException e2) {
                if (!(e2.getCause() instanceof Exception)) {
                    throw new Exception(e2);
                }
                throw ((Exception) e2.getCause());
            }
        }
        LOG.info("AutoStartComplete");
        for (InputSpec inputSpec2 : this.inputSpecs) {
            if (!newHashSet.contains(inputSpec2.getSourceVertexName())) {
                this.runInputMap.put(inputSpec2.getSourceVertexName(), this.inputsMap.get(inputSpec2.getSourceVertexName()));
            }
        }
        for (OutputSpec outputSpec : this.outputSpecs) {
            this.runOutputMap.put(outputSpec.getDestinationVertexName(), this.outputsMap.get(outputSpec.getDestinationVertexName()));
        }
        startRouterThread();
    }

    public void run() throws Exception {
        Preconditions.checkState(this.state.get() == RuntimeTask.State.INITED, "Can only run while in INITED state. Current: " + this.state);
        this.state.set(RuntimeTask.State.RUNNING);
        this.processor.run(this.runInputMap, this.runOutputMap);
    }

    public void close() throws Exception {
        try {
            Preconditions.checkState(this.state.get() == RuntimeTask.State.RUNNING, "Can only run while in RUNNING state. Current: " + this.state);
            this.state.set(RuntimeTask.State.CLOSED);
            Iterator<InputSpec> it = this.inputSpecs.iterator();
            while (it.hasNext()) {
                String sourceVertexName = it.next().getSourceVertexName();
                this.initializedInputs.remove(sourceVertexName);
                sendTaskGeneratedEvents(this.inputsMap.get(sourceVertexName).close(), EventMetaData.EventProducerConsumerType.INPUT, this.taskSpec.getVertexName(), sourceVertexName, this.taskSpec.getTaskAttemptID());
            }
            Iterator<OutputSpec> it2 = this.outputSpecs.iterator();
            while (it2.hasNext()) {
                String destinationVertexName = it2.next().getDestinationVertexName();
                this.initializedOutputs.remove(destinationVertexName);
                sendTaskGeneratedEvents(this.outputsMap.get(destinationVertexName).close(), EventMetaData.EventProducerConsumerType.OUTPUT, this.taskSpec.getVertexName(), destinationVertexName, this.taskSpec.getTaskAttemptID());
            }
            this.processorClosed = true;
            this.processor.close();
            setTaskDone();
            Thread.interrupted();
            if (this.eventRouterThread != null) {
                this.eventRouterThread.interrupt();
                LOG.info("Joining on EventRouter");
                try {
                    this.eventRouterThread.join();
                } catch (InterruptedException e) {
                    LOG.info("Ignoring interrupt while waiting for the router thread to die");
                    Thread.currentThread().interrupt();
                }
                this.eventRouterThread = null;
            }
            String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
            System.err.println(format + " Completed running task attempt: " + this.taskSpec.getTaskAttemptID().toString());
            System.out.println(format + " Completed running task attempt: " + this.taskSpec.getTaskAttemptID().toString());
        } catch (Throwable th) {
            setTaskDone();
            Thread.interrupted();
            if (this.eventRouterThread != null) {
                this.eventRouterThread.interrupt();
                LOG.info("Joining on EventRouter");
                try {
                    this.eventRouterThread.join();
                } catch (InterruptedException e2) {
                    LOG.info("Ignoring interrupt while waiting for the router thread to die");
                    Thread.currentThread().interrupt();
                }
                this.eventRouterThread = null;
            }
            String format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
            System.err.println(format2 + " Completed running task attempt: " + this.taskSpec.getTaskAttemptID().toString());
            System.out.println(format2 + " Completed running task attempt: " + this.taskSpec.getTaskAttemptID().toString());
            throw th;
        }
    }

    private boolean inputAlreadyStarted(String str, String str2) {
        return this.startedInputsMap.containsKey(str) && this.startedInputsMap.get(str).contains(str2);
    }

    private void initializeGroupInputs() throws TezException {
        if (this.groupInputSpecs == null || this.groupInputSpecs.isEmpty()) {
            return;
        }
        this.groupInputsMap = new ConcurrentHashMap<>(this.groupInputSpecs.size());
        for (GroupInputSpec groupInputSpec : this.groupInputSpecs) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Initializing GroupInput using GroupInputSpec: " + groupInputSpec);
            }
            MergedInputContext tezMergedInputContextImpl = new TezMergedInputContextImpl(groupInputSpec.getMergedInputDescriptor().getUserPayload(), groupInputSpec.getGroupName(), this.groupInputsMap, this.inputReadyTracker, this.localDirs, this);
            List<Input> newArrayListWithCapacity = Lists.newArrayListWithCapacity(groupInputSpec.getGroupVertices().size());
            Iterator<String> it = groupInputSpec.getGroupVertices().iterator();
            while (it.hasNext()) {
                newArrayListWithCapacity.add(this.inputsMap.get(it.next()));
            }
            this.groupInputsMap.put(groupInputSpec.getGroupName(), (MergedLogicalInput) createMergedInput(groupInputSpec.getMergedInputDescriptor(), tezMergedInputContextImpl, newArrayListWithCapacity));
        }
    }

    private void initializeLogicalIOProcessor() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Initializing processor, processorClassName=" + this.processorDescriptor.getClassName());
        }
        this.processor.initialize();
        LOG.info("Initialized processor");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public InputContext createInputContext(Map<String, LogicalInput> map, InputSpec inputSpec, int i) {
        return new TezInputContextImpl(this.tezConf, this.localDirs, this.appAttemptNumber, this.tezUmbilical, this.taskSpec.getDAGName(), this.taskSpec.getVertexName(), inputSpec.getSourceVertexName(), this.taskSpec.getVertexParallelism(), this.taskSpec.getTaskAttemptID(), i, inputSpec.getInputDescriptor().getUserPayload(), this, this.serviceConsumerMetadata, this.envMap, this.initialMemoryDistributor, inputSpec.getInputDescriptor(), map, this.inputReadyTracker, this.objectRegistry, this.ExecutionContext, this.memAvailable);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public OutputContext createOutputContext(OutputSpec outputSpec, int i) {
        return new TezOutputContextImpl(this.tezConf, this.localDirs, this.appAttemptNumber, this.tezUmbilical, this.taskSpec.getDAGName(), this.taskSpec.getVertexName(), outputSpec.getDestinationVertexName(), this.taskSpec.getVertexParallelism(), this.taskSpec.getTaskAttemptID(), i, outputSpec.getOutputDescriptor().getUserPayload(), this, this.serviceConsumerMetadata, this.envMap, this.initialMemoryDistributor, outputSpec.getOutputDescriptor(), this.objectRegistry, this.ExecutionContext, this.memAvailable);
    }

    private ProcessorContext createProcessorContext() {
        return new TezProcessorContextImpl(this.tezConf, this.localDirs, this.appAttemptNumber, this.tezUmbilical, this.taskSpec.getDAGName(), this.taskSpec.getVertexName(), this.taskSpec.getVertexParallelism(), this.taskSpec.getTaskAttemptID(), this.processorDescriptor.getUserPayload(), this, this.serviceConsumerMetadata, this.envMap, this.initialMemoryDistributor, this.processorDescriptor, this.inputReadyTracker, this.objectRegistry, this.ExecutionContext, this.memAvailable);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public LogicalInput createInput(InputSpec inputSpec, InputContext inputContext) throws TezException {
        InputDescriptor inputDescriptor = inputSpec.getInputDescriptor();
        LogicalInput logicalInput = (Input) ReflectionUtils.createClazzInstance(inputDescriptor.getClassName(), new Class[]{InputContext.class, Integer.TYPE}, new Object[]{inputContext, Integer.valueOf(inputSpec.getPhysicalEdgeCount())});
        if (logicalInput instanceof LogicalInput) {
            return logicalInput;
        }
        throw new TezUncheckedException(inputDescriptor.getClass().getName() + " is not a sub-type of LogicalInput. Only LogicalInput sub-types supported by LogicalIOProcessor.");
    }

    private LogicalInput createMergedInput(InputDescriptor inputDescriptor, MergedInputContext mergedInputContext, List<Input> list) throws TezException {
        return (LogicalInput) ReflectionUtils.createClazzInstance(inputDescriptor.getClassName(), new Class[]{MergedInputContext.class, List.class}, new Object[]{mergedInputContext, list});
    }

    /* JADX INFO: Access modifiers changed from: private */
    public LogicalOutput createOutput(OutputSpec outputSpec, OutputContext outputContext) throws TezException {
        LogicalOutput logicalOutput = (Output) ReflectionUtils.createClazzInstance(outputSpec.getOutputDescriptor().getClassName(), new Class[]{OutputContext.class, Integer.TYPE}, new Object[]{outputContext, Integer.valueOf(outputSpec.getPhysicalEdgeCount())});
        if (logicalOutput instanceof LogicalOutput) {
            return logicalOutput;
        }
        throw new TezUncheckedException(logicalOutput.getClass().getName() + " is not a sub-type of LogicalOutput. Only LogicalOutput sub-types supported by LogicalIOProcessor.");
    }

    private AbstractLogicalIOProcessor createProcessor(String str, ProcessorContext processorContext) throws TezException {
        AbstractLogicalIOProcessor abstractLogicalIOProcessor = (Processor) ReflectionUtils.createClazzInstance(str, new Class[]{ProcessorContext.class}, new Object[]{processorContext});
        if (abstractLogicalIOProcessor instanceof AbstractLogicalIOProcessor) {
            return abstractLogicalIOProcessor;
        }
        throw new TezUncheckedException(abstractLogicalIOProcessor.getClass().getName() + " is not a sub-type of AbstractLogicalIOProcessor. Only AbstractLogicalIOProcessor sub-types supported by LogicalIOProcessorRuntimeTask.");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendTaskGeneratedEvents(List<Event> list, EventMetaData.EventProducerConsumerType eventProducerConsumerType, String str, String str2, TezTaskAttemptID tezTaskAttemptID) {
        if (list == null || list.isEmpty()) {
            return;
        }
        EventMetaData eventMetaData = new EventMetaData(eventProducerConsumerType, str, str2, tezTaskAttemptID);
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<Event> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(new TezEvent(it.next(), eventMetaData));
        }
        if (LOG.isDebugEnabled()) {
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                LOG.debug("Generated event info, eventMetaData=" + eventMetaData.toString() + ", eventType=" + ((TezEvent) it2.next()).getEventType());
            }
        }
        this.tezUmbilical.addEvents(arrayList);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean handleEvent(TezEvent tezEvent) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Handling TezEvent in task, taskAttemptId=" + this.taskSpec.getTaskAttemptID() + ", eventType=" + tezEvent.getEventType() + ", eventSourceInfo=" + tezEvent.getSourceInfo() + ", eventDestinationInfo=" + tezEvent.getDestinationInfo());
        }
        try {
            switch (AnonymousClass2.$SwitchMap$org$apache$tez$runtime$api$impl$EventMetaData$EventProducerConsumerType[tezEvent.getDestinationInfo().getEventGenerator().ordinal()]) {
                case 1:
                    InputFrameworkInterface inputFrameworkInterface = (LogicalInput) this.inputsMap.get(tezEvent.getDestinationInfo().getEdgeVertexName());
                    if (inputFrameworkInterface == null) {
                        throw new TezUncheckedException("Unhandled event for invalid target: " + tezEvent);
                    }
                    inputFrameworkInterface.handleEvents(Collections.singletonList(tezEvent.getEvent()));
                    break;
                case SystemEventProtos.TaskAttemptFailedEventProto.TASK_FAILURE_TYPE_FIELD_NUMBER /* 2 */:
                    OutputFrameworkInterface outputFrameworkInterface = (LogicalOutput) this.outputsMap.get(tezEvent.getDestinationInfo().getEdgeVertexName());
                    if (outputFrameworkInterface == null) {
                        throw new TezUncheckedException("Unhandled event for invalid target: " + tezEvent);
                    }
                    outputFrameworkInterface.handleEvents(Collections.singletonList(tezEvent.getEvent()));
                    break;
                case 3:
                    this.processor.handleEvents(Collections.singletonList(tezEvent.getEvent()));
                    break;
                case 4:
                    LOG.warn("Trying to send a System event in a Task: " + tezEvent);
                    break;
            }
            return true;
        } catch (Throwable th) {
            LOG.warn("Failed to handle event", th);
            registerError();
            EventMetaData eventMetaData = new EventMetaData(tezEvent.getDestinationInfo().getEventGenerator(), this.taskSpec.getVertexName(), tezEvent.getDestinationInfo().getEdgeVertexName(), getTaskAttemptID());
            setFrameworkCounters();
            this.tezUmbilical.signalFailure(getTaskAttemptID(), TaskFailureType.NON_FATAL, th, ExceptionUtils.getStackTrace(th), eventMetaData);
            return false;
        }
    }

    @Override // org.apache.tez.runtime.RuntimeTask
    public int getMaxEventsToHandle() {
        return Math.max(0, this.maxEventBacklog - this.eventsToBeProcessed.size());
    }

    @Override // org.apache.tez.runtime.RuntimeTask
    public synchronized void handleEvents(Collection<TezEvent> collection) {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        this.eventCounter.addAndGet(collection.size());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received events to be processed by task, taskAttemptId=" + this.taskSpec.getTaskAttemptID() + ", eventCount=" + collection.size() + ", newEventCounter=" + this.eventCounter.get());
        }
        this.eventsToBeProcessed.addAll(collection);
    }

    @Override // org.apache.tez.runtime.RuntimeTask
    public synchronized void abortTask() {
        if (this.processor != null) {
            this.processor.abort();
        }
    }

    private void startRouterThread() {
        this.eventRouterThread = new Thread((Runnable) new RunnableWithNdc() { // from class: org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.1
            public void runInternal() {
                while (!LogicalIOProcessorRuntimeTask.this.isTaskDone() && !Thread.currentThread().isInterrupted()) {
                    try {
                        TezEvent take = LogicalIOProcessorRuntimeTask.this.eventsToBeProcessed.take();
                        if (take != null && !LogicalIOProcessorRuntimeTask.this.handleEvent(take)) {
                            LogicalIOProcessorRuntimeTask.LOG.warn("Stopping Event Router thread as failed to handle event: " + take);
                            return;
                        }
                    } catch (InterruptedException e) {
                        if (!LogicalIOProcessorRuntimeTask.this.isTaskDone()) {
                            LogicalIOProcessorRuntimeTask.LOG.warn("Event Router thread interrupted. Returning.");
                        }
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }
        });
        this.eventRouterThread.setName("TezTaskEventRouter{" + this.taskSpec.getTaskAttemptID().toString() + "}");
        this.eventRouterThread.start();
    }

    private void maybeResetInterruptStatus() {
        if (Thread.currentThread().isInterrupted()) {
            return;
        }
        Thread.currentThread().interrupt();
    }

    private void closeContexts() throws IOException {
        closeContext(this.inputContextMap);
        closeContext(this.outputContextMap);
        closeContext((TaskContext) this.processorContext);
    }

    private void closeContext(Map<String, ? extends TaskContext> map) throws IOException {
        if (map == null) {
            return;
        }
        Iterator<? extends TaskContext> it = map.values().iterator();
        while (it.hasNext()) {
            closeContext(it.next());
        }
        map.clear();
    }

    private void closeContext(TaskContext taskContext) throws IOException {
        if (taskContext == null || !(taskContext instanceof Closeable)) {
            return;
        }
        ((Closeable) taskContext).close();
    }

    public void cleanup() throws InterruptedException {
        LOG.info("Final Counters for " + this.taskSpec.getTaskAttemptID() + ": " + getCounters().toShortString());
        setTaskDone();
        if (this.eventRouterThread != null) {
            this.eventRouterThread.interrupt();
            LOG.info("Joining on EventRouter");
            try {
                this.eventRouterThread.join();
            } catch (InterruptedException e) {
                LOG.info("Ignoring interrupt while waiting for the router thread to die");
                Thread.currentThread().interrupt();
            }
            this.eventRouterThread = null;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Processor closed={}", Boolean.valueOf(this.processorClosed));
            LOG.debug("Num of inputs to be closed={}", Integer.valueOf(this.initializedInputs.size()));
            LOG.debug("Num of outputs to be closed={}", Integer.valueOf(this.initializedOutputs.size()));
        }
        Iterator<Map.Entry<String, LogicalInput>> it = this.initializedInputs.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, LogicalInput> next = it.next();
            String key = next.getKey();
            it.remove();
            try {
                try {
                    next.getValue().close();
                    maybeResetInterruptStatus();
                    LOG.info("Closed input for vertex={}, sourceVertex={}, interruptedStatus={}", new Object[]{this.processor.getContext().getTaskVertexName(), key, Boolean.valueOf(Thread.currentThread().isInterrupted())});
                } finally {
                    LOG.info("Closed input for vertex={}, sourceVertex={}, interruptedStatus={}", new Object[]{this.processor.getContext().getTaskVertexName(), key, Boolean.valueOf(Thread.currentThread().isInterrupted())});
                }
            } catch (InterruptedException e2) {
                LOG.info("Resetting interrupt status for input with srcVertexName={}", key);
                Thread.currentThread().interrupt();
                LOG.info("Closed input for vertex={}, sourceVertex={}, interruptedStatus={}", new Object[]{this.processor.getContext().getTaskVertexName(), key, Boolean.valueOf(Thread.currentThread().isInterrupted())});
            } catch (Throwable th) {
                LOG.warn("Ignoring exception when closing input {}(cleanup). Exception class={}, message={}", new Object[]{key, th.getClass().getName(), th.getMessage()});
                LOG.info("Closed input for vertex={}, sourceVertex={}, interruptedStatus={}", new Object[]{this.processor.getContext().getTaskVertexName(), key, Boolean.valueOf(Thread.currentThread().isInterrupted())});
            }
        }
        Iterator<Map.Entry<String, LogicalOutput>> it2 = this.initializedOutputs.entrySet().iterator();
        while (it2.hasNext()) {
            Map.Entry<String, LogicalOutput> next2 = it2.next();
            String key2 = next2.getKey();
            it2.remove();
            try {
                try {
                    next2.getValue().close();
                    maybeResetInterruptStatus();
                    LOG.info("Closed input for vertex={}, sourceVertex={}, interruptedStatus={}", new Object[]{this.processor.getContext().getTaskVertexName(), key2, Boolean.valueOf(Thread.currentThread().isInterrupted())});
                } finally {
                    LOG.info("Closed input for vertex={}, sourceVertex={}, interruptedStatus={}", new Object[]{this.processor.getContext().getTaskVertexName(), key2, Boolean.valueOf(Thread.currentThread().isInterrupted())});
                }
            } catch (InterruptedException e3) {
                LOG.info("Resetting interrupt status for output with destVertexName={}", key2);
                Thread.currentThread().interrupt();
                LOG.info("Closed input for vertex={}, sourceVertex={}, interruptedStatus={}", new Object[]{this.processor.getContext().getTaskVertexName(), key2, Boolean.valueOf(Thread.currentThread().isInterrupted())});
            } catch (Throwable th2) {
                LOG.warn("Ignoring exception when closing output {}(cleanup). Exception class={}, message={}", new Object[]{key2, th2.getClass().getName(), th2.getMessage()});
                LOG.info("Closed input for vertex={}, sourceVertex={}, interruptedStatus={}", new Object[]{this.processor.getContext().getTaskVertexName(), key2, Boolean.valueOf(Thread.currentThread().isInterrupted())});
            }
        }
        if (LOG.isDebugEnabled()) {
            printThreads();
        }
        if (!this.processorClosed && this.processor != null) {
            try {
                this.processorClosed = true;
                this.processor.close();
                LOG.info("Closed processor for vertex={}, index={}, interruptedStatus={}", new Object[]{this.processor.getContext().getTaskVertexName(), Integer.valueOf(this.processor.getContext().getTaskVertexIndex()), Boolean.valueOf(Thread.currentThread().isInterrupted())});
                maybeResetInterruptStatus();
            } catch (InterruptedException e4) {
                LOG.info("Resetting interrupt for processor");
                Thread.currentThread().interrupt();
            } catch (Throwable th3) {
                LOG.warn("Ignoring Exception when closing processor(cleanup). Exception class={}, message={}" + th3.getClass().getName(), th3.getMessage());
            }
        }
        try {
            closeContexts();
            cleanupStructures();
        } catch (IOException e5) {
            LOG.info("Error while cleaning up contexts ", e5);
        }
    }

    private void cleanupStructures() {
        if (this.initializerExecutor != null && !this.initializerExecutor.isShutdown()) {
            this.initializerExecutor.shutdownNow();
        }
        this.inputsMap.clear();
        this.outputsMap.clear();
        this.initializedInputs.clear();
        this.initializedOutputs.clear();
        this.inputContextMap.clear();
        this.outputContextMap.clear();
        if (!this.tezConf.getBoolean("tez.local.mode", false)) {
            this.inputSpecs.clear();
            this.outputSpecs.clear();
            if (this.groupInputSpecs != null) {
                this.groupInputSpecs.clear();
            }
        }
        if (this.groupInputsMap != null) {
            this.groupInputsMap.clear();
            this.groupInputsMap = null;
        }
        this.processor = null;
        this.processorContext = null;
        this.runInputMap.clear();
        this.runOutputMap.clear();
        this.eventsToBeProcessed.clear();
        this.inputReadyTracker = null;
        this.objectRegistry = null;
    }

    void printThreads() {
        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
        for (long j : threadMXBean.getAllThreadIds()) {
            Long valueOf = Long.valueOf(j);
            ThreadInfo threadInfo = threadMXBean.getThreadInfo(valueOf.longValue());
            if (threadInfo != null && LOG.isDebugEnabled()) {
                LOG.debug("ThreadId : " + valueOf + ", name=" + threadInfo.getThreadName());
            }
        }
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public Collection<InputContext> getInputContexts() {
        return this.inputContextMap.values();
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public Collection<OutputContext> getOutputContexts() {
        return this.outputContextMap.values();
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public ProcessorContext getProcessorContext() {
        return this.processorContext;
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public LogicalIOProcessor getProcessor() {
        return this.processor;
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public Map<String, LogicalInput> getInputs() {
        return this.inputsMap;
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public Map<String, LogicalOutput> getOutputs() {
        return this.outputsMap;
    }

    @InterfaceAudience.Private
    public HadoopShim getHadoopShim() {
        return this.hadoopShim;
    }
}
