package org.springframework.cloud.stream.binding;

import java.lang.reflect.Field;
import java.util.Map;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.DefaultPollableMessageSource;
import org.springframework.cloud.stream.binder.JavaClassMimeTypeUtils;
import org.springframework.cloud.stream.binder.PartitionHandler;
import org.springframework.cloud.stream.binder.PollableMessageSource;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.converter.MessageConverterUtils;
import org.springframework.cloud.stream.function.StreamFunctionProperties;
import org.springframework.core.env.Environment;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.integration.support.MutableMessageBuilderFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:BOOT-INF/lib/spring-cloud-stream-3.1.4.jar:org/springframework/cloud/stream/binding/MessageConverterConfigurer.class */
public class MessageConverterConfigurer implements MessageChannelAndSourceConfigurer, BeanFactoryAware {
    private final MessageBuilderFactory messageBuilderFactory;
    private final CompositeMessageConverter compositeMessageConverter;
    private final BindingServiceProperties bindingServiceProperties;
    private final Field headersField;
    private final StreamFunctionProperties streamFunctionProperties;
    private ConfigurableListableBeanFactory beanFactory;

    /* loaded from: input_file:BOOT-INF/lib/spring-cloud-stream-3.1.4.jar:org/springframework/cloud/stream/binding/MessageConverterConfigurer$AbstractContentTypeInterceptor.class */
    private abstract class AbstractContentTypeInterceptor implements ChannelInterceptor {
        final MimeType mimeType;

        private AbstractContentTypeInterceptor(String str) {
            this.mimeType = MessageConverterUtils.getMimeType(str);
        }

        @Override // org.springframework.messaging.support.ChannelInterceptor
        public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {
            return message instanceof ErrorMessage ? message : doPreSend(message, messageChannel);
        }

        protected abstract Message<?> doPreSend(Message<?> message, MessageChannel messageChannel);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-cloud-stream-3.1.4.jar:org/springframework/cloud/stream/binding/MessageConverterConfigurer$InboundContentTypeEnhancingInterceptor.class */
    public final class InboundContentTypeEnhancingInterceptor extends AbstractContentTypeInterceptor {
        private InboundContentTypeEnhancingInterceptor(String str) {
            super(str);
        }

