package azkaban.flowtrigger;

import azkaban.flowtrigger.database.FlowTriggerInstanceLoader;
import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginException;
import azkaban.flowtrigger.plugin.FlowTriggerDependencyPluginManager;
import azkaban.project.FlowTrigger;
import azkaban.project.FlowTriggerDependency;
import azkaban.project.Project;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:azkaban/flowtrigger/FlowTriggerService.class */
public class FlowTriggerService {
    private static final Logger logger = LoggerFactory.getLogger(FlowTriggerService.class);
    private static final Duration CANCELLING_GRACE_PERIOD_AFTER_RESTART = Duration.ofMinutes(1);
    private static final int RECENTLY_FINISHED_TRIGGER_LIMIT = 50;
    private static final int CANCEL_EXECUTOR_POOL_SIZE = 32;
    private static final int TIMEOUT_EXECUTOR_POOL_SIZE = 8;
    private final ExecutorService flowTriggerExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("FlowTrigger-service").build());
    private final ExecutorService cancelExecutorService = Executors.newFixedThreadPool(CANCEL_EXECUTOR_POOL_SIZE, new ThreadFactoryBuilder().setNameFormat("FlowTrigger-cancel").build());
    private final ScheduledExecutorService timeoutService = Executors.newScheduledThreadPool(TIMEOUT_EXECUTOR_POOL_SIZE);
    private final List<TriggerInstance> runningTriggers = new ArrayList();
    private final FlowTriggerDependencyPluginManager triggerPluginManager;
    private final TriggerInstanceProcessor triggerProcessor;
    private final FlowTriggerInstanceLoader flowTriggerInstanceLoader;
    private final DependencyInstanceProcessor dependencyProcessor;
    private final FlowTriggerExecutionCleaner cleaner;

    @Inject
    public FlowTriggerService(FlowTriggerDependencyPluginManager flowTriggerDependencyPluginManager, TriggerInstanceProcessor triggerInstanceProcessor, DependencyInstanceProcessor dependencyInstanceProcessor, FlowTriggerInstanceLoader flowTriggerInstanceLoader, FlowTriggerExecutionCleaner flowTriggerExecutionCleaner) {
        this.triggerPluginManager = flowTriggerDependencyPluginManager;
        this.triggerProcessor = triggerInstanceProcessor;
        this.dependencyProcessor = dependencyInstanceProcessor;
        this.flowTriggerInstanceLoader = flowTriggerInstanceLoader;
        this.cleaner = flowTriggerExecutionCleaner;
    }

    public void start() throws FlowTriggerDependencyPluginException {
        this.triggerPluginManager.loadAllPlugins();
        recoverIncompleteTriggerInstances();
        this.cleaner.start();
    }

    private DependencyInstanceContext createDepContext(FlowTriggerDependency flowTriggerDependency, long j, String str) throws Exception {
        DependencyCheck dependencyCheck = this.triggerPluginManager.getDependencyCheck(flowTriggerDependency.getType());
        DependencyInstanceCallbackImpl dependencyInstanceCallbackImpl = new DependencyInstanceCallbackImpl(this);
        HashMap hashMap = new HashMap();
        hashMap.putAll(flowTriggerDependency.getProps());
        hashMap.put("name", flowTriggerDependency.getName());
        return dependencyCheck.run(new DependencyInstanceConfigImpl(hashMap), new DependencyInstanceRuntimePropsImpl(ImmutableMap.of("startTime", String.valueOf(j), "triggerInstanceId", str)), dependencyInstanceCallbackImpl);
    }

    private TriggerInstance createTriggerInstance(FlowTrigger flowTrigger, String str, int i, String str2, Project project) {
        String generateId = generateId();
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        for (FlowTriggerDependency flowTriggerDependency : flowTrigger.getDependencies()) {
            String name = flowTriggerDependency.getName();
            DependencyInstanceContext dependencyInstanceContext = null;
            try {
                dependencyInstanceContext = createDepContext(flowTriggerDependency, currentTimeMillis, generateId);
            } catch (Exception e) {
                logger.error("unable to create dependency context for trigger instance[id = {}]", generateId, e);
            }
            arrayList.add(new DependencyInstance(name, currentTimeMillis, dependencyInstanceContext == null ? System.currentTimeMillis() : 0L, dependencyInstanceContext, dependencyInstanceContext == null ? Status.CANCELLED : Status.RUNNING, dependencyInstanceContext == null ? CancellationCause.FAILURE : CancellationCause.NONE));
        }
        return new TriggerInstance(generateId, flowTrigger, str, i, str2, arrayList, -1, project);
    }

    private String generateId() {
        return UUID.randomUUID().toString();
    }

    private void scheduleKill(TriggerInstance triggerInstance, Duration duration, CancellationCause cancellationCause) {
        logger.debug("cancel trigger instance {} in {} secs", triggerInstance.getId(), Long.valueOf(duration.getSeconds()));
        this.timeoutService.schedule(() -> {
            cancelTriggerInstance(triggerInstance, cancellationCause);
        }, duration.toMillis(), TimeUnit.MILLISECONDS);
    }

    public Collection<TriggerInstance> getRunningTriggers() {
        return this.flowTriggerInstanceLoader.getRunning();
    }

    public Collection<TriggerInstance> getRecentlyFinished() {
        return this.flowTriggerInstanceLoader.getRecentlyFinished(RECENTLY_FINISHED_TRIGGER_LIMIT);
    }

    public TriggerInstance findTriggerInstanceById(String str) {
        return this.flowTriggerInstanceLoader.getTriggerInstanceById(str);
    }

    public TriggerInstance findTriggerInstanceByExecId(int i) {
        return this.flowTriggerInstanceLoader.getTriggerInstanceByFlowExecId(i);
    }

    private boolean isDoneButFlowNotExecuted(TriggerInstance triggerInstance) {
        return triggerInstance.getStatus() == Status.SUCCEEDED && triggerInstance.getFlowExecId() == -1;
    }

    private void recoverRunningOrCancelling(TriggerInstance triggerInstance) {
        FlowTrigger flowTrigger = triggerInstance.getFlowTrigger();
        for (DependencyInstance dependencyInstance : triggerInstance.getDepInstances()) {
            if (dependencyInstance.getStatus() == Status.RUNNING || dependencyInstance.getStatus() == Status.CANCELLING) {
                DependencyInstanceContext dependencyInstanceContext = null;
                try {
                    dependencyInstanceContext = createDepContext(flowTrigger.getDependencyByName(dependencyInstance.getDepName()), dependencyInstance.getStartTime(), dependencyInstance.getTriggerInstance().getId());
                } catch (Exception e) {
                    logger.error("unable to create dependency context for trigger instance[id = {}]", triggerInstance.getId(), e);
                }
                dependencyInstance.setDependencyInstanceContext(dependencyInstanceContext);
                if (dependencyInstanceContext == null) {
                    dependencyInstance.setStatus(Status.CANCELLED);
                    dependencyInstance.setCancellationCause(CancellationCause.FAILURE);
                }
            }
        }
        if (triggerInstance.getStatus() == Status.CANCELLING) {
            addToRunningListAndCancel(triggerInstance);
        } else if (triggerInstance.getStatus() == Status.RUNNING) {
            addToRunningListAndScheduleKill(triggerInstance, Duration.ofMillis(remainingTimeBeforeTimeout(triggerInstance)).plus(CANCELLING_GRACE_PERIOD_AFTER_RESTART), CancellationCause.TIMEOUT);
        }
    }

    private void recoverTriggerInstance(TriggerInstance triggerInstance) {
        this.flowTriggerExecutorService.submit(() -> {
            recover(triggerInstance);
        });
    }

    private void recover(TriggerInstance triggerInstance) {
        logger.info("recovering pending trigger instance {}", triggerInstance.getId());
        if (isDoneButFlowNotExecuted(triggerInstance)) {
            this.triggerProcessor.processSucceed(triggerInstance);
        } else {
            recoverRunningOrCancelling(triggerInstance);
        }
    }

    public void recoverIncompleteTriggerInstances() {
        for (TriggerInstance triggerInstance : this.flowTriggerInstanceLoader.getIncompleteTriggerInstances()) {
            if (triggerInstance.getFlowTrigger() != null) {
                recoverTriggerInstance(triggerInstance);
            } else {
                logger.error("cannot recover the trigger instance {}, flow trigger is null, cancelling it ", triggerInstance.getId());
                if (isDoneButFlowNotExecuted(triggerInstance)) {
                    triggerInstance.setFlowExecId(-2);
                    this.flowTriggerInstanceLoader.updateAssociatedFlowExecId(triggerInstance);
                } else {
                    for (DependencyInstance dependencyInstance : triggerInstance.getDepInstances()) {
                        if (!Status.isDone(dependencyInstance.getStatus())) {
                            processStatusAndCancelCauseUpdate(dependencyInstance, Status.CANCELLED, CancellationCause.FAILURE);
                            this.triggerProcessor.processTermination(dependencyInstance.getTriggerInstance());
                        }
                    }
                }
            }
        }
    }

    private void addToRunningListAndScheduleKill(TriggerInstance triggerInstance, Duration duration, CancellationCause cancellationCause) {
        if (Status.isDone(triggerInstance.getStatus())) {
            return;
        }
        this.runningTriggers.add(triggerInstance);
        scheduleKill(triggerInstance, duration, cancellationCause);
    }

    private CancellationCause getCancelleationCause(TriggerInstance triggerInstance) {
        Set set = (Set) triggerInstance.getDepInstances().stream().map((v0) -> {
            return v0.getCancellationCause();
        }).collect(Collectors.toSet());
        return (set.contains(CancellationCause.FAILURE) || set.contains(CancellationCause.CASCADING)) ? CancellationCause.CASCADING : set.contains(CancellationCause.TIMEOUT) ? CancellationCause.TIMEOUT : set.contains(CancellationCause.MANUAL) ? CancellationCause.MANUAL : CancellationCause.NONE;
    }

    private void cancelTriggerInstance(TriggerInstance triggerInstance) {
        logger.debug("cancelling trigger instance of exec id" + triggerInstance.getId());
        CancellationCause cancelleationCause = getCancelleationCause(triggerInstance);
        for (DependencyInstance dependencyInstance : triggerInstance.getDepInstances()) {
            if (dependencyInstance.getStatus() == Status.CANCELLING) {
                cancelContextAsync(dependencyInstance.getContext());
            } else if (dependencyInstance.getStatus() == Status.RUNNING) {
                processStatusAndCancelCauseUpdate(dependencyInstance, Status.CANCELLING, cancelleationCause);
                cancelContextAsync(dependencyInstance.getContext());
            }
        }
    }

    private void addToRunningListAndCancel(TriggerInstance triggerInstance) {
        this.runningTriggers.add(triggerInstance);
        cancelTriggerInstance(triggerInstance);
    }

    private void updateDepInstStatus(DependencyInstance dependencyInstance, Status status) {
        dependencyInstance.setStatus(status);
        if (Status.isDone(dependencyInstance.getStatus())) {
            dependencyInstance.setEndTime(System.currentTimeMillis());
        }
    }

    private void processStatusUpdate(DependencyInstance dependencyInstance, Status status) {
        logger.debug("process status update for " + dependencyInstance);
        updateDepInstStatus(dependencyInstance, status);
        this.dependencyProcessor.processStatusUpdate(dependencyInstance);
    }

    private void processStatusAndCancelCauseUpdate(DependencyInstance dependencyInstance, Status status, CancellationCause cancellationCause) {
        dependencyInstance.setCancellationCause(cancellationCause);
        updateDepInstStatus(dependencyInstance, status);
        this.dependencyProcessor.processStatusUpdate(dependencyInstance);
    }

    private long remainingTimeBeforeTimeout(TriggerInstance triggerInstance) {
        return Math.max(0L, ((Duration) triggerInstance.getFlowTrigger().getMaxWaitDuration().get()).toMillis() - (System.currentTimeMillis() - triggerInstance.getStartTime()));
    }

    public void startTrigger(FlowTrigger flowTrigger, String str, int i, String str2, Project project) {
        TriggerInstance createTriggerInstance = createTriggerInstance(flowTrigger, str, i, str2, project);
        this.flowTriggerExecutorService.submit(() -> {
            logger.info("Starting the flow trigger [trigger instance id: {}] by {}", createTriggerInstance.getId(), str2);
            start(createTriggerInstance);
        });
    }

    private void start(TriggerInstance triggerInstance) {
        this.triggerProcessor.processNewInstance(triggerInstance);
        if (triggerInstance.getStatus() == Status.CANCELLED) {
            logger.info("Trigger instance[id: {}] is cancelled since all dependency instances fail to be created", triggerInstance.getId());
            this.triggerProcessor.processTermination(triggerInstance);
        } else if (triggerInstance.getStatus() == Status.CANCELLING) {
            logger.info("Trigger instance[id: {}] is being cancelled since some dependency instances fail to be created", triggerInstance.getId());
            addToRunningListAndCancel(triggerInstance);
        } else if (triggerInstance.getStatus() == Status.SUCCEEDED) {
            this.triggerProcessor.processSucceed(triggerInstance);
        } else {
            addToRunningListAndScheduleKill(triggerInstance, (Duration) triggerInstance.getFlowTrigger().getMaxWaitDuration().get(), CancellationCause.TIMEOUT);
        }
    }

    public TriggerInstance findRunningTriggerInstById(String str) {
        try {
            return (TriggerInstance) this.flowTriggerExecutorService.submit(() -> {
                return getTriggerInstanceById(str);
            }).get();
        } catch (Exception e) {
            logger.error("exception when finding trigger instance by id" + str, e);
            return null;
        }
    }

    private TriggerInstance getTriggerInstanceById(String str) {
        return this.runningTriggers.stream().filter(triggerInstance -> {
            return triggerInstance.getId().equals(str);
        }).findFirst().orElse(null);
    }

    private void cancelContextAsync(DependencyInstanceContext dependencyInstanceContext) {
        this.cancelExecutorService.submit(() -> {
            dependencyInstanceContext.cancel();
        });
    }

    public void cancelTriggerInstance(TriggerInstance triggerInstance, CancellationCause cancellationCause) {
        if (triggerInstance.getStatus() == Status.RUNNING) {
            this.flowTriggerExecutorService.submit(() -> {
                cancel(triggerInstance, cancellationCause);
            });
        }
    }

    private void cancel(TriggerInstance triggerInstance, CancellationCause cancellationCause) {
        logger.info("cancelling trigger instance with id {}", triggerInstance.getId());
        if (triggerInstance == null) {
            logger.debug("unable to cancel a trigger instance in non-running state with id {}", triggerInstance.getId());
            return;
        }
        for (DependencyInstance dependencyInstance : triggerInstance.getDepInstances()) {
            if (dependencyInstance.getStatus() == Status.RUNNING) {
                processStatusAndCancelCauseUpdate(dependencyInstance, Status.CANCELLING, cancellationCause);
                cancelContextAsync(dependencyInstance.getContext());
            }
        }
    }

    private DependencyInstance findDependencyInstanceByContext(DependencyInstanceContext dependencyInstanceContext) {
        return (DependencyInstance) this.runningTriggers.stream().flatMap(triggerInstance -> {
            return triggerInstance.getDepInstances().stream();
        }).filter(dependencyInstance -> {
            return dependencyInstance.getContext() != null && dependencyInstance.getContext() == dependencyInstanceContext;
        }).findFirst().orElse(null);
    }

    public void markDependencySuccess(DependencyInstanceContext dependencyInstanceContext) {
        this.flowTriggerExecutorService.submit(() -> {
            markSuccess(dependencyInstanceContext);
        });
    }

    private void markSuccess(DependencyInstanceContext dependencyInstanceContext) {
        DependencyInstance findDependencyInstanceByContext = findDependencyInstanceByContext(dependencyInstanceContext);
        if (findDependencyInstanceByContext == null) {
            logger.debug("unable to find trigger instance with context {} when marking it success", dependencyInstanceContext);
            return;
        }
        if (Status.isDone(findDependencyInstanceByContext.getStatus())) {
            logger.warn("OnSuccess of dependency instance[id: {}, name: {}] is ignored", findDependencyInstanceByContext.getTriggerInstance().getId(), findDependencyInstanceByContext.getDepName());
            return;
        }
        processStatusAndCancelCauseUpdate(findDependencyInstanceByContext, Status.SUCCEEDED, CancellationCause.NONE);
        if (findDependencyInstanceByContext.getTriggerInstance().getStatus() == Status.SUCCEEDED) {
            logger.info("trigger instance[id: {}] succeeded", findDependencyInstanceByContext.getTriggerInstance().getId());
            this.triggerProcessor.processSucceed(findDependencyInstanceByContext.getTriggerInstance());
            this.runningTriggers.remove(findDependencyInstanceByContext.getTriggerInstance());
        }
    }

    private boolean cancelledByAzkaban(DependencyInstance dependencyInstance) {
        return dependencyInstance.getStatus() == Status.CANCELLING;
    }

    private boolean cancelledByDependencyPlugin(DependencyInstance dependencyInstance) {
        return dependencyInstance.getStatus() == Status.RUNNING;
    }

    public void markDependencyCancelled(DependencyInstanceContext dependencyInstanceContext) {
        this.flowTriggerExecutorService.submit(() -> {
            markCancelled(dependencyInstanceContext);
        });
    }

    private void markCancelled(DependencyInstanceContext dependencyInstanceContext) {
        DependencyInstance findDependencyInstanceByContext = findDependencyInstanceByContext(dependencyInstanceContext);
        if (findDependencyInstanceByContext == null) {
            logger.warn("unable to find trigger instance with context {} when marking it cancelled", dependencyInstanceContext);
            return;
        }
        if (cancelledByDependencyPlugin(findDependencyInstanceByContext)) {
            processStatusAndCancelCauseUpdate(findDependencyInstanceByContext, Status.CANCELLED, CancellationCause.FAILURE);
            cancelTriggerInstance(findDependencyInstanceByContext.getTriggerInstance());
        } else {
            if (!cancelledByAzkaban(findDependencyInstanceByContext)) {
                logger.warn("OnCancel of dependency instance[id: {}, name: {}] is ignored", findDependencyInstanceByContext.getTriggerInstance().getId(), findDependencyInstanceByContext.getDepName());
                return;
            }
            processStatusUpdate(findDependencyInstanceByContext, Status.CANCELLED);
        }
        if (findDependencyInstanceByContext.getTriggerInstance().getStatus() == Status.CANCELLED) {
            logger.info("trigger instance with execId {} is cancelled", findDependencyInstanceByContext.getTriggerInstance().getId());
            this.triggerProcessor.processTermination(findDependencyInstanceByContext.getTriggerInstance());
            this.runningTriggers.remove(findDependencyInstanceByContext.getTriggerInstance());
        }
    }

    public void shutdown() {
        this.flowTriggerExecutorService.shutdown();
        this.cancelExecutorService.shutdown();
        this.timeoutService.shutdown();
        this.flowTriggerExecutorService.shutdownNow();
        this.cancelExecutorService.shutdownNow();
        this.timeoutService.shutdownNow();
        this.triggerProcessor.shutdown();
        this.triggerPluginManager.shutdown();
        this.cleaner.shutdown();
    }

    public Collection<TriggerInstance> getTriggerInstances(int i, String str, int i2, int i3) {
        return this.flowTriggerInstanceLoader.getTriggerInstances(i, str, i2, i3);
    }
}
