package org.axonframework.amqp.eventhandling.spring;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.axonframework.amqp.eventhandling.AMQPMessage;
import org.axonframework.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.amqp.eventhandling.DefaultAMQPMessageConverter;
import org.axonframework.amqp.eventhandling.EventPublicationFailedException;
import org.axonframework.amqp.eventhandling.PackageRoutingKeyResolver;
import org.axonframework.amqp.eventhandling.RoutingKeyResolver;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.axonframework.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

/* loaded from: input_file:org/axonframework/amqp/eventhandling/spring/SpringAMQPPublisher.class */
public class SpringAMQPPublisher implements InitializingBean, ApplicationContextAware {
    private static final Logger logger = LoggerFactory.getLogger(SpringAMQPPublisher.class);
    private static final String DEFAULT_EXCHANGE_NAME = "Axon.EventBus";
    private final SubscribableMessageSource<EventMessage<?>> messageSource;
    private ConnectionFactory connectionFactory;
    private String exchangeName = DEFAULT_EXCHANGE_NAME;
    private boolean isTransactional = false;
    private boolean isDurable = true;
    private AMQPMessageConverter messageConverter;
    private ApplicationContext applicationContext;
    private Serializer serializer;
    private RoutingKeyResolver routingKeyResolver;
    private boolean waitForAck;
    private long publisherAckTimeout;
    private Registration eventBusRegistration;

    public SpringAMQPPublisher(SubscribableMessageSource<EventMessage<?>> subscribableMessageSource) {
        this.messageSource = subscribableMessageSource;
    }

    public void start() {
        this.eventBusRegistration = this.messageSource.subscribe(this::send);
    }

    public void shutDown() {
        if (this.eventBusRegistration != null) {
            this.eventBusRegistration.cancel();
            this.eventBusRegistration = null;
        }
    }

    protected void send(List<? extends EventMessage<?>> list) {
        Channel createChannel = this.connectionFactory.createConnection().createChannel(this.isTransactional);
        try {
            try {
                try {
                    if (this.isTransactional) {
                        createChannel.txSelect();
                    } else if (this.waitForAck) {
                        createChannel.confirmSelect();
                    }
                    Iterator<? extends EventMessage<?>> it = list.iterator();
                    while (it.hasNext()) {
                        doSendMessage(createChannel, this.messageConverter.createAMQPMessage(it.next()));
                    }
                    if (CurrentUnitOfWork.isStarted()) {
                        UnitOfWork unitOfWork = CurrentUnitOfWork.get();
                        unitOfWork.onCommit(unitOfWork2 -> {
                            if ((this.isTransactional || this.waitForAck) && !createChannel.isOpen()) {
                                throw new EventPublicationFailedException("Unable to Commit UnitOfWork changes to AMQP: Channel is closed.", createChannel.getCloseReason());
                            }
                        });
                        unitOfWork.afterCommit(unitOfWork3 -> {
                            try {
                                if (this.isTransactional) {
                                    createChannel.txCommit();
                                } else if (this.waitForAck) {
                                    try {
                                        createChannel.waitForConfirmsOrDie(this.publisherAckTimeout);
                                    } catch (IOException e) {
                                        throw new EventPublicationFailedException("Failed to receive acknowledgements for all events", e);
                                    } catch (TimeoutException e2) {
                                        throw new EventPublicationFailedException("Timeout while waiting for publisher acknowledgements", e2);
                                    }
                                }
                            } catch (IOException e3) {
                                logger.warn("Unable to commit transaction on channel.", e3);
                            } catch (InterruptedException e4) {
                                logger.warn("Interrupt received when waiting for message confirms.");
                                Thread.currentThread().interrupt();
                            }
                            tryClose(createChannel);
                        });
                        unitOfWork.onRollback(unitOfWork4 -> {
                            try {
                                if (this.isTransactional) {
                                    createChannel.txRollback();
                                }
                            } catch (IOException e) {
                                logger.warn("Unable to rollback transaction on channel.", e);
                            }
                            tryClose(createChannel);
                        });
                    } else if (this.isTransactional) {
                        createChannel.txCommit();
                    } else if (this.waitForAck) {
                        createChannel.waitForConfirmsOrDie();
                    }
                    if (CurrentUnitOfWork.isStarted()) {
                        return;
                    }
                    tryClose(createChannel);
                } catch (InterruptedException e) {
                    logger.warn("Interrupt received when waiting for message confirms.");
                    Thread.currentThread().interrupt();
                    if (CurrentUnitOfWork.isStarted()) {
                        return;
                    }
                    tryClose(createChannel);
                }
            } catch (ShutdownSignalException e2) {
                throw new EventPublicationFailedException("Failed to dispatch Events to the Message Broker.", e2);
            } catch (IOException e3) {
                if (this.isTransactional) {
                    tryRollback(createChannel);
                }
                throw new EventPublicationFailedException("Failed to dispatch Events to the Message Broker.", e3);
            }
        } catch (Throwable th) {
            if (!CurrentUnitOfWork.isStarted()) {
                tryClose(createChannel);
            }
            throw th;
        }
    }

