package org.apache.james.queue.rabbitmq;

import com.github.fge.lambdas.Throwing;
import com.github.fge.lambdas.consumers.ThrowingConsumer;
import java.io.Closeable;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.blob.api.ObjectNotFoundException;
import org.apache.james.metrics.api.Metric;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser;
import org.apache.mailet.Mail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.Receiver;

/* loaded from: input_file:org/apache/james/queue/rabbitmq/Dequeuer.class */
class Dequeuer implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(Dequeuer.class);
    private static final boolean REQUEUE = true;
    private final MailLoader mailLoader;
    private final Metric dequeueMetric;
    private final MailReferenceSerializer mailReferenceSerializer;
    private final MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> mailQueueView;
    private final Receiver receiver;
    private final Flux<AcknowledgableDelivery> flux;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/james/queue/rabbitmq/Dequeuer$RabbitMQMailQueueItem.class */
    public static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem {
        private final Consumer<Boolean> ack;
        private final EnqueueId enqueueId;
        private final Mail mail;

        private RabbitMQMailQueueItem(Consumer<Boolean> consumer, MailWithEnqueueId mailWithEnqueueId) {
            this.ack = consumer;
            this.enqueueId = mailWithEnqueueId.getEnqueueId();
            this.mail = mailWithEnqueueId.getMail();
        }

        public Mail getMail() {
            return this.mail;
        }

        public EnqueueId getEnqueueId() {
            return this.enqueueId;
        }

        public void done(boolean z) {
            this.ack.accept(Boolean.valueOf(z));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Dequeuer(MailQueueName mailQueueName, ReceiverProvider receiverProvider, MailLoader mailLoader, MailReferenceSerializer mailReferenceSerializer, MetricFactory metricFactory, MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> mailQueueView, MailQueueFactory.PrefetchCount prefetchCount) {
        this.mailLoader = mailLoader;
        this.mailReferenceSerializer = mailReferenceSerializer;
        this.mailQueueView = mailQueueView;
        this.dequeueMetric = metricFactory.generate("dequeuedMail:" + mailQueueName.asString());
        this.receiver = receiverProvider.createReceiver();
        this.flux = this.receiver.consumeManualAck(mailQueueName.toWorkQueueName().asString(), new ConsumeOptions().qos(prefetchCount.asInt())).filter(acknowledgableDelivery -> {
            return acknowledgableDelivery.getBody() != null;
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.receiver.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<? extends MailQueue.MailQueueItem> deQueue() {
        return this.flux.flatMapSequential(acknowledgableDelivery -> {
            return loadItem(acknowledgableDelivery).subscribeOn(Schedulers.elastic());
        }).concatMap(rabbitMQMailQueueItem -> {
            return filterIfDeleted(rabbitMQMailQueueItem).subscribeOn(Schedulers.elastic());
        });
    }

    private Mono<RabbitMQMailQueueItem> filterIfDeleted(RabbitMQMailQueueItem rabbitMQMailQueueItem) {
        return this.mailQueueView.isPresent(rabbitMQMailQueueItem.getEnqueueId()).flatMap(bool -> {
            return keepWhenPresent(rabbitMQMailQueueItem, bool);
        });
    }

    private Mono<? extends RabbitMQMailQueueItem> keepWhenPresent(RabbitMQMailQueueItem rabbitMQMailQueueItem, Boolean bool) {
        if (bool.booleanValue()) {
            return Mono.just(rabbitMQMailQueueItem);
        }
        rabbitMQMailQueueItem.done(true);
        return Mono.empty();
    }

    private Mono<RabbitMQMailQueueItem> loadItem(AcknowledgableDelivery acknowledgableDelivery) {
        return loadMail(acknowledgableDelivery).map(mailWithEnqueueId -> {
            return new RabbitMQMailQueueItem(ack(acknowledgableDelivery, mailWithEnqueueId), mailWithEnqueueId);
        });
    }

    private ThrowingConsumer<Boolean> ack(AcknowledgableDelivery acknowledgableDelivery, MailWithEnqueueId mailWithEnqueueId) {
        return bool -> {
            if (!bool.booleanValue()) {
                acknowledgableDelivery.nack(true);
                return;
            }
            this.dequeueMetric.increment();
            acknowledgableDelivery.ack();
            this.mailQueueView.delete(DeleteCondition.withEnqueueId(mailWithEnqueueId.getEnqueueId(), mailWithEnqueueId.getBlobIds()));
        };
    }

    private Mono<MailWithEnqueueId> loadMail(AcknowledgableDelivery acknowledgableDelivery) {
        return toMailReference(acknowledgableDelivery).flatMap(mailReferenceDTO -> {
            return this.mailLoader.load(mailReferenceDTO).onErrorResume(ObjectNotFoundException.class, objectNotFoundException -> {
                LOGGER.error("Fail to load mail {} with enqueueId {} as underlying blobs do not exist. Discarding this message to prevent an infinite loop.", new Object[]{mailReferenceDTO.getName(), mailReferenceDTO.getEnqueueId(), objectNotFoundException});
                acknowledgableDelivery.nack(false);
                return Mono.empty();
            }).onErrorResume(th -> {
                LOGGER.error("Fail to load mail {} with enqueueId {}", new Object[]{mailReferenceDTO.getName(), mailReferenceDTO.getEnqueueId(), th});
                acknowledgableDelivery.nack(true);
                return Mono.empty();
            });
        });
    }

    private Mono<MailReferenceDTO> toMailReference(AcknowledgableDelivery acknowledgableDelivery) {
        Objects.requireNonNull(acknowledgableDelivery);
        Mono fromCallable = Mono.fromCallable(acknowledgableDelivery::getBody);
        MailReferenceSerializer mailReferenceSerializer = this.mailReferenceSerializer;
        Objects.requireNonNull(mailReferenceSerializer);
        return fromCallable.map(Throwing.function(mailReferenceSerializer::read).sneakyThrow()).onErrorResume(th -> {
            LOGGER.error("Fail to deserialize MailReferenceDTO. Discarding this message to prevent an infinite loop.", th);
            acknowledgableDelivery.nack(false);
            return Mono.empty();
        });
    }
}
