package org.springframework.amqp.rabbitmq.client.listener;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.amqp.Consumer;
import com.rabbitmq.client.amqp.Message;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.AmqpAcknowledgment;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.listener.adapter.InvocationResult;
import org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter;
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
import org.springframework.amqp.rabbit.listener.support.ContainerUtils;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory;
import org.springframework.amqp.rabbitmq.client.RabbitAmqpTemplate;
import org.springframework.amqp.rabbitmq.client.RabbitAmqpUtils;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/amqp/rabbitmq/client/listener/RabbitAmqpMessageListenerAdapter.class */
public class RabbitAmqpMessageListenerAdapter extends MessagingMessageListenerAdapter implements RabbitAmqpMessageListener {
    private Collection<MessagePostProcessor> afterReceivePostProcessors;
    private RabbitAmqpTemplate rabbitAmqpTemplate;

    public RabbitAmqpMessageListenerAdapter(Object obj, Method method, boolean z, RabbitListenerErrorHandler rabbitListenerErrorHandler, boolean z2) {
        super(obj, method, z, rabbitListenerErrorHandler, z2);
    }

    public void setAfterReceivePostProcessors(Collection<MessagePostProcessor> collection) {
        this.afterReceivePostProcessors = new ArrayList(collection);
    }

    public void setConnectionFactory(AmqpConnectionFactory amqpConnectionFactory) {
        this.rabbitAmqpTemplate = new RabbitAmqpTemplate(amqpConnectionFactory);
    }

    @Override // org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListener
    public void onAmqpMessage(Message message, Consumer.Context context) {
        org.springframework.amqp.core.Message fromAmqpMessage = RabbitAmqpUtils.fromAmqpMessage(message, context);
        if (this.afterReceivePostProcessors != null) {
            Iterator<MessagePostProcessor> it = this.afterReceivePostProcessors.iterator();
            while (it.hasNext()) {
                fromAmqpMessage = it.next().postProcessMessage(fromAmqpMessage);
            }
        }
        try {
            org.springframework.messaging.Message messagingMessage = toMessagingMessage(fromAmqpMessage);
            InvocationResult invoke = getHandlerAdapter().invoke(messagingMessage, new Object[]{fromAmqpMessage, fromAmqpMessage.getMessageProperties().getAmqpAcknowledgment(), message, context});
            if (invoke.getReturnValue() != null) {
                Assert.notNull(this.rabbitAmqpTemplate, "The 'connectionFactory' must be provided for handling replies.");
                handleResult(invoke, fromAmqpMessage, null, messagingMessage);
            }
        } catch (Exception e) {
            throw new ListenerExecutionFailedException("Failed to invoke listener", e, new org.springframework.amqp.core.Message[]{fromAmqpMessage});
        }
    }

    protected void asyncFailure(org.springframework.amqp.core.Message message, Channel channel, Throwable th, Object obj) {
        try {
            handleException(message, channel, (org.springframework.messaging.Message) obj, new ListenerExecutionFailedException("Async Fail", th, new org.springframework.amqp.core.Message[]{message}));
        } catch (Exception e) {
            this.logger.error("Future, Mono, or suspend function was completed with an exception for " + String.valueOf(message), th);
            AmqpAcknowledgment amqpAcknowledgment = message.getMessageProperties().getAmqpAcknowledgment();
            Assert.notNull(amqpAcknowledgment, "'(amqpAcknowledgment' must be provided into request message.");
            if (ContainerUtils.shouldRequeue(isDefaultRequeueRejected(), th, this.logger)) {
                amqpAcknowledgment.acknowledge(AmqpAcknowledgment.Status.REQUEUE);
            } else {
                amqpAcknowledgment.acknowledge(AmqpAcknowledgment.Status.REJECT);
            }
        }
    }

    protected void sendResponse(Channel channel, Address address, org.springframework.amqp.core.Message message) {
        CompletableFuture<Boolean> send;
        org.springframework.amqp.core.Message message2 = message;
        MessagePostProcessor[] beforeSendReplyPostProcessors = getBeforeSendReplyPostProcessors();
        if (beforeSendReplyPostProcessors != null) {
            for (MessagePostProcessor messagePostProcessor : beforeSendReplyPostProcessors) {
                message2 = messagePostProcessor.postProcessMessage(message2);
            }
        }
        String exchangeName = address.getExchangeName();
        String routingKey = address.getRoutingKey();
        if (StringUtils.hasText(exchangeName)) {
            send = this.rabbitAmqpTemplate.send(exchangeName, routingKey, message2);
        } else {
            Assert.hasText(routingKey, "The 'replyTo' must be provided, in request message or in @SendTo.");
            send = this.rabbitAmqpTemplate.send(routingKey.replaceFirst("queues/", ""), message2);
        }
        send.join();
    }

    protected void basicAck(org.springframework.amqp.core.Message message, Channel channel) {
        AmqpAcknowledgment amqpAcknowledgment = message.getMessageProperties().getAmqpAcknowledgment();
        Assert.notNull(amqpAcknowledgment, "'(amqpAcknowledgment' must be provided into request message.");
        amqpAcknowledgment.acknowledge();
    }

    public void onMessageBatch(List<org.springframework.amqp.core.Message> list) {
        GenericMessage genericMessage;
        AmqpAcknowledgment amqpAcknowledgment = (AmqpAcknowledgment) list.stream().findAny().map(message -> {
            return message.getMessageProperties().getAmqpAcknowledgment();
        }).orElse(null);
        if (this.messagingMessageConverter.isAmqpMessageList()) {
            genericMessage = new GenericMessage(list);
        } else {
            List list2 = list.stream().map(message2 -> {
                return this.toMessagingMessage(message2);
            }).toList();
            if (this.messagingMessageConverter.isMessageList()) {
                genericMessage = new GenericMessage(list2);
            } else {
                ArrayList arrayList = new ArrayList();
                Iterator it = list2.iterator();
                while (it.hasNext()) {
                    arrayList.add(((org.springframework.messaging.Message) it.next()).getPayload());
                }
                genericMessage = new GenericMessage(arrayList);
            }
        }
        try {
            if (getHandlerAdapter().invoke(genericMessage, new Object[]{amqpAcknowledgment}).getReturnValue() != null) {
                this.logger.warn("Replies for batches are not currently supported with RabbitMQ AMQP 1.0 listeners");
            }
        } catch (Exception e) {
            throw new ListenerExecutionFailedException("Failed to invoke listener", e, (org.springframework.amqp.core.Message[]) list.toArray(new org.springframework.amqp.core.Message[0]));
        }
    }
}
