package org.springframework.integration.handler;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.routingslip.RoutingSlipRouteStrategy;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.core.DestinationResolutionException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SettableListenableFuture;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-core-5.1.6.RELEASE.jar:org/springframework/integration/handler/AbstractMessageProducingHandler.class */
public abstract class AbstractMessageProducingHandler extends AbstractMessageHandler implements MessageProducer, HeaderPropagationAware {
    protected final MessagingTemplate messagingTemplate = new MessagingTemplate();
    private boolean async;
    private String outputChannelName;
    private MessageChannel outputChannel;
    private String[] notPropagatedHeaders;
    private boolean selectiveHeaderPropagation;
    private boolean noHeadersPropagation;

    public void setSendTimeout(long j) {
        this.messagingTemplate.setSendTimeout(j);
    }

    @Override // org.springframework.integration.core.MessageProducer
    public void setOutputChannel(MessageChannel messageChannel) {
        this.outputChannel = messageChannel;
    }

    @Override // org.springframework.integration.core.MessageProducer
    public void setOutputChannelName(String str) {
        Assert.hasText(str, "'outputChannelName' must not be empty");
        this.outputChannelName = str;
    }

    public final void setAsync(boolean z) {
        this.async = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isAsync() {
        return this.async;
    }

    @Override // org.springframework.integration.handler.HeaderPropagationAware
    public void setNotPropagatedHeaders(String... strArr) {
        updateNotPropagatedHeaders(strArr, false);
    }

    protected final void updateNotPropagatedHeaders(String[] strArr, boolean z) {
        HashSet hashSet = new HashSet();
        if (z && this.notPropagatedHeaders != null) {
            hashSet.addAll(Arrays.asList(this.notPropagatedHeaders));
        }
        if (!ObjectUtils.isEmpty((Object[]) strArr)) {
            Assert.noNullElements(strArr, "null elements are not allowed in 'headers'");
            hashSet.addAll(Arrays.asList(strArr));
            this.notPropagatedHeaders = (String[]) hashSet.toArray(new String[0]);
        }
        if (hashSet.contains("*")) {
            this.notPropagatedHeaders = new String[]{"*"};
            this.noHeadersPropagation = true;
        }
        this.selectiveHeaderPropagation = !ObjectUtils.isEmpty((Object[]) this.notPropagatedHeaders);
    }

    @Override // org.springframework.integration.handler.HeaderPropagationAware
    public Collection<String> getNotPropagatedHeaders() {
        return this.notPropagatedHeaders != null ? Collections.unmodifiableSet(new HashSet(Arrays.asList(this.notPropagatedHeaders))) : Collections.emptyList();
    }

    @Override // org.springframework.integration.handler.HeaderPropagationAware
    public void addNotPropagatedHeaders(String... strArr) {
        updateNotPropagatedHeaders(strArr, true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.handler.AbstractMessageHandler, org.springframework.integration.context.IntegrationObjectSupport
    public void onInit() {
        super.onInit();
        Assert.state(this.outputChannelName == null || this.outputChannel == null, "'outputChannelName' and 'outputChannel' are mutually exclusive.");
        if (getBeanFactory() != null) {
            this.messagingTemplate.setBeanFactory(getBeanFactory());
        }
        this.messagingTemplate.setDestinationResolver(getChannelResolver());
    }

    @Override // org.springframework.integration.core.MessageProducer
    public MessageChannel getOutputChannel() {
        if (this.outputChannelName != null) {
            this.outputChannel = getChannelResolver().resolveDestination(this.outputChannelName);
            this.outputChannelName = null;
        }
        return this.outputChannel;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendOutputs(Object obj, Message<?> message) {
        if ((obj instanceof Iterable) && shouldSplitOutput((Iterable) obj)) {
            Iterator it = ((Iterable) obj).iterator();
            while (it.hasNext()) {
                produceOutput(it.next(), message);
            }
        } else if (obj != null) {
            produceOutput(obj, message);
        }
    }

    protected boolean shouldSplitOutput(Iterable<?> iterable) {
        for (Object obj : iterable) {
            if ((obj instanceof Message) || (obj instanceof AbstractIntegrationMessageBuilder)) {
                return true;
            }
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void produceOutput(Object obj, Message<?> message) {
        MessageHeaders headers = message.getHeaders();
        Object obj2 = obj;
        Object obj3 = null;
        if (getOutputChannel() == null) {
            Map map = (Map) headers.get(IntegrationMessageHeaderAccessor.ROUTING_SLIP, Map.class);
            if (map != null) {
                Assert.isTrue(map.size() == 1, "The RoutingSlip header value must be a SingletonMap");
                Object next = map.keySet().iterator().next();
                Object next2 = map.values().iterator().next();
                Assert.isInstanceOf((Class<?>) List.class, next, "The RoutingSlip key must be List");
                Assert.isInstanceOf((Class<?>) Integer.class, next2, "The RoutingSlip value must be Integer");
                List<?> list = (List) next;
                AtomicInteger atomicInteger = new AtomicInteger(((Integer) next2).intValue());
                obj3 = getOutputChannelFromRoutingSlip(obj2, message, list, atomicInteger);
                if (obj3 != null) {
                    obj2 = addRoutingSlipHeader(obj2, list, atomicInteger);
                }
            }
            if (obj3 == null) {
                obj3 = headers.getReplyChannel();
                if (obj3 == null && (obj2 instanceof Message)) {
                    obj3 = ((Message) obj2).getHeaders().getReplyChannel();
                }
            }
        }
        doProduceOutput(message, headers, obj2, obj3);
    }

    private void doProduceOutput(Message<?> message, MessageHeaders messageHeaders, Object obj, Object obj2) {
        if (!this.async || (!(obj instanceof ListenableFuture) && !(obj instanceof Publisher))) {
            sendOutput(createOutputMessage(obj, messageHeaders), obj2, false);
        } else if ((obj instanceof ListenableFuture) || !(getOutputChannel() instanceof ReactiveStreamsSubscribableChannel)) {
            asyncNonReactiveReply(message, messageHeaders, obj, obj2);
        } else {
            ((ReactiveStreamsSubscribableChannel) getOutputChannel()).subscribeTo(Flux.from((Publisher) obj).map(obj3 -> {
                return createOutputMessage(obj3, messageHeaders);
            }));
        }
    }

    private AbstractIntegrationMessageBuilder<?> addRoutingSlipHeader(Object obj, List<?> list, AtomicInteger atomicInteger) {
        AbstractIntegrationMessageBuilder<?> fromMessage = obj instanceof Message ? getMessageBuilderFactory().fromMessage((Message) obj) : obj instanceof AbstractIntegrationMessageBuilder ? (AbstractIntegrationMessageBuilder) obj : getMessageBuilderFactory().withPayload(obj);
        fromMessage.setHeader(IntegrationMessageHeaderAccessor.ROUTING_SLIP, Collections.singletonMap(list, Integer.valueOf(atomicInteger.get())));
        return fromMessage;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v11, types: [org.springframework.util.concurrent.ListenableFuture] */
    private void asyncNonReactiveReply(final Message<?> message, final MessageHeaders messageHeaders, Object obj, final Object obj2) {
        SettableListenableFuture settableListenableFuture;
        if (obj instanceof ListenableFuture) {
            settableListenableFuture = (ListenableFuture) obj;
        } else {
            SettableListenableFuture settableListenableFuture2 = new SettableListenableFuture();
            Mono from = Mono.from((Publisher) obj);
            settableListenableFuture2.getClass();
            Consumer consumer = settableListenableFuture2::set;
            settableListenableFuture2.getClass();
            from.subscribe(consumer, settableListenableFuture2::setException);
            settableListenableFuture = settableListenableFuture2;
        }
        settableListenableFuture.addCallback(new ListenableFutureCallback<Object>() { // from class: org.springframework.integration.handler.AbstractMessageProducingHandler.1
            @Override // org.springframework.util.concurrent.SuccessCallback
            public void onSuccess(Object obj3) {
                Message<?> message2 = null;
                try {
                    message2 = AbstractMessageProducingHandler.this.createOutputMessage(obj3, messageHeaders);
                    AbstractMessageProducingHandler.this.sendOutput(message2, obj2, false);
                } catch (Exception e) {
                    Exception exc = e;
                    if (!(e instanceof MessagingException)) {
                        exc = new MessageHandlingException((Message<?>) message, e);
                        if (message2 != null) {
                            exc = new MessagingException(message2, exc);
                        }
                    }
                    AbstractMessageProducingHandler.this.logger.error("Failed to send async reply: " + obj3.toString(), exc);
                    onFailure(exc);
                }
            }

            @Override // org.springframework.util.concurrent.FailureCallback
            public void onFailure(Throwable th) {
                AbstractMessageProducingHandler.this.sendErrorMessage(message, th);
            }
        });
    }

    private Object getOutputChannelFromRoutingSlip(Object obj, Message<?> message, List<?> list, AtomicInteger atomicInteger) {
        Object obj2;
        if (atomicInteger.get() >= list.size()) {
            return null;
        }
        Object obj3 = list.get(atomicInteger.get());
        if (obj3 instanceof String) {
            obj2 = getBeanFactory().getBean((String) obj3);
        } else {
            if (!(obj3 instanceof RoutingSlipRouteStrategy)) {
                throw new IllegalArgumentException("The RoutingSlip 'path' can be of String or RoutingSlipRouteStrategy type, but got: " + obj3.getClass());
            }
            obj2 = obj3;
        }
        if (obj2 instanceof MessageChannel) {
            atomicInteger.incrementAndGet();
            return obj2;
        }
        Object nextPath = ((RoutingSlipRouteStrategy) obj2).getNextPath(message, obj);
        if (nextPath != null && (!(nextPath instanceof String) || StringUtils.hasText((String) nextPath))) {
            return nextPath;
        }
        atomicInteger.incrementAndGet();
        return getOutputChannelFromRoutingSlip(obj, message, list, atomicInteger);
    }

    protected Message<?> createOutputMessage(Object obj, MessageHeaders messageHeaders) {
        AbstractIntegrationMessageBuilder withPayload;
        if (!(obj instanceof Message)) {
            withPayload = obj instanceof AbstractIntegrationMessageBuilder ? (AbstractIntegrationMessageBuilder) obj : getMessageBuilderFactory().withPayload(obj);
        } else {
            if (this.noHeadersPropagation || !shouldCopyRequestHeaders()) {
                return (Message) obj;
            }
            withPayload = getMessageBuilderFactory().fromMessage((Message) obj);
        }
        if (!this.noHeadersPropagation && shouldCopyRequestHeaders()) {
            withPayload.filterAndCopyHeadersIfAbsent(messageHeaders, this.selectiveHeaderPropagation ? this.notPropagatedHeaders : null);
        }
        return withPayload.build();
    }

    protected void sendOutput(Object obj, @Nullable Object obj2, boolean z) {
        Object obj3 = obj2;
        MessageChannel outputChannel = getOutputChannel();
        if (!z && outputChannel != null) {
            obj3 = outputChannel;
        }
        if (obj3 == null) {
            throw new DestinationResolutionException("no output-channel or replyChannel header available");
        }
        if (obj3 instanceof MessageChannel) {
            if (obj instanceof Message) {
                this.messagingTemplate.send((MessagingTemplate) obj3, (Message<?>) obj);
                return;
            } else {
                this.messagingTemplate.convertAndSend((MessagingTemplate) obj3, obj);
                return;
            }
        }
        if (!(obj3 instanceof String)) {
            throw new MessagingException("replyChannel must be a MessageChannel or String");
        }
        if (obj instanceof Message) {
            this.messagingTemplate.send((String) obj3, (Message<?>) obj);
        } else {
            this.messagingTemplate.convertAndSend((String) obj3, (String) obj);
        }
    }

    protected boolean shouldCopyRequestHeaders() {
        return true;
    }

    protected void sendErrorMessage(Message<?> message, Throwable th) {
        Object resolveErrorChannel = resolveErrorChannel(message.getHeaders());
        Throwable th2 = th;
        if (!(th instanceof MessagingException)) {
            th2 = new MessageHandlingException(message, th);
        }
        if (resolveErrorChannel == null) {
            this.logger.error("Async exception received and no 'errorChannel' header exists and no default 'errorChannel' found", th2);
            return;
        }
        try {
            sendOutput(new ErrorMessage(th2), resolveErrorChannel, true);
        } catch (Exception e) {
            Exception exc = e;
            if (!(e instanceof MessagingException)) {
                exc = new MessageHandlingException(message, e);
            }
            this.logger.error("Failed to send async reply", exc);
        }
    }

    protected Object resolveErrorChannel(MessageHeaders messageHeaders) {
        Object errorChannel = messageHeaders.getErrorChannel();
        if (errorChannel == null) {
            try {
                errorChannel = getChannelResolver().resolveDestination("errorChannel");
            } catch (DestinationResolutionException e) {
            }
        }
        return errorChannel;
    }
}
