package com.solace.spring.cloud.stream.binder.inbound;

import com.solace.spring.cloud.stream.binder.health.SolaceBinderHealthAccessor;
import com.solace.spring.cloud.stream.binder.inbound.acknowledge.JCSMPAcknowledgementCallbackFactory;
import com.solace.spring.cloud.stream.binder.meter.SolaceMeterAccessor;
import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
import com.solace.spring.cloud.stream.binder.provisioning.SolaceConsumerDestination;
import com.solace.spring.cloud.stream.binder.provisioning.SolaceProvisioningUtil;
import com.solace.spring.cloud.stream.binder.tracing.TracingProxy;
import com.solace.spring.cloud.stream.binder.util.ErrorQueueInfrastructure;
import com.solace.spring.cloud.stream.binder.util.FlowReceiverContainer;
import com.solace.spring.cloud.stream.binder.util.SolaceMessageConversionException;
import com.solacesystems.jcsmp.ConsumerFlowProperties;
import com.solacesystems.jcsmp.Endpoint;
import com.solacesystems.jcsmp.EndpointProperties;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.XMLMessage;
import com.solacesystems.jcsmp.impl.JCSMPBasicSession;
import com.solacesystems.jcsmp.transaction.RollbackException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import lombok.Generated;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.core.AttributeAccessor;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.core.Pausable;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.util.Assert;

/* loaded from: input_file:com/solace/spring/cloud/stream/binder/inbound/JCSMPInboundQueueMessageProducer.class */
public class JCSMPInboundQueueMessageProducer extends MessageProducerSupport implements OrderlyShutdownCapable, Pausable {
    private final SolaceConsumerDestination consumerDestination;
    private final JCSMPSession jcsmpSession;
    private final ExtendedConsumerProperties<SolaceConsumerProperties> consumerProperties;
    private final EndpointProperties endpointProperties;
    private final Optional<SolaceMeterAccessor> solaceMeterAccessor;
    private final Optional<TracingProxy> tracingProxy;
    private final Optional<SolaceBinderHealthAccessor> solaceBinderHealthAccessor;
    private final List<FlowReceiverContainer> flowReceivers;
    private final Set<AtomicBoolean> consumerStopFlags;
    private Consumer<Endpoint> postStart;
    private ExecutorService executorService;
    private AtomicBoolean remoteStopFlag;
    private RetryTemplate retryTemplate;
    private RecoveryCallback<?> recoveryCallback;
    private ErrorQueueInfrastructure errorQueueInfrastructure;

    @Generated
    private static final Logger log = LoggerFactory.getLogger(JCSMPInboundQueueMessageProducer.class);
    private static final ThreadLocal<AttributeAccessor> attributesHolder = new ThreadLocal<>();
    private final String id = UUID.randomUUID().toString();
    private final long shutdownInterruptThresholdInMillis = 500;
    private final AtomicBoolean paused = new AtomicBoolean(false);

    /* loaded from: input_file:com/solace/spring/cloud/stream/binder/inbound/JCSMPInboundQueueMessageProducer$SolaceRetryListener.class */
    private static final class SolaceRetryListener implements RetryListener {
        private final String queueName;

        private SolaceRetryListener(String str) {
            this.queueName = str;
        }

        public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
            return true;
        }

