package org.apache.james.task;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.james.task.Task;
import org.apache.james.task.TaskExecutionDetails;
import org.apache.james.task.TaskManagerWorker;
import org.apache.james.util.MDCBuilder;
import org.apache.james.util.concurrent.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

/* loaded from: input_file:org/apache/james/task/SerialTaskManagerWorker.class */
public class SerialTaskManagerWorker implements TaskManagerWorker {
    private static final Logger LOGGER = LoggerFactory.getLogger(SerialTaskManagerWorker.class);
    private final TaskManagerWorker.Listener listener;
    private final Duration pollingInterval;
    private final ExecutorService taskExecutor = Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor"));
    private final Set<TaskId> cancelledTasks = Sets.newConcurrentHashSet();
    private final AtomicReference<Tuple2<TaskId, Future<?>>> runningTask = new AtomicReference<>();

    public SerialTaskManagerWorker(TaskManagerWorker.Listener listener, Duration duration) {
        this.pollingInterval = duration;
        this.listener = listener;
    }

    @Override // org.apache.james.task.TaskManagerWorker
    public Mono<Task.Result> executeTask(TaskWithId taskWithId) {
        if (this.cancelledTasks.remove(taskWithId.getId())) {
            this.listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
            return Mono.empty();
        }
        CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> {
            return runWithMdc(taskWithId, this.listener);
        }, this.taskExecutor);
        this.runningTask.set(Tuples.of(taskWithId.getId(), supplyAsync));
        return Mono.using(() -> {
            return pollAdditionalInformation(taskWithId).subscribe();
        }, disposable -> {
            return Mono.fromFuture(supplyAsync).doOnError(th -> {
                handleExecutionError(taskWithId, this.listener, th);
            }).onErrorReturn(Task.Result.PARTIAL);
        }, (v0) -> {
            v0.dispose();
        });
    }

    private void handleExecutionError(TaskWithId taskWithId, TaskManagerWorker.Listener listener, Throwable th) {
        if (th instanceof CancellationException) {
            listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
        } else {
            listener.failed(taskWithId.getId(), taskWithId.getTask().details(), th);
        }
    }

    private Flux<TaskExecutionDetails.AdditionalInformation> pollAdditionalInformation(TaskWithId taskWithId) {
        return Mono.fromCallable(() -> {
            return taskWithId.getTask().details();
        }).delayElement(this.pollingInterval, Schedulers.elastic()).repeat().handle((optional, synchronousSink) -> {
            Objects.requireNonNull(synchronousSink);
            optional.ifPresent((v1) -> {
                r1.next(v1);
            });
        }).doOnNext(additionalInformation -> {
            this.listener.updated(taskWithId.getId(), additionalInformation);
        });
    }

    private Task.Result runWithMdc(TaskWithId taskWithId, TaskManagerWorker.Listener listener) {
        return (Task.Result) MDCBuilder.withMdc(MDCBuilder.create().addContext("taskId", taskWithId.getId()).addContext("taskType", taskWithId.getTask().type()).addContext("taskDetails", taskWithId.getTask().details()), () -> {
            return run(taskWithId, listener);
        });
    }

    private Task.Result run(TaskWithId taskWithId, TaskManagerWorker.Listener listener) {
        listener.started(taskWithId.getId());
        try {
            return taskWithId.getTask().run().onComplete(new Task.CompletionOperation[]{result -> {
                listener.completed(taskWithId.getId(), result, taskWithId.getTask().details());
            }}).onFailure(new Task.Operation[]{() -> {
                LOGGER.error("Task was partially performed. Check logs for more details. Taskid : " + taskWithId.getId());
                listener.failed(taskWithId.getId(), taskWithId.getTask().details());
            }});
        } catch (InterruptedException e) {
            listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
            return Task.Result.PARTIAL;
        } catch (Exception e2) {
            LOGGER.error("Error while running task {}", taskWithId.getId(), e2);
            listener.failed(taskWithId.getId(), taskWithId.getTask().details(), e2);
            return Task.Result.PARTIAL;
        }
    }

    @Override // org.apache.james.task.TaskManagerWorker
    public void cancelTask(TaskId taskId) {
        this.cancelledTasks.add(taskId);
        Optional.ofNullable(this.runningTask.get()).filter(tuple2 -> {
            return ((TaskId) tuple2.getT1()).equals(taskId);
        }).ifPresent(tuple22 -> {
            ((Future) tuple22.getT2()).cancel(true);
        });
    }

    @Override // org.apache.james.task.TaskManagerWorker
    public void fail(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> optional, String str, Throwable th) {
        this.listener.failed(taskId, optional, str, th);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.taskExecutor.shutdownNow();
    }
}
