package org.apache.james.events.delivery;

import com.google.common.annotations.VisibleForTesting;
import javax.inject.Inject;
import org.apache.james.events.Event;
import org.apache.james.events.EventBus;
import org.apache.james.events.EventListener;
import org.apache.james.events.delivery.EventDelivery;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.util.MDCBuilder;
import org.apache.james.util.MDCStructuredLogger;
import org.apache.james.util.ReactorUtils;
import org.apache.james.util.StructuredLogger;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/events/delivery/InVmEventDelivery.class */
public class InVmEventDelivery implements EventDelivery {
    private static final Logger LOGGER = LoggerFactory.getLogger(InVmEventDelivery.class);
    private final MetricFactory metricFactory;

    @Inject
    @VisibleForTesting
    public InVmEventDelivery(MetricFactory metricFactory) {
        this.metricFactory = metricFactory;
    }

    @Override // org.apache.james.events.delivery.EventDelivery
    public Mono<Void> deliver(EventListener.ReactiveEventListener reactiveEventListener, Event event, EventDelivery.DeliveryOption deliveryOption) {
        return waitForResultIfNeeded(reactiveEventListener.getExecutionMode(), deliverByOption(reactiveEventListener, event, deliveryOption));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Mono<Void> waitForResultIfNeeded(EventListener.ExecutionMode executionMode, Mono<Void> mono) {
        return executionMode.equals(EventListener.ExecutionMode.SYNCHRONOUS) ? mono : Flux.merge(new Publisher[]{mono, Mono.empty()}).publishNext().onErrorResume(th -> {
            return Mono.empty();
        });
    }

    private Mono<Void> deliverByOption(EventListener.ReactiveEventListener reactiveEventListener, Event event, EventDelivery.DeliveryOption deliveryOption) {
        return deliveryOption.getRetrier().doRetry(doDeliverToListener(reactiveEventListener, event).doOnError(th -> {
            structuredLogger(event, reactiveEventListener).log(logger -> {
                logger.error("Error while processing listener", th);
            });
        }).then(), event).onErrorResume(th2 -> {
            return deliveryOption.getPermanentFailureHandler().handle(event);
        }).then();
    }

    private Mono<Void> doDeliverToListener(EventListener.ReactiveEventListener reactiveEventListener, Event event) {
        return reactiveEventListener.isHandling(event) ? Mono.defer(() -> {
            return Mono.from(this.metricFactory.decoratePublisherWithTimerMetric(EventBus.Metrics.timerName(reactiveEventListener), reactiveEventListener.reactiveEvent(event)));
        }).subscriberContext(ReactorUtils.context("deliver", buildMDC(reactiveEventListener, event))) : Mono.empty();
    }

    private MDCBuilder buildMDC(EventListener eventListener, Event event) {
        return MDCBuilder.create().addToContext("eventId", event.getEventId().toString()).addToContext("eventClass", event.getClass().getCanonicalName()).addToContext("user", event.getUsername().asString()).addToContext("listenerClass", eventListener.getClass().getCanonicalName());
    }

    private StructuredLogger structuredLogger(Event event, EventListener eventListener) {
        return MDCStructuredLogger.forLogger(LOGGER).field("eventId", event.getEventId().getId().toString()).field("eventClass", event.getClass().getCanonicalName()).field("user", event.getUsername().asString()).field("listenerClass", eventListener.getClass().getCanonicalName());
    }
}
