A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)
This service is experimental and the APIs are likely to change before settling down.
Getting Started
Maven
Add the following dependency to your maven project
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rabbitmq-client</artifactId>
<version>4.0.0.Beta1</version>
</dependency>
Gradle
Add the following dependency to your gradle project
dependencies {
compile 'io.vertx:vertx-rabbitmq-client:4.0.0.Beta1'
}
Create a client
You can create a client instance as follows using a full amqp uri:
def config = [:]
// full amqp uri
config.uri = "amqp://xvjvsrrc:VbuL1atClKt7zVNQha0bnnScbNvGiqgb@moose.rmq.cloudamqp.com/xvjvsrrc"
def client = RabbitMQClient.create(vertx, config)
Or you can also specify individual parameters manually:
def config = [:]
// Each parameter is optional
// The default parameter with be used if the parameter is not set
config.user = "user1"
config.password = "password1"
config.host = "localhost"
config.port = 5672
config.virtualHost = "vhost1"
config.connectionTimeout = 6000
config.requestedHeartbeat = 60
config.handshakeTimeout = 6000
config.requestedChannelMax = 5
config.networkRecoveryInterval = 500
config.automaticRecoveryEnabled = true
def client = RabbitMQClient.create(vertx, config)
You can set multiples addresses to connect to a cluster;
def config = [:]
config.user = "user1"
config.password = "password1"
config.virtualHost = "vhost1"
config.addresses = [com.rabbitmq.client.Address.parseAddresses("firstHost,secondHost:5672")]
def client = RabbitMQClient.create(vertx, config)
Reconnections
The RabbitMQClient will, by default, retry connecting to the RabbitMQ server if it cannot connect. It will also attempt to reconnect whenever the connection to the RabbitMQ server becomes broken. The failure of a connection could be caused by a transient network failure (where the client would probably connect back to the same RabbitMQ server) or it could be caused by a failover scenario.
The reconnection policy can be configured by setting the connectionRetries and connectionRetryDelay properties in the configuration:
RabbitMQOptions config = new RabbitMQOptions();
options.setConnectionRetries(Integer.MAX_VALUE);
options.setConnectionRetryDelay(500);
As soon as the connection has been re-established users of the client may attempt to publish (or consume) messages. However, if the connection is to a new RabbitMQ server it is possible that objects created through this RabbitMQClient won’t exist,. i.e. when exchanges and queues are created through the RabbitMQClient on startup. To provide an opportunity to create these objects before the connection is considered ready the RabbitMQClient provides the ConnectionEstablishedCallback. The ConnectionEstablishedCallback can be used to carry out any operations on the RabbitMQClient before other users (including the RabbitMQConsumer and RabbitMQPublisher) are able to access it.
def client = RabbitMQClient.create(vertx, config)
client.addConnectionEstablishedCallback({ promise ->
client.exchangeDeclare("exchange", "fanout", true, false).compose({ v ->
return client.queueDeclare("queue", false, true, true)
}).compose({ declareOk ->
return client.queueBind(declareOk.getQueue(), "exchange", "")
}).onComplete(promise)
})
// At this point the exchange, queue and binding will have been declared even if the client connects to a new server
client.basicConsumer("queue", { rabbitMQConsumerAsyncResult ->
})
If a RabbitMQConsumer is listening for messages on an auto-delete server-named queue and the broker restarts the queue will have been removed by the time the client reconnects. In this instance it is necessary to both recreate the queue and set the new queue name on the RabbitMQConsumer.
def client = RabbitMQClient.create(vertx, config)
def consumer = new java.util.concurrent.atomic.AtomicReference()
def queueName = new java.util.concurrent.atomic.AtomicReference()
client.addConnectionEstablishedCallback({ promise ->
client.exchangeDeclare("exchange", "fanout", true, false).compose({ v ->
client.queueDeclare("", false, true, true)
}).compose({ dok ->
queueName.set(dok.getQueue())
// The first time this runs there will be no existing consumer
// on subsequent connections the consumer needs to be update with the new queue name
def currentConsumer = consumer.get()
if (currentConsumer != null) {
currentConsumer.setQueueName(queueName.get())
}
return client.queueBind(queueName.get(), "exchange", "")
}).onComplete(promise)
})
client.start().onSuccess({ v ->
// At this point the exchange, queue and binding will have been declared even if the client connects to a new server
client.basicConsumer(queueName.get(), { rabbitMQConsumerAsyncResult ->
if (rabbitMQConsumerAsyncResult.succeeded()) {
consumer.set(rabbitMQConsumerAsyncResult.result())
}
})
}).onFailure({ ex ->
println("It went wrong: ${ex.getMessage()}")
})
Declare exchange with additional config
You can pass additional config parameters to RabbitMQ’s exchangeDeclare method
def config = [:]
config.x-dead-letter-exchange = "my.deadletter.exchange"
config.alternate-exchange = "my.alternate.exchange"
// ...
client.exchangeDeclare("my.exchange", "fanout", true, false, config, { onResult ->
if (onResult.succeeded()) {
println("Exchange successfully declared with config")
} else {
onResult.cause().printStackTrace()
}
})
Declare queue with additional config
You can pass additional config parameters to RabbitMQs queueDeclare method
def config = [:]
config.x-message-ttl = 10000L
client.queueDeclare("my-queue", true, false, true, config, { queueResult ->
if (queueResult.succeeded()) {
println("Queue declared!")
} else {
System.err.println("Queue failed to be declared!")
queueResult.cause().printStackTrace()
}
})
Operations
The following are some examples of the operations supported by the RabbitMQService API. Consult the javadoc/documentation for detailed information on all API methods.
Publish
Publish a message to a queue
def message = Buffer.buffer("body", "Hello RabbitMQ, from Vert.x !")
client.basicPublish("", "my.queue", message, { pubResult ->
if (pubResult.succeeded()) {
println("Message published !")
} else {
pubResult.cause().printStackTrace()
}
})
Publish with confirm
Publish a message to a queue and confirm the broker acknowledged it.
def message = Buffer.buffer("body", "Hello RabbitMQ, from Vert.x !")
// Put the channel in confirm mode. This can be done once at init.
client.confirmSelect({ confirmResult ->
if (confirmResult.succeeded()) {
client.basicPublish("", "my.queue", message, { pubResult ->
if (pubResult.succeeded()) {
// Check the message got confirmed by the broker.
client.waitForConfirms({ waitResult ->
if (waitResult.succeeded()) {
println("Message published !")} else {
waitResult.cause().printStackTrace()}
})
} else {
pubResult.cause().printStackTrace()
}
})
} else {
confirmResult.cause().printStackTrace()
}
})
Reliable Message Publishing
In order to reliably publish messages to RabbitMQ it is necessary to handle confirmations that each message has been accepted by the server. The simplest approach to confirmations is to use the basicPublishWithConfirm approach, above, which synchronously confirms each message when it is sent - blocking the publishing channel until the confirmation is received.
In order to achieve greater throughput RabbitMQ provides asynchronous confirmations. The asynchronous confirmations can confirm multiple messages in one go, so it is necessary for the client to track all messages in the order that they were published. Also, until messages are confirmed by the server it may be necessary to resend them, so they must be retained by the client.
The RabbitMQPublisher class implements a standard approach to handling asynchronous confirmations, avoiding much of the boiler plate code that would otherwise be required.
The RabbitMQPublisher works by: * Adding all sent messages to an internal queue. * Sending messages from the queue when it is able, keeping track of these messages pending acknowledgement in a separate queue. * Handling asynchronous confirmations from RabbitMQ, removing messages from the pendingAck queue once they are confirmed. * Notifying the caller for each message that is confirmed (this is always a single message at a time, not the bulk confirmation used by RabbitMQ).
Code not translatable
Delivery Tags
This section is an implementation detail that is useful for anyone that wants to implement their own alternative to RabbitMQPublisher.
For the RabbitMQPublisher to work it has to know the delivery tag that RabbitMQ will use for each message published. The confirmations from RabbitMQ can arrive at the client before the call to basicPublish has completed, so it is not possible to identify the delivery tag via anything returned by basicPublish if asynchronous confirmations are being used. For this reason it is necessary for the RabbitMQClient to tell the RabbitMQPublisher the delivery tag of each message via a separate callback that occurs in the call to RabbitMQClient::basicPublish before the message is actually sent on the network. It is also possible for the delivery tag of a single message to change (delivery tags are per-channel, so if the message is resent following a reconnection it will have a new delivery tag) - this means that we cannot use a Future to inform the client of the delivery tag. If the deliveryTagHandler is called more than once for a given message it is always safe to ignore the previous value - there can be only one valid delivery tag for a message at any time.
To capture the delivery tag one of the RabbitMqClient::basicPublishWithDeliveryTag methods should be used.
void basicPublishWithDeliveryTag(String exchange, String routingKey, BasicProperties properties, Buffer body, Handler<Long> deliveryTagHandler, Handler<AsyncResult<Void>> resultHandler);
Future<Void> basicPublishWithDeliveryTag(String exchange, String routingKey, BasicProperties properties, Buffer body, @Nullable Handler<Long> deliveryTagHandler);
These methods
Consume
Consume messages from a queue.
// Create a stream of messages from a queue
client.basicConsumer("my.queue", { rabbitMQConsumerAsyncResult ->
if (rabbitMQConsumerAsyncResult.succeeded()) {
println("RabbitMQ consumer created !")
def mqConsumer = rabbitMQConsumerAsyncResult.result()
mqConsumer.handler({ message ->
println("Got message: ${message.body().toString()}")
})
} else {
rabbitMQConsumerAsyncResult.cause().printStackTrace()
}
})
At any moment of time you can pause or resume the stream. When stream is paused you won’t receive any message.
consumer.pause()
consumer.resume()
There are actually a set of options to specify when creating a consumption stream.
The QueueOptions
lets you specify:
-
The size of internal queue with
setMaxInternalQueueSize
-
Should the stream keep more recent messages when queue size is exceed with
setKeepMostRecent
def options = [
maxInternalQueueSize:1000,
keepMostRecent:true
]
client.basicConsumer("my.queue", options, { rabbitMQConsumerAsyncResult ->
if (rabbitMQConsumerAsyncResult.succeeded()) {
println("RabbitMQ consumer created !")
} else {
rabbitMQConsumerAsyncResult.cause().printStackTrace()
}
})
When you want to stop consuming message from a queue, you can do:
rabbitMQConsumer.cancel({ cancelResult ->
if (cancelResult.succeeded()) {
println("Consumption successfully stopped")
} else {
println("Tired in attempt to stop consumption")
cancelResult.cause().printStackTrace()
}
})
You can get notified by the end handler when the queue won’t process any more messages:
rabbitMQConsumer.endHandler({ v ->
println("It is the end of the stream")
})
You can set the exception handler to be notified of any error that may occur when a message is processed:
consumer.exceptionHandler({ e ->
println("An exception occurred in the process of message handling")
e.printStackTrace()
})
And finally, you may want to retrive a related to the consumer tag:
def consumerTag = consumer.consumerTag()
println("Consumer tag is: ${consumerTag}")
Get
Will get a message from a queue
client.basicGet("my.queue", true, { getResult ->
if (getResult.succeeded()) {
def msg = getResult.result()
println("Got message: ${msg.body()}")
} else {
getResult.cause().printStackTrace()
}
})
Consume messages without auto-ack
// Setup the rabbitmq consumer
client.basicConsumer("my.queue", [
autoAck:false
], { consumeResult ->
if (consumeResult.succeeded()) {
println("RabbitMQ consumer created !")
def consumer = consumeResult.result()
// Set the handler which messages will be sent to
consumer.handler({ msg ->
def json = msg.body()
println("Got message: ${json.body}")
// ack
client.basicAck(json.deliveryTag, false, { asyncResult ->
})
})
} else {
consumeResult.cause().printStackTrace()
}
})
Running the tests
You will need to have RabbitMQ installed and running with default ports on localhost for this to work.