package com.solace.spring.cloud.stream.binder.inbound;

import com.solace.spring.cloud.stream.binder.health.SolaceBinderHealthAccessor;
import com.solace.spring.cloud.stream.binder.inbound.acknowledge.JCSMPAcknowledgementCallbackFactory;
import com.solace.spring.cloud.stream.binder.inbound.acknowledge.SolaceAckUtil;
import com.solace.spring.cloud.stream.binder.meter.SolaceMeterAccessor;
import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
import com.solace.spring.cloud.stream.binder.provisioning.EndpointProvider;
import com.solace.spring.cloud.stream.binder.provisioning.SolaceConsumerDestination;
import com.solace.spring.cloud.stream.binder.provisioning.SolaceProvisioningUtil;
import com.solace.spring.cloud.stream.binder.util.ClosedChannelBindingException;
import com.solace.spring.cloud.stream.binder.util.ErrorQueueInfrastructure;
import com.solace.spring.cloud.stream.binder.util.FlowReceiverContainer;
import com.solace.spring.cloud.stream.binder.util.MessageContainer;
import com.solace.spring.cloud.stream.binder.util.UnboundFlowReceiverContainerException;
import com.solace.spring.cloud.stream.binder.util.XMLMessageMapper;
import com.solacesystems.jcsmp.ClosedFacilityException;
import com.solacesystems.jcsmp.Endpoint;
import com.solacesystems.jcsmp.EndpointProperties;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.JCSMPTransportException;
import com.solacesystems.jcsmp.XMLMessage;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.context.Lifecycle;
import org.springframework.integration.acks.AckUtils;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.core.Pausable;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;

/* loaded from: input_file:com/solace/spring/cloud/stream/binder/inbound/JCSMPMessageSource.class */
public class JCSMPMessageSource extends AbstractMessageSource<Object> implements Lifecycle, Pausable {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(JCSMPMessageSource.class);
    private final SolaceConsumerDestination consumerDestination;
    private final JCSMPSession jcsmpSession;
    private final BatchCollector batchCollector;
    private final EndpointProperties endpointProperties;

    @Nullable
    private final SolaceMeterAccessor solaceMeterAccessor;
    private final ExtendedConsumerProperties<SolaceConsumerProperties> consumerProperties;
    private FlowReceiverContainer flowReceiverContainer;
    private JCSMPAcknowledgementCallbackFactory ackCallbackFactory;
    private XMLMessageMapper xmlMessageMapper;

    @Nullable
    private SolaceBinderHealthAccessor solaceBinderHealthAccessor;
    private Supplier<Boolean> remoteStopFlag;
    private ErrorQueueInfrastructure errorQueueInfrastructure;
    private Consumer<Endpoint> postStart;
    private final String id = UUID.randomUUID().toString();
    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private volatile boolean isRunning = false;
    private volatile boolean paused = false;

    public JCSMPMessageSource(SolaceConsumerDestination solaceConsumerDestination, JCSMPSession jCSMPSession, @Nullable BatchCollector batchCollector, ExtendedConsumerProperties<SolaceConsumerProperties> extendedConsumerProperties, EndpointProperties endpointProperties, @Nullable SolaceMeterAccessor solaceMeterAccessor) {
        this.consumerDestination = solaceConsumerDestination;
        this.jcsmpSession = jCSMPSession;
        this.batchCollector = batchCollector;
        this.consumerProperties = extendedConsumerProperties;
        this.endpointProperties = endpointProperties;
        this.solaceMeterAccessor = solaceMeterAccessor;
    }

