package com.solace.spring.cloud.stream.binder.provisioning;

import com.solace.spring.cloud.stream.binder.properties.SolaceCommonProperties;
import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
import com.solace.spring.cloud.stream.binder.properties.SolaceProducerProperties;
import com.solace.spring.cloud.stream.binder.provisioning.SolaceProvisioningUtil;
import com.solace.spring.cloud.stream.binder.util.DestinationType;
import com.solace.spring.cloud.stream.binder.util.JCSMPSessionEventHandler;
import com.solacesystems.jcsmp.ConsumerFlowProperties;
import com.solacesystems.jcsmp.Endpoint;
import com.solacesystems.jcsmp.EndpointProperties;
import com.solacesystems.jcsmp.JCSMPErrorResponseException;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.Queue;
import com.solacesystems.jcsmp.Topic;
import com.solacesystems.jcsmp.XMLMessageListener;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.util.StringUtils;

/* loaded from: input_file:com/solace/spring/cloud/stream/binder/provisioning/SolaceQueueProvisioner.class */
public class SolaceQueueProvisioner implements ProvisioningProvider<ExtendedConsumerProperties<SolaceConsumerProperties>, ExtendedProducerProperties<SolaceProducerProperties>> {
    private final JCSMPSession jcsmpSession;
    private final JCSMPSessionEventHandler jcsmpSessionEventHandler;
    private static final Log logger = LogFactory.getLog(SolaceQueueProvisioner.class);

    public ProducerDestination provisionProducerDestination(String str, ExtendedProducerProperties<SolaceProducerProperties> extendedProducerProperties) throws ProvisioningException {
        if (extendedProducerProperties.isPartitioned()) {
            logger.warn("Partitioning is not supported with this version of Solace's cloud stream binder.\nProvisioning will continue under the assumption that it is disabled...");
        }
        if (((SolaceProducerProperties) extendedProducerProperties.getExtension()).getDestinationType() == DestinationType.QUEUE) {
            if (extendedProducerProperties.getRequiredGroups() != null && extendedProducerProperties.getRequiredGroups().length > 0) {
                throw new ProvisioningException(String.format("Producer requiredGroups are not supported when destinationType=%s", DestinationType.QUEUE));
            }
            provisionQueueIfRequired(str, extendedProducerProperties);
            return new SolaceProducerDestination(str);
        }
        String topicName = SolaceProvisioningUtil.getTopicName(str, (SolaceCommonProperties) extendedProducerProperties.getExtension());
        HashSet<String> hashSet = new HashSet(Arrays.asList(extendedProducerProperties.getRequiredGroups()));
        Map<String, String[]> queueAdditionalSubscriptions = ((SolaceProducerProperties) extendedProducerProperties.getExtension()).getQueueAdditionalSubscriptions();
        for (String str2 : hashSet) {
            String queueName = SolaceProvisioningUtil.getQueueName(topicName, str2, extendedProducerProperties);
            logger.info(String.format("Creating durable queue %s for required consumer group %s", queueName, str2));
            Queue provisionQueueIfRequired = provisionQueueIfRequired(queueName, extendedProducerProperties);
            addSubscriptionToQueue(provisionQueueIfRequired, topicName, (SolaceCommonProperties) extendedProducerProperties.getExtension(), true);
            for (String str3 : queueAdditionalSubscriptions.getOrDefault(str2, new String[0])) {
                addSubscriptionToQueue(provisionQueueIfRequired, str3, (SolaceCommonProperties) extendedProducerProperties.getExtension(), false);
            }
        }
        Set set = (Set) queueAdditionalSubscriptions.keySet().stream().filter(str4 -> {
            return !hashSet.contains(str4);
        }).collect(Collectors.toSet());
        if (set.size() > 0) {
            logger.warn(String.format("Groups [%s] are not required groups. The additional subscriptions defined for them were ignored...", String.join(", ", set)));
        }
        return new SolaceProducerDestination(topicName);
    }

