package org.apache.james.queue.rabbitmq;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.fge.lambdas.Throwing;
import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.MessageProperties;
import java.time.Clock;
import java.util.Objects;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import org.apache.james.blob.api.Store;
import org.apache.james.blob.mail.MimeMessagePartsId;
import org.apache.james.metrics.api.Metric;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser;
import org.apache.mailet.Mail;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.Sender;

/* loaded from: input_file:org/apache/james/queue/rabbitmq/Enqueuer.class */
class Enqueuer {
    private final MailQueueName name;
    private final Sender sender;
    private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
    private final MailReferenceSerializer mailReferenceSerializer;
    private final Metric enqueueMetric;
    private final MailQueueView mailQueueView;
    private final Clock clock;

    /* JADX INFO: Access modifiers changed from: package-private */
    public Enqueuer(MailQueueName mailQueueName, Sender sender, Store<MimeMessage, MimeMessagePartsId> store, MailReferenceSerializer mailReferenceSerializer, MetricFactory metricFactory, MailQueueView mailQueueView, Clock clock) {
        this.name = mailQueueName;
        this.sender = sender;
        this.mimeMessageStore = store;
        this.mailReferenceSerializer = mailReferenceSerializer;
        this.mailQueueView = mailQueueView;
        this.clock = clock;
        this.enqueueMetric = metricFactory.generate("enqueuedMail:" + mailQueueName.asString());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void enQueue(Mail mail) throws MailQueue.MailQueueException {
        EnqueueId generate = EnqueueId.generate();
        Mono flatMap = saveMail(mail).map(mimeMessagePartsId -> {
            return new MailReference(generate, mail, mimeMessagePartsId);
        }).flatMap(Throwing.function(mailReference -> {
            return Flux.mergeDelayError(2, new Publisher[]{this.mailQueueView.storeMail(toEnqueuedItems(mailReference)), publishReferenceToRabbit(mailReference)}).then();
        }).sneakyThrow());
        Metric metric = this.enqueueMetric;
        Objects.requireNonNull(metric);
        flatMap.thenEmpty(Mono.fromRunnable(metric::increment)).block();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> reQueue(CassandraMailQueueBrowser.CassandraMailQueueItemView cassandraMailQueueItemView) {
        Mail mail = cassandraMailQueueItemView.getMail();
        return Mono.fromCallable(() -> {
            return new MailReference(cassandraMailQueueItemView.getEnqueuedId(), mail, cassandraMailQueueItemView.getEnqueuedPartsId());
        }).flatMap(Throwing.function(this::publishReferenceToRabbit).sneakyThrow()).then();
    }

    private Mono<MimeMessagePartsId> saveMail(Mail mail) throws MailQueue.MailQueueException {
        try {
            return this.mimeMessageStore.save(mail.getMessage());
        } catch (MessagingException e) {
            throw new MailQueue.MailQueueException("Error while saving blob", e);
        }
    }

    private Mono<Void> publishReferenceToRabbit(MailReference mailReference) throws MailQueue.MailQueueException {
        return this.sender.send(Mono.just(new OutboundMessage(this.name.toRabbitExchangeName().asString(), "", new AMQP.BasicProperties.Builder().deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode()).priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority()).contentType(MessageProperties.PERSISTENT_TEXT_PLAIN.getContentType()).headers(ImmutableMap.of("x-dead-letter-routing-key", "")).build(), getMailReferenceBytes(mailReference))));
    }

    private EnqueuedItem toEnqueuedItems(MailReference mailReference) {
        return EnqueuedItem.builder().enqueueId(mailReference.getEnqueueId()).mailQueueName(this.name).mail(mailReference.getMail()).enqueuedTime(this.clock.instant()).mimeMessagePartsId(mailReference.getPartsId()).build();
    }

    private byte[] getMailReferenceBytes(MailReference mailReference) throws MailQueue.MailQueueException {
        try {
            return this.mailReferenceSerializer.write(MailReferenceDTO.fromMailReference(mailReference));
        } catch (JsonProcessingException e) {
            throw new MailQueue.MailQueueException("Unable to serialize message", e);
        }
    }
}
