package org.citrusframework.endpoint.direct;

import org.citrusframework.context.TestContext;
import org.citrusframework.message.Message;
import org.citrusframework.message.MessageQueue;
import org.citrusframework.message.correlation.CorrelationManager;
import org.citrusframework.message.correlation.PollingCorrelationManager;
import org.citrusframework.messaging.ReplyProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/citrusframework/endpoint/direct/DirectSyncConsumer.class */
public class DirectSyncConsumer extends DirectConsumer implements ReplyProducer {
    private static Logger log = LoggerFactory.getLogger(DirectSyncConsumer.class);
    private CorrelationManager<MessageQueue> correlationManager;
    private final DirectSyncEndpointConfiguration endpointConfiguration;

    public DirectSyncConsumer(String str, DirectSyncEndpointConfiguration directSyncEndpointConfiguration) {
        super(str, directSyncEndpointConfiguration);
        this.endpointConfiguration = directSyncEndpointConfiguration;
        this.correlationManager = new PollingCorrelationManager(directSyncEndpointConfiguration, "Reply channel not set up yet");
    }

    @Override // org.citrusframework.endpoint.direct.DirectConsumer
    public Message receive(String str, TestContext testContext, long j) {
        Message receive = super.receive(str, testContext, j);
        saveReplyMessageQueue(receive, testContext);
        return receive;
    }

    public void send(Message message, TestContext testContext) {
        Assert.notNull(message, "Can not send empty message");
        String correlationKey = this.correlationManager.getCorrelationKey(this.endpointConfiguration.getCorrelator().getCorrelationKeyName(getName()), testContext);
        MessageQueue messageQueue = (MessageQueue) this.correlationManager.find(correlationKey, this.endpointConfiguration.getTimeout());
        Assert.notNull(messageQueue, "Failed to find reply channel for message correlation key: " + correlationKey);
        if (log.isDebugEnabled()) {
            log.debug("Sending message to reply channel: '" + messageQueue + "'");
            log.debug("Message to send is:\n" + message.toString());
        }
        messageQueue.send(message);
        log.info("Message was sent to reply channel: '" + messageQueue + "'");
    }

    public void saveReplyMessageQueue(Message message, TestContext testContext) {
        MessageQueue messageQueue = null;
        if (message.getHeader(DirectMessageHeaders.REPLY_QUEUE) instanceof MessageQueue) {
            messageQueue = (MessageQueue) message.getHeader(DirectMessageHeaders.REPLY_QUEUE);
        } else if (StringUtils.hasText((String) message.getHeader(DirectMessageHeaders.REPLY_QUEUE))) {
            messageQueue = resolveQueueName(message.getHeader(DirectMessageHeaders.REPLY_QUEUE).toString(), testContext);
        }
        if (messageQueue == null) {
            log.warn("Unable to retrieve reply message channel for message \n" + message + "\n - no reply channel found in message headers!");
            return;
        }
        String correlationKeyName = this.endpointConfiguration.getCorrelator().getCorrelationKeyName(getName());
        String correlationKey = this.endpointConfiguration.getCorrelator().getCorrelationKey(message);
        this.correlationManager.saveCorrelationKey(correlationKeyName, correlationKey, testContext);
        this.correlationManager.store(correlationKey, messageQueue);
    }

    public CorrelationManager<MessageQueue> getCorrelationManager() {
        return this.correlationManager;
    }

    public void setCorrelationManager(CorrelationManager<MessageQueue> correlationManager) {
        this.correlationManager = correlationManager;
    }
}
