package org.apache.james.mailbox.events;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.MessageProperties;
import java.nio.charset.StandardCharsets;
import org.apache.james.event.json.EventSerializer;
import org.apache.james.mailbox.events.GroupRegistration;
import org.apache.james.util.MDCStructuredLogger;
import org.apache.james.util.StructuredLogger;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.Sender;

/* loaded from: input_file:org/apache/james/mailbox/events/GroupConsumerRetry.class */
class GroupConsumerRetry {
    private static final Logger LOGGER = LoggerFactory.getLogger(GroupConsumerRetry.class);
    private final Sender sender;
    private final RetryExchangeName retryExchangeName;
    private final RetryBackoffConfiguration retryBackoff;
    private final EventDeadLetters eventDeadLetters;
    private final Group group;
    private final EventSerializer eventSerializer;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/james/mailbox/events/GroupConsumerRetry$RetryExchangeName.class */
    public static class RetryExchangeName {
        static final String MAILBOX_EVENT_RETRY_EXCHANGE_PREFIX = "mailboxEvent-retryExchange-";
        private final String name;

        static RetryExchangeName of(Group group) {
            return new RetryExchangeName(group.asString());
        }

        private RetryExchangeName(String str) {
            Preconditions.checkArgument(!Strings.isNullOrEmpty(str), "Exchange name must be specified");
            this.name = str;
        }

        String asString() {
            return MAILBOX_EVENT_RETRY_EXCHANGE_PREFIX + this.name;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GroupConsumerRetry(Sender sender, Group group, RetryBackoffConfiguration retryBackoffConfiguration, EventDeadLetters eventDeadLetters, EventSerializer eventSerializer) {
        this.sender = sender;
        this.retryExchangeName = RetryExchangeName.of(group);
        this.retryBackoff = retryBackoffConfiguration;
        this.eventDeadLetters = eventDeadLetters;
        this.group = group;
        this.eventSerializer = eventSerializer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> createRetryExchange(GroupRegistration.WorkQueueName workQueueName) {
        return Flux.concat(new Publisher[]{this.sender.declareExchange(ExchangeSpecification.exchange(this.retryExchangeName.asString()).durable(true).type("direct")), this.sender.bind(BindingSpecification.binding().exchange(this.retryExchangeName.asString()).queue(workQueueName.asString()).routingKey(""))}).then();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> handleRetry(Event event, int i, Throwable th) {
        createStructuredLogger(event).log(logger -> {
            logger.error("Exception happens when handling event after {} retries", Integer.valueOf(i), th);
        });
        return retryOrStoreToDeadLetter(event, i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> retryOrStoreToDeadLetter(Event event, int i) {
        return i >= this.retryBackoff.getMaxRetries() ? this.eventDeadLetters.store(this.group, event).then() : sendRetryMessage(event, i);
    }

    private Mono<Void> sendRetryMessage(Event event, int i) {
        return this.sender.send(Mono.just(new OutboundMessage(this.retryExchangeName.asString(), "", new AMQP.BasicProperties.Builder().headers(ImmutableMap.of("retry-count", Integer.valueOf(i + 1))).deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode()).priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority()).contentType(MessageProperties.PERSISTENT_TEXT_PLAIN.getContentType()).build(), this.eventSerializer.toJson(event).getBytes(StandardCharsets.UTF_8)))).doOnError(th -> {
            createStructuredLogger(event).log(logger -> {
                logger.error("Exception happens when publishing event to retry exchange, this event will be stored in deadLetter", th);
            });
        }).onErrorResume(th2 -> {
            return this.eventDeadLetters.store(this.group, event).then();
        });
    }

    private StructuredLogger createStructuredLogger(Event event) {
        return MDCStructuredLogger.forLogger(LOGGER).addField("eventId", event.getEventId()).addField("eventClass", event.getClass()).addField("user", event.getUsername()).addField("group", this.group.asString());
    }
}
