package org.apache.tez.runtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.RuntimeTask;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.LogicalIOProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.Output;
import org.apache.tez.runtime.api.Processor;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.api.TezProcessorContext;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezInputContextImpl;
import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
import org.apache.tez.runtime.api.impl.TezProcessorContextImpl;
import org.apache.tez.runtime.api.impl.TezUmbilical;
import org.apache.tez.runtime.internals.api.events.SystemEventProtos;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.class */
public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
    private static final Log LOG = LogFactory.getLog(LogicalIOProcessorRuntimeTask.class);
    private final List<InputSpec> inputSpecs;
    private final ConcurrentHashMap<String, LogicalInput> inputsMap;
    private final ConcurrentHashMap<String, TezInputContext> inputContextMap;
    private final List<OutputSpec> outputSpecs;
    private final ConcurrentHashMap<String, LogicalOutput> outputsMap;
    private final ConcurrentHashMap<String, TezOutputContext> outputContextMap;
    private final ProcessorDescriptor processorDescriptor;
    private final LogicalIOProcessor processor;
    private TezProcessorContext processorContext;
    private final LinkedHashMap<String, LogicalInput> runInputMap;
    private final LinkedHashMap<String, LogicalOutput> runOutputMap;
    private final Map<String, ByteBuffer> serviceConsumerMetadata;
    private final ExecutorService initializerExecutor;
    private final CompletionService<Void> initializerCompletionService;
    private LinkedBlockingQueue<TezEvent> eventsToBeProcessed;
    private Thread eventRouterThread;
    private final int appAttemptNumber;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.tez.runtime.LogicalIOProcessorRuntimeTask$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/tez/runtime/LogicalIOProcessorRuntimeTask$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$tez$runtime$api$impl$EventMetaData$EventProducerConsumerType = new int[EventMetaData.EventProducerConsumerType.values().length];

        static {
            try {
                $SwitchMap$org$apache$tez$runtime$api$impl$EventMetaData$EventProducerConsumerType[EventMetaData.EventProducerConsumerType.INPUT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$tez$runtime$api$impl$EventMetaData$EventProducerConsumerType[EventMetaData.EventProducerConsumerType.OUTPUT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$tez$runtime$api$impl$EventMetaData$EventProducerConsumerType[EventMetaData.EventProducerConsumerType.PROCESSOR.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$tez$runtime$api$impl$EventMetaData$EventProducerConsumerType[EventMetaData.EventProducerConsumerType.SYSTEM.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:org/apache/tez/runtime/LogicalIOProcessorRuntimeTask$InitializeInputCallable.class */
    private class InitializeInputCallable implements Callable<Void> {
        private final InputSpec inputSpec;

        public InitializeInputCallable(InputSpec inputSpec) {
            this.inputSpec = inputSpec;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            LogicalIOProcessorRuntimeTask.LOG.info("Initializing Input using InputSpec: " + this.inputSpec);
            String sourceVertexName = this.inputSpec.getSourceVertexName();
            LogicalInput createInput = LogicalIOProcessorRuntimeTask.this.createInput(this.inputSpec);
            TezInputContext createInputContext = LogicalIOProcessorRuntimeTask.this.createInputContext(this.inputSpec);
            LogicalIOProcessorRuntimeTask.this.inputsMap.put(sourceVertexName, createInput);
            LogicalIOProcessorRuntimeTask.this.inputContextMap.put(sourceVertexName, createInputContext);
            if (createInput instanceof LogicalInput) {
                createInput.setNumPhysicalInputs(this.inputSpec.getPhysicalEdgeCount());
            }
            LogicalIOProcessorRuntimeTask.LOG.info("Initializing Input with src edge: " + sourceVertexName);
            LogicalIOProcessorRuntimeTask.this.sendTaskGeneratedEvents(createInput.initialize(createInputContext), EventMetaData.EventProducerConsumerType.INPUT, createInputContext.getTaskVertexName(), createInputContext.getSourceVertexName(), LogicalIOProcessorRuntimeTask.this.taskSpec.getTaskAttemptID());
            LogicalIOProcessorRuntimeTask.LOG.info("Initialized Input with src edge: " + sourceVertexName);
            return null;
        }
    }

    /* loaded from: input_file:org/apache/tez/runtime/LogicalIOProcessorRuntimeTask$InitializeOutputCallable.class */
    private class InitializeOutputCallable implements Callable<Void> {
        private final OutputSpec outputSpec;

        public InitializeOutputCallable(OutputSpec outputSpec) {
            this.outputSpec = outputSpec;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            LogicalIOProcessorRuntimeTask.LOG.info("Initializing Output using OutputSpec: " + this.outputSpec);
            String destinationVertexName = this.outputSpec.getDestinationVertexName();
            LogicalOutput createOutput = LogicalIOProcessorRuntimeTask.this.createOutput(this.outputSpec);
            TezOutputContext createOutputContext = LogicalIOProcessorRuntimeTask.this.createOutputContext(this.outputSpec);
            LogicalIOProcessorRuntimeTask.this.outputsMap.put(destinationVertexName, createOutput);
            LogicalIOProcessorRuntimeTask.this.outputContextMap.put(destinationVertexName, createOutputContext);
            if (createOutput instanceof LogicalOutput) {
                createOutput.setNumPhysicalOutputs(this.outputSpec.getPhysicalEdgeCount());
            }
            LogicalIOProcessorRuntimeTask.LOG.info("Initializing Input with dest edge: " + destinationVertexName);
            LogicalIOProcessorRuntimeTask.this.sendTaskGeneratedEvents(createOutput.initialize(createOutputContext), EventMetaData.EventProducerConsumerType.OUTPUT, createOutputContext.getTaskVertexName(), createOutputContext.getDestinationVertexName(), LogicalIOProcessorRuntimeTask.this.taskSpec.getTaskAttemptID());
            LogicalIOProcessorRuntimeTask.LOG.info("Initialized Output with dest edge: " + destinationVertexName);
            return null;
        }
    }

    public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int i, Configuration configuration, TezUmbilical tezUmbilical, Map<String, ByteBuffer> map) throws IOException {
        super(taskSpec, configuration, tezUmbilical);
        this.eventRouterThread = null;
        LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: " + taskSpec);
        int size = taskSpec.getInputs().size();
        int size2 = taskSpec.getOutputs().size();
        this.inputSpecs = taskSpec.getInputs();
        this.inputsMap = new ConcurrentHashMap<>(size);
        this.inputContextMap = new ConcurrentHashMap<>(size);
        this.outputSpecs = taskSpec.getOutputs();
        this.outputsMap = new ConcurrentHashMap<>(size2);
        this.outputContextMap = new ConcurrentHashMap<>(size2);
        this.runInputMap = new LinkedHashMap<>();
        this.runOutputMap = new LinkedHashMap<>();
        this.processorDescriptor = taskSpec.getProcessorDescriptor();
        this.processor = createProcessor(this.processorDescriptor);
        this.serviceConsumerMetadata = map;
        this.eventsToBeProcessed = new LinkedBlockingQueue<>();
        this.state = RuntimeTask.State.NEW;
        this.appAttemptNumber = i;
        this.initializerExecutor = Executors.newFixedThreadPool(size + size2, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Initializer %d").build());
        this.initializerCompletionService = new ExecutorCompletionService(this.initializerExecutor);
    }

    public void initialize() throws Exception {
        LOG.info("Initializing LogicalProcessorIORuntimeTask");
        Preconditions.checkState(this.state == RuntimeTask.State.NEW, "Already initialized");
        this.state = RuntimeTask.State.INITED;
        int i = 0;
        Iterator<InputSpec> it = this.taskSpec.getInputs().iterator();
        while (it.hasNext()) {
            this.initializerCompletionService.submit(new InitializeInputCallable(it.next()));
            i++;
        }
        Iterator<OutputSpec> it2 = this.taskSpec.getOutputs().iterator();
        while (it2.hasNext()) {
            this.initializerCompletionService.submit(new InitializeOutputCallable(it2.next()));
            i++;
        }
        this.initializerExecutor.shutdown();
        initializeLogicalIOProcessor();
        for (int i2 = 0; i2 < i; i2++) {
            LOG.info("Waiting for " + (i - i2) + " initializers to finish");
            try {
                this.initializerCompletionService.take().get();
            } catch (ExecutionException e) {
                if (!(e.getCause() instanceof Exception)) {
                    throw new Exception(e);
                }
                throw ((Exception) e.getCause());
            }
        }
        LOG.info("All initializers finished");
        for (InputSpec inputSpec : this.inputSpecs) {
            this.runInputMap.put(inputSpec.getSourceVertexName(), this.inputsMap.get(inputSpec.getSourceVertexName()));
        }
        for (OutputSpec outputSpec : this.outputSpecs) {
            this.runOutputMap.put(outputSpec.getDestinationVertexName(), this.outputsMap.get(outputSpec.getDestinationVertexName()));
        }
        startRouterThread();
    }

    public void run() throws Exception {
        synchronized (this.state) {
            Preconditions.checkState(this.state == RuntimeTask.State.INITED, "Can only run while in INITED state. Current: " + this.state);
            this.state = RuntimeTask.State.RUNNING;
        }
        this.processor.run(this.runInputMap, this.runOutputMap);
    }

    public void close() throws Exception {
        try {
            Preconditions.checkState(this.state == RuntimeTask.State.RUNNING, "Can only run while in RUNNING state. Current: " + this.state);
            this.state = RuntimeTask.State.CLOSED;
            Iterator<InputSpec> it = this.inputSpecs.iterator();
            while (it.hasNext()) {
                String sourceVertexName = it.next().getSourceVertexName();
                sendTaskGeneratedEvents(this.inputsMap.get(sourceVertexName).close(), EventMetaData.EventProducerConsumerType.INPUT, this.taskSpec.getVertexName(), sourceVertexName, this.taskSpec.getTaskAttemptID());
            }
            this.processor.close();
            Iterator<OutputSpec> it2 = this.outputSpecs.iterator();
            while (it2.hasNext()) {
                String destinationVertexName = it2.next().getDestinationVertexName();
                sendTaskGeneratedEvents(this.outputsMap.get(destinationVertexName).close(), EventMetaData.EventProducerConsumerType.OUTPUT, this.taskSpec.getVertexName(), destinationVertexName, this.taskSpec.getTaskAttemptID());
            }
        } finally {
            setTaskDone();
            if (this.eventRouterThread != null) {
                this.eventRouterThread.interrupt();
            }
        }
    }

    private void initializeLogicalIOProcessor() throws Exception {
        LOG.info("Initializing processor, processorClassName=" + this.processorDescriptor.getClassName());
        TezProcessorContext createProcessorContext = createProcessorContext();
        this.processorContext = createProcessorContext;
        this.processor.initialize(createProcessorContext);
        LOG.info("Initialized processor, processorClassName=" + this.processorDescriptor.getClassName());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public TezInputContext createInputContext(InputSpec inputSpec) {
        return new TezInputContextImpl(this.tezConf, this.appAttemptNumber, this.tezUmbilical, this.taskSpec.getVertexName(), inputSpec.getSourceVertexName(), this.taskSpec.getTaskAttemptID(), this.tezCounters, inputSpec.getInputDescriptor().getUserPayload() == null ? this.taskSpec.getProcessorDescriptor().getUserPayload() : inputSpec.getInputDescriptor().getUserPayload(), this, this.serviceConsumerMetadata, System.getenv());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public TezOutputContext createOutputContext(OutputSpec outputSpec) {
        return new TezOutputContextImpl(this.tezConf, this.appAttemptNumber, this.tezUmbilical, this.taskSpec.getVertexName(), outputSpec.getDestinationVertexName(), this.taskSpec.getTaskAttemptID(), this.tezCounters, outputSpec.getOutputDescriptor().getUserPayload() == null ? this.taskSpec.getProcessorDescriptor().getUserPayload() : outputSpec.getOutputDescriptor().getUserPayload(), this, this.serviceConsumerMetadata, System.getenv());
    }

    private TezProcessorContext createProcessorContext() {
        return new TezProcessorContextImpl(this.tezConf, this.appAttemptNumber, this.tezUmbilical, this.taskSpec.getVertexName(), this.taskSpec.getTaskAttemptID(), this.tezCounters, this.processorDescriptor.getUserPayload(), this, this.serviceConsumerMetadata, System.getenv());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public LogicalInput createInput(InputSpec inputSpec) {
        LOG.info("Creating Input");
        LogicalInput logicalInput = (Input) RuntimeUtils.createClazzInstance(inputSpec.getInputDescriptor().getClassName());
        if (logicalInput instanceof LogicalInput) {
            return logicalInput;
        }
        throw new TezUncheckedException(logicalInput.getClass().getName() + " is not a sub-type of LogicalInput. Only LogicalInput sub-types supported by LogicalIOProcessor.");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public LogicalOutput createOutput(OutputSpec outputSpec) {
        LOG.info("Creating Output");
        LogicalOutput logicalOutput = (Output) RuntimeUtils.createClazzInstance(outputSpec.getOutputDescriptor().getClassName());
        if (logicalOutput instanceof LogicalOutput) {
            return logicalOutput;
        }
        throw new TezUncheckedException(logicalOutput.getClass().getName() + " is not a sub-type of LogicalOutput. Only LogicalOutput sub-types supported by LogicalIOProcessor.");
    }

    private LogicalIOProcessor createProcessor(ProcessorDescriptor processorDescriptor) {
        LogicalIOProcessor logicalIOProcessor = (Processor) RuntimeUtils.createClazzInstance(processorDescriptor.getClassName());
        if (logicalIOProcessor instanceof LogicalIOProcessor) {
            return logicalIOProcessor;
        }
        throw new TezUncheckedException(logicalIOProcessor.getClass().getName() + " is not a sub-type of LogicalIOProcessor. Only LogicalIOProcessor sub-types supported by LogicalIOProcessorRuntimeTask.");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendTaskGeneratedEvents(List<Event> list, EventMetaData.EventProducerConsumerType eventProducerConsumerType, String str, String str2, TezTaskAttemptID tezTaskAttemptID) {
        if (list == null || list.isEmpty()) {
            return;
        }
        EventMetaData eventMetaData = new EventMetaData(eventProducerConsumerType, str, str2, tezTaskAttemptID);
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<Event> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(new TezEvent(it.next(), eventMetaData));
        }
        if (LOG.isDebugEnabled()) {
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                LOG.debug("Generated event info, eventMetaData=" + eventMetaData.toString() + ", eventType=" + ((TezEvent) it2.next()).getEventType());
            }
        }
        this.tezUmbilical.addEvents(arrayList);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean handleEvent(TezEvent tezEvent) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Handling TezEvent in task, taskAttemptId=" + this.taskSpec.getTaskAttemptID() + ", eventType=" + tezEvent.getEventType() + ", eventSourceInfo=" + tezEvent.getSourceInfo() + ", eventDestinationInfo=" + tezEvent.getDestinationInfo());
        }
        try {
            switch (AnonymousClass2.$SwitchMap$org$apache$tez$runtime$api$impl$EventMetaData$EventProducerConsumerType[tezEvent.getDestinationInfo().getEventGenerator().ordinal()]) {
                case SystemEventProtos.TaskAttemptFailedEventProto.DIAGNOSTICS_FIELD_NUMBER /* 1 */:
                    LogicalInput logicalInput = this.inputsMap.get(tezEvent.getDestinationInfo().getEdgeVertexName());
                    if (logicalInput == null) {
                        throw new TezUncheckedException("Unhandled event for invalid target: " + tezEvent);
                    }
                    logicalInput.handleEvents(Collections.singletonList(tezEvent.getEvent()));
                    break;
                case 2:
                    LogicalOutput logicalOutput = this.outputsMap.get(tezEvent.getDestinationInfo().getEdgeVertexName());
                    if (logicalOutput == null) {
                        throw new TezUncheckedException("Unhandled event for invalid target: " + tezEvent);
                    }
                    logicalOutput.handleEvents(Collections.singletonList(tezEvent.getEvent()));
                    break;
                case 3:
                    this.processor.handleEvents(Collections.singletonList(tezEvent.getEvent()));
                    break;
                case 4:
                    LOG.warn("Trying to send a System event in a Task: " + tezEvent);
                    break;
            }
            return true;
        } catch (Throwable th) {
            LOG.warn("Failed to handle event", th);
            setFatalError(th, "Failed to handle event");
            this.tezUmbilical.signalFatalError(getTaskAttemptID(), StringUtils.stringifyException(th), new EventMetaData(tezEvent.getDestinationInfo().getEventGenerator(), this.taskSpec.getVertexName(), tezEvent.getDestinationInfo().getEdgeVertexName(), getTaskAttemptID()));
            return false;
        }
    }

    @Override // org.apache.tez.runtime.RuntimeTask
    public synchronized void handleEvents(Collection<TezEvent> collection) {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        this.eventCounter.addAndGet(collection.size());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received events to be processed by task, taskAttemptId=" + this.taskSpec.getTaskAttemptID() + ", eventCount=" + collection.size() + ", newEventCounter=" + this.eventCounter.get());
        }
        this.eventsToBeProcessed.addAll(collection);
    }

    private void startRouterThread() {
        this.eventRouterThread = new Thread(new Runnable() { // from class: org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.1
            @Override // java.lang.Runnable
            public void run() {
                while (!LogicalIOProcessorRuntimeTask.this.isTaskDone() && !Thread.currentThread().isInterrupted()) {
                    try {
                        TezEvent tezEvent = (TezEvent) LogicalIOProcessorRuntimeTask.this.eventsToBeProcessed.take();
                        if (tezEvent != null && !LogicalIOProcessorRuntimeTask.this.handleEvent(tezEvent)) {
                            LogicalIOProcessorRuntimeTask.LOG.warn("Stopping Event Router thread as failed to handle event: " + tezEvent);
                            return;
                        }
                    } catch (InterruptedException e) {
                        if (LogicalIOProcessorRuntimeTask.this.isTaskDone()) {
                            return;
                        }
                        LogicalIOProcessorRuntimeTask.LOG.warn("Event Router thread interrupted. Returning.");
                        return;
                    }
                }
            }
        });
        this.eventRouterThread.setName("TezTaskEventRouter[" + this.taskSpec.getTaskAttemptID().toString() + "]");
        this.eventRouterThread.start();
    }

    public synchronized void cleanup() {
        setTaskDone();
        if (this.eventRouterThread != null) {
            this.eventRouterThread.interrupt();
        }
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public Collection<TezInputContext> getInputContexts() {
        return this.inputContextMap.values();
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public Collection<TezOutputContext> getOutputContexts() {
        return this.outputContextMap.values();
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public TezProcessorContext getProcessorContext() {
        return this.processorContext;
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public LogicalIOProcessor getProcessor() {
        return this.processor;
    }
}
