package org.apache.hadoop.yarn.client.api.async.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;

@InterfaceAudience.Private
@InterfaceStability.Unstable
/* loaded from: input_file:lib/hadoop-yarn-client-2.7.6.jar:org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.class */
public class NMClientAsyncImpl extends NMClientAsync {
    private static final Log LOG = LogFactory.getLog(NMClientAsyncImpl.class);
    protected static final int INITIAL_THREAD_POOL_SIZE = 10;
    protected ThreadPoolExecutor threadPool;
    protected int maxThreadPoolSize;
    protected Thread eventDispatcherThread;
    protected AtomicBoolean stopped;
    protected BlockingQueue<ContainerEvent> events;
    protected ConcurrentMap<ContainerId, StatefulContainer> containers;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:lib/hadoop-yarn-client-2.7.6.jar:org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl$ContainerEvent.class */
    public static class ContainerEvent extends AbstractEvent<ContainerEventType> {
        private ContainerId containerId;
        private NodeId nodeId;
        private Token containerToken;

        public ContainerEvent(ContainerId containerId, NodeId nodeId, Token token, ContainerEventType containerEventType) {
            super(containerEventType);
            this.containerId = containerId;
            this.nodeId = nodeId;
            this.containerToken = token;
        }

        public ContainerId getContainerId() {
            return this.containerId;
        }

        public NodeId getNodeId() {
            return this.nodeId;
        }

