package org.springframework.amqp.rabbitmq.client.listener;

import com.rabbitmq.client.amqp.Connection;
import com.rabbitmq.client.amqp.Consumer;
import com.rabbitmq.client.amqp.Message;
import com.rabbitmq.client.amqp.Resource;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.support.ContainerUtils;
import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory;
import org.springframework.amqp.rabbitmq.client.RabbitAmqpUtils;
import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.log.LogAccessor;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;

/* loaded from: input_file:org/springframework/amqp/rabbitmq/client/listener/RabbitAmqpListenerContainer.class */
public class RabbitAmqpListenerContainer implements MessageListenerContainer, BeanNameAware, DisposableBean {
    private static final LogAccessor LOG = new LogAccessor(LogFactory.getLog(RabbitAmqpListenerContainer.class));
    private final AmqpConnectionFactory connectionFactory;
    private String[] queues;
    private Advice[] adviceChain;
    private int priority;
    private Resource.StateListener[] stateListeners;
    private MessageListener messageListener;
    private MessageListener proxy;
    private boolean asyncReplies;
    private Collection<MessagePostProcessor> afterReceivePostProcessors;
    private String listenerId;
    private int batchSize;
    private TaskScheduler taskScheduler;
    private final Lock lock = new ReentrantLock();
    private final MultiValueMap<String, Consumer> queueToConsumers = new LinkedMultiValueMap();
    private int initialCredits = 100;
    private boolean autoSettle = true;
    private boolean defaultRequeue = true;
    private int consumersPerQueue = 1;
    private ErrorHandler errorHandler = new ConditionalRejectingErrorHandler();
    private boolean autoStartup = true;
    private String beanName = "not.a.Spring.bean";
    private Duration gracefulShutdownPeriod = Duration.ofSeconds(30);
    private Duration batchReceiveDuration = Duration.ofSeconds(30);
    private boolean internalTaskScheduler = true;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/amqp/rabbitmq/client/listener/RabbitAmqpListenerContainer$ConsumerMessageHandler.class */
    public class ConsumerMessageHandler implements Consumer.MessageHandler {
        private volatile ConsumerBatch consumerBatch;

        /* loaded from: input_file:org/springframework/amqp/rabbitmq/client/listener/RabbitAmqpListenerContainer$ConsumerMessageHandler$ConsumerBatch.class */
        private class ConsumerBatch {
            private final List<Message> batch = new ArrayList();
            private final Consumer.BatchContext batchContext;
            private volatile ScheduledFuture<?> batchReleaseFuture;

            ConsumerBatch(Consumer.BatchContext batchContext) {
                this.batchContext = batchContext;
            }

            void add(Consumer.Context context, Message message) {
                this.batchContext.add(context);
                this.batch.add(message);
                if (this.batchReleaseFuture == null) {
                    this.batchReleaseFuture = ((TaskScheduler) Objects.requireNonNull(RabbitAmqpListenerContainer.this.taskScheduler)).schedule(this::releaseInternal, Instant.now().plus((TemporalAmount) RabbitAmqpListenerContainer.this.batchReceiveDuration));
                }
            }

            void release() {
                ScheduledFuture<?> scheduledFuture = this.batchReleaseFuture;
                if (scheduledFuture != null) {
                    scheduledFuture.cancel(true);
                    releaseInternal();
                }
            }

            private void releaseInternal() {
                if (this.batchReleaseFuture != null) {
                    this.batchReleaseFuture = null;
                    RabbitAmqpListenerContainer.this.invokeBatchListener(this.batchContext, this.batch);
                }
            }
        }

        ConsumerMessageHandler() {
        }

        public void handle(Consumer.Context context, Message message) {
            if (RabbitAmqpListenerContainer.this.batchSize <= 1) {
                RabbitAmqpListenerContainer.this.invokeListener(context, message);
                return;
            }
            ConsumerBatch consumerBatch = this.consumerBatch;
            if (consumerBatch == null || consumerBatch.batchReleaseFuture == null) {
                consumerBatch = new ConsumerBatch(context.batch(RabbitAmqpListenerContainer.this.batchSize));
                this.consumerBatch = consumerBatch;
            }
            consumerBatch.add(context, message);
            if (consumerBatch.batchContext.size() == RabbitAmqpListenerContainer.this.batchSize) {
                consumerBatch.release();
                this.consumerBatch = null;
            }
        }
    }

    public RabbitAmqpListenerContainer(AmqpConnectionFactory amqpConnectionFactory) {
        this.connectionFactory = amqpConnectionFactory;
    }

    public void setQueueNames(String... strArr) {
        this.queues = (String[]) Arrays.copyOf(strArr, strArr.length);
    }

