package com.solace.spring.cloud.stream.binder.inbound;

import com.solace.spring.cloud.stream.binder.inbound.acknowledge.JCSMPAcknowledgementCallbackFactory;
import com.solace.spring.cloud.stream.binder.inbound.acknowledge.SolaceAckUtil;
import com.solace.spring.cloud.stream.binder.meter.SolaceMeterAccessor;
import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
import com.solace.spring.cloud.stream.binder.util.FlowReceiverContainer;
import com.solace.spring.cloud.stream.binder.util.MessageContainer;
import com.solace.spring.cloud.stream.binder.util.SolaceAcknowledgmentException;
import com.solace.spring.cloud.stream.binder.util.SolaceMessageHeaderErrorMessageStrategy;
import com.solace.spring.cloud.stream.binder.util.UnboundFlowReceiverContainerException;
import com.solace.spring.cloud.stream.binder.util.XMLMessageMapper;
import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.ClosedFacilityException;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPTransportException;
import com.solacesystems.jcsmp.StaleSessionException;
import com.solacesystems.jcsmp.XMLMessage;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.RequeueCurrentMessageException;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.core.AttributeAccessor;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AckUtils;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;

/* loaded from: input_file:com/solace/spring/cloud/stream/binder/inbound/InboundXMLMessageListener.class */
abstract class InboundXMLMessageListener implements Runnable {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(InboundXMLMessageListener.class);
    final FlowReceiverContainer flowReceiverContainer;
    final ConsumerDestination consumerDestination;
    private final ExtendedConsumerProperties<SolaceConsumerProperties> consumerProperties;
    final ThreadLocal<AttributeAccessor> attributesHolder;
    private final BatchCollector batchCollector;
    private final XMLMessageMapper xmlMessageMapper;
    private final Consumer<Message<?>> messageConsumer;
    private final JCSMPAcknowledgementCallbackFactory ackCallbackFactory;

    @Nullable
    private final SolaceMeterAccessor solaceMeterAccessor;
    private final boolean needHolder;
    private final boolean needAttributes;
    private final AtomicBoolean stopFlag = new AtomicBoolean(false);
    private final Supplier<Boolean> remoteStopFlag;

    /* JADX INFO: Access modifiers changed from: package-private */
    public InboundXMLMessageListener(FlowReceiverContainer flowReceiverContainer, ConsumerDestination consumerDestination, ExtendedConsumerProperties<SolaceConsumerProperties> extendedConsumerProperties, @Nullable BatchCollector batchCollector, Consumer<Message<?>> consumer, JCSMPAcknowledgementCallbackFactory jCSMPAcknowledgementCallbackFactory, @Nullable SolaceMeterAccessor solaceMeterAccessor, @Nullable AtomicBoolean atomicBoolean, ThreadLocal<AttributeAccessor> threadLocal, boolean z, boolean z2) {
        this.flowReceiverContainer = flowReceiverContainer;
        this.consumerDestination = consumerDestination;
        this.consumerProperties = extendedConsumerProperties;
        this.batchCollector = batchCollector;
        this.messageConsumer = consumer;
        this.ackCallbackFactory = jCSMPAcknowledgementCallbackFactory;
        this.solaceMeterAccessor = solaceMeterAccessor;
        this.remoteStopFlag = () -> {
            return Boolean.valueOf(atomicBoolean != null && atomicBoolean.get());
        };
        this.attributesHolder = threadLocal;
        this.needHolder = z;
        this.needAttributes = z2;
        this.xmlMessageMapper = flowReceiverContainer.getXMLMessageMapper();
    }

