package com.netflix.conductor.client.automator;

import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Uninterruptibles;
import com.netflix.conductor.client.automator.filters.PollFilter;
import com.netflix.conductor.client.config.PropertyFactory;
import com.netflix.conductor.client.events.dispatcher.EventDispatcher;
import com.netflix.conductor.client.events.taskrunner.PollCompleted;
import com.netflix.conductor.client.events.taskrunner.PollFailure;
import com.netflix.conductor.client.events.taskrunner.PollStarted;
import com.netflix.conductor.client.events.taskrunner.TaskExecutionCompleted;
import com.netflix.conductor.client.events.taskrunner.TaskExecutionFailure;
import com.netflix.conductor.client.events.taskrunner.TaskExecutionStarted;
import com.netflix.conductor.client.events.taskrunner.TaskRunnerEvent;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.Thread;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netflix/conductor/client/automator/TaskRunner.class */
class TaskRunner {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) TaskRunner.class);
    private final TaskClient taskClient;
    private final int updateRetryCount;
    private final ExecutorService executorService;
    private final int taskPollTimeout;
    private final Semaphore permits;
    private final Worker worker;
    private final int pollingIntervalInMillis;
    private final String taskType;
    private final int errorAt;
    private int pollingErrorCount;
    private String domain;
    private final List<PollFilter> pollFilters;
    private final EventDispatcher<TaskRunnerEvent> eventDispatcher;
    private volatile boolean pollingAndExecuting = true;
    private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = (thread, th) -> {
        LOGGER.error("Uncaught exception. Thread {} will exit now", thread, th);
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    public TaskRunner(Worker worker, TaskClient taskClient, int i, Map<String, String> map, String str, int i2, int i3, List<PollFilter> list, EventDispatcher<TaskRunnerEvent> eventDispatcher) {
        this.worker = worker;
        this.taskClient = taskClient;
        this.updateRetryCount = i;
        this.taskPollTimeout = i3;
        this.pollingIntervalInMillis = worker.getPollingInterval();
        this.taskType = worker.getTaskDefName();
        this.permits = new Semaphore(i2);
        this.pollFilters = list;
        this.eventDispatcher = eventDispatcher;
        this.domain = PropertyFactory.getString(this.taskType, Worker.PROP_DOMAIN, null);
        if (this.domain == null) {
            this.domain = PropertyFactory.getString(Worker.PROP_ALL_WORKERS, Worker.PROP_DOMAIN, null);
        }
        if (this.domain == null) {
            this.domain = map.get(this.taskType);
        }
        int intValue = PropertyFactory.getInteger(this.taskType, Worker.PROP_LOG_INTERVAL, 0).intValue();
        intValue = intValue == 0 ? PropertyFactory.getInteger(Worker.PROP_ALL_WORKERS, Worker.PROP_LOG_INTERVAL, 0).intValue() : intValue;
        this.errorAt = intValue == 0 ? 100 : intValue;
        LOGGER.info("Polling errors will be sampled at every {} error (after the first 100 errors) for taskType {}", Integer.valueOf(this.errorAt), this.taskType);
        this.executorService = Executors.newFixedThreadPool(i2, new BasicThreadFactory.Builder().namingPattern(str).uncaughtExceptionHandler(this.uncaughtExceptionHandler).build());
        LOGGER.info("Starting Worker for taskType '{}' with {} threads, {} ms polling interval and domain {}", this.taskType, Integer.valueOf(i2), Integer.valueOf(this.pollingIntervalInMillis), this.domain);
        LOGGER.info("Polling errors for taskType {} will be printed at every {} occurrence.", this.taskType, Integer.valueOf(this.errorAt));
    }

    public void pollAndExecute() {
        Stopwatch stopwatch = null;
        while (this.pollingAndExecuting && !Thread.currentThread().isInterrupted()) {
            try {
                List<Task> pollTasksForWorker = pollTasksForWorker();
                if (pollTasksForWorker.isEmpty()) {
                    if (stopwatch == null) {
                        stopwatch = Stopwatch.createStarted();
                    }
                    Uninterruptibles.sleepUninterruptibly(this.pollingIntervalInMillis, TimeUnit.MILLISECONDS);
                } else {
                    if (stopwatch != null) {
                        stopwatch.stop();
                        LOGGER.trace("Poller for task {} waited for {} ms before getting {} tasks to execute", this.taskType, Long.valueOf(stopwatch.elapsed(TimeUnit.MILLISECONDS)), Integer.valueOf(pollTasksForWorker.size()));
                        stopwatch = null;
                    }
                    pollTasksForWorker.forEach(task -> {
                        this.executorService.submit(() -> {
                            processTask(task);
                        });
                    });
                }
            } catch (Throwable th) {
                LOGGER.error(th.getMessage(), th);
            }
        }
    }

    public void shutdown(int i) {
        try {
            this.pollingAndExecuting = false;
            this.executorService.shutdown();
            if (this.executorService.awaitTermination(i, TimeUnit.SECONDS)) {
                LOGGER.debug("tasks completed, shutting down");
            } else {
                LOGGER.warn(String.format("forcing shutdown after waiting for %s second", Integer.valueOf(i)));
                this.executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            LOGGER.warn("shutdown interrupted, invoking shutdownNow");
            this.executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    private List<Task> pollTasksForWorker() {
        this.eventDispatcher.publish(new PollStarted(this.taskType));
        if (this.worker.paused()) {
            LOGGER.trace("Worker {} has been paused. Not polling anymore!", this.worker.getClass());
            return List.of();
        }
        Iterator<PollFilter> it = this.pollFilters.iterator();
        while (it.hasNext()) {
            if (!it.next().filter(this.taskType, this.domain)) {
                LOGGER.trace("Filter returned false, not polling.");
                return List.of();
            }
        }
        int i = 0;
        while (this.permits.tryAcquire()) {
            i++;
        }
        if (i == 0) {
            return List.of();
        }
        List<Task> linkedList = new LinkedList();
        Stopwatch createStarted = Stopwatch.createStarted();
        try {
            LOGGER.trace("Polling task of type: {} in domain: '{}' with size {}", this.taskType, this.domain, Integer.valueOf(i));
            linkedList = pollTask(this.domain, i);
            this.permits.release(i - linkedList.size());
            createStarted.stop();
            long elapsed = createStarted.elapsed(TimeUnit.MILLISECONDS);
            LOGGER.debug("Time taken to poll {} task with a batch size of {} is {} ms", this.taskType, Integer.valueOf(linkedList.size()), Long.valueOf(elapsed));
            this.eventDispatcher.publish(new PollCompleted(this.taskType, elapsed));
        } catch (Throwable th) {
            this.permits.release(i - linkedList.size());
            boolean z = this.pollingErrorCount < 100 || this.pollingErrorCount % this.errorAt == 0;
            this.pollingErrorCount++;
            if (this.pollingErrorCount > 10000000) {
                this.pollingErrorCount = 0;
            }
            if (z) {
                LOGGER.error("Error polling for taskType: {}, error = {}", this.taskType, th.getMessage(), th);
            }
            if (createStarted.isRunning()) {
                createStarted.stop();
            }
            this.eventDispatcher.publish(new PollFailure(this.taskType, createStarted.elapsed(TimeUnit.MILLISECONDS), th));
        }
        return linkedList;
    }

    private List<Task> pollTask(String str, int i) {
        if (i < 1) {
            return Collections.emptyList();
        }
        String identity = this.worker.getIdentity();
        LOGGER.debug("poll {} in the domain {} with batch size {}", this.taskType, str, Integer.valueOf(i));
        return this.taskClient.batchPollTasksInDomain(this.taskType, str, identity, i, this.taskPollTimeout);
    }

    private void processTask(Task task) {
        this.eventDispatcher.publish(new TaskExecutionStarted(this.taskType, task.getTaskId(), this.worker.getIdentity()));
        LOGGER.trace("Executing task: {} of type: {} in worker: {} at {}", task.getTaskId(), this.taskType, this.worker.getClass().getSimpleName(), this.worker.getIdentity());
        LOGGER.trace("task {} is getting executed after {} ms of getting polled", task.getTaskId(), Long.valueOf(System.currentTimeMillis() - task.getStartTime()));
        Stopwatch createStarted = Stopwatch.createStarted();
        try {
            try {
                executeTask(this.worker, task);
                createStarted.stop();
                LOGGER.trace("Took {} ms to execute and update task with id {}", Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)), task.getTaskId());
                this.permits.release();
            } catch (Throwable th) {
                task.setStatus(Task.Status.FAILED);
                handleException(th, new TaskResult(task), this.worker, task);
                this.permits.release();
            }
        } catch (Throwable th2) {
            this.permits.release();
            throw th2;
        }
    }

    private void executeTask(Worker worker, Task task) {
        if (task == null || task.getTaskDefName().isEmpty()) {
            LOGGER.warn("Empty task {}", worker.getTaskDefName());
            return;
        }
        Stopwatch createStarted = Stopwatch.createStarted();
        TaskResult taskResult = null;
        try {
            LOGGER.trace("Executing task: {} in worker: {} at {}", task.getTaskId(), worker.getClass().getSimpleName(), worker.getIdentity());
            taskResult = worker.execute(task);
            createStarted.stop();
            this.eventDispatcher.publish(new TaskExecutionCompleted(this.taskType, task.getTaskId(), worker.getIdentity(), createStarted.elapsed(TimeUnit.MILLISECONDS)));
            taskResult.setWorkflowInstanceId(task.getWorkflowInstanceId());
            taskResult.setTaskId(task.getTaskId());
            taskResult.setWorkerId(worker.getIdentity());
        } catch (Exception e) {
            if (createStarted.isRunning()) {
                createStarted.stop();
            }
            this.eventDispatcher.publish(new TaskExecutionFailure(this.taskType, task.getTaskId(), worker.getIdentity(), e, createStarted.elapsed(TimeUnit.MILLISECONDS)));
            LOGGER.error("Unable to execute task: {} of type: {}", task.getTaskId(), task.getTaskDefName(), e);
            if (taskResult == null) {
                task.setStatus(Task.Status.FAILED);
                taskResult = new TaskResult(task);
            }
            handleException(e, taskResult, worker, task);
        }
        LOGGER.trace("Task: {} executed by worker: {} at {} with status: {}", task.getTaskId(), worker.getClass().getSimpleName(), worker.getIdentity(), taskResult.getStatus());
        Stopwatch createStarted2 = Stopwatch.createStarted();
        updateTaskResult(this.updateRetryCount, task, taskResult, worker);
        createStarted2.stop();
        LOGGER.trace("Time taken to update the {} {} ms", task.getTaskType(), Long.valueOf(createStarted2.elapsed(TimeUnit.MILLISECONDS)));
    }

    private void updateTaskResult(int i, Task task, TaskResult taskResult, Worker worker) {
        try {
            Optional optional = (Optional) retryOperation(taskResult2 -> {
                return upload(taskResult2, task.getTaskType());
            }, i, taskResult, "evaluateAndUploadLargePayload");
            if (optional.isPresent()) {
                taskResult.setExternalOutputPayloadStoragePath((String) optional.get());
                taskResult.setOutputData(null);
            }
            retryOperation(taskResult3 -> {
                this.taskClient.updateTask(taskResult3);
                return null;
            }, i, taskResult, "updateTask");
        } catch (Exception e) {
            worker.onErrorUpdate(task);
            LOGGER.error(String.format("Failed to update result: %s for task: %s in worker: %s", taskResult.toString(), task.getTaskDefName(), worker.getIdentity()), (Throwable) e);
        }
    }

    private Optional<String> upload(TaskResult taskResult, String str) {
        try {
            return this.taskClient.evaluateAndUploadLargePayload(taskResult.getOutputData(), str);
        } catch (IllegalArgumentException e) {
            taskResult.setReasonForIncompletion(e.getMessage());
            taskResult.setOutputData(null);
            taskResult.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR);
            return Optional.empty();
        }
    }

    private <T, R> R retryOperation(Function<T, R> function, int i, T t, String str) {
        int i2 = 0;
        while (i2 < i) {
            try {
                return function.apply(t);
            } catch (Exception e) {
                LOGGER.error("Error executing {}", str, e);
                i2++;
                Uninterruptibles.sleepUninterruptibly(500 * (i + 1), TimeUnit.MILLISECONDS);
            }
        }
        throw new RuntimeException("Exhausted retries performing " + str);
    }

    private void handleException(Throwable th, TaskResult taskResult, Worker worker, Task task) {
        LOGGER.error(String.format("Error while executing task %s", task.toString()), th);
        taskResult.setStatus(TaskResult.Status.FAILED);
        taskResult.setReasonForIncompletion("Error while executing the task: " + th);
        StringWriter stringWriter = new StringWriter();
        th.printStackTrace(new PrintWriter(stringWriter));
        taskResult.log(stringWriter.toString());
        updateTaskResult(this.updateRetryCount, task, taskResult, worker);
    }
}
