package org.apache.james.mailbox.events;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.rabbitmq.client.AMQP;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.james.event.json.EventSerializer;
import org.apache.james.mailbox.events.MailboxListener;
import org.apache.james.mailbox.events.RoutingKeyConverter;
import org.apache.james.util.MDCBuilder;
import org.apache.james.util.MDCStructuredLogger;
import org.apache.james.util.StructuredLogger;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.Sender;
import reactor.util.function.Tuples;

/* loaded from: input_file:org/apache/james/mailbox/events/EventDispatcher.class */
class EventDispatcher {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class);
    private final EventSerializer eventSerializer;
    private final Sender sender;
    private final LocalListenerRegistry localListenerRegistry;
    private final AMQP.BasicProperties basicProperties;
    private final MailboxListenerExecutor mailboxListenerExecutor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventDispatcher(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) {
        this.eventSerializer = eventSerializer;
        this.sender = sender;
        this.localListenerRegistry = localListenerRegistry;
        this.basicProperties = new AMQP.BasicProperties.Builder().headers(ImmutableMap.of("eventBusId", eventBusId.asString())).build();
        this.mailboxListenerExecutor = mailboxListenerExecutor;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        this.sender.declareExchange(ExchangeSpecification.exchange("mailboxEvent-exchange").durable(true).type("direct")).block();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> dispatch(Event event, Set<RegistrationKey> set) {
        return Flux.concat(new Publisher[]{dispatchToLocalListeners(event, set), dispatchToRemoteListeners(serializeEvent(event), set)}).subscribeOn(Schedulers.elastic()).doOnError(th -> {
            LOGGER.error("error while dispatching event", th);
        }).then().subscribeWith(MonoProcessor.create());
    }

    private Mono<Void> dispatchToLocalListeners(Event event, Set<RegistrationKey> set) {
        return Flux.fromIterable(set).flatMap(registrationKey -> {
            return this.localListenerRegistry.getLocalMailboxListeners(registrationKey).map(mailboxListener -> {
                return Tuples.of(registrationKey, mailboxListener);
            });
        }).filter(tuple2 -> {
            return ((MailboxListener) tuple2.getT2()).getExecutionMode().equals(MailboxListener.ExecutionMode.SYNCHRONOUS);
        }).flatMap(tuple22 -> {
            return executeListener(event, (MailboxListener) tuple22.getT2(), (RegistrationKey) tuple22.getT1());
        }).then();
    }

    private Mono<Void> executeListener(Event event, MailboxListener mailboxListener, RegistrationKey registrationKey) {
        return Mono.from(subscriber -> {
            try {
                this.mailboxListenerExecutor.execute(mailboxListener, MDCBuilder.create().addContext("registrationKey", registrationKey), event);
            } catch (Exception e) {
                structuredLogger(event, ImmutableSet.of(registrationKey)).log(logger -> {
                    logger.error("Exception happens when dispatching event", e);
                });
            }
            subscriber.onComplete();
        });
    }

    private StructuredLogger structuredLogger(Event event, Set<RegistrationKey> set) {
        return MDCStructuredLogger.forLogger(LOGGER).addField("eventId", event.getEventId()).addField("eventClass", event.getClass()).addField("user", event.getUser()).addField("registrationKeys", set);
    }

    private Mono<Void> dispatchToRemoteListeners(byte[] bArr, Set<RegistrationKey> set) {
        return this.sender.send(Flux.fromStream(Stream.concat(Stream.of(RoutingKeyConverter.RoutingKey.empty()), set.stream().map(RoutingKeyConverter.RoutingKey::of)).map(routingKey -> {
            return new OutboundMessage("mailboxEvent-exchange", routingKey.asString(), this.basicProperties, bArr);
        })));
    }

    private byte[] serializeEvent(Event event) {
        return this.eventSerializer.toJson(event).getBytes(StandardCharsets.UTF_8);
    }
}