        public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable th) {
        }

        public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable th) {
            JCSMPInboundQueueMessageProducer.log.warn(String.format("Failed to consume a message from destination %s - attempt %s", this.queueName, Integer.valueOf(retryContext.getRetryCount())));
            for (Throwable th2 : ExceptionUtils.getThrowableList(th)) {
                if ((th2 instanceof SolaceMessageConversionException) || (th2 instanceof RollbackException)) {
                    retryContext.setExhaustedOnly();
                    return;
                }
            }
        }
    }

    public JCSMPInboundQueueMessageProducer(SolaceConsumerDestination solaceConsumerDestination, JCSMPSession jCSMPSession, ExtendedConsumerProperties<SolaceConsumerProperties> extendedConsumerProperties, @Nullable EndpointProperties endpointProperties, Optional<SolaceMeterAccessor> optional, Optional<TracingProxy> optional2, Optional<SolaceBinderHealthAccessor> optional3) {
        this.consumerDestination = solaceConsumerDestination;
        this.jcsmpSession = jCSMPSession;
        this.consumerProperties = extendedConsumerProperties;
        this.endpointProperties = endpointProperties;
        this.solaceMeterAccessor = optional;
        this.tracingProxy = optional2;
        this.solaceBinderHealthAccessor = optional3;
        this.flowReceivers = new ArrayList(extendedConsumerProperties.getConcurrency());
        this.consumerStopFlags = new HashSet(extendedConsumerProperties.getConcurrency());
    }

    protected void doStart() {
        String name = this.consumerDestination.getName();
        log.info("Creating {} consumer flows for {} <inbound adapter {}>", new Object[]{Integer.valueOf(this.consumerProperties.getConcurrency()), name, this.id});
        if (isRunning()) {
            log.warn("Nothing to do. Inbound message channel adapter {} is already running", this.id);
            return;
        }
        if (this.consumerProperties.getConcurrency() < 1) {
            String format = String.format("Concurrency must be greater than 0, was %s <inbound adapter %s>", Integer.valueOf(this.consumerProperties.getConcurrency()), this.id);
            log.warn(format);
            throw new MessagingException(format);
        }
        JCSMPBasicSession jCSMPBasicSession = this.jcsmpSession;
        if ((jCSMPBasicSession instanceof JCSMPBasicSession) && !jCSMPBasicSession.isRequiredSettlementCapable(Set.of(XMLMessage.Outcome.ACCEPTED, XMLMessage.Outcome.FAILED, XMLMessage.Outcome.REJECTED))) {
            throw new MessagingException(String.format("The Solace PubSub+ Broker doesn't support message NACK capability, <inbound adapter %s>", this.id));
        }
        if (this.executorService != null && !this.executorService.isTerminated()) {
            log.warn("Unexpectedly found running executor service while starting inbound adapter {}, closing it...", this.id);
            stopAllConsumers();
        }
        Endpoint createQueue = JCSMPFactory.onlyInstance().createQueue(name);
        ConsumerFlowProperties consumerFlowProperties = SolaceProvisioningUtil.getConsumerFlowProperties(this.consumerDestination.getBindingDestinationName(), this.consumerProperties);
        int concurrency = this.consumerProperties.getConcurrency() - this.flowReceivers.size();
        for (int i = 0; i < concurrency; i++) {
            log.info("Creating consumer {} of {} for inbound adapter {}", new Object[]{Integer.valueOf(i + 1), Integer.valueOf(this.consumerProperties.getConcurrency()), this.id});
            FlowReceiverContainer flowReceiverContainer = new FlowReceiverContainer(this.jcsmpSession, createQueue, this.endpointProperties, consumerFlowProperties);
            if (this.paused.get()) {
                log.info("Inbound adapter {} is paused, pausing newly created flow receiver container {}", this.id, flowReceiverContainer.getId());
                flowReceiverContainer.pause();
            }
            this.flowReceivers.add(flowReceiverContainer);
        }
        if (this.solaceBinderHealthAccessor.isPresent()) {
            for (int i2 = 0; i2 < this.flowReceivers.size(); i2++) {
                this.solaceBinderHealthAccessor.get().addFlow(this.consumerProperties.getBindingName(), i2, this.flowReceivers.get(i2));
            }
        }
        try {
            Iterator<FlowReceiverContainer> it = this.flowReceivers.iterator();
            while (it.hasNext()) {
                it.next().bind();
            }
            if (this.retryTemplate != null) {
                this.retryTemplate.registerListener(new SolaceRetryListener(name));
            }
            this.executorService = buildThreadPool(this.consumerProperties.getConcurrency(), this.consumerProperties.getBindingName());
            this.flowReceivers.stream().map(this::buildListener).forEach(inboundXMLMessageListener -> {
                this.consumerStopFlags.add(inboundXMLMessageListener.getStopFlag());
                this.executorService.submit(inboundXMLMessageListener);
            });
            this.executorService.shutdown();
            if (this.postStart != null) {
                this.postStart.accept(createQueue);
            }
        } catch (JCSMPException e) {
            String format2 = String.format("Failed to get message consumer for inbound adapter %s", this.id);
            log.warn(format2, e);
            this.flowReceivers.forEach((v0) -> {
                v0.unbind();
            });
            throw new MessagingException(format2, e);
        }
    }

    private ExecutorService buildThreadPool(int i, String str) {
        return Executors.newFixedThreadPool(i, new CustomizableThreadFactory("solace-scst-consumer-" + str));
    }

    protected void doStop() {
        if (isRunning()) {
            stopAllConsumers();
        }
    }

    private void stopAllConsumers() {
        log.info(String.format("Stopping all %s consumer flows to queue %s <inbound adapter ID: %s>", Integer.valueOf(this.consumerProperties.getConcurrency()), this.consumerDestination.getName(), this.id));
        this.consumerStopFlags.forEach(atomicBoolean -> {
            atomicBoolean.set(true);
        });
        try {
            if (!this.executorService.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
                log.info(String.format("Interrupting all workers for inbound adapter %s", this.id));
                this.executorService.shutdownNow();
                if (!this.executorService.awaitTermination(1L, TimeUnit.MINUTES)) {
                    String format = String.format("executor service shutdown for inbound adapter %s timed out", this.id);
                    log.warn(format);
                    throw new MessagingException(format);
                }
            }
            if (this.solaceBinderHealthAccessor.isPresent()) {
                for (int i = 0; i < this.flowReceivers.size(); i++) {
                    this.solaceBinderHealthAccessor.get().removeFlow(this.consumerProperties.getBindingName(), i);
                }
            }
            this.consumerStopFlags.clear();
        } catch (InterruptedException e) {
            String format2 = String.format("executor service shutdown for inbound adapter %s was interrupted", this.id);
            log.warn(format2);
            throw new MessagingException(format2);
        }
    }

    public int beforeShutdown() {
        stop();
        return 0;
    }

    public int afterShutdown() {
        return 0;
    }

    protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
        AttributeAccessor attributeAccessor = attributesHolder.get();
        return attributeAccessor == null ? super.getErrorMessageAttributes(message) : attributeAccessor;
    }

    private InboundXMLMessageListener buildListener(FlowReceiverContainer flowReceiverContainer) {
        InboundXMLMessageListener basicInboundXMLMessageListener;
        JCSMPAcknowledgementCallbackFactory jCSMPAcknowledgementCallbackFactory = new JCSMPAcknowledgementCallbackFactory(flowReceiverContainer);
        jCSMPAcknowledgementCallbackFactory.setErrorQueueInfrastructure(this.errorQueueInfrastructure);
        if (this.consumerProperties.isBatchMode()) {
            log.error("BatchMode is deprecated and should not be used.");
        }
        if (this.retryTemplate != null) {
            Assert.state(getErrorChannel() == null, "Cannot have an 'errorChannel' property when a 'RetryTemplate' is provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to send an error message when retries are exhausted");
            basicInboundXMLMessageListener = new RetryableInboundXMLMessageListener(flowReceiverContainer, this.consumerDestination, this.consumerProperties, message -> {
                this.sendMessage(message);
            }, jCSMPAcknowledgementCallbackFactory, this.retryTemplate, this.recoveryCallback, this.solaceMeterAccessor, this.tracingProxy, this.remoteStopFlag, attributesHolder);
        } else {
            basicInboundXMLMessageListener = new BasicInboundXMLMessageListener(flowReceiverContainer, this.consumerDestination, this.consumerProperties, message2 -> {
                this.sendMessage(message2);
            }, jCSMPAcknowledgementCallbackFactory, (message3, exc) -> {
                return Boolean.valueOf(this.sendErrorMessageIfNecessary(message3, exc));
            }, this.solaceMeterAccessor, this.tracingProxy, this.remoteStopFlag, attributesHolder, getErrorChannel() != null);
        }
        return basicInboundXMLMessageListener;
    }

    public void pause() {
        log.info(String.format("Pausing inbound adapter %s", this.id));
        this.flowReceivers.forEach((v0) -> {
            v0.pause();
        });
        this.paused.set(true);
    }

    public void resume() {
        log.info(String.format("Resuming inbound adapter %s", this.id));
        try {
            Iterator<FlowReceiverContainer> it = this.flowReceivers.iterator();
            while (it.hasNext()) {
                it.next().resume();
            }
            this.paused.set(false);
        } catch (Exception e) {
            RuntimeException runtimeException = new RuntimeException(String.format("Failed to resume inbound adapter %s", this.id), e);
            if (this.paused.get()) {
                log.error(String.format("Inbound adapter %s failed to be resumed. Resumed flow receiver containers will be re-paused", this.id), e);
                try {
                    pause();
                } catch (Exception e2) {
                    runtimeException.addSuppressed(e2);
                }
            }
            throw runtimeException;
        }
    }

    public boolean isPaused() {
        if (!this.paused.get()) {
            return false;
        }
        for (FlowReceiverContainer flowReceiverContainer : this.flowReceivers) {
            if (!flowReceiverContainer.isPaused()) {
                log.warn(String.format("Flow receiver container %s is unexpectedly running for inbound adapter %s", flowReceiverContainer.getId(), this.id));
                return false;
            }
        }
        return true;
    }

    @Generated
    public void setPostStart(Consumer<Endpoint> consumer) {
        this.postStart = consumer;
    }

    @Generated
    public void setRemoteStopFlag(AtomicBoolean atomicBoolean) {
        this.remoteStopFlag = atomicBoolean;
    }

    @Generated
    public void setRetryTemplate(RetryTemplate retryTemplate) {
        this.retryTemplate = retryTemplate;
    }

    @Generated
    public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    @Generated
    public void setErrorQueueInfrastructure(ErrorQueueInfrastructure errorQueueInfrastructure) {
        this.errorQueueInfrastructure = errorQueueInfrastructure;
    }
}