    public ConsumerDestination provisionConsumerDestination(String str, String str2, ExtendedConsumerProperties<SolaceConsumerProperties> extendedConsumerProperties) throws ProvisioningException {
        String format;
        if (extendedConsumerProperties.isPartitioned()) {
            logger.warn("Partitioning is not supported with this version of Solace's cloud stream binder.\nProvisioning will continue under the assumption that it is disabled...");
        }
        boolean isAnonQueue = SolaceProvisioningUtil.isAnonQueue(str2, ((SolaceConsumerProperties) extendedConsumerProperties.getExtension()).getQualityOfService());
        boolean isDurableQueue = SolaceProvisioningUtil.isDurableQueue(str2, ((SolaceConsumerProperties) extendedConsumerProperties.getExtension()).getQualityOfService());
        SolaceProvisioningUtil.QueueNames queueNames = SolaceProvisioningUtil.getQueueNames(str, str2, extendedConsumerProperties, isAnonQueue);
        String consumerGroupQueueName = queueNames.getConsumerGroupQueueName();
        EndpointProperties endpointProperties = SolaceProvisioningUtil.getEndpointProperties((SolaceCommonProperties) extendedConsumerProperties.getExtension());
        boolean isProvisionDurableQueue = ((SolaceConsumerProperties) extendedConsumerProperties.getExtension()).isProvisionDurableQueue();
        if (extendedConsumerProperties.getConcurrency() > 1) {
            if (endpointProperties.getAccessType().equals(1)) {
                logger.warn("Concurrency > 1 is not supported when using exclusive queues, either configure a concurrency of 1 or use a non-exclusive queue");
                throw new ProvisioningException("Concurrency > 1 is not supported when using exclusive queues, either configure a concurrency of 1 or use a non-exclusive queue");
            }
            if (!StringUtils.hasText(str2)) {
                logger.warn("Concurrency > 1 is not supported when using anonymous consumer groups, either configure a concurrency of 1 or define a consumer group");
                throw new ProvisioningException("Concurrency > 1 is not supported when using anonymous consumer groups, either configure a concurrency of 1 or define a consumer group");
            }
        }
        Log log = logger;
        if (isAnonQueue) {
            format = String.format("Creating anonymous (temporary) queue %s", consumerGroupQueueName);
        } else {
            Object[] objArr = new Object[3];
            objArr[0] = isDurableQueue ? "durable" : "temporary";
            objArr[1] = consumerGroupQueueName;
            objArr[2] = str2;
            format = String.format("Creating %s queue %s for consumer group %s", objArr);
        }
        log.info(format);
        Queue provisionQueue = provisionQueue(consumerGroupQueueName, isDurableQueue, endpointProperties, isProvisionDurableQueue, extendedConsumerProperties.isAutoStartup());
        HashSet hashSet = new HashSet(Arrays.asList(((SolaceConsumerProperties) extendedConsumerProperties.getExtension()).getQueueAdditionalSubscriptions()));
        String str3 = null;
        if (((SolaceConsumerProperties) extendedConsumerProperties.getExtension()).isAutoBindErrorQueue()) {
            str3 = provisionErrorQueue(queueNames.getErrorQueueName(), extendedConsumerProperties).getName();
        }
        return new SolaceConsumerDestination(provisionQueue.getName(), str, queueNames.getPhysicalGroupName(), !isDurableQueue, str3, hashSet);
    }

    private Queue provisionQueueIfRequired(String str, ExtendedProducerProperties<SolaceProducerProperties> extendedProducerProperties) {
        return provisionQueue(str, true, SolaceProvisioningUtil.getEndpointProperties((SolaceCommonProperties) extendedProducerProperties.getExtension()), ((SolaceProducerProperties) extendedProducerProperties.getExtension()).isProvisionDurableQueue(), extendedProducerProperties.isAutoStartup());
    }

    private Queue provisionQueue(String str, boolean z, EndpointProperties endpointProperties, boolean z2, boolean z3) {
        return provisionQueue(str, z, endpointProperties, z2, z3, "Durable queue");
    }

    private Queue provisionQueue(String str, boolean z, EndpointProperties endpointProperties, boolean z2, boolean z3, String str2) throws ProvisioningException {
        Endpoint createTemporaryQueue;
        try {
            if (z) {
                createTemporaryQueue = JCSMPFactory.onlyInstance().createQueue(str);
                if (z2) {
                    this.jcsmpSession.provision(createTemporaryQueue, endpointProperties, 1L);
                } else {
                    logger.info(String.format("%s provisioning is disabled, %s will not be provisioned nor will its configuration be validated", str2, str));
                }
            } else {
                createTemporaryQueue = this.jcsmpSession.createTemporaryQueue(str);
            }
            if (z && z3) {
                try {
                    logger.info(String.format("Testing consumer flow connection to queue %s (will not start it)", str));
                    this.jcsmpSession.createFlow((XMLMessageListener) null, new ConsumerFlowProperties().setEndpoint(createTemporaryQueue).setStartState(false), endpointProperties).close();
                    logger.info(String.format("Connected test consumer flow to queue %s, closing it", str));
                } catch (JCSMPException e) {
                    String format = String.format("Failed to connect test consumer flow to queue %s", str);
                    if (!z2) {
                        format = format + ". Provisioning is disabled, queue was not provisioned nor was its configuration validated.";
                    }
                    logger.warn(format, e);
                    throw new ProvisioningException(format, e);
                }
            } else {
                logger.trace(String.format("Skipping test consumer flow connection for queue %s", str));
            }
            return createTemporaryQueue;
        } catch (Exception e2) {
            String format2 = String.format("Failed to %s queue %s", z ? "provision durable" : "create temporary", str);
            logger.warn(format2, e2);
            throw new ProvisioningException(format2, e2);
        }
    }

