package org.apache.james.task.eventsourcing.distributed;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import org.apache.james.backends.rabbitmq.QueueArguments;
import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.server.task.json.JsonTaskSerializer;
import org.apache.james.task.Task;
import org.apache.james.task.TaskId;
import org.apache.james.task.TaskManagerWorker;
import org.apache.james.task.TaskWithId;
import org.apache.james.task.WorkQueue;
import org.apache.james.util.ReactorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.Sender;
import reactor.util.retry.Retry;

/* loaded from: input_file:org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.class */
public class RabbitMQWorkQueue implements WorkQueue {
    static final String EXCHANGE_NAME = "taskManagerWorkQueueExchange";
    static final String QUEUE_NAME = "taskManagerWorkQueue";
    static final String ROUTING_KEY = "taskManagerWorkQueueRoutingKey";
    static final String CANCEL_REQUESTS_EXCHANGE_NAME = "taskManagerCancelRequestsExchange";
    static final String CANCEL_REQUESTS_ROUTING_KEY = "taskManagerCancelRequestsRoutingKey";
    public static final String TASK_ID = "taskId";
    public static final int NUM_RETRIES = 8;
    private final TaskManagerWorker worker;
    private final JsonTaskSerializer taskSerializer;
    private final RabbitMQWorkQueueConfiguration configuration;
    private final RabbitMQConfiguration rabbitMQConfiguration;
    private final Sender sender;
    private final ReceiverProvider receiverProvider;
    private final CancelRequestQueueName cancelRequestQueueName;
    private Sinks.Many<TaskId> sendCancelRequestsQueue;
    private Disposable sendCancelRequestsQueueHandle;
    private Disposable receiverHandle;
    private Disposable cancelRequestListenerHandle;
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQWorkQueue.class);
    public static final Duration FIRST_BACKOFF = Duration.ofMillis(100);

    public RabbitMQWorkQueue(TaskManagerWorker taskManagerWorker, Sender sender, ReceiverProvider receiverProvider, JsonTaskSerializer jsonTaskSerializer, RabbitMQWorkQueueConfiguration rabbitMQWorkQueueConfiguration, CancelRequestQueueName cancelRequestQueueName, RabbitMQConfiguration rabbitMQConfiguration) {
        this.cancelRequestQueueName = cancelRequestQueueName;
        this.worker = taskManagerWorker;
        this.receiverProvider = receiverProvider;
        this.sender = sender;
        this.taskSerializer = jsonTaskSerializer;
        this.configuration = rabbitMQWorkQueueConfiguration;
        this.rabbitMQConfiguration = rabbitMQConfiguration;
    }

    public void start() {
        startWorkqueue();
        listenToCancelRequests();
    }

    private void startWorkqueue() {
        declareQueue();
        if (this.configuration.enabled()) {
            consumeWorkqueue();
        }
    }

    @VisibleForTesting
    void declareQueue() {
        Mono retryWhen = this.sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).retryWhen(Retry.backoff(8L, FIRST_BACKOFF));
        Mono retryWhen2 = this.sender.declare(QueueSpecification.queue(QUEUE_NAME).durable(true).arguments(this.rabbitMQConfiguration.workQueueArgumentsBuilder(true).singleActiveConsumer().build())).retryWhen(Retry.backoff(8L, FIRST_BACKOFF));
        retryWhen.then(retryWhen2).then(this.sender.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)).retryWhen(Retry.backoff(8L, FIRST_BACKOFF))).block();
    }

    public void restart() {
        Disposable disposable = this.receiverHandle;
        consumeWorkqueue();
        disposable.dispose();
        Disposable disposable2 = this.cancelRequestListenerHandle;
        registerCancelRequestsListener(this.cancelRequestQueueName.asString());
        disposable2.dispose();
    }

    private void consumeWorkqueue() {
        ReceiverProvider receiverProvider = this.receiverProvider;
        Objects.requireNonNull(receiverProvider);
        this.receiverHandle = Flux.using(receiverProvider::createReceiver, receiver -> {
            return receiver.consumeManualAck(QUEUE_NAME, new ConsumeOptions());
        }, (v0) -> {
            v0.close();
        }).subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER).concatMap(this::executeTask).subscribe();
    }

    private Mono<Task.Result> executeTask(AcknowledgableDelivery acknowledgableDelivery) {
        return Mono.fromCallable(() -> {
            return acknowledgableDelivery.getProperties().getHeaders();
        }).map(map -> {
            return map.get(TASK_ID);
        }).map(obj -> {
            return TaskId.fromString(obj.toString());
        }).flatMap(taskId -> {
            return Mono.fromCallable(() -> {
                return new String(acknowledgableDelivery.getBody(), StandardCharsets.UTF_8);
            }).flatMap(str -> {
                return deserialize(str, taskId);
            }).doOnNext(task -> {
                acknowledgableDelivery.ack();
            }).flatMap(task2 -> {
                return executeOnWorker(taskId, task2);
            });
        }).onErrorResume(th -> {
            LOGGER.error("Unable to process {} {}", new Object[]{TASK_ID, Optional.ofNullable(acknowledgableDelivery.getProperties()).flatMap(basicProperties -> {
                return Optional.ofNullable(basicProperties.getHeaders());
            }).flatMap(map2 -> {
                return Optional.ofNullable(map2.get(TASK_ID));
            }), th});
            acknowledgableDelivery.nack(false);
            return Mono.empty();
        });
    }

    private Mono<Task> deserialize(String str, TaskId taskId) {
        return Mono.fromCallable(() -> {
            return this.taskSerializer.deserialize(str);
        }).onErrorResume(th -> {
            String format = String.format("Unable to deserialize submitted Task %s", taskId.asString());
            LOGGER.error(format, th);
            return Mono.from(this.worker.fail(taskId, Mono.empty(), format, th)).then(Mono.empty());
        });
    }

    private Mono<Task.Result> executeOnWorker(TaskId taskId, Task task) {
        return this.worker.executeTask(new TaskWithId(taskId, task)).onErrorResume(th -> {
            String format = String.format("Unable to run submitted Task %s", taskId.asString());
            LOGGER.warn(format, th);
            return Mono.from(this.worker.fail(taskId, task.detailsReactive(), format, th)).then(Mono.empty());
        });
    }

    private void listenToCancelRequests() {
        this.sender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block();
        QueueArguments.Builder builder = QueueArguments.builder();
        Optional queueTTL = this.rabbitMQConfiguration.getQueueTTL();
        Objects.requireNonNull(builder);
        queueTTL.ifPresent((v1) -> {
            r1.queueTTL(v1);
        });
        this.sender.declare(QueueSpecification.queue(this.cancelRequestQueueName.asString()).durable(false).autoDelete(true).arguments(builder.build())).block();
        this.sender.bind(BindingSpecification.binding(CANCEL_REQUESTS_EXCHANGE_NAME, CANCEL_REQUESTS_ROUTING_KEY, this.cancelRequestQueueName.asString())).block();
        registerCancelRequestsListener(this.cancelRequestQueueName.asString());
        this.sendCancelRequestsQueue = Sinks.many().multicast().onBackpressureBuffer();
        this.sendCancelRequestsQueueHandle = this.sender.send(this.sendCancelRequestsQueue.asFlux().map(this::makeCancelRequestMessage)).subscribeOn(Schedulers.boundedElastic()).subscribe();
    }

    private void registerCancelRequestsListener(String str) {
        ReceiverProvider receiverProvider = this.receiverProvider;
        Objects.requireNonNull(receiverProvider);
        Flux map = Flux.using(receiverProvider::createReceiver, receiver -> {
            return receiver.consumeAutoAck(str);
        }, (v0) -> {
            v0.close();
        }).subscribeOn(Schedulers.boundedElastic()).map(this::readCancelRequestMessage);
        TaskManagerWorker taskManagerWorker = this.worker;
        Objects.requireNonNull(taskManagerWorker);
        this.cancelRequestListenerHandle = map.doOnNext(taskManagerWorker::cancelTask).subscribe();
    }

    private TaskId readCancelRequestMessage(Delivery delivery) {
        return TaskId.fromString(new String(delivery.getBody(), StandardCharsets.UTF_8));
    }

    private OutboundMessage makeCancelRequestMessage(TaskId taskId) {
        return new OutboundMessage(CANCEL_REQUESTS_EXCHANGE_NAME, CANCEL_REQUESTS_ROUTING_KEY, new AMQP.BasicProperties.Builder().build(), taskId.asString().getBytes(StandardCharsets.UTF_8));
    }

    public void submit(TaskWithId taskWithId) {
        try {
            this.sender.send(Mono.just(new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, new AMQP.BasicProperties.Builder().deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode()).priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority()).contentType(MessageProperties.PERSISTENT_TEXT_PLAIN.getContentType()).headers(ImmutableMap.of(TASK_ID, taskWithId.getId().asString())).build(), this.taskSerializer.serialize(taskWithId.getTask()).getBytes(StandardCharsets.UTF_8)))).block();
        } catch (JsonProcessingException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public void cancel(TaskId taskId) {
        this.sendCancelRequestsQueue.emitNext(taskId, Sinks.EmitFailureHandler.FAIL_FAST);
    }

    public void close() {
        try {
            this.worker.close();
            Optional.ofNullable(this.receiverHandle).ifPresent((v0) -> {
                v0.dispose();
            });
            Optional.ofNullable(this.sendCancelRequestsQueueHandle).ifPresent((v0) -> {
                v0.dispose();
            });
            Optional.ofNullable(this.cancelRequestListenerHandle).ifPresent((v0) -> {
                v0.dispose();
            });
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
