package org.apache.james.events;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import java.util.Collection;
import java.util.Set;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.events.EventListener;
import org.apache.james.lifecycle.api.Startable;
import org.apache.james.metrics.api.MetricFactory;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.Sender;

/* loaded from: input_file:org/apache/james/events/RabbitMQEventBus.class */
public class RabbitMQEventBus implements EventBus, Startable {
    private static final Set<RegistrationKey> NO_KEY = ImmutableSet.of();
    private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not running";
    static final String EVENT_BUS_ID = "eventBusId";
    private final NamingStrategy namingStrategy;
    private final EventSerializer eventSerializer;
    private final RoutingKeyConverter routingKeyConverter;
    private final RetryBackoffConfiguration retryBackoff;
    private final EventBusId eventBusId;
    private final EventDeadLetters eventDeadLetters;
    private final ListenerExecutor listenerExecutor;
    private final Sender sender;
    private final ReceiverProvider receiverProvider;
    private final ReactorRabbitMQChannelPool channelPool;
    private final RabbitMQConfiguration configuration;
    private final MetricFactory metricFactory;
    private volatile boolean isRunning = false;
    private volatile boolean isStopping = false;
    private GroupRegistrationHandler groupRegistrationHandler;
    private KeyRegistrationHandler keyRegistrationHandler;
    private EventDispatcher eventDispatcher;

    @Inject
    public RabbitMQEventBus(NamingStrategy namingStrategy, Sender sender, ReceiverProvider receiverProvider, EventSerializer eventSerializer, RetryBackoffConfiguration retryBackoffConfiguration, RoutingKeyConverter routingKeyConverter, EventDeadLetters eventDeadLetters, MetricFactory metricFactory, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, EventBusId eventBusId, RabbitMQConfiguration rabbitMQConfiguration) {
        this.namingStrategy = namingStrategy;
        this.sender = sender;
        this.receiverProvider = receiverProvider;
        this.listenerExecutor = new ListenerExecutor(metricFactory);
        this.channelPool = reactorRabbitMQChannelPool;
        this.eventBusId = eventBusId;
        this.eventSerializer = eventSerializer;
        this.routingKeyConverter = routingKeyConverter;
        this.retryBackoff = retryBackoffConfiguration;
        this.eventDeadLetters = eventDeadLetters;
        this.configuration = rabbitMQConfiguration;
        this.metricFactory = metricFactory;
    }

    public void start() {
        if (this.isRunning || this.isStopping) {
            return;
        }
        LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
        this.keyRegistrationHandler = new KeyRegistrationHandler(this.namingStrategy, this.eventBusId, this.eventSerializer, this.sender, this.receiverProvider, this.routingKeyConverter, localListenerRegistry, this.listenerExecutor, this.retryBackoff, this.configuration, this.metricFactory);
        this.groupRegistrationHandler = new GroupRegistrationHandler(this.namingStrategy, this.eventSerializer, this.channelPool, this.sender, this.receiverProvider, this.retryBackoff, this.eventDeadLetters, this.listenerExecutor, this.eventBusId, this.configuration);
        this.eventDispatcher = new EventDispatcher(this.namingStrategy, this.eventBusId, this.eventSerializer, this.sender, localListenerRegistry, this.listenerExecutor, this.eventDeadLetters, this.configuration);
        this.eventDispatcher.start();
        this.keyRegistrationHandler.start();
        this.isRunning = true;
    }

    public void restart() {
        this.keyRegistrationHandler.restart();
        this.groupRegistrationHandler.restart();
    }

    @VisibleForTesting
    void startWithoutStartingKeyRegistrationHandler() {
        if (this.isRunning || this.isStopping) {
            return;
        }
        LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
        this.keyRegistrationHandler = new KeyRegistrationHandler(this.namingStrategy, this.eventBusId, this.eventSerializer, this.sender, this.receiverProvider, this.routingKeyConverter, localListenerRegistry, this.listenerExecutor, this.retryBackoff, this.configuration, this.metricFactory);
        this.groupRegistrationHandler = new GroupRegistrationHandler(this.namingStrategy, this.eventSerializer, this.channelPool, this.sender, this.receiverProvider, this.retryBackoff, this.eventDeadLetters, this.listenerExecutor, this.eventBusId, this.configuration);
        this.eventDispatcher = new EventDispatcher(this.namingStrategy, this.eventBusId, this.eventSerializer, this.sender, localListenerRegistry, this.listenerExecutor, this.eventDeadLetters, this.configuration);
        this.keyRegistrationHandler.declareQueue();
        this.eventDispatcher.start();
        this.isRunning = true;
    }

    @VisibleForTesting
    void startKeyRegistrationHandler() {
        this.keyRegistrationHandler.start();
    }

    @PreDestroy
    public void stop() {
        if (!this.isRunning || this.isStopping) {
            return;
        }
        this.isStopping = true;
        this.isRunning = false;
        this.groupRegistrationHandler.stop();
        this.keyRegistrationHandler.stop();
    }

    /* renamed from: register, reason: merged with bridge method [inline-methods] */
    public Mono<Registration> m10register(EventListener.ReactiveEventListener reactiveEventListener, RegistrationKey registrationKey) {
        Preconditions.checkState(this.isRunning, NOT_RUNNING_ERROR_MESSAGE);
        return Mono.from(this.metricFactory.decoratePublisherWithTimerMetric("rabbit-register", this.keyRegistrationHandler.register(reactiveEventListener, registrationKey)));
    }

    public Registration register(EventListener.ReactiveEventListener reactiveEventListener, Group group) {
        Preconditions.checkState(this.isRunning, NOT_RUNNING_ERROR_MESSAGE);
        return this.groupRegistrationHandler.register(reactiveEventListener, group);
    }

    public Mono<Void> dispatch(Event event, Set<RegistrationKey> set) {
        Preconditions.checkState(this.isRunning, NOT_RUNNING_ERROR_MESSAGE);
        return !event.isNoop() ? Mono.from(this.metricFactory.decoratePublisherWithTimerMetric("rabbit-dispatch", this.eventDispatcher.dispatch(event, set))) : Mono.empty();
    }

    public Mono<Void> reDeliver(Group group, Event event) {
        Preconditions.checkState(this.isRunning, NOT_RUNNING_ERROR_MESSAGE);
        return !event.isNoop() ? group instanceof DispatchingFailureGroup ? this.eventDispatcher.dispatch(event, NO_KEY) : this.groupRegistrationHandler.retrieveGroupRegistration(group).reDeliver(event) : Mono.empty();
    }

    public EventBusName eventBusName() {
        return this.namingStrategy.getEventBusName();
    }

    public Collection<Group> listRegisteredGroups() {
        return this.groupRegistrationHandler.registeredGroups();
    }
}
