package org.apache.tez.dag.history.logging.ats;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.security.HistoryACLPolicyManager;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.logging.HistoryLoggingService;
import org.apache.tez.dag.records.TezDAGID;

/* loaded from: input_file:org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.class */
public class ATSHistoryLoggingService extends HistoryLoggingService {
    private static final Log LOG = LogFactory.getLog(ATSHistoryLoggingService.class);
    private LinkedBlockingQueue<DAGHistoryEvent> eventQueue;
    private Thread eventHandlingThread;
    private AtomicBoolean stopped;
    private int eventCounter;
    private int eventsProcessed;
    private final Object lock;

    @VisibleForTesting
    TimelineClient timelineClient;
    private HashSet<TezDAGID> skippedDAGs;
    private Map<TezDAGID, String> dagDomainIdMap;
    private long maxTimeToWaitOnShutdown;
    private boolean waitForeverOnShutdown;
    private int maxEventsPerBatch;
    private long maxPollingTimeMillis;
    private String sessionDomainId;
    private static final String atsHistoryACLManagerClassName = "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";
    private HistoryACLPolicyManager historyACLPolicyManager;

    public ATSHistoryLoggingService() {
        super(ATSHistoryLoggingService.class.getName());
        this.eventQueue = new LinkedBlockingQueue<>();
        this.stopped = new AtomicBoolean(false);
        this.eventCounter = 0;
        this.eventsProcessed = 0;
        this.lock = new Object();
        this.skippedDAGs = new HashSet<>();
        this.dagDomainIdMap = new HashMap();
        this.waitForeverOnShutdown = false;
    }

    public void serviceInit(Configuration configuration) throws Exception {
        LOG.info("Initializing ATSService");
        this.timelineClient = TimelineClient.createTimelineClient();
        this.timelineClient.init(configuration);
        this.maxTimeToWaitOnShutdown = configuration.getLong("tez.yarn.ats.event.flush.timeout.millis", -1L);
        this.maxEventsPerBatch = configuration.getInt("tez.yarn.ats.max.events.per.batch", 5);
        this.maxPollingTimeMillis = configuration.getInt("tez.yarn.ats.max.polling.time.per.event.millis", 10);
        if (this.maxTimeToWaitOnShutdown < 0) {
            this.waitForeverOnShutdown = true;
        }
        this.sessionDomainId = configuration.get("tez.yarn.ats.acl.session.domain.id");
        LOG.info("Using org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager to manage Timeline ACLs");
        try {
            this.historyACLPolicyManager = (HistoryACLPolicyManager) ReflectionUtils.createClazzInstance(atsHistoryACLManagerClassName);
            this.historyACLPolicyManager.setConf(configuration);
        } catch (TezUncheckedException e) {
            LOG.warn("Could not instantiate object for org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager. ACLs cannot be enforced correctly for history data in Timeline", e);
            if (!configuration.getBoolean("tez.allow.disabled.timeline-domains", false)) {
                throw e;
            }
            this.historyACLPolicyManager = null;
        }
    }

    public void serviceStart() {
        LOG.info("Starting ATSService");
        this.timelineClient.start();
        this.eventHandlingThread = new Thread(new Runnable() { // from class: org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService.1
            @Override // java.lang.Runnable
            public void run() {
                LinkedList linkedList = new LinkedList();
                boolean z = false;
                while (!ATSHistoryLoggingService.this.stopped.get() && !Thread.currentThread().isInterrupted() && !z) {
                    if (ATSHistoryLoggingService.this.eventCounter == 0 || ATSHistoryLoggingService.this.eventCounter % 1000 != 0) {
                        ATSHistoryLoggingService.access$104(ATSHistoryLoggingService.this);
                    } else {
                        if (ATSHistoryLoggingService.this.eventsProcessed != 0 && !linkedList.isEmpty()) {
                            ATSHistoryLoggingService.LOG.info("Event queue stats, eventsProcessedSinceLastUpdate=" + ATSHistoryLoggingService.this.eventsProcessed + ", eventQueueSize=" + ATSHistoryLoggingService.this.eventQueue.size());
                        }
                        ATSHistoryLoggingService.this.eventCounter = 0;
                        ATSHistoryLoggingService.this.eventsProcessed = 0;
                    }
                    synchronized (ATSHistoryLoggingService.this.lock) {
                        try {
                            ATSHistoryLoggingService.this.getEventBatch(linkedList);
                        } catch (InterruptedException e) {
                            z = true;
                        }
                        if (!linkedList.isEmpty()) {
                            ATSHistoryLoggingService.access$212(ATSHistoryLoggingService.this, linkedList.size());
                            try {
                                ATSHistoryLoggingService.this.handleEvents(linkedList);
                            } catch (Exception e2) {
                                ATSHistoryLoggingService.LOG.warn("Error handling events", e2);
                            }
                        }
                    }
                }
            }
        }, "HistoryEventHandlingThread");
        this.eventHandlingThread.start();
    }