    public void setInitialCredits(int i) {
        this.initialCredits = i;
    }

    public void setPriority(int i) {
        this.priority = i;
    }

    public void setStateListeners(Resource.StateListener... stateListenerArr) {
        this.stateListeners = (Resource.StateListener[]) Arrays.copyOf(stateListenerArr, stateListenerArr.length);
    }

    public void setAfterReceivePostProcessors(MessagePostProcessor... messagePostProcessorArr) {
        this.afterReceivePostProcessors = MessagePostProcessorUtils.sort(Arrays.asList(messagePostProcessorArr));
    }

    public void setBatchSize(int i) {
        Assert.isTrue(i > 1, "'batchSize' must be greater than 1");
        this.batchSize = i;
    }

    public void setBatchReceiveTimeout(long j) {
        this.batchReceiveDuration = Duration.ofMillis(j);
    }

    public void setTaskScheduler(TaskScheduler taskScheduler) {
        this.taskScheduler = taskScheduler;
        this.internalTaskScheduler = false;
    }

    public void setBeanName(String str) {
        this.beanName = str;
    }

    public void setAutoStartup(boolean z) {
        this.autoStartup = z;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void setAdviceChain(Advice... adviceArr) {
        Assert.notNull(adviceArr, "'advices' cannot be null");
        Assert.noNullElements(adviceArr, "'advices' cannot have null elements");
        this.adviceChain = (Advice[]) Arrays.copyOf(adviceArr, adviceArr.length);
    }

    public void setAutoSettle(boolean z) {
        this.autoSettle = z;
    }

    public void setDefaultRequeue(boolean z) {
        this.defaultRequeue = z;
    }

    public void setGracefulShutdownPeriod(Duration duration) {
        this.gracefulShutdownPeriod = duration;
    }

    public void setConsumersPerQueue(int i) {
        this.consumersPerQueue = i;
    }

    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    public void setListenerId(String str) {
        this.listenerId = str;
    }

    public String getListenerId() {
        return this.listenerId != null ? this.listenerId : this.beanName;
    }

    public void setupMessageListener(MessageListener messageListener) {
        this.messageListener = messageListener;
        this.asyncReplies = messageListener.isAsyncReplies();
        MessageListener messageListener2 = this.messageListener;
        if (messageListener2 instanceof RabbitAmqpMessageListenerAdapter) {
            ((RabbitAmqpMessageListenerAdapter) messageListener2).setConnectionFactory(this.connectionFactory);
        }
        this.proxy = this.messageListener;
        if (ObjectUtils.isEmpty(this.adviceChain)) {
            return;
        }
        ProxyFactory proxyFactory = new ProxyFactory(messageListener);
        for (Advice advice : this.adviceChain) {
            proxyFactory.addAdvisor(new DefaultPointcutAdvisor(advice));
        }
        proxyFactory.setInterfaces(messageListener.getClass().getInterfaces());
        this.proxy = (MessageListener) proxyFactory.getProxy(getClass().getClassLoader());
    }

    public Object getMessageListener() {
        return this.proxy;
    }

    public void afterPropertiesSet() {
        Assert.state(this.queues != null, "At least one queue has to be provided for consuming.");
        Assert.state(this.messageListener != null, "The 'messageListener' must be provided.");
        if (this.asyncReplies && this.autoSettle) {
            LOG.info("Enforce MANUAL settlement for async replies.");
            this.autoSettle = false;
        }
        this.messageListener.containerAckMode(this.autoSettle ? AcknowledgeMode.AUTO : AcknowledgeMode.MANUAL);
        MessageListener messageListener = this.messageListener;
        if (messageListener instanceof RabbitAmqpMessageListenerAdapter) {
            RabbitAmqpMessageListenerAdapter rabbitAmqpMessageListenerAdapter = (RabbitAmqpMessageListenerAdapter) messageListener;
            if (this.afterReceivePostProcessors != null) {
                rabbitAmqpMessageListenerAdapter.setAfterReceivePostProcessors(this.afterReceivePostProcessors);
            }
        }
        if (this.batchSize <= 1 || !this.internalTaskScheduler) {
            return;
        }
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
        threadPoolTaskScheduler.setThreadNamePrefix(getListenerId() + "-consumerMonitor-");
        threadPoolTaskScheduler.afterPropertiesSet();
        this.taskScheduler = threadPoolTaskScheduler;
    }

    public boolean isRunning() {
        this.lock.lock();
        try {
            return !this.queueToConsumers.isEmpty();
        } finally {
            this.lock.unlock();
        }
    }

    public void start() {
        this.lock.lock();
        try {
            if (this.queueToConsumers.isEmpty()) {
                Connection connection = this.connectionFactory.getConnection();
                for (String str : this.queues) {
                    for (int i = 0; i < this.consumersPerQueue; i++) {
                        this.queueToConsumers.add(str, connection.consumerBuilder().queue(str).priority(this.priority).initialCredits(this.initialCredits).listeners(this.stateListeners).messageHandler(new ConsumerMessageHandler()).build());
                    }
                }
            }
        } finally {
            this.lock.unlock();
        }
    }

    private void invokeListener(Consumer.Context context, Message message) {
        try {
            doInvokeListener(context, message);
            if (this.autoSettle) {
                context.accept();
            }
        } catch (Exception e) {
            handleListenerError(e, context, message);
        }
    }

    private void doInvokeListener(Consumer.Context context, Message message) {
        Consumer.Context context2 = this.autoSettle ? null : context;
        MessageListener messageListener = this.proxy;
        if (messageListener instanceof RabbitAmqpMessageListener) {
            ((RabbitAmqpMessageListener) messageListener).onAmqpMessage(message, context2);
        } else {
            this.proxy.onMessage(RabbitAmqpUtils.fromAmqpMessage(message, context2));
        }
    }

    private void invokeBatchListener(Consumer.Context context, List<Message> list) {
        Consumer.Context context2 = this.autoSettle ? null : context;
        try {
            doInvokeBatchListener(list.stream().map(message -> {
                return RabbitAmqpUtils.fromAmqpMessage(message, context2);
            }).toList());
            if (this.autoSettle) {
                context.accept();
            }
        } catch (Exception e) {
            handleListenerError(e, context, list);
        }
    }

    private void doInvokeBatchListener(List<org.springframework.amqp.core.Message> list) {
        this.proxy.onMessageBatch(list);
    }

    private void handleListenerError(Exception exc, Consumer.Context context, Object obj) {
        try {
            this.errorHandler.handleError(exc);
            if (!handleSpecialErrors(exc, context)) {
                context.accept();
            }
        } catch (Exception e) {
            if (handleSpecialErrors(e, context)) {
                return;
            }
            if (this.defaultRequeue) {
                context.requeue();
            } else {
                context.discard();
            }
            LOG.error(e, () -> {
                return "The 'errorHandler' has thrown an exception. The '" + String.valueOf(obj) + "' is " + (this.defaultRequeue ? "re-queued." : "discarded.");
            });
        }
    }

    private boolean handleSpecialErrors(Exception exc, Consumer.Context context) {
        if (ContainerUtils.shouldRequeue(this.defaultRequeue, exc, LOG.getLog())) {
            context.requeue();
            return true;
        }
        if (ContainerUtils.isAmqpReject(exc)) {
            context.discard();
            return true;
        }
        if (!ContainerUtils.isImmediateAcknowledge(exc)) {
            return false;
        }
        context.accept();
        return true;
    }

    public void stop() {
        stop(() -> {
        });
    }

    public void stop(Runnable runnable) {
        this.lock.lock();
        try {
            CompletableFuture.allOf((CompletableFuture[]) this.queueToConsumers.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).map(consumer -> {
                return CompletableFuture.supplyAsync(() -> {
                    consumer.pause();
                    while (consumer.unsettledMessageCount() > 0) {
                        try {
                            try {
                                Thread.sleep(100L);
                            } finally {
                            }
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            throw new RuntimeException(e);
                        }
                    }
                    if (consumer != null) {
                        consumer.close();
                    }
                    return null;
                });
            }).toArray(i -> {
                return new CompletableFuture[i];
            })).orTimeout(this.gracefulShutdownPeriod.toMillis(), TimeUnit.MILLISECONDS).whenComplete((r4, th) -> {
                this.queueToConsumers.clear();
                runnable.run();
            });
        } finally {
            this.lock.unlock();
        }
    }

    public void pause() {
        this.queueToConsumers.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).forEach((v0) -> {
            v0.pause();
        });
    }

    public void resume() {
        this.queueToConsumers.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).forEach((v0) -> {
            v0.unpause();
        });
    }

    public void pause(String str) {
        List list = (List) this.queueToConsumers.get(str);
        if (list != null) {
            list.forEach((v0) -> {
                v0.pause();
            });
        }
    }

    public void resume(String str) {
        List list = (List) this.queueToConsumers.get(str);
        if (list != null) {
            list.forEach((v0) -> {
                v0.unpause();
            });
        }
    }

    public void destroy() {
        if (!this.internalTaskScheduler || this.taskScheduler == null) {
            return;
        }
        this.taskScheduler.shutdown();
    }
}
