package org.citrusframework.vertx.endpoint;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import org.citrusframework.context.TestContext;
import org.citrusframework.exceptions.MessageTimeoutException;
import org.citrusframework.messaging.AbstractMessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/citrusframework/vertx/endpoint/VertxConsumer.class */
public class VertxConsumer extends AbstractMessageConsumer {
    private final Vertx vertx;
    private final VertxEndpointConfiguration endpointConfiguration;
    private static final Logger logger = LoggerFactory.getLogger(VertxConsumer.class);
    private static final Logger RETRY_LOG = LoggerFactory.getLogger("org.citrusframework.RetryLogger");

    /* loaded from: input_file:org/citrusframework/vertx/endpoint/VertxConsumer$VertxSingleMessageHandler.class */
    private class VertxSingleMessageHandler implements Handler<Message<Object>> {
        private Message message;

        private VertxSingleMessageHandler() {
        }

        public void handle(Message message) {
            if (this.message == null) {
                this.message = message;
            } else {
                VertxConsumer.logger.warn("Vert.x message handler ignored message on event bus address '" + VertxConsumer.this.endpointConfiguration.getAddress() + "'");
                VertxConsumer.logger.debug("Vert.x message ignored is " + message);
            }
        }

        public Message getMessage() {
            return this.message;
        }
    }

    public VertxConsumer(String str, Vertx vertx, VertxEndpointConfiguration vertxEndpointConfiguration) {
        super(str, vertxEndpointConfiguration);
        this.vertx = vertx;
        this.endpointConfiguration = vertxEndpointConfiguration;
    }

    public org.citrusframework.message.Message receive(TestContext testContext, long j) {
        long pollingInterval;
        if (logger.isDebugEnabled()) {
            logger.debug("Receiving message on Vert.x event bus address: '" + this.endpointConfiguration.getAddress() + "'");
        }
        VertxSingleMessageHandler vertxSingleMessageHandler = new VertxSingleMessageHandler();
        MessageConsumer consumer = this.vertx.eventBus().consumer(this.endpointConfiguration.getAddress(), vertxSingleMessageHandler);
        try {
            long j2 = j;
            org.citrusframework.message.Message convertInbound = this.endpointConfiguration.getMessageConverter().convertInbound(vertxSingleMessageHandler.getMessage(), this.endpointConfiguration, testContext);
            while (convertInbound == null && j2 > 0) {
                j2 -= this.endpointConfiguration.getPollingInterval();
                if (RETRY_LOG.isDebugEnabled()) {
                    Logger logger2 = RETRY_LOG;
                    Object[] objArr = new Object[2];
                    objArr[0] = this.endpointConfiguration.getAddress();
                    objArr[1] = Long.valueOf(j2 > 0 ? this.endpointConfiguration.getPollingInterval() : this.endpointConfiguration.getPollingInterval() + j2);
                    logger2.debug(String.format("Waiting for message on Vert.x event bus address '%s' - retrying in %s ms", objArr));
                }
                if (j2 > 0) {
                    try {
                        pollingInterval = this.endpointConfiguration.getPollingInterval();
                    } catch (InterruptedException e) {
                        RETRY_LOG.warn("Thread interrupted while waiting for message on Vert.x event bus", e);
                    }
                } else {
                    pollingInterval = this.endpointConfiguration.getPollingInterval() + j2;
                }
                Thread.sleep(pollingInterval);
                convertInbound = this.endpointConfiguration.getMessageConverter().convertInbound(vertxSingleMessageHandler.getMessage(), this.endpointConfiguration, testContext);
            }
            if (convertInbound == null) {
                throw new MessageTimeoutException(j, this.endpointConfiguration.getAddress());
            }
            logger.info("Received message on Vert.x event bus address: '" + this.endpointConfiguration.getAddress() + "'");
            testContext.onInboundMessage(convertInbound);
            org.citrusframework.message.Message message = convertInbound;
            consumer.unregister();
            return message;
        } catch (Throwable th) {
            consumer.unregister();
            throw th;
        }
    }
}
