package org.citrusframework.websocket.endpoint;

import org.citrusframework.context.TestContext;
import org.citrusframework.exceptions.MessageTimeoutException;
import org.citrusframework.message.Message;
import org.citrusframework.messaging.AbstractSelectiveMessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.WebSocketMessage;

/* loaded from: input_file:org/citrusframework/websocket/endpoint/WebSocketConsumer.class */
public class WebSocketConsumer extends AbstractSelectiveMessageConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(WebSocketConsumer.class);
    private final WebSocketEndpointConfiguration endpointConfiguration;

    public WebSocketConsumer(String str, WebSocketEndpointConfiguration webSocketEndpointConfiguration) {
        super(str, webSocketEndpointConfiguration);
        this.endpointConfiguration = webSocketEndpointConfiguration;
    }

    public Message receive(String str, TestContext testContext, long j) {
        LOG.info(String.format("Waiting %s ms for Web Socket message ...", Long.valueOf(j)));
        Message convertInbound = this.endpointConfiguration.getMessageConverter().convertInbound(receive(this.endpointConfiguration, j), this.endpointConfiguration, testContext);
        LOG.info("Received Web Socket message");
        testContext.onInboundMessage(convertInbound);
        return convertInbound;
    }

    private WebSocketMessage<?> receive(WebSocketEndpointConfiguration webSocketEndpointConfiguration, long j) {
        long j2 = j;
        WebSocketMessage<?> message = webSocketEndpointConfiguration.getHandler().getMessage();
        String endpointUri = this.endpointConfiguration.getEndpointUri();
        while (message == null && j2 > 0) {
            j2 -= this.endpointConfiguration.getPollingInterval();
            long pollingInterval = j2 > 0 ? this.endpointConfiguration.getPollingInterval() : this.endpointConfiguration.getPollingInterval() + j2;
            if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("Waiting for message on '%s' - retrying in %s ms", endpointUri, Long.valueOf(pollingInterval)));
            }
            try {
                Thread.sleep(pollingInterval);
            } catch (InterruptedException e) {
                LOG.warn(String.format("Thread interrupted while waiting for message on '%s'", endpointUri), e);
            }
            message = webSocketEndpointConfiguration.getHandler().getMessage();
        }
        if (message == null) {
            throw new MessageTimeoutException(j, endpointUri);
        }
        return message;
    }
}
