package org.axonframework.eventhandling;

import java.util.List;
import org.axonframework.common.Registration;
import org.axonframework.common.io.IOUtils;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.messaging.unitofwork.RollbackConfiguration;
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;

/* loaded from: input_file:org/axonframework/eventhandling/SubscribingEventProcessor.class */
public class SubscribingEventProcessor extends AbstractEventProcessor {
    private final SubscribableMessageSource<? extends EventMessage<?>> messageSource;
    private final EventProcessingStrategy processingStrategy;
    private volatile Registration eventBusRegistration;

    public SubscribingEventProcessor(String str, EventHandlerInvoker eventHandlerInvoker, SubscribableMessageSource<EventMessage<?>> subscribableMessageSource) {
        this(str, eventHandlerInvoker, subscribableMessageSource, DirectEventProcessingStrategy.INSTANCE, PropagatingErrorHandler.INSTANCE);
    }

    public SubscribingEventProcessor(String str, EventHandlerInvoker eventHandlerInvoker, SubscribableMessageSource<EventMessage<?>> subscribableMessageSource, EventProcessingStrategy eventProcessingStrategy, ErrorHandler errorHandler) {
        this(str, eventHandlerInvoker, subscribableMessageSource, eventProcessingStrategy, errorHandler, NoOpMessageMonitor.INSTANCE);
    }

    public SubscribingEventProcessor(String str, EventHandlerInvoker eventHandlerInvoker, SubscribableMessageSource<? extends EventMessage<?>> subscribableMessageSource, EventProcessingStrategy eventProcessingStrategy, ErrorHandler errorHandler, MessageMonitor<? super EventMessage<?>> messageMonitor) {
        this(str, eventHandlerInvoker, RollbackConfigurationType.ANY_THROWABLE, subscribableMessageSource, eventProcessingStrategy, errorHandler, messageMonitor);
    }

    public SubscribingEventProcessor(String str, EventHandlerInvoker eventHandlerInvoker, RollbackConfiguration rollbackConfiguration, SubscribableMessageSource<? extends EventMessage<?>> subscribableMessageSource, EventProcessingStrategy eventProcessingStrategy, ErrorHandler errorHandler, MessageMonitor<? super EventMessage<?>> messageMonitor) {
        super(str, eventHandlerInvoker, rollbackConfiguration, errorHandler, messageMonitor);
        this.messageSource = subscribableMessageSource;
        this.processingStrategy = eventProcessingStrategy;
    }

    @Override // org.axonframework.eventhandling.EventProcessor
    public void start() {
        if (this.eventBusRegistration == null) {
            this.eventBusRegistration = this.messageSource.subscribe(list -> {
                this.processingStrategy.handle(list, this::process);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.axonframework.eventhandling.AbstractEventProcessor
    public void process(List<? extends EventMessage<?>> list) {
        try {
            super.process(list);
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e2) {
            throw new EventProcessingException("Exception occurred while processing events", e2);
        }
    }

    @Override // org.axonframework.eventhandling.EventProcessor
    public void shutDown() {
        IOUtils.closeQuietly(this.eventBusRegistration);
    }
}