        @Override // org.springframework.cloud.stream.binding.MessageConverterConfigurer.AbstractContentTypeInterceptor
        public Message<?> doPreSend(Message<?> message, MessageChannel messageChannel) {
            Map map = (Map) ReflectionUtils.getField(MessageConverterConfigurer.this.headersField, message.getHeaders());
            MimeType mimeType = this.mimeType;
            if (!message.getHeaders().containsKey("contentType")) {
                map.put("contentType", mimeType);
            } else if (message.getHeaders().get("contentType") instanceof String) {
                map.put("contentType", MimeType.valueOf((String) message.getHeaders().get("contentType")));
            }
            return message;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-cloud-stream-3.1.4.jar:org/springframework/cloud/stream/binding/MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.class */
    public final class OutboundContentTypeConvertingInterceptor extends AbstractContentTypeInterceptor {
        private final MessageConverter messageConverter;

        private OutboundContentTypeConvertingInterceptor(String str, CompositeMessageConverter compositeMessageConverter) {
            super(str);
            this.messageConverter = compositeMessageConverter;
        }

        @Override // org.springframework.cloud.stream.binding.MessageConverterConfigurer.AbstractContentTypeInterceptor
        public Message<?> doPreSend(Message<?> message, MessageChannel messageChannel) {
            if ((message.getPayload() instanceof byte[]) && message.getHeaders().containsKey("contentType")) {
                return message;
            }
            Object obj = message.getHeaders().containsKey("contentType") ? message.getHeaders().get("contentType").toString() : null;
            String mimeType = message.getPayload() instanceof String ? JavaClassMimeTypeUtils.mimeTypeFromObject(message.getPayload(), ObjectUtils.nullSafeToString(obj)).toString() : obj;
            if (!message.getHeaders().containsKey("contentType")) {
                ((Map) ReflectionUtils.getField(MessageConverterConfigurer.this.headersField, message.getHeaders())).put("contentType", this.mimeType);
            }
            Message<?> message2 = message.getPayload() instanceof byte[] ? message : this.messageConverter.toMessage(message.getPayload(), message.getHeaders());
            if (message2 == null) {
                throw new IllegalStateException("Failed to convert message: '" + message + "' to outbound message.");
            }
            if (mimeType != null && !mimeType.equals(obj) && obj != null) {
                ((Map) ReflectionUtils.getField(MessageConverterConfigurer.this.headersField, message2.getHeaders())).put("contentType", MimeType.valueOf(mimeType));
            }
            return message2;
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/spring-cloud-stream-3.1.4.jar:org/springframework/cloud/stream/binding/MessageConverterConfigurer$PartitioningInterceptor.class */
    public final class PartitioningInterceptor implements ChannelInterceptor {
        private final BindingProperties bindingProperties;
        private final PartitionHandler partitionHandler;

        PartitioningInterceptor(BindingProperties bindingProperties) {
            this.bindingProperties = bindingProperties;
            this.partitionHandler = new PartitionHandler(ExpressionUtils.createStandardEvaluationContext(MessageConverterConfigurer.this.beanFactory), this.bindingProperties.getProducer(), MessageConverterConfigurer.this.beanFactory);
        }

        public void setPartitionCount(int i) {
            this.partitionHandler.setPartitionCount(i);
        }

        @Override // org.springframework.messaging.support.ChannelInterceptor
        public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {
            if (message.getHeaders().containsKey(BinderHeaders.PARTITION_OVERRIDE)) {
                return MessageConverterConfigurer.this.messageBuilderFactory.fromMessage(message).setHeader(BinderHeaders.PARTITION_HEADER, message.getHeaders().get(BinderHeaders.PARTITION_OVERRIDE)).removeHeader(BinderHeaders.PARTITION_OVERRIDE).build();
            }
            return MessageConverterConfigurer.this.messageBuilderFactory.fromMessage(message).setHeader(BinderHeaders.PARTITION_HEADER, Integer.valueOf(this.partitionHandler.determinePartition(message))).build();
        }
    }

    public MessageConverterConfigurer(BindingServiceProperties bindingServiceProperties, CompositeMessageConverter compositeMessageConverter, StreamFunctionProperties streamFunctionProperties) {
        this.messageBuilderFactory = new MutableMessageBuilderFactory();
        Assert.notNull(compositeMessageConverter, "The message converter factory cannot be null");
        this.bindingServiceProperties = bindingServiceProperties;
        this.compositeMessageConverter = compositeMessageConverter;
        this.headersField = ReflectionUtils.findField(MessageHeaders.class, ExchangeTypes.HEADERS);
        this.headersField.setAccessible(true);
        this.streamFunctionProperties = streamFunctionProperties;
    }

    public MessageConverterConfigurer(BindingServiceProperties bindingServiceProperties, CompositeMessageConverter compositeMessageConverter) {
        this(bindingServiceProperties, compositeMessageConverter, null);
    }

    @Override // org.springframework.beans.factory.BeanFactoryAware
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = (ConfigurableListableBeanFactory) beanFactory;
    }

    @Override // org.springframework.cloud.stream.binding.MessageChannelConfigurer
    public void configureInputChannel(MessageChannel messageChannel, String str) {
        configureMessageChannel(messageChannel, str, true);
    }

    @Override // org.springframework.cloud.stream.binding.MessageChannelConfigurer
    public void configureOutputChannel(MessageChannel messageChannel, String str) {
        configureMessageChannel(messageChannel, str, false);
    }

    @Override // org.springframework.cloud.stream.binding.MessageChannelAndSourceConfigurer
    public void configurePolledMessageSource(PollableMessageSource pollableMessageSource, String str) {
        BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(str);
        String contentType = bindingProperties.getContentType();
        ConsumerProperties consumer = bindingProperties.getConsumer();
        if ((consumer == null || !consumer.isUseNativeDecoding()) && (pollableMessageSource instanceof DefaultPollableMessageSource)) {
            ((DefaultPollableMessageSource) pollableMessageSource).addInterceptor(new InboundContentTypeEnhancingInterceptor(contentType));
        }
    }

    private void configureMessageChannel(MessageChannel messageChannel, String str, boolean z) {
        Assert.isAssignable(AbstractMessageChannel.class, messageChannel.getClass());
        AbstractMessageChannel abstractMessageChannel = (AbstractMessageChannel) messageChannel;
        BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(str);
        String contentType = bindingProperties.getContentType();
        ProducerProperties producer = bindingProperties.getProducer();
        boolean z2 = (z || producer == null || !producer.isPartitioned()) ? false : true;
        boolean z3 = this.streamFunctionProperties != null && (StringUtils.hasText(this.streamFunctionProperties.getDefinition()) || StringUtils.hasText(this.bindingServiceProperties.getSource()));
        if (z2 && (z || !z3)) {
            abstractMessageChannel.addInterceptor(new PartitioningInterceptor(bindingProperties));
        }
        Environment environment = this.beanFactory == null ? null : (Environment) this.beanFactory.getBean(Environment.class);
        if (isNativeEncodingNotSet(producer, bindingProperties.getConsumer(), z)) {
            if (z) {
                abstractMessageChannel.addInterceptor(new InboundContentTypeEnhancingInterceptor(contentType));
                return;
            }
            if (environment != null && environment.containsProperty("spring.cloud.stream.rabbit.bindings." + str + ".producer.routing-key-expression")) {
                setSkipOutputConversionIfNecessary();
                z3 = false;
            }
            if (z3) {
                return;
            }
            abstractMessageChannel.addInterceptor(new OutboundContentTypeConvertingInterceptor(contentType, this.compositeMessageConverter));
        }
    }

    private void setSkipOutputConversionIfNecessary() {
        SimpleFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper;
        FunctionCatalog functionCatalog = (FunctionCatalog) this.beanFactory.getBean(FunctionCatalog.class);
        if (functionCatalog == null || (functionInvocationWrapper = (SimpleFunctionRegistry.FunctionInvocationWrapper) functionCatalog.lookup(this.streamFunctionProperties.getDefinition())) == null) {
            return;
        }
        functionInvocationWrapper.setSkipOutputConversion(true);
    }

    private boolean isNativeEncodingNotSet(ProducerProperties producerProperties, ConsumerProperties consumerProperties, boolean z) {
        return z ? consumerProperties == null || !consumerProperties.isUseNativeDecoding() : producerProperties == null || !producerProperties.isUseNativeEncoding();
    }
}
