package org.apache.james.events;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.MessageProperties;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import org.apache.james.backends.rabbitmq.QueueArguments;
import org.apache.james.events.EventListener;
import org.apache.james.events.RoutingKeyConverter;
import org.apache.james.util.MDCBuilder;
import org.apache.james.util.MDCStructuredLogger;
import org.apache.james.util.StructuredLogger;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.Sender;
import reactor.util.function.Tuples;

/* loaded from: input_file:org/apache/james/events/EventDispatcher.class */
public class EventDispatcher {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class);
    private final NamingStrategy namingStrategy;
    private final EventSerializer eventSerializer;
    private final Sender sender;
    private final LocalListenerRegistry localListenerRegistry;
    private final AMQP.BasicProperties basicProperties;
    private final ListenerExecutor listenerExecutor;
    private final EventDeadLetters deadLetters;

    /* loaded from: input_file:org/apache/james/events/EventDispatcher$DispatchingFailureGroup.class */
    public static class DispatchingFailureGroup extends Group {
        public static final DispatchingFailureGroup INSTANCE = new DispatchingFailureGroup();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventDispatcher(NamingStrategy namingStrategy, EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, LocalListenerRegistry localListenerRegistry, ListenerExecutor listenerExecutor, EventDeadLetters eventDeadLetters) {
        this.namingStrategy = namingStrategy;
        this.eventSerializer = eventSerializer;
        this.sender = sender;
        this.localListenerRegistry = localListenerRegistry;
        this.basicProperties = new AMQP.BasicProperties.Builder().headers(ImmutableMap.of("eventBusId", eventBusId.asString())).deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode()).priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority()).contentType(MessageProperties.PERSISTENT_TEXT_PLAIN.getContentType()).build();
        this.listenerExecutor = listenerExecutor;
        this.deadLetters = eventDeadLetters;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        Flux.concat(new Publisher[]{this.sender.declareExchange(ExchangeSpecification.exchange(this.namingStrategy.exchange()).durable(true).type("direct")), this.sender.declareExchange(ExchangeSpecification.exchange(this.namingStrategy.deadLetterExchange()).durable(true).type("direct")), this.sender.declareQueue(this.namingStrategy.deadLetterQueue().durable(true).exclusive(false).autoDelete(false).arguments(QueueArguments.NO_ARGUMENTS)), this.sender.bind(BindingSpecification.binding().exchange(this.namingStrategy.deadLetterExchange()).queue(this.namingStrategy.deadLetterQueue().getName()).routingKey(""))}).then().block();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> dispatch(Event event, Set<RegistrationKey> set) {
        return Flux.concat(new Publisher[]{dispatchToLocalListeners(event, set), dispatchToRemoteListeners(event, set)}).doOnError(th -> {
            LOGGER.error("error while dispatching event", th);
        }).then().subscribeWith(MonoProcessor.create());
    }

    private Mono<Void> dispatchToLocalListeners(Event event, Set<RegistrationKey> set) {
        return Flux.fromIterable(set).flatMap(registrationKey -> {
            return this.localListenerRegistry.getLocalListeners(registrationKey).map(reactiveEventListener -> {
                return Tuples.of(registrationKey, reactiveEventListener);
            });
        }, 10).filter(tuple2 -> {
            return ((EventListener.ReactiveEventListener) tuple2.getT2()).getExecutionMode() == EventListener.ExecutionMode.SYNCHRONOUS;
        }).flatMap(tuple22 -> {
            return executeListener(event, (EventListener.ReactiveEventListener) tuple22.getT2(), (RegistrationKey) tuple22.getT1());
        }, 10).then();
    }

    private Mono<Void> executeListener(Event event, EventListener.ReactiveEventListener reactiveEventListener, RegistrationKey registrationKey) {
        return this.listenerExecutor.execute(reactiveEventListener, MDCBuilder.create().addToContext("registrationKey", registrationKey.asString()), event).onErrorResume(th -> {
            structuredLogger(event, ImmutableSet.of(registrationKey)).log(logger -> {
                logger.error("Exception happens when dispatching event", th);
            });
            return Mono.empty();
        });
    }

    private StructuredLogger structuredLogger(Event event, Set<RegistrationKey> set) {
        return MDCStructuredLogger.forLogger(LOGGER).field("eventId", event.getEventId().getId().toString()).field("eventClass", event.getClass().getCanonicalName()).field("user", event.getUsername().asString()).field("registrationKeys", set.toString());
    }

    private Mono<Void> dispatchToRemoteListeners(Event event, Set<RegistrationKey> set) {
        return Mono.fromCallable(() -> {
            return serializeEvent(event);
        }).flatMap(bArr -> {
            return Mono.zipDelayError(remoteGroupsDispatch(bArr, event), remoteKeysDispatch(bArr, set));
        }).then();
    }

    private Mono<Void> remoteGroupsDispatch(byte[] bArr, Event event) {
        return remoteDispatchWithAcks(bArr, Collections.singletonList(RoutingKeyConverter.RoutingKey.empty())).doOnError(th -> {
            LOGGER.error("cannot dispatch event of type '{}' belonging '{}' with id '{}' to remote groups, store it into dead letter", new Object[]{event.getClass().getSimpleName(), event.getUsername().asString(), event.getEventId().getId(), th});
        }).onErrorResume(th2 -> {
            return this.deadLetters.store(DispatchingFailureGroup.INSTANCE, event).then(Mono.error(th2));
        });
    }

    private Mono<Void> remoteKeysDispatch(byte[] bArr, Set<RegistrationKey> set) {
        return remoteDispatch(bArr, (Collection) set.stream().map(RoutingKeyConverter.RoutingKey::of).collect(ImmutableList.toImmutableList()));
    }

    private Mono<Void> remoteDispatch(byte[] bArr, Collection<RoutingKeyConverter.RoutingKey> collection) {
        return collection.isEmpty() ? Mono.empty() : this.sender.send(toMessages(bArr, collection));
    }

    private Mono<Void> remoteDispatchWithAcks(byte[] bArr, Collection<RoutingKeyConverter.RoutingKey> collection) {
        return collection.isEmpty() ? Mono.empty() : this.sender.sendWithPublishConfirms(toMessages(bArr, collection)).subscribeOn(Schedulers.elastic()).filter(outboundMessageResult -> {
            return !outboundMessageResult.isAck();
        }).next().handle((outboundMessageResult2, synchronousSink) -> {
            synchronousSink.error(new Exception("Publish was not acked"));
        });
    }

    private Flux<OutboundMessage> toMessages(byte[] bArr, Collection<RoutingKeyConverter.RoutingKey> collection) {
        return Flux.fromIterable(collection).map(routingKey -> {
            return new OutboundMessage(this.namingStrategy.exchange(), routingKey.asString(), this.basicProperties, bArr);
        });
    }

    private byte[] serializeEvent(Event event) {
        return this.eventSerializer.toJsonBytes(event);
    }
}