    public void serviceStop() {
        LOG.info("Stopping ATSService, eventQueueBacklog=" + this.eventQueue.size());
        this.stopped.set(true);
        if (this.eventHandlingThread != null) {
            this.eventHandlingThread.interrupt();
        }
        synchronized (this.lock) {
            if (!this.eventQueue.isEmpty()) {
                LOG.warn("ATSService being stopped, eventQueueBacklog=" + this.eventQueue.size() + ", maxTimeLeftToFlush=" + this.maxTimeToWaitOnShutdown + ", waitForever=" + this.waitForeverOnShutdown);
                long time = this.appContext.getClock().getTime() + this.maxTimeToWaitOnShutdown;
                LinkedList linkedList = new LinkedList();
                while (true) {
                    if (!this.waitForeverOnShutdown && time < this.appContext.getClock().getTime()) {
                        break;
                    }
                    try {
                        getEventBatch(linkedList);
                    } catch (InterruptedException e) {
                        LOG.info("ATSService interrupted while shutting down. Exiting. EventQueueBacklog=" + this.eventQueue.size());
                    }
                    if (linkedList.isEmpty()) {
                        LOG.info("Event queue empty, stopping ATS Service");
                        break;
                    } else {
                        try {
                            handleEvents(linkedList);
                        } catch (Exception e2) {
                            LOG.warn("Error handling event", e2);
                        }
                    }
                }
            }
        }
        if (!this.eventQueue.isEmpty()) {
            LOG.warn("Did not finish flushing eventQueue before stopping ATSService, eventQueueBacklog=" + this.eventQueue.size());
        }
        this.timelineClient.stop();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void getEventBatch(List<DAGHistoryEvent> list) throws InterruptedException {
        DAGHistoryEvent poll;
        list.clear();
        int i = 0;
        while (i < this.maxEventsPerBatch && (poll = this.eventQueue.poll(this.maxPollingTimeMillis, TimeUnit.MILLISECONDS)) != null) {
            if (isValidEvent(poll)) {
                i++;
                list.add(poll);
                if (poll.getHistoryEvent().getEventType().equals(HistoryEventType.DAG_SUBMITTED)) {
                    return;
                }
            }
        }
    }

    public void handle(DAGHistoryEvent dAGHistoryEvent) {
        this.eventQueue.add(dAGHistoryEvent);
    }

    private boolean isValidEvent(DAGHistoryEvent dAGHistoryEvent) {
        String str;
        HistoryEventType eventType = dAGHistoryEvent.getHistoryEvent().getEventType();
        TezDAGID dagID = dAGHistoryEvent.getDagID();
        if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
            DAGSubmittedEvent historyEvent = dAGHistoryEvent.getHistoryEvent();
            String dAGName = historyEvent.getDAGName();
            if (dAGName != null && dAGName.startsWith("TezPreWarmDAG")) {
                this.skippedDAGs.add(dagID);
                return false;
            }
            if (this.historyACLPolicyManager != null && (str = historyEvent.getConf().get("tez.yarn.ats.acl.dag.domain.id")) != null) {
                this.dagDomainIdMap.put(dagID, str);
            }
        }
        if (eventType.equals(HistoryEventType.DAG_FINISHED) && this.skippedDAGs.remove(dagID)) {
            return false;
        }
        return dagID == null || !this.skippedDAGs.contains(dagID);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleEvents(List<DAGHistoryEvent> list) {
        TimelineEntity[] timelineEntityArr = new TimelineEntity[list.size()];
        for (int i = 0; i < list.size(); i++) {
            DAGHistoryEvent dAGHistoryEvent = list.get(i);
            String str = this.sessionDomainId;
            TezDAGID dagID = dAGHistoryEvent.getDagID();
            if (this.historyACLPolicyManager != null && dagID != null && this.dagDomainIdMap.containsKey(dagID)) {
                str = this.dagDomainIdMap.get(dagID);
            }
            timelineEntityArr[i] = HistoryEventTimelineConversion.convertToTimelineEntity(dAGHistoryEvent.getHistoryEvent());
            if (this.historyACLPolicyManager != null && str != null && !str.isEmpty()) {
                this.historyACLPolicyManager.updateTimelineEntityDomain(timelineEntityArr[i], str);
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending event batch to Timeline, batchSize=" + list.size());
        }
        try {
            TimelinePutResponse putEntities = this.timelineClient.putEntities(timelineEntityArr);
            if (putEntities != null && !putEntities.getErrors().isEmpty()) {
                int size = putEntities.getErrors().size();
                for (int i2 = 0; i2 < size; i2++) {
                    TimelinePutResponse.TimelinePutError timelinePutError = (TimelinePutResponse.TimelinePutError) putEntities.getErrors().get(i2);
                    if (timelinePutError.getErrorCode() != 0) {
                        LOG.warn("Could not post history event to ATS, atsPutError=" + timelinePutError.getErrorCode() + ", entityId=" + timelineEntityArr[i2].getEntityId() + ", eventType=" + list.get(i2).getHistoryEvent().getEventType());
                    }
                }
            }
        } catch (Exception e) {
            LOG.warn("Could not handle history events", e);
        }
    }

    static /* synthetic */ int access$104(ATSHistoryLoggingService aTSHistoryLoggingService) {
        int i = aTSHistoryLoggingService.eventCounter + 1;
        aTSHistoryLoggingService.eventCounter = i;
        return i;
    }

    static /* synthetic */ int access$212(ATSHistoryLoggingService aTSHistoryLoggingService, int i) {
        int i2 = aTSHistoryLoggingService.eventsProcessed + i;
        aTSHistoryLoggingService.eventsProcessed = i2;
        return i2;
    }
}
