package org.springframework.integration.endpoint;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import org.reactivestreams.Subscriber;
import org.springframework.context.Lifecycle;
import org.springframework.integration.channel.ExecutorChannelInterceptorAware;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.router.MessageRouter;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.integration.transaction.IntegrationResourceHolder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ExecutorChannelInterceptor;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-core-5.5.18.jar:org/springframework/integration/endpoint/PollingConsumer.class */
public class PollingConsumer extends AbstractPollingEndpoint implements IntegrationConsumer {
    public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
    private final MessageHandler handler;
    private final List<ChannelInterceptor> channelInterceptors;
    private PollableChannel inputChannel;
    private long receiveTimeout = 1000;

    public PollingConsumer(PollableChannel pollableChannel, MessageHandler messageHandler) {
        Assert.notNull(pollableChannel, "inputChannel must not be null");
        Assert.notNull(messageHandler, "handler must not be null");
        if (pollableChannel instanceof NullChannel) {
            this.logger.warn("The polling from the NullChannel does not have any effects: it doesn't forward messages sent to it. A NullChannel is the end of the flow.");
        }
        this.inputChannel = pollableChannel;
        this.handler = messageHandler;
        if (this.inputChannel instanceof ExecutorChannelInterceptorAware) {
            this.channelInterceptors = ((ExecutorChannelInterceptorAware) this.inputChannel).getInterceptors();
        } else {
            this.channelInterceptors = null;
        }
    }

    public void setReceiveTimeout(long j) {
        this.receiveTimeout = j;
    }

    @Override // org.springframework.integration.endpoint.IntegrationConsumer
    public MessageChannel getInputChannel() {
        return this.inputChannel;
    }

    @Override // org.springframework.integration.endpoint.IntegrationConsumer
    public MessageChannel getOutputChannel() {
        if (this.handler instanceof MessageProducer) {
            return ((MessageProducer) this.handler).getOutputChannel();
        }
        if (this.handler instanceof MessageRouter) {
            return ((MessageRouter) this.handler).getDefaultOutputChannel();
        }
        return null;
    }

    @Override // org.springframework.integration.endpoint.IntegrationConsumer
    public MessageHandler getHandler() {
        return this.handler;
    }

    @Override // org.springframework.integration.endpoint.AbstractPollingEndpoint
    protected Object getReceiveMessageSource() {
        return this.inputChannel;
    }

    @Override // org.springframework.integration.endpoint.AbstractPollingEndpoint
    protected void setReceiveMessageSource(Object obj) {
        this.inputChannel = (PollableChannel) obj;
    }

    @Override // org.springframework.integration.endpoint.AbstractPollingEndpoint
    protected boolean isReactive() {
        return (getOutputChannel() instanceof ReactiveStreamsSubscribableChannel) && (this.handler instanceof Subscriber);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.endpoint.AbstractPollingEndpoint, org.springframework.integration.endpoint.AbstractEndpoint
    public void doStart() {
        if (this.handler instanceof Lifecycle) {
            ((Lifecycle) this.handler).start();
        }
        super.doStart();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.endpoint.AbstractPollingEndpoint, org.springframework.integration.endpoint.AbstractEndpoint
    public void doStop() {
        if (this.handler instanceof Lifecycle) {
            ((Lifecycle) this.handler).stop();
        }
        super.doStop();
    }

    @Override // org.springframework.integration.endpoint.AbstractPollingEndpoint
    protected void handleMessage(Message<?> message) {
        Message<?> message2 = message;
        ArrayDeque arrayDeque = null;
        try {
            if (this.channelInterceptors != null && ((ExecutorChannelInterceptorAware) this.inputChannel).hasExecutorInterceptors()) {
                arrayDeque = new ArrayDeque();
                message2 = applyBeforeHandle(message2, arrayDeque);
                if (message2 == null) {
                    return;
                }
            }
            this.handler.handleMessage(message2);
            if (!CollectionUtils.isEmpty(arrayDeque)) {
                triggerAfterMessageHandled(message2, null, arrayDeque);
            }
        } catch (Error e) {
            if (!CollectionUtils.isEmpty(arrayDeque)) {
                triggerAfterMessageHandled(message2, new MessageDeliveryException(message2, "Failed to handle message to " + this + " in " + this.handler, e), arrayDeque);
            }
            throw e;
        } catch (Exception e2) {
            if (!CollectionUtils.isEmpty(arrayDeque)) {
                triggerAfterMessageHandled(message2, e2, arrayDeque);
            }
            throw IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message2, () -> {
                return "Failed to handle message to " + this + " in " + this.handler;
            }, e2);
        }
    }

    private Message<?> applyBeforeHandle(Message<?> message, Deque<ExecutorChannelInterceptor> deque) {
        Message<?> message2 = message;
        for (ChannelInterceptor channelInterceptor : this.channelInterceptors) {
            if (channelInterceptor instanceof ExecutorChannelInterceptor) {
                ExecutorChannelInterceptor executorChannelInterceptor = (ExecutorChannelInterceptor) channelInterceptor;
                message2 = executorChannelInterceptor.beforeHandle(message2, this.inputChannel, this.handler);
                if (message2 == null) {
                    this.logger.debug(() -> {
                        return executorChannelInterceptor.getClass().getSimpleName() + " returned null from beforeHandle, i.e. precluding the send.";
                    });
                    triggerAfterMessageHandled(null, null, deque);
                    return null;
                }
                deque.add(executorChannelInterceptor);
            }
        }
        return message2;
    }

    private void triggerAfterMessageHandled(Message<?> message, Exception exc, Deque<ExecutorChannelInterceptor> deque) {
        Iterator<ExecutorChannelInterceptor> descendingIterator = deque.descendingIterator();
        while (descendingIterator.hasNext()) {
            ExecutorChannelInterceptor next = descendingIterator.next();
            try {
                next.afterMessageHandled(message, this.inputChannel, this.handler, exc);
            } catch (Throwable th) {
                this.logger.error(th, () -> {
                    return "Exception from afterMessageHandled in " + next;
                });
            }
        }
    }

    @Override // org.springframework.integration.endpoint.AbstractPollingEndpoint
    protected Message<?> receiveMessage() {
        return this.receiveTimeout >= 0 ? this.inputChannel.receive(this.receiveTimeout) : this.inputChannel.receive();
    }

    @Override // org.springframework.integration.endpoint.AbstractPollingEndpoint
    protected Object getResourceToBind() {
        return this.inputChannel;
    }

    @Override // org.springframework.integration.endpoint.AbstractPollingEndpoint
    protected String getResourceKey() {
        return IntegrationResourceHolder.INPUT_CHANNEL;
    }
}