    protected Object doReceive() {
        MessageContainer receive;
        Lock readLock = this.readWriteLock.readLock();
        readLock.lock();
        try {
            if (this.remoteStopFlag.get().booleanValue()) {
                if (log.isDebugEnabled()) {
                    log.debug(String.format("Message source %s is not running. Cannot receive message", this.id));
                }
                return null;
            }
            if (!isRunning()) {
                String format = String.format("Cannot receive message using message source %s", this.id);
                ClosedChannelBindingException closedChannelBindingException = new ClosedChannelBindingException(String.format("Message source %s is not running", this.id));
                log.warn("{}{}", closedChannelBindingException, format);
                throw new MessagingException(format, closedChannelBindingException);
            }
            readLock.unlock();
            try {
                if (this.batchCollector != null) {
                    int batchTimeout = ((SolaceConsumerProperties) this.consumerProperties.getExtension()).getBatchTimeout();
                    this.batchCollector.resetLastSentTimeIfEmpty();
                    do {
                        MessageContainer receive2 = batchTimeout > 0 ? this.flowReceiverContainer.receive(Integer.valueOf(batchTimeout)) : this.flowReceiverContainer.receive();
                        if (receive2 != null) {
                            if (this.solaceMeterAccessor != null) {
                                this.solaceMeterAccessor.recordMessage(this.consumerProperties.getBindingName(), receive2.getMessage());
                            }
                            this.batchCollector.addToBatch(receive2);
                        }
                    } while (!this.batchCollector.isBatchAvailable());
                    receive = null;
                } else {
                    receive = this.flowReceiverContainer.receive(Integer.valueOf(((SolaceConsumerProperties) this.consumerProperties.getExtension()).getPolledConsumerWaitTimeInMillis()));
                    if (this.solaceMeterAccessor != null && receive != null) {
                        this.solaceMeterAccessor.recordMessage(this.consumerProperties.getBindingName(), receive.getMessage());
                    }
                }
                if (this.batchCollector != null) {
                    return processBatchIfAvailable();
                }
                if (receive != null) {
                    return processMessage(receive);
                }
                return null;
            } catch (UnboundFlowReceiverContainerException e) {
                if (!log.isDebugEnabled()) {
                    return null;
                }
                log.debug(String.format("Unable to receive message from endpoint %s", this.consumerDestination.getName()), e);
                return null;
            } catch (JCSMPException e2) {
                if (isRunning() && !this.remoteStopFlag.get().booleanValue()) {
                    String format2 = String.format("Unable to consume message from endpoint %s", this.consumerDestination.getName());
                    log.warn(format2, e2);
                    throw new MessagingException(format2, e2);
                }
                String format3 = String.format("Exception received while consuming a message, but the consumer <message source ID: %s> is currently shutdown. Exception will be ignored", this.id);
                if ((e2 instanceof JCSMPTransportException) || (e2 instanceof ClosedFacilityException)) {
                    log.debug(format3, e2);
                    return null;
                }
                log.warn(format3, e2);
                return null;
            }
        } finally {
            readLock.unlock();
        }
    }

    private Message<?> processMessage(MessageContainer messageContainer) {
        AcknowledgmentCallback createCallback = this.ackCallbackFactory.createCallback(messageContainer);
        try {
            return this.xmlMessageMapper.map((XMLMessage) messageContainer.getMessage(), createCallback, true, (SolaceConsumerProperties) this.consumerProperties.getExtension());
        } catch (Exception e) {
            log.warn(String.format("XMLMessage %s cannot be consumed. It will be requeued", messageContainer.getMessage().getMessageId()), e);
            if (SolaceAckUtil.republishToErrorQueue(createCallback)) {
                return null;
            }
            AckUtils.requeue(createCallback);
            return null;
        }
    }

    private Message<List<?>> processBatchIfAvailable() {
        Optional<List<MessageContainer>> collectBatchIfAvailable = this.batchCollector.collectBatchIfAvailable();
        if (!collectBatchIfAvailable.isPresent()) {
            return null;
        }
        AcknowledgmentCallback createTransactedBatchCallback = ((SolaceConsumerProperties) this.consumerProperties.getExtension()).isTransacted() ? this.ackCallbackFactory.createTransactedBatchCallback(collectBatchIfAvailable.get(), this.flowReceiverContainer.getTransactedSession()) : this.ackCallbackFactory.createBatchCallback(collectBatchIfAvailable.get());
        try {
            try {
                Message<List<?>> mapBatchMessage = this.xmlMessageMapper.mapBatchMessage((List<? extends XMLMessage>) collectBatchIfAvailable.get().stream().map((v0) -> {
                    return v0.getMessage();
                }).collect(Collectors.toList()), createTransactedBatchCallback, true, (SolaceConsumerProperties) this.consumerProperties.getExtension());
                this.batchCollector.confirmDelivery();
                return mapBatchMessage;
            } catch (Exception e) {
                log.warn("Message batch cannot be consumed. It will be requeued", e);
                if (!SolaceAckUtil.republishToErrorQueue(createTransactedBatchCallback)) {
                    AckUtils.requeue(createTransactedBatchCallback);
                }
                this.batchCollector.confirmDelivery();
                return null;
            }
        } catch (Throwable th) {
            this.batchCollector.confirmDelivery();
            throw th;
        }
    }