    private Queue provisionErrorQueue(String str, ExtendedConsumerProperties<SolaceConsumerProperties> extendedConsumerProperties) {
        logger.info(String.format("Provisioning error queue %s", str));
        return provisionQueue(str, true, SolaceProvisioningUtil.getErrorQueueEndpointProperties((SolaceConsumerProperties) extendedConsumerProperties.getExtension()), ((SolaceConsumerProperties) extendedConsumerProperties.getExtension()).isProvisionErrorQueue(), extendedConsumerProperties.isAutoStartup(), "Error Queue");
    }

    public void addSubscriptionToQueue(Queue queue, String str, SolaceCommonProperties solaceCommonProperties, boolean z) {
        if (!z && queue.isDurable() && !solaceCommonProperties.isAddDestinationAsSubscriptionToQueue()) {
            logger.debug(String.format("Provision subscriptions to durable queues was disabled, queue %s will not be subscribed to topic %s", queue.getName(), str));
            return;
        }
        if (z && !solaceCommonProperties.isAddDestinationAsSubscriptionToQueue()) {
            logger.debug(String.format("Adding destination as subscription was disabled, queue %s will not be subscribed to topic %s", queue.getName(), str));
            return;
        }
        logger.info(String.format("Subscribing queue %s to topic %s", queue.getName(), str));
        try {
            Topic createTopic = JCSMPFactory.onlyInstance().createTopic(str);
            try {
                this.jcsmpSession.addSubscription(queue, createTopic, 4);
                registerTastToReapplySubscriptionAfterReconnect(queue, str, createTopic);
            } catch (JCSMPErrorResponseException e) {
                if (e.getSubcodeEx() != 13) {
                    throw e;
                }
                logger.info(String.format("Queue %s is already subscribed to topic %s, SUBSCRIPTION_ALREADY_PRESENT error will be ignored...", queue.getName(), str));
            }
        } catch (JCSMPException e2) {
            String format = String.format("Failed to add subscription of %s to queue %s", str, queue.getName());
            logger.warn(format, e2);
            throw new ProvisioningException(format, e2);
        }
    }

    private void registerTastToReapplySubscriptionAfterReconnect(Queue queue, String str, Topic topic) {
        AtomicReference atomicReference = new AtomicReference();
        atomicReference.set(() -> {
            try {
                logger.info(String.format("Subscribing after reconnect: queue %s and topic  %s.", queue.getName(), str));
                this.jcsmpSession.addSubscription(queue, topic, 4);
                logger.warn(String.format("The subscription has been re-established. !You might have lost messages! queue %s and topic  %s.", queue.getName(), str));
            } catch (Exception e) {
                logger.error(String.format("Could not add subscription after reconnect %s %s.", queue.getName(), str), e);
                this.jcsmpSessionEventHandler.removeAfterReconnectTask((Runnable) atomicReference.get());
            } catch (JCSMPErrorResponseException e2) {
                if (e2.getSubcodeEx() == 13) {
                    logger.info(String.format("Queue %s is already subscribed to topic %s, SUBSCRIPTION_ALREADY_PRESENT error will be ignored...", queue.getName(), str));
                } else {
                    logger.error(String.format("Could not add subscription after reconnect %s %s.", queue.getName(), str), e2);
                    this.jcsmpSessionEventHandler.removeAfterReconnectTask((Runnable) atomicReference.get());
                }
            }
        });
        this.jcsmpSessionEventHandler.addAfterReconnectTask((Runnable) atomicReference.get());
    }

    public SolaceQueueProvisioner(JCSMPSession jCSMPSession, JCSMPSessionEventHandler jCSMPSessionEventHandler) {
        this.jcsmpSession = jCSMPSession;
        this.jcsmpSessionEventHandler = jCSMPSessionEventHandler;
    }
}
