package azkaban.flowtrigger;

import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutorManagerAdapter;
import azkaban.flow.FlowUtils;
import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
import azkaban.project.Project;
import azkaban.utils.EmailMessage;
import azkaban.utils.Emailer;
import azkaban.utils.TimeUtils;
import com.google.common.base.Preconditions;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:azkaban/flowtrigger/TriggerInstanceProcessor.class */
public class TriggerInstanceProcessor {
    private static final Logger logger = LoggerFactory.getLogger(TriggerInstanceProcessor.class);
    private static final String FAILURE_EMAIL_SUBJECT = "flow trigger for flow '%s', project '%s' has been cancelled on %s";
    private static final int THREAD_POOL_SIZE = 32;
    private final ExecutorManagerAdapter executorManager;
    private final FlowTriggerInstanceLoader flowTriggerInstanceLoader;
    private final Emailer emailer;
    private final ExecutorService executorService;

    @Inject
    public TriggerInstanceProcessor(ExecutorManagerAdapter executorManagerAdapter, FlowTriggerInstanceLoader flowTriggerInstanceLoader, Emailer emailer) {
        Preconditions.checkNotNull(executorManagerAdapter);
        Preconditions.checkNotNull(flowTriggerInstanceLoader);
        Preconditions.checkNotNull(emailer);
        this.emailer = emailer;
        this.executorManager = executorManagerAdapter;
        this.flowTriggerInstanceLoader = flowTriggerInstanceLoader;
        this.executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
    }

    private void executeFlowAndUpdateExecID(TriggerInstance triggerInstance) {
        try {
            Project project = triggerInstance.getProject();
            ExecutableFlow createExecutableFlow = FlowUtils.createExecutableFlow(project, FlowUtils.getFlow(project, triggerInstance.getFlowId()));
            this.executorManager.submitExecutableFlow(createExecutableFlow, triggerInstance.getSubmitUser());
            triggerInstance.setFlowExecId(createExecutableFlow.getExecutionId());
        } catch (Exception e) {
            logger.error("exception when executing the associated flow and updating flow exec id for trigger instance[id: {}]", triggerInstance.getId(), e);
            triggerInstance.setFlowExecId(-2);
        }
        this.flowTriggerInstanceLoader.updateAssociatedFlowExecId(triggerInstance);
    }

    private String generateFailureEmailSubject(TriggerInstance triggerInstance) {
        return String.format(FAILURE_EMAIL_SUBJECT, triggerInstance.getFlowId(), triggerInstance.getProjectName(), this.emailer.getAzkabanName());
    }

    private EmailMessage createFlowTriggerFailureEmailMessage(TriggerInstance triggerInstance) {
        EmailMessage createEmailMessage = this.emailer.createEmailMessage(generateFailureEmailSubject(triggerInstance), "text/html", triggerInstance.getFailureEmails());
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        createEmailMessage.addAllToAddress(triggerInstance.getFailureEmails());
        createEmailMessage.setMimeType("text/html");
        createEmailMessage.println("<table>");
        createEmailMessage.println("<tr><td>Start Time</td><td>");
        createEmailMessage.println("<tr><td>" + simpleDateFormat.format(new Date(triggerInstance.getStartTime())) + "</td><td>");
        createEmailMessage.println("<tr><td>End Time</td><td>");
        createEmailMessage.println("<tr><td>" + simpleDateFormat.format(new Date(triggerInstance.getEndTime())) + "</td><td>");
        createEmailMessage.println("<tr><td>Duration</td><td>" + TimeUtils.formatDuration(triggerInstance.getStartTime(), triggerInstance.getEndTime()) + "</td></tr>");
        createEmailMessage.println("<tr><td>Status</td><td>" + triggerInstance.getStatus() + "</td></tr>");
        createEmailMessage.println("</table>");
        createEmailMessage.println("");
        createEmailMessage.println("<a href=\"" + (this.emailer.getAzkabanURL() + "/executor?triggerinstanceid=" + triggerInstance.getId()) + "\">" + triggerInstance.getFlowId() + " Flow Trigger Instance Link</a>");
        createEmailMessage.println("");
        createEmailMessage.println("<h3>Cancelled Dependencies</h3>");
        for (DependencyInstance dependencyInstance : triggerInstance.getDepInstances()) {
            if (dependencyInstance.getStatus() == Status.CANCELLED) {
                createEmailMessage.println("<table>");
                createEmailMessage.println("<tr><td>Dependency Name: " + dependencyInstance.getDepName() + "</td><td>");
                createEmailMessage.println("<tr><td>Cancellation Cause: " + dependencyInstance.getCancellationCause() + "</td><td>");
                createEmailMessage.println("</table>");
            }
        }
        return createEmailMessage;
    }

    private void sendFailureEmailIfConfigured(TriggerInstance triggerInstance) {
        if (triggerInstance.getFailureEmails().isEmpty()) {
            return;
        }
        this.emailer.sendEmail(createFlowTriggerFailureEmailMessage(triggerInstance), true, "email message failure email for flow trigger " + triggerInstance.getId());
    }

    public void processSucceed(TriggerInstance triggerInstance) {
        this.executorService.submit(() -> {
            executeFlowAndUpdateExecID(triggerInstance);
        });
    }

    public void processTermination(TriggerInstance triggerInstance) {
        this.executorService.submit(() -> {
            sendFailureEmailIfConfigured(triggerInstance);
        });
    }

    public void processNewInstance(TriggerInstance triggerInstance) {
        this.flowTriggerInstanceLoader.uploadTriggerInstance(triggerInstance);
    }

    public void shutdown() {
        this.executorService.shutdown();
        this.executorService.shutdownNow();
    }
}