    private void tryClose(Channel channel) {
        try {
            channel.close();
        } catch (IOException | TimeoutException e) {
            logger.info("Unable to close channel. It might already be closed.", e);
        }
    }

    protected void doSendMessage(Channel channel, AMQPMessage aMQPMessage) throws IOException {
        channel.basicPublish(this.exchangeName, aMQPMessage.getRoutingKey(), aMQPMessage.isMandatory(), aMQPMessage.isImmediate(), aMQPMessage.getProperties(), aMQPMessage.getBody());
    }

    private void tryRollback(Channel channel) {
        try {
            channel.txRollback();
        } catch (IOException e) {
            logger.debug("Unable to rollback. The underlying channel might already be closed.", e);
        }
    }

    public void afterPropertiesSet() {
        if (this.connectionFactory == null) {
            this.connectionFactory = (ConnectionFactory) this.applicationContext.getBean(ConnectionFactory.class);
        }
        if (this.messageConverter == null) {
            if (this.serializer == null) {
                this.serializer = (Serializer) this.applicationContext.getBean(Serializer.class);
            }
            if (this.routingKeyResolver == null) {
                Map beansOfType = this.applicationContext.getBeansOfType(RoutingKeyResolver.class);
                if (beansOfType.size() > 1) {
                    throw new AxonConfigurationException("No MessageConverter was configured, but none can be created using autowired properties, as more than 1 RoutingKeyResolver is present in the ApplicationContent");
                }
                if (beansOfType.size() == 1) {
                    this.routingKeyResolver = (RoutingKeyResolver) beansOfType.values().iterator().next();
                } else {
                    this.routingKeyResolver = new PackageRoutingKeyResolver();
                }
            }
            this.messageConverter = new DefaultAMQPMessageConverter(this.serializer, this.routingKeyResolver, this.isDurable);
        }
    }

    public void setTransactional(boolean z) {
        Assert.isTrue((this.waitForAck && z) ? false : true, () -> {
            return "Cannot set transactional behavior when 'waitForServerAck' is enabled.";
        });
        this.isTransactional = z;
    }

    public void setWaitForPublisherAck(boolean z) {
        Assert.isTrue((z && this.isTransactional) ? false : true, () -> {
            return "Cannot set 'waitForPublisherAck' when using transactions.";
        });
        this.waitForAck = z;
    }

    public void setPublisherAckTimeout(long j) {
        this.publisherAckTimeout = j;
    }

    public void setConnectionFactory(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public void setMessageConverter(AMQPMessageConverter aMQPMessageConverter) {
        this.messageConverter = aMQPMessageConverter;
    }

    public void setDurable(boolean z) {
        this.isDurable = z;
    }

    public void setSerializer(Serializer serializer) {
        this.serializer = serializer;
    }

    public void setRoutingKeyResolver(RoutingKeyResolver routingKeyResolver) {
        this.routingKeyResolver = routingKeyResolver;
    }

    public void setExchangeName(String str) {
        this.exchangeName = str;
    }

    public void setExchange(Exchange exchange) {
        this.exchangeName = exchange.getName();
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}
