A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)
This service is experimental and the APIs are likely to change before settling down.
Getting Started
Maven
Add the following dependency to your maven project
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rabbitmq-client-scala_2.12</artifactId>
<version>3.5.0.Beta1</version>
</dependency>
Gradle
Add the following dependency to your gradle project
dependencies {
compile 'io.vertx:vertx-rabbitmq-client-scala_2.12:3.5.0.Beta1'
}
Create a client
You can create a client instance as follows using a full amqp uri:
var config = RabbitMQOptions()
// full amqp uri
config.setUri("amqp://xvjvsrrc:VbuL1atClKt7zVNQha0bnnScbNvGiqgb@moose.rmq.cloudamqp.com/xvjvsrrc")
var client = RabbitMQClient.create(vertx, config)
Or you can also specify individual parameters manually:
var config = RabbitMQOptions()
// Each parameter is optional
// The default parameter with be used if the parameter is not set
config.setUser("user1")
config.setPassword("password1")
config.setHost("localhost")
config.setPort(5672)
config.setVirtualHost("vhost1")
config.setConnectionTimeout(6000)
config.setRequestedHeartbeat(60)
config.setHandshakeTimeout(6000)
config.setRequestedChannelMax(5)
config.setNetworkRecoveryInterval(500)
config.setAutomaticRecoveryEnabled(true)
var client = RabbitMQClient.create(vertx, config)
Declare exchange with additional config
You can pass additional config parameters to RabbitMQ’s exchangeDeclare method
var config = Map()
config + ("x-dead-letter-exchange" -> "my.deadletter.exchange")
config + ("alternate-exchange" -> "my.alternate.exchange")
// ...
client.exchangeDeclareFuture("my.exchange", "fanout", true, false, config).onComplete{
case Success(result) => {
println("Exchange successfully declared with config")
}
case Failure(cause) => {
println(s"$cause")
}
}
Operations
The following are some examples of the operations supported by the RabbitMQService API.
Consult the javadoc/documentation for detailed information on all API methods.
Publish
Publish a message to a queue
var message = new io.vertx.core.json.JsonObject().put("body", "Hello RabbitMQ, from Vert.x !")
client.basicPublishFuture("", "my.queue", message).onComplete{
case Success(result) => {
println("Message published !")
}
case Failure(cause) => {
println(s"$cause")
}
}
Publish with confirm
Publish a message to a queue and confirm the broker acknowledged it.
var message = new io.vertx.core.json.JsonObject().put("body", "Hello RabbitMQ, from Vert.x !")
// Put the channel in confirm mode. This can be done once at init.
client.confirmSelectFuture().onComplete{
case Success(result) => {
client.basicPublishFuture("", "my.queue", message).onComplete{
case Success(result) => {
// Check the message got confirmed by the broker.
client.waitForConfirmsFuture().onComplete{
case Success(result) => {
println("Message published !")}
case Failure(cause) => {
println(s"$cause")
}
}
}
case Failure(cause) => {
println(s"$cause")
}
}
}
case Failure(cause) => {
println(s"$cause")
}
}
Consume
Consume messages from a queue
// Create the event bus handler which messages will be sent to
// Create the event bus handler which messages will be sent to
vertx.eventBus().consumer("my.address", (msg: io.vertx.scala.core.eventbus.Message<java.lang.Object>) => {
var json = msg.body()
println(s"Got message: ${json.getValue("body")}")
})
// Setup the link between rabbitmq consumer and event bus address
client.basicConsumeFuture("my.queue", "my.address").onComplete{
case Success(result) => {
println("RabbitMQ consumer created !")
}
case Failure(cause) => {
println(s"$cause")
}
}
Get
Will get a message from a queue
client.basicGetFuture("my.queue", true).onComplete{
case Success(result) => {
var msg = result
println(s"Got message: ${msg.getValue("body")}")
}
case Failure(cause) => {
println(s"$cause")
}
}
Consume messages without auto-ack
// Create the event bus handler which messages will be sent to
vertx.eventBus().consumer("my.address", (msg: io.vertx.scala.core.eventbus.Message<java.lang.Object>) => {
var json = msg.body()
println(s"Got message: ${json.getValue("body")}")
// ack
client.basicAckFuture(json.getValue("deliveryTag"), false).onComplete{
case Success(result) => println("Success")
case Failure(cause) => println("Failure")
}
})
// Setup the link between rabbitmq consumer and event bus address
client.basicConsumeFuture("my.queue", "my.address", false).onComplete{
case Success(result) => {
println("RabbitMQ consumer created !")
}
case Failure(cause) => {
println(s"$cause")
}
}
Running the tests
You will need to have RabbitMQ installed and running with default ports on localhost for this to work. <a href="mailto:nscavell@redhat.com">Nick Scavelli</a>