    public String getComponentType() {
        return "jcsmp:message-source";
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v21, types: [com.solacesystems.jcsmp.Endpoint, java.lang.Object] */
    public void start() {
        Lock writeLock = this.readWriteLock.writeLock();
        writeLock.lock();
        try {
            log.info(String.format("Creating consumer to %s %s <message source ID: %s>", ((SolaceConsumerProperties) this.consumerProperties.getExtension()).getEndpointType(), this.consumerDestination.getName(), this.id));
            if (isRunning()) {
                log.warn(String.format("Nothing to do, message source %s is already running", this.id));
                writeLock.unlock();
                return;
            }
            try {
                EndpointProvider<?> from = EndpointProvider.from(((SolaceConsumerProperties) this.consumerProperties.getExtension()).getEndpointType());
                if (from == null) {
                    String format = String.format("Consumer not supported for destination type %s <inbound adapter %s>", ((SolaceConsumerProperties) this.consumerProperties.getExtension()).getEndpointType(), this.id);
                    log.warn(format);
                    throw new IllegalArgumentException(format);
                }
                ?? createInstance = from.createInstance(this.consumerDestination.getName());
                if (this.flowReceiverContainer == null) {
                    this.flowReceiverContainer = new FlowReceiverContainer(this.jcsmpSession, createInstance, ((SolaceConsumerProperties) this.consumerProperties.getExtension()).isTransacted(), this.endpointProperties, SolaceProvisioningUtil.getConsumerFlowProperties(this.consumerDestination.getBindingDestinationName(), this.consumerProperties));
                    this.xmlMessageMapper = this.flowReceiverContainer.getXMLMessageMapper();
                    if (this.paused) {
                        log.info(String.format("Message source %s is paused, pausing newly created flow receiver container %s", this.id, this.flowReceiverContainer.getId()));
                        this.flowReceiverContainer.pause();
                    }
                }
                if (this.solaceBinderHealthAccessor != null) {
                    this.solaceBinderHealthAccessor.addFlow(this.consumerProperties.getBindingName(), 0, this.flowReceiverContainer);
                }
                this.flowReceiverContainer.bind();
                if (this.postStart != null) {
                    this.postStart.accept(createInstance);
                }
                this.ackCallbackFactory = new JCSMPAcknowledgementCallbackFactory(this.flowReceiverContainer);
                this.ackCallbackFactory.setErrorQueueInfrastructure(this.errorQueueInfrastructure);
                this.isRunning = true;
                writeLock.unlock();
            } catch (JCSMPException e) {
                String format2 = String.format("Unable to get a message consumer for session %s", this.jcsmpSession.getSessionName());
                log.warn(format2, e);
                throw new RuntimeException(format2, e);
            }
        } catch (Throwable th) {
            writeLock.unlock();
            throw th;
        }
    }

    public void stop() {
        Lock writeLock = this.readWriteLock.writeLock();
        writeLock.lock();
        try {
            if (isRunning()) {
                log.info(String.format("Stopping consumer to endpoint %s <message source ID: %s>", this.consumerDestination.getName(), this.id));
                this.flowReceiverContainer.unbind();
                if (this.solaceBinderHealthAccessor != null) {
                    this.solaceBinderHealthAccessor.removeFlow(this.consumerProperties.getBindingName(), 0);
                }
                this.isRunning = false;
            }
        } finally {
            writeLock.unlock();
        }
    }

    public boolean isRunning() {
        return this.isRunning;
    }

    public void setErrorQueueInfrastructure(ErrorQueueInfrastructure errorQueueInfrastructure) {
        this.errorQueueInfrastructure = errorQueueInfrastructure;
    }

    public void setPostStart(Consumer<Endpoint> consumer) {
        this.postStart = consumer;
    }

    public void setRemoteStopFlag(Supplier<Boolean> supplier) {
        this.remoteStopFlag = supplier;
    }

    public void pause() {
        Lock writeLock = this.readWriteLock.writeLock();
        writeLock.lock();
        try {
            log.info(String.format("Pausing message source %s", this.id));
            if (this.flowReceiverContainer != null) {
                this.flowReceiverContainer.pause();
            }
            this.paused = true;
        } finally {
            writeLock.unlock();
        }
    }

    public void resume() {
        Lock writeLock = this.readWriteLock.writeLock();
        writeLock.lock();
        try {
            log.info(String.format("Resuming message source %s", this.id));
            if (this.flowReceiverContainer != null) {
                try {
                    this.flowReceiverContainer.resume();
                } catch (JCSMPException e) {
                    throw new RuntimeException(String.format("Failed to resume message source %s", this.id), e);
                }
            }
            this.paused = false;
        } finally {
            writeLock.unlock();
        }
    }

    public boolean isPaused() {
        if (!this.paused) {
            return false;
        }
        if (this.flowReceiverContainer.isPaused()) {
            return true;
        }
        log.warn(String.format("Flow receiver container %s is unexpectedly running for message source %s", this.flowReceiverContainer.getId(), this.id));
        return false;
    }

    public void setSolaceBinderHealthAccessor(@Nullable SolaceBinderHealthAccessor solaceBinderHealthAccessor) {
        this.solaceBinderHealthAccessor = solaceBinderHealthAccessor;
    }
}