        public Token getContainerToken() {
            return this.containerToken;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:lib/hadoop-yarn-client-2.7.6.jar:org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl$ContainerEventProcessor.class */
    public class ContainerEventProcessor implements Runnable {
        protected ContainerEvent event;

        public ContainerEventProcessor(ContainerEvent containerEvent) {
            this.event = containerEvent;
        }

        @Override // java.lang.Runnable
        public void run() {
            ContainerId containerId = this.event.getContainerId();
            NMClientAsyncImpl.LOG.info("Processing Event " + this.event + " for Container " + containerId);
            if (this.event.getType() != ContainerEventType.QUERY_CONTAINER) {
                StatefulContainer statefulContainer = NMClientAsyncImpl.this.containers.get(containerId);
                if (statefulContainer == null) {
                    NMClientAsyncImpl.LOG.info("Container " + containerId + " is already stopped or failed");
                    return;
                }
                statefulContainer.handle(this.event);
                if (NMClientAsyncImpl.this.isCompletelyDone(statefulContainer)) {
                    NMClientAsyncImpl.this.containers.remove(containerId);
                    return;
                }
                return;
            }
            try {
                try {
                    NMClientAsyncImpl.this.callbackHandler.onContainerStatusReceived(containerId, NMClientAsyncImpl.this.client.getContainerStatus(containerId, this.event.getNodeId()));
                } catch (Throwable th) {
                    NMClientAsyncImpl.LOG.info("Unchecked exception is thrown from onContainerStatusReceived for Container " + this.event.getContainerId(), th);
                }
            } catch (IOException e) {
                onExceptionRaised(containerId, e);
            } catch (YarnException e2) {
                onExceptionRaised(containerId, e2);
            } catch (Throwable th2) {
                onExceptionRaised(containerId, th2);
            }
        }

        private void onExceptionRaised(ContainerId containerId, Throwable th) {
            try {
                NMClientAsyncImpl.this.callbackHandler.onGetContainerStatusError(containerId, th);
            } catch (Throwable th2) {
                NMClientAsyncImpl.LOG.info("Unchecked exception is thrown from onGetContainerStatusError for Container " + containerId, th2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:lib/hadoop-yarn-client-2.7.6.jar:org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl$ContainerEventType.class */
    public enum ContainerEventType {
        START_CONTAINER,
        STOP_CONTAINER,
        QUERY_CONTAINER
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:lib/hadoop-yarn-client-2.7.6.jar:org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl$ContainerState.class */
    public enum ContainerState {
        PREP,
        FAILED,
        RUNNING,
        DONE
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:lib/hadoop-yarn-client-2.7.6.jar:org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl$StartContainerEvent.class */
    public static class StartContainerEvent extends ContainerEvent {
        private Container container;
        private ContainerLaunchContext containerLaunchContext;

        public StartContainerEvent(Container container, ContainerLaunchContext containerLaunchContext) {
            super(container.getId(), container.getNodeId(), container.getContainerToken(), ContainerEventType.START_CONTAINER);
            this.container = container;
            this.containerLaunchContext = containerLaunchContext;
        }

        public Container getContainer() {
            return this.container;
        }

        public ContainerLaunchContext getContainerLaunchContext() {
            return this.containerLaunchContext;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:lib/hadoop-yarn-client-2.7.6.jar:org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl$StatefulContainer.class */
    public static class StatefulContainer implements EventHandler<ContainerEvent> {
        protected static final StateMachineFactory<StatefulContainer, ContainerState, ContainerEventType, ContainerEvent> stateMachineFactory = new StateMachineFactory(ContainerState.PREP).addTransition((StateMachineFactory) ContainerState.PREP, (Set<StateMachineFactory>) EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED), (EnumSet) ContainerEventType.START_CONTAINER, (MultipleArcTransition<OPERAND, EVENT, StateMachineFactory>) new StartContainerTransition()).addTransition(ContainerState.PREP, ContainerState.DONE, (ContainerState) ContainerEventType.STOP_CONTAINER, (SingleArcTransition) new OutOfOrderTransition()).addTransition((StateMachineFactory) ContainerState.RUNNING, (Set<StateMachineFactory>) EnumSet.of(ContainerState.DONE, ContainerState.FAILED), (EnumSet) ContainerEventType.STOP_CONTAINER, (MultipleArcTransition<OPERAND, EVENT, StateMachineFactory>) new StopContainerTransition()).addTransition(ContainerState.DONE, ContainerState.DONE, EnumSet.of(ContainerEventType.START_CONTAINER, ContainerEventType.STOP_CONTAINER)).addTransition(ContainerState.FAILED, ContainerState.FAILED, EnumSet.of(ContainerEventType.START_CONTAINER, ContainerEventType.STOP_CONTAINER));
        private final NMClientAsync nmClientAsync;
        private final ContainerId containerId;
        private final StateMachine<ContainerState, ContainerEventType, ContainerEvent> stateMachine = stateMachineFactory.make(this);
        private final ReentrantReadWriteLock.ReadLock readLock;
        private final ReentrantReadWriteLock.WriteLock writeLock;

        /* loaded from: input_file:lib/hadoop-yarn-client-2.7.6.jar:org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl$StatefulContainer$OutOfOrderTransition.class */
        protected static class OutOfOrderTransition implements SingleArcTransition<StatefulContainer, ContainerEvent> {
            protected static final String STOP_BEFORE_START_ERROR_MSG = "Container was killed before it was launched";

            protected OutOfOrderTransition() {
            }

            @Override // org.apache.hadoop.yarn.state.SingleArcTransition
            public void transition(StatefulContainer statefulContainer, ContainerEvent containerEvent) {
                try {
                    statefulContainer.nmClientAsync.getCallbackHandler().onStartContainerError(containerEvent.getContainerId(), RPCUtil.getRemoteException(STOP_BEFORE_START_ERROR_MSG));
                } catch (Throwable th) {
                    NMClientAsyncImpl.LOG.info("Unchecked exception is thrown from onStartContainerError for Container " + containerEvent.getContainerId(), th);
                }
            }
        }

        /* loaded from: input_file:lib/hadoop-yarn-client-2.7.6.jar:org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl$StatefulContainer$StartContainerTransition.class */
        protected static class StartContainerTransition implements MultipleArcTransition<StatefulContainer, ContainerEvent, ContainerState> {
            static final /* synthetic */ boolean $assertionsDisabled;

            protected StartContainerTransition() {
            }

            @Override // org.apache.hadoop.yarn.state.MultipleArcTransition
            public ContainerState transition(StatefulContainer statefulContainer, ContainerEvent containerEvent) {
                ContainerId containerId = containerEvent.getContainerId();
                try {
                    StartContainerEvent startContainerEvent = null;
                    if (containerEvent instanceof StartContainerEvent) {
                        startContainerEvent = (StartContainerEvent) containerEvent;
                    }
                    if (!$assertionsDisabled && startContainerEvent == null) {
                        throw new AssertionError();
                    }
                    try {
                        statefulContainer.nmClientAsync.getCallbackHandler().onContainerStarted(containerId, statefulContainer.nmClientAsync.getClient().startContainer(startContainerEvent.getContainer(), startContainerEvent.getContainerLaunchContext()));
                    } catch (Throwable th) {
                        NMClientAsyncImpl.LOG.info("Unchecked exception is thrown from onContainerStarted for Container " + containerId, th);
                    }
                    return ContainerState.RUNNING;
                } catch (IOException e) {
                    return onExceptionRaised(statefulContainer, containerEvent, e);
                } catch (YarnException e2) {
                    return onExceptionRaised(statefulContainer, containerEvent, e2);
                } catch (Throwable th2) {
                    return onExceptionRaised(statefulContainer, containerEvent, th2);
                }
            }

            private ContainerState onExceptionRaised(StatefulContainer statefulContainer, ContainerEvent containerEvent, Throwable th) {
                try {
                    statefulContainer.nmClientAsync.getCallbackHandler().onStartContainerError(containerEvent.getContainerId(), th);
                } catch (Throwable th2) {
                    NMClientAsyncImpl.LOG.info("Unchecked exception is thrown from onStartContainerError for Container " + containerEvent.getContainerId(), th2);
                }
                return ContainerState.FAILED;
            }

            static {
                $assertionsDisabled = !NMClientAsyncImpl.class.desiredAssertionStatus();
            }
        }

        /* loaded from: input_file:lib/hadoop-yarn-client-2.7.6.jar:org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl$StatefulContainer$StopContainerTransition.class */
        protected static class StopContainerTransition implements MultipleArcTransition<StatefulContainer, ContainerEvent, ContainerState> {
            protected StopContainerTransition() {
            }

            @Override // org.apache.hadoop.yarn.state.MultipleArcTransition
            public ContainerState transition(StatefulContainer statefulContainer, ContainerEvent containerEvent) {
                try {
                    statefulContainer.nmClientAsync.getClient().stopContainer(containerEvent.getContainerId(), containerEvent.getNodeId());
                    try {
                        statefulContainer.nmClientAsync.getCallbackHandler().onContainerStopped(containerEvent.getContainerId());
                    } catch (Throwable th) {
                        NMClientAsyncImpl.LOG.info("Unchecked exception is thrown from onContainerStopped for Container " + containerEvent.getContainerId(), th);
                    }
                    return ContainerState.DONE;
                } catch (IOException e) {
                    return onExceptionRaised(statefulContainer, containerEvent, e);
                } catch (YarnException e2) {
                    return onExceptionRaised(statefulContainer, containerEvent, e2);
                } catch (Throwable th2) {
                    return onExceptionRaised(statefulContainer, containerEvent, th2);
                }
            }

            private ContainerState onExceptionRaised(StatefulContainer statefulContainer, ContainerEvent containerEvent, Throwable th) {
                try {
                    statefulContainer.nmClientAsync.getCallbackHandler().onStopContainerError(containerEvent.getContainerId(), th);
                } catch (Throwable th2) {
                    NMClientAsyncImpl.LOG.info("Unchecked exception is thrown from onStopContainerError for Container " + containerEvent.getContainerId(), th2);
                }
                return ContainerState.FAILED;
            }
        }

        public StatefulContainer(NMClientAsync nMClientAsync, ContainerId containerId) {
            this.nmClientAsync = nMClientAsync;
            this.containerId = containerId;
            ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
            this.readLock = reentrantReadWriteLock.readLock();
            this.writeLock = reentrantReadWriteLock.writeLock();
        }

        @Override // org.apache.hadoop.yarn.event.EventHandler
        public void handle(ContainerEvent containerEvent) {
            this.writeLock.lock();
            try {
                try {
                    this.stateMachine.doTransition(containerEvent.getType(), containerEvent);
                } catch (InvalidStateTransitonException e) {
                    NMClientAsyncImpl.LOG.error("Can't handle this event at current state", e);
                }
            } finally {
                this.writeLock.unlock();
            }
        }

        public ContainerId getContainerId() {
            return this.containerId;
        }

        public ContainerState getState() {
            this.readLock.lock();
            try {
                return this.stateMachine.getCurrentState();
            } finally {
                this.readLock.unlock();
            }
        }
    }

    public NMClientAsyncImpl(NMClientAsync.CallbackHandler callbackHandler) {
        this(NMClientAsync.class.getName(), callbackHandler);
    }

    public NMClientAsyncImpl(String str, NMClientAsync.CallbackHandler callbackHandler) {
        this(str, new NMClientImpl(), callbackHandler);
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    protected NMClientAsyncImpl(String str, NMClient nMClient, NMClientAsync.CallbackHandler callbackHandler) {
        super(str, nMClient, callbackHandler);
        this.stopped = new AtomicBoolean(false);
        this.events = new LinkedBlockingQueue();
        this.containers = new ConcurrentHashMap();
        this.client = nMClient;
        this.callbackHandler = callbackHandler;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.AbstractService
    public void serviceInit(Configuration configuration) throws Exception {
        this.maxThreadPoolSize = configuration.getInt(YarnConfiguration.NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE, 500);
        LOG.info("Upper bound of the thread pool size is " + this.maxThreadPoolSize);
        this.client.init(configuration);
        super.serviceInit(configuration);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.AbstractService
    public void serviceStart() throws Exception {
        this.client.start();
        this.threadPool = new ThreadPoolExecutor(Math.min(10, this.maxThreadPoolSize), Integer.MAX_VALUE, 1L, TimeUnit.HOURS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setNameFormat(getClass().getName() + " #%d").setDaemon(true).build());
        this.eventDispatcherThread = new Thread() { // from class: org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                int size;
                int min;
                HashSet hashSet = new HashSet();
                while (!NMClientAsyncImpl.this.stopped.get() && !Thread.currentThread().isInterrupted()) {
                    try {
                        ContainerEvent take = NMClientAsyncImpl.this.events.take();
                        hashSet.add(take.getNodeId().toString());
                        int corePoolSize = NMClientAsyncImpl.this.threadPool.getCorePoolSize();
                        if (corePoolSize != NMClientAsyncImpl.this.maxThreadPoolSize && corePoolSize < (min = Math.min(NMClientAsyncImpl.this.maxThreadPoolSize, (size = hashSet.size())))) {
                            int min2 = Math.min(NMClientAsyncImpl.this.maxThreadPoolSize, min + 10);
                            NMClientAsyncImpl.LOG.info("Set NMClientAsync thread pool size to " + min2 + " as the number of nodes to talk to is " + size);
                            NMClientAsyncImpl.this.threadPool.setCorePoolSize(min2);
                        }
                        NMClientAsyncImpl.this.threadPool.execute(NMClientAsyncImpl.this.getContainerEventProcessor(take));
                    } catch (InterruptedException e) {
                        if (NMClientAsyncImpl.this.stopped.get()) {
                            return;
                        }
                        NMClientAsyncImpl.LOG.error("Returning, thread interrupted", e);
                        return;
                    }
                }
            }
        };
        this.eventDispatcherThread.setName("Container  Event Dispatcher");
        this.eventDispatcherThread.setDaemon(false);
        this.eventDispatcherThread.start();
        super.serviceStart();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.AbstractService
    public void serviceStop() throws Exception {
        if (this.stopped.getAndSet(true)) {
            return;
        }
        if (this.eventDispatcherThread != null) {
            this.eventDispatcherThread.interrupt();
            try {
                this.eventDispatcherThread.join();
            } catch (InterruptedException e) {
                LOG.error("The thread of " + this.eventDispatcherThread.getName() + " didn't finish normally.", e);
            }
        }
        if (this.threadPool != null) {
            this.threadPool.shutdownNow();
        }
        if (this.client != null) {
            if ((!(this.client instanceof NMClientImpl) || ((NMClientImpl) this.client).getCleanupRunningContainers().get()) && this.containers != null) {
                this.containers.clear();
            }
            this.client.stop();
        }
        super.serviceStop();
    }

    @Override // org.apache.hadoop.yarn.client.api.async.NMClientAsync
    public void startContainerAsync(Container container, ContainerLaunchContext containerLaunchContext) {
        if (this.containers.putIfAbsent(container.getId(), new StatefulContainer(this, container.getId())) != null) {
            this.callbackHandler.onStartContainerError(container.getId(), RPCUtil.getRemoteException("Container " + container.getId() + " is already started or scheduled to start"));
        }
        try {
            this.events.put(new StartContainerEvent(container, containerLaunchContext));
        } catch (InterruptedException e) {
            LOG.warn("Exception when scheduling the event of starting Container " + container.getId());
            this.callbackHandler.onStartContainerError(container.getId(), e);
        }
    }

    @Override // org.apache.hadoop.yarn.client.api.async.NMClientAsync
    public void stopContainerAsync(ContainerId containerId, NodeId nodeId) {
        if (this.containers.get(containerId) == null) {
            this.callbackHandler.onStopContainerError(containerId, RPCUtil.getRemoteException("Container " + containerId + " is neither started nor scheduled to start"));
        }
        try {
            this.events.put(new ContainerEvent(containerId, nodeId, null, ContainerEventType.STOP_CONTAINER));
        } catch (InterruptedException e) {
            LOG.warn("Exception when scheduling the event of stopping Container " + containerId);
            this.callbackHandler.onStopContainerError(containerId, e);
        }
    }

    @Override // org.apache.hadoop.yarn.client.api.async.NMClientAsync
    public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId) {
        try {
            this.events.put(new ContainerEvent(containerId, nodeId, null, ContainerEventType.QUERY_CONTAINER));
        } catch (InterruptedException e) {
            LOG.warn("Exception when scheduling the event of querying the status of Container " + containerId);
            this.callbackHandler.onGetContainerStatusError(containerId, e);
        }
    }

    protected boolean isCompletelyDone(StatefulContainer statefulContainer) {
        return statefulContainer.getState() == ContainerState.DONE || statefulContainer.getState() == ContainerState.FAILED;
    }

    protected ContainerEventProcessor getContainerEventProcessor(ContainerEvent containerEvent) {
        return new ContainerEventProcessor(containerEvent);
    }
}
