package org.apache.james.mailbox.events;

import com.github.fge.lambdas.Throwing;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Delivery;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import org.apache.james.backend.rabbitmq.Constants;
import org.apache.james.event.json.EventSerializer;
import org.apache.james.mailbox.events.LocalListenerRegistry;
import org.apache.james.mailbox.events.MailboxListener;
import org.apache.james.util.MDCBuilder;
import org.apache.james.util.MDCStructuredLogger;
import org.apache.james.util.StructuredLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.RabbitFlux;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.Sender;

/* loaded from: input_file:org/apache/james/mailbox/events/KeyRegistrationHandler.class */
class KeyRegistrationHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(KeyRegistrationHandler.class);
    private final EventBusId eventBusId;
    private final LocalListenerRegistry localListenerRegistry;
    private final EventSerializer eventSerializer;
    private final Sender sender;
    private final RoutingKeyConverter routingKeyConverter;
    private final Receiver receiver;
    private final RegistrationQueueName registrationQueue = new RegistrationQueueName();
    private final RegistrationBinder registrationBinder;
    private final MailboxListenerExecutor mailboxListenerExecutor;
    private Optional<Disposable> receiverSubscriber;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, Mono<Connection> mono, RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) {
        this.eventBusId = eventBusId;
        this.eventSerializer = eventSerializer;
        this.sender = sender;
        this.routingKeyConverter = routingKeyConverter;
        this.localListenerRegistry = localListenerRegistry;
        this.receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(mono));
        this.mailboxListenerExecutor = mailboxListenerExecutor;
        this.registrationBinder = new RegistrationBinder(sender, this.registrationQueue);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        Mono map = this.sender.declareQueue(QueueSpecification.queue(this.eventBusId.asString()).durable(true).exclusive(false).autoDelete(false).arguments(Constants.NO_ARGUMENTS)).map((v0) -> {
            return v0.getQueue();
        });
        RegistrationQueueName registrationQueueName = this.registrationQueue;
        Objects.requireNonNull(registrationQueueName);
        map.doOnSuccess(registrationQueueName::initialize).block();
        this.receiverSubscriber = Optional.of(this.receiver.consumeAutoAck(this.registrationQueue.asString(), new ConsumeOptions().qos(10)).subscribeOn(Schedulers.parallel()).flatMap(this::handleDelivery).subscribe());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        this.receiverSubscriber.filter(disposable -> {
            return !disposable.isDisposed();
        }).ifPresent((v0) -> {
            v0.dispose();
        });
        this.receiver.close();
        this.sender.delete(QueueSpecification.queue(this.registrationQueue.asString())).block();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Registration register(MailboxListener mailboxListener, RegistrationKey registrationKey) {
        LocalListenerRegistry.LocalRegistration addListener = this.localListenerRegistry.addListener(registrationKey, mailboxListener);
        if (addListener.isFirstListener()) {
            this.registrationBinder.bind(registrationKey).block();
        }
        return new KeyRegistration(() -> {
            if (addListener.unregister().lastListenerRemoved()) {
                this.registrationBinder.unbind(registrationKey).block();
            }
        });
    }

    private Mono<Void> handleDelivery(Delivery delivery) {
        if (delivery.getBody() == null) {
            return Mono.empty();
        }
        EventBusId of = EventBusId.of(delivery.getProperties().getHeaders().get("eventBusId").toString());
        RegistrationKey registrationKey = this.routingKeyConverter.toRegistrationKey(delivery.getEnvelope().getRoutingKey());
        Event event = toEvent(delivery);
        return this.localListenerRegistry.getLocalMailboxListeners(registrationKey).filter(mailboxListener -> {
            return !isLocalSynchronousListeners(of, mailboxListener);
        }).flatMap(mailboxListener2 -> {
            return Mono.fromRunnable(Throwing.runnable(() -> {
                executeListener(mailboxListener2, event, registrationKey);
            })).doOnError(th -> {
                structuredLogger(event, registrationKey).log(logger -> {
                    logger.error("Exception happens when handling event", th);
                });
            }).onErrorResume(th2 -> {
                return Mono.empty();
            }).then();
        }).subscribeOn(Schedulers.elastic()).then();
    }

    private void executeListener(MailboxListener mailboxListener, Event event, RegistrationKey registrationKey) throws Exception {
        this.mailboxListenerExecutor.execute(mailboxListener, MDCBuilder.create().addContext("registrationKey", registrationKey), event);
    }

    private boolean isLocalSynchronousListeners(EventBusId eventBusId, MailboxListener mailboxListener) {
        return eventBusId.equals(this.eventBusId) && mailboxListener.getExecutionMode().equals(MailboxListener.ExecutionMode.SYNCHRONOUS);
    }

    private Event toEvent(Delivery delivery) {
        return (Event) this.eventSerializer.fromJson(new String(delivery.getBody(), StandardCharsets.UTF_8)).get();
    }

    private StructuredLogger structuredLogger(Event event, RegistrationKey registrationKey) {
        return MDCStructuredLogger.forLogger(LOGGER).addField("eventId", event.getEventId()).addField("eventClass", event.getClass()).addField("user", event.getUser()).addField("registrationKey", registrationKey);
    }
}
