package io.micronaut.rabbitmq.intercept;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.RecoverableChannel;
import com.rabbitmq.client.ShutdownSignalException;
import io.micronaut.context.BeanContext;
import io.micronaut.context.Qualifier;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.bind.BoundExecutable;
import io.micronaut.core.bind.DefaultExecutableBinder;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.BeanDefinition;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.messaging.Acknowledgement;
import io.micronaut.messaging.exceptions.MessageAcknowledgementException;
import io.micronaut.messaging.exceptions.MessageListenerException;
import io.micronaut.rabbitmq.annotation.Queue;
import io.micronaut.rabbitmq.annotation.RabbitConnection;
import io.micronaut.rabbitmq.annotation.RabbitProperty;
import io.micronaut.rabbitmq.bind.AcknowledgmentAction;
import io.micronaut.rabbitmq.bind.RabbitBinderRegistry;
import io.micronaut.rabbitmq.bind.RabbitConsumerState;
import io.micronaut.rabbitmq.bind.RabbitMessageCloseable;
import io.micronaut.rabbitmq.connect.ChannelPool;
import io.micronaut.rabbitmq.exception.RabbitListenerException;
import io.micronaut.rabbitmq.exception.RabbitListenerExceptionHandler;
import io.micronaut.rabbitmq.serdes.RabbitMessageSerDes;
import io.micronaut.rabbitmq.serdes.RabbitMessageSerDesRegistry;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:io/micronaut/rabbitmq/intercept/RabbitMQConsumerAdvice.class */
public class RabbitMQConsumerAdvice implements ExecutableMethodProcessor<Queue>, AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQConsumerAdvice.class);
    private final BeanContext beanContext;
    private final RabbitBinderRegistry binderRegistry;
    private final RabbitListenerExceptionHandler exceptionHandler;
    private final RabbitMessageSerDesRegistry serDesRegistry;
    private final ConversionService conversionService;
    private final Map<String, ChannelPool> channelPools;
    private final List<RecoverableConsumerWrapper> consumers = new CopyOnWriteArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:io/micronaut/rabbitmq/intercept/RabbitMQConsumerAdvice$DeliverCallback.class */
    public interface DeliverCallback {
        void handle(Channel channel, Delivery delivery);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/micronaut/rabbitmq/intercept/RabbitMQConsumerAdvice$RecoverableConsumerWrapper.class */
    public class RecoverableConsumerWrapper {
        final String consumerTag;
        private final ExecutorService executorService;
        private final String queue;
        private final boolean exclusive;
        private final Map<String, Object> arguments;
        private final ChannelPool channelPool;
        private final Integer prefetch;
        private final DeliverCallback deliverCallback;
        private final boolean autoAcknowledgment;
        private com.rabbitmq.client.DefaultConsumer consumer;
        private final AtomicInteger handlingDeliveryCount = new AtomicInteger();
        private boolean canceled = false;

        RecoverableConsumerWrapper(String str, String str2, ExecutorService executorService, boolean z, Map<String, Object> map, ChannelPool channelPool, Integer num, DeliverCallback deliverCallback, boolean z2) throws IOException {
            this.queue = str;
            this.consumerTag = str2;
            this.executorService = executorService;
            this.exclusive = z;
            this.arguments = map;
            this.channelPool = channelPool;
            this.prefetch = num;
            this.deliverCallback = deliverCallback;
            this.autoAcknowledgment = z2;
            Channel channel = null;
            try {
                channel = channelPool.getChannel();
                this.consumer = createConsumer(channel);
            } catch (IOException e) {
                if (channel != null) {
                    channelPool.returnChannel(channel);
                }
                throw e;
            }
        }

        public synchronized void cancel() {
            this.canceled = true;
            if (this.consumer == null) {
                return;
            }
            Channel channel = this.consumer.getChannel();
            try {
                channel.basicCancel(this.consumerTag);
            } catch (IOException | AlreadyClosedException e) {
            }
            while (this.handlingDeliveryCount.get() > 0) {
                try {
                    wait(500L);
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    return;
                } finally {
                    this.consumer = null;
                    RabbitMQConsumerAdvice.this.consumers.remove(this);
                    this.channelPool.returnChannel(channel);
                }
            }
        }

        private void performConsumerRecovery() {
            com.rabbitmq.client.DefaultConsumer defaultConsumer = null;
            int i = 0;
            while (defaultConsumer == null) {
                try {
                    synchronized (this) {
                        if (this.canceled) {
                            return;
                        }
                        RabbitMQConsumerAdvice.LOG.debug("consumer [{}] recovery attempt: {}", this.consumerTag, Integer.valueOf(i + 1));
                        int i2 = i;
                        i++;
                        defaultConsumer = createConsumer(this.channelPool.getChannelWithRecoveringDelay(i2));
                        this.consumer = defaultConsumer;
                    }
                } catch (IOException e) {
                    if (0 != 0) {
                        this.channelPool.returnChannel(null);
                    }
                    RabbitMQConsumerAdvice.LOG.warn("Recovery attempt {} for consumer [{}] failed, will retry.", new Object[]{Integer.valueOf(i), this.consumerTag, e});
                } catch (InterruptedException e2) {
                    RabbitMQConsumerAdvice.LOG.warn("The consumer [{}] recovery was interrupted. The consumer will not recover.", this.consumerTag, e2);
                    Thread.currentThread().interrupt();
                    return;
                }
            }
            RabbitMQConsumerAdvice.LOG.info("consumer [{}] recovered", this.consumerTag);
        }

        private com.rabbitmq.client.DefaultConsumer createConsumer(Channel channel) throws IOException {
            RabbitMQConsumerAdvice.setChannelPrefetch(this.prefetch, channel);
            com.rabbitmq.client.DefaultConsumer defaultConsumer = new com.rabbitmq.client.DefaultConsumer(channel) { // from class: io.micronaut.rabbitmq.intercept.RabbitMQConsumerAdvice.RecoverableConsumerWrapper.1
                public void handleCancel(String str) throws IOException {
                    synchronized (RecoverableConsumerWrapper.this) {
                        RecoverableConsumerWrapper.this.consumer = null;
                        RecoverableConsumerWrapper.this.channelPool.returnChannel(getChannel());
                    }
                    if (RecoverableConsumerWrapper.this.channelPool.isTopologyRecoveryEnabled() && (getChannel() instanceof RecoverableChannel)) {
                        RabbitMQConsumerAdvice.LOG.warn("The consumer [{}] subscription was canceled, a recovery will be tried.", str);
                        RecoverableConsumerWrapper.this.performConsumerRecovery();
                    } else {
                        RabbitMQConsumerAdvice.LOG.warn("The RabbitMQ consumer [{}] was canceled. Recovery is not enabled. It will no longer receive messages", str);
                        RecoverableConsumerWrapper.this.cancel();
                    }
                }

                public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
                    if ((getChannel() instanceof RecoverableChannel) && shutdownSignalException.isHardError()) {
                        RabbitMQConsumerAdvice.LOG.info("The underlying connection was terminated. Automatic recovery attempt is underway for consumer [{}]", str);
                        return;
                    }
                    if (!RecoverableConsumerWrapper.this.channelPool.isTopologyRecoveryEnabled() || !(getChannel() instanceof RecoverableChannel)) {
                        RabbitMQConsumerAdvice.LOG.error("The channel was closed. Recovery is not enabled. The consumer [{}] will no longer receive messages", str, shutdownSignalException);
                        RecoverableConsumerWrapper.this.cancel();
                        return;
                    }
                    RabbitMQConsumerAdvice.LOG.info("The channel of this consumer was terminated. Automatic recovery attempt is underway for consumer [{}]", str, shutdownSignalException);
                    synchronized (RecoverableConsumerWrapper.this) {
                        RecoverableConsumerWrapper.this.consumer = null;
                        RecoverableConsumerWrapper.this.channelPool.returnChannel(getChannel());
                    }
                    RecoverableConsumerWrapper.this.performConsumerRecovery();
                }

                public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                    if (RecoverableConsumerWrapper.this.canceled || !getChannel().isOpen()) {
                        return;
                    }
                    RecoverableConsumerWrapper.this.handlingDeliveryCount.incrementAndGet();
                    if (RecoverableConsumerWrapper.this.executorService != null) {
                        RecoverableConsumerWrapper.this.executorService.submit(() -> {
                            callbackHandle(envelope, basicProperties, bArr);
                        });
                    } else {
                        callbackHandle(envelope, basicProperties, bArr);
                    }
                }

                private void callbackHandle(Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
                    try {
                        RecoverableConsumerWrapper.this.deliverCallback.handle(getChannel(), new Delivery(envelope, basicProperties, bArr));
                        RecoverableConsumerWrapper.this.handlingDeliveryCount.decrementAndGet();
                    } catch (Throwable th) {
                        RecoverableConsumerWrapper.this.handlingDeliveryCount.decrementAndGet();
                        throw th;
                    }
                }
            };
            channel.basicConsume(this.queue, this.autoAcknowledgment, this.consumerTag, false, this.exclusive, this.arguments, defaultConsumer);
            return defaultConsumer;
        }
    }

    public RabbitMQConsumerAdvice(BeanContext beanContext, RabbitBinderRegistry rabbitBinderRegistry, RabbitListenerExceptionHandler rabbitListenerExceptionHandler, RabbitMessageSerDesRegistry rabbitMessageSerDesRegistry, ConversionService conversionService, List<ChannelPool> list) {
        this.beanContext = beanContext;
        this.binderRegistry = rabbitBinderRegistry;
        this.exceptionHandler = rabbitListenerExceptionHandler;
        this.serDesRegistry = rabbitMessageSerDesRegistry;
        this.conversionService = conversionService;
        this.channelPools = new HashMap(list.size());
        for (ChannelPool channelPool : list) {
            this.channelPools.put(channelPool.getName(), channelPool);
        }
    }

    public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> executableMethod) {
        AnnotationValue annotation = executableMethod.getAnnotation(Queue.class);
        if (annotation != null) {
            String str = (String) annotation.getRequiredValue(String.class);
            String str2 = executableMethod.getDeclaringType().getSimpleName() + "#" + executableMethod;
            boolean booleanValue = ((Boolean) annotation.getRequiredValue("reQueue", Boolean.TYPE)).booleanValue();
            boolean booleanValue2 = ((Boolean) annotation.getRequiredValue("exclusive", Boolean.TYPE)).booleanValue();
            boolean anyMatch = Arrays.stream(executableMethod.getArguments()).anyMatch(argument -> {
                return Acknowledgement.class.isAssignableFrom(argument.getType());
            });
            boolean z = !anyMatch && ((Boolean) annotation.getRequiredValue("autoAcknowledgment", Boolean.TYPE)).booleanValue();
            boolean z2 = z || anyMatch;
            OptionalInt intValue = annotation.intValue("prefetch");
            Integer valueOf = intValue.isPresent() ? Integer.valueOf(intValue.getAsInt()) : null;
            int orElse = annotation.intValue("numberOfConsumers").orElse(1);
            ChannelPool channelPool = getChannelPool(executableMethod);
            ExecutorService executorService = getExecutorService(executableMethod);
            Map<String, Object> retrieveArguments = retrieveArguments(executableMethod);
            Object executableMethodBean = getExecutableMethodBean(beanDefinition, executableMethod);
            boolean isVoid = executableMethod.getReturnType().isVoid();
            DefaultExecutableBinder defaultExecutableBinder = new DefaultExecutableBinder();
            DeliverCallback deliverCallback = (channel, delivery) -> {
                RabbitConsumerState rabbitConsumerState = new RabbitConsumerState(delivery.getEnvelope(), delivery.getProperties(), delivery.getBody(), channel);
                BoundExecutable boundExecutable = null;
                try {
                    boundExecutable = defaultExecutableBinder.bind(executableMethod, this.binderRegistry, rabbitConsumerState);
                } catch (Throwable th) {
                    handleException(new RabbitListenerException("An error occurred binding the message to the method", th, executableMethodBean, rabbitConsumerState));
                }
                try {
                    if (boundExecutable != null) {
                        try {
                            try {
                                try {
                                    RabbitMessageCloseable withAcknowledgmentAction = new RabbitMessageCloseable(rabbitConsumerState, false, booleanValue).withAcknowledgmentAction(z2 ? AcknowledgmentAction.NONE : AcknowledgmentAction.NACK);
                                    try {
                                        Object invoke = boundExecutable.invoke(executableMethodBean);
                                        String replyTo = delivery.getProperties().getReplyTo();
                                        if (!isVoid && StringUtils.isNotEmpty(replyTo)) {
                                            MutableBasicProperties mutableBasicProperties = new MutableBasicProperties();
                                            mutableBasicProperties.setCorrelationId(delivery.getProperties().getCorrelationId());
                                            byte[] bArr = null;
                                            if (invoke != null) {
                                                Optional findSerdes = this.serDesRegistry.findSerdes(executableMethod.getReturnType().asArgument());
                                                Class<RabbitMessageSerDes> cls = RabbitMessageSerDes.class;
                                                Objects.requireNonNull(RabbitMessageSerDes.class);
                                                bArr = ((RabbitMessageSerDes) findSerdes.map((v1) -> {
                                                    return r1.cast(v1);
                                                }).orElseThrow(() -> {
                                                    return new RabbitListenerException(String.format("Could not find a serializer for the body argument of type [%s]", invoke.getClass().getName()), executableMethodBean, rabbitConsumerState);
                                                })).serialize(invoke, mutableBasicProperties);
                                            }
                                            channel.basicPublish("", replyTo, mutableBasicProperties.toBasicProperties(), bArr);
                                        }
                                        if (!z2) {
                                            withAcknowledgmentAction.withAcknowledgmentAction(AcknowledgmentAction.ACK);
                                        }
                                        if (withAcknowledgmentAction != null) {
                                            withAcknowledgmentAction.close();
                                        }
                                    } catch (Throwable th2) {
                                        if (withAcknowledgmentAction != null) {
                                            try {
                                                withAcknowledgmentAction.close();
                                            } catch (Throwable th3) {
                                                th2.addSuppressed(th3);
                                            }
                                        }
                                        throw th2;
                                    }
                                } catch (MessageAcknowledgementException e) {
                                    throw e;
                                }
                            } catch (Throwable th4) {
                                handleException(new RabbitListenerException("An error occurred executing the listener", th4, executableMethodBean, rabbitConsumerState));
                            }
                        } catch (RabbitListenerException e2) {
                            handleException(e2);
                        }
                    } else {
                        new RabbitMessageCloseable(rabbitConsumerState, false, booleanValue).withAcknowledgmentAction(AcknowledgmentAction.NACK).close();
                    }
                } catch (MessageAcknowledgementException e3) {
                    handleException(new RabbitListenerException(e3.getMessage(), e3, executableMethodBean, rabbitConsumerState));
                }
            };
            for (int i = 0; i < orElse; i++) {
                try {
                    String str3 = str2 + "[" + i + "]";
                    LOG.debug("Registering a consumer to queue [{}] with client tag [{}]", str, str3);
                    this.consumers.add(new RecoverableConsumerWrapper(str, str3, executorService, booleanValue2, retrieveArguments, channelPool, valueOf, deliverCallback, z));
                } catch (Throwable th) {
                    handleException(new RabbitListenerException("An error occurred subscribing to a queue", th, executableMethodBean, null));
                    return;
                }
            }
        }
    }

    private ChannelPool getChannelPool(ExecutableMethod<?, ?> executableMethod) {
        String str = (String) executableMethod.stringValue(RabbitConnection.class, "connection").orElse("default");
        return (ChannelPool) Optional.ofNullable(this.channelPools.get(str)).orElseThrow(() -> {
            return new MessageListenerException(String.format("Failed to find a channel pool named [%s] to register a listener", str));
        });
    }

    private static void setChannelPrefetch(Integer num, Channel channel) {
        if (num != null) {
            try {
                channel.basicQos(num.intValue());
            } catch (IOException e) {
                throw new MessageListenerException(String.format("Failed to set a prefetch count of [%s] on the channel", num), e);
            }
        }
    }

    private Map<String, Object> retrieveArguments(ExecutableMethod<?, ?> executableMethod) {
        HashMap hashMap = new HashMap();
        List annotationValuesByType = executableMethod.getAnnotationValuesByType(RabbitProperty.class);
        Collections.reverse(annotationValuesByType);
        annotationValuesByType.forEach(annotationValue -> {
            String str = (String) annotationValue.getRequiredValue("name", String.class);
            String str2 = (String) annotationValue.stringValue().orElse(null);
            Class cls = (Class) annotationValue.get("type", Class.class).orElse(null);
            if (StringUtils.isNotEmpty(str) && StringUtils.isNotEmpty(str2)) {
                if (cls == null || cls == Void.class) {
                    hashMap.put(str, str2);
                    return;
                }
                Optional convert = this.conversionService.convert(str2, cls);
                if (!convert.isPresent()) {
                    throw new MessageListenerException(String.format("Could not convert the argument [%s] to the required type [%s]", str, cls));
                }
                hashMap.put(str, convert.get());
            }
        });
        return hashMap;
    }

    private Object getExecutableMethodBean(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> executableMethod) {
        Qualifier qualifier = (Qualifier) beanDefinition.getAnnotationNameByStereotype("javax.inject.Qualifier").map(str -> {
            return Qualifiers.byAnnotation(beanDefinition, str);
        }).orElse(null);
        return this.beanContext.findBean(beanDefinition.getBeanType(), qualifier).orElseThrow(() -> {
            return new MessageListenerException("Could not find the bean to execute the method " + executableMethod);
        });
    }

    private ExecutorService getExecutorService(ExecutableMethod<?, ?> executableMethod) {
        String str = (String) executableMethod.stringValue(RabbitConnection.class, "executor").orElse(null);
        if (str != null) {
            return (ExecutorService) this.beanContext.findBean(ExecutorService.class, Qualifiers.byName(str)).orElseThrow(() -> {
                return new MessageListenerException(String.format("Could not find the executor service [%s] specified for the method [%s]", str, executableMethod));
            });
        }
        return null;
    }

    @Override // java.lang.AutoCloseable
    @PreDestroy
    public void close() throws Exception {
        this.consumers.forEach((v0) -> {
            v0.cancel();
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void handleException(RabbitListenerException rabbitListenerException) {
        Object listener = rabbitListenerException.getListener();
        if (listener instanceof RabbitListenerExceptionHandler) {
            ((RabbitListenerExceptionHandler) listener).handle(rabbitListenerException);
        } else {
            this.exceptionHandler.handle(rabbitListenerException);
        }
    }

    public /* bridge */ /* synthetic */ void process(BeanDefinition beanDefinition, Object obj) {
        process((BeanDefinition<?>) beanDefinition, (ExecutableMethod<?, ?>) obj);
    }
}