    abstract void handleMessage(Supplier<Message<?>> supplier, Consumer<Message<?>> consumer, AcknowledgmentCallback acknowledgmentCallback, boolean z) throws SolaceAcknowledgmentException;

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                if (this.batchCollector != null) {
                    this.batchCollector.resetLastSentTimeIfEmpty();
                }
                while (keepPolling()) {
                    try {
                        receive();
                    } catch (UnboundFlowReceiverContainerException | RuntimeException e) {
                        log.warn(String.format("Exception received while consuming messages from destination %s", this.consumerDestination.getName()), e);
                    }
                }
                log.info(String.format("Closing flow receiver to destination %s", this.consumerDestination.getName()));
                this.flowReceiverContainer.unbind();
            } catch (StaleSessionException e2) {
                log.error("Session has lost connection", e2);
                log.info(String.format("Closing flow receiver to destination %s", this.consumerDestination.getName()));
                this.flowReceiverContainer.unbind();
            } catch (Throwable th) {
                log.error(String.format("Received unexpected error while consuming from destination %s", this.consumerDestination.getName()), th);
                throw th;
            }
        } catch (Throwable th2) {
            log.info(String.format("Closing flow receiver to destination %s", this.consumerDestination.getName()));
            this.flowReceiverContainer.unbind();
            throw th2;
        }
    }

    private boolean keepPolling() {
        return (this.stopFlag.get() || this.remoteStopFlag.get().booleanValue()) ? false : true;
    }

    private void receive() throws UnboundFlowReceiverContainerException, StaleSessionException {
        try {
            MessageContainer receive = (this.batchCollector == null || ((SolaceConsumerProperties) this.consumerProperties.getExtension()).getBatchTimeout() <= 0) ? this.flowReceiverContainer.receive() : this.flowReceiverContainer.receive(Integer.valueOf(((SolaceConsumerProperties) this.consumerProperties.getExtension()).getBatchTimeout()));
            if (this.solaceMeterAccessor != null && receive != null) {
                this.solaceMeterAccessor.recordMessage(this.consumerProperties.getBindingName(), receive.getMessage());
            }
            try {
                if (this.batchCollector != null) {
                    if (receive != null) {
                        this.batchCollector.addToBatch(receive);
                    }
                    processBatchIfAvailable();
                } else if (receive != null) {
                    processMessage(receive);
                }
            } finally {
                if (this.needHolder || this.needAttributes) {
                    this.attributesHolder.remove();
                }
            }
        } catch (JCSMPException e) {
            String format = String.format("Received error while trying to read message from endpoint %s", this.flowReceiverContainer.getEndpointName());
            if (((e instanceof JCSMPTransportException) || (e instanceof ClosedFacilityException)) && !keepPolling()) {
                log.debug(format, e);
            } else {
                log.warn(format, e);
            }
        } catch (StaleSessionException e2) {
            throw e2;
        }
    }

    private void processMessage(MessageContainer messageContainer) {
        BytesXMLMessage message = messageContainer.getMessage();
        AcknowledgmentCallback createCallback = this.ackCallbackFactory.createCallback(messageContainer);
        try {
            handleMessage(() -> {
                return createOneMessage(message, createCallback);
            }, message2 -> {
                sendOneToConsumer(message2, message);
            }, createCallback, false);
        } catch (SolaceAcknowledgmentException e) {
            throw e;
        } catch (Exception e2) {
            try {
                if (ExceptionUtils.indexOfType(e2, RequeueCurrentMessageException.class) > -1) {
                    log.warn(String.format("Exception thrown while processing XMLMessage %s. Message will be requeued.", message.getMessageId()), e2);
                    AckUtils.requeue(createCallback);
                } else {
                    log.warn(String.format("Exception thrown while processing XMLMessage %s. Message will be requeued.", message.getMessageId()), e2);
                    if (!SolaceAckUtil.republishToErrorQueue(createCallback)) {
                        AckUtils.requeue(createCallback);
                    }
                }
            } catch (SolaceAcknowledgmentException e3) {
                e3.addSuppressed(e2);
                log.warn(String.format("Exception thrown while re-queuing XMLMessage %s.", message.getMessageId()), e3);
                throw e3;
            }
        }
    }

    private void processBatchIfAvailable() {
        Optional<List<MessageContainer>> collectBatchIfAvailable = this.batchCollector.collectBatchIfAvailable();
        if (collectBatchIfAvailable.isEmpty()) {
            return;
        }
        AcknowledgmentCallback createTransactedBatchCallback = ((SolaceConsumerProperties) this.consumerProperties.getExtension()).isTransacted() ? this.ackCallbackFactory.createTransactedBatchCallback(collectBatchIfAvailable.get(), this.flowReceiverContainer.getTransactedSession()) : this.ackCallbackFactory.createBatchCallback(collectBatchIfAvailable.get());
        try {
            try {
                List list = (List) collectBatchIfAvailable.get().stream().map((v0) -> {
                    return v0.getMessage();
                }).collect(Collectors.toList());
                handleMessage(() -> {
                    return createBatchMessage(list, ((SolaceConsumerProperties) this.consumerProperties.getExtension()).isTransacted() ? null : createTransactedBatchCallback);
                }, message -> {
                    sendBatchToConsumer(message, list, createTransactedBatchCallback);
                }, createTransactedBatchCallback, true);
                this.batchCollector.confirmDelivery();
            } catch (Exception e) {
                try {
                    if (ExceptionUtils.indexOfType(e, RequeueCurrentMessageException.class) > -1) {
                        if (log.isWarnEnabled()) {
                            log.warn("Exception thrown while processing batch. Batch's message will be requeued.", e);
                        }
                        AckUtils.requeue(createTransactedBatchCallback);
                    } else {
                        if (log.isWarnEnabled()) {
                            log.warn("Exception thrown while processing batch. Batch's messages will be requeued.", e);
                        }
                        if (!SolaceAckUtil.republishToErrorQueue(createTransactedBatchCallback)) {
                            AckUtils.requeue(createTransactedBatchCallback);
                        }
                    }
                    this.batchCollector.confirmDelivery();
                } catch (SolaceAcknowledgmentException e2) {
                    e2.addSuppressed(e);
                    log.warn("Exception thrown while re-queuing batch.", e2);
                    throw e2;
                }
            }
        } catch (Throwable th) {
            this.batchCollector.confirmDelivery();
            throw th;
        }
    }

    Message<?> createOneMessage(BytesXMLMessage bytesXMLMessage, AcknowledgmentCallback acknowledgmentCallback) {
        setAttributesIfNecessary((XMLMessage) bytesXMLMessage, acknowledgmentCallback);
        return this.xmlMessageMapper.map((XMLMessage) bytesXMLMessage, acknowledgmentCallback, (SolaceConsumerProperties) this.consumerProperties.getExtension());
    }

    Message<?> createBatchMessage(List<BytesXMLMessage> list, AcknowledgmentCallback acknowledgmentCallback) {
        setBatchAttributesIfNecessary(list, null, acknowledgmentCallback);
        return this.xmlMessageMapper.mapBatchMessage(list, acknowledgmentCallback, (SolaceConsumerProperties) this.consumerProperties.getExtension());
    }

    void sendOneToConsumer(Message<?> message, BytesXMLMessage bytesXMLMessage) throws RuntimeException {
        setAttributesIfNecessary((XMLMessage) bytesXMLMessage, message);
        sendToConsumer(message);
    }

    private void sendBatchToConsumer(Message<?> message, List<BytesXMLMessage> list, AcknowledgmentCallback acknowledgmentCallback) throws RuntimeException {
        setBatchAttributesIfNecessary(list, message, acknowledgmentCallback);
        sendToConsumer(message);
    }

    private void sendToConsumer(Message<?> message) throws RuntimeException {
        AtomicInteger deliveryAttempt = StaticMessageHeaderAccessor.getDeliveryAttempt(message);
        if (deliveryAttempt != null) {
            deliveryAttempt.incrementAndGet();
        }
        this.messageConsumer.accept(message);
    }

    void setAttributesIfNecessary(XMLMessage xMLMessage, AcknowledgmentCallback acknowledgmentCallback) {
        setAttributesIfNecessary(xMLMessage, null, acknowledgmentCallback);
    }

    void setAttributesIfNecessary(XMLMessage xMLMessage, Message<?> message) {
        setAttributesIfNecessary(xMLMessage, message, null);
    }

    void setBatchAttributesIfNecessary(List<? extends XMLMessage> list, @Nullable Message<?> message, AcknowledgmentCallback acknowledgmentCallback) {
        if (message == null || StaticMessageHeaderAccessor.getAcknowledgmentCallback(message) == null) {
            setAttributesIfNecessary(list, message, acknowledgmentCallback);
        } else {
            setAttributesIfNecessary(list, message, null);
        }
    }

    private void setAttributesIfNecessary(Object obj, Message<?> message, AcknowledgmentCallback acknowledgmentCallback) {
        AttributeAccessor attributeAccessor;
        if (this.needHolder) {
            this.attributesHolder.set(ErrorMessageUtils.getAttributeAccessor((Message) null, (Message) null));
        }
        if (!this.needAttributes || (attributeAccessor = this.attributesHolder.get()) == null) {
            return;
        }
        attributeAccessor.setAttribute("inputMessage", message);
        attributeAccessor.setAttribute(SolaceMessageHeaderErrorMessageStrategy.ATTR_SOLACE_RAW_MESSAGE, obj);
        attributeAccessor.setAttribute(SolaceMessageHeaderErrorMessageStrategy.ATTR_SOLACE_ACKNOWLEDGMENT_CALLBACK, acknowledgmentCallback);
    }

    public AtomicBoolean getStopFlag() {
        return this.stopFlag;
    }
}
