A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)
This service is experimental and the APIs are likely to change before settling down.
Getting Started
Maven
Add the following dependency to your maven project
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rabbitmq-client</artifactId>
<version>3.5.0.Beta1</version>
</dependency>
Gradle
Add the following dependency to your gradle project
dependencies {
compile 'io.vertx:vertx-rabbitmq-client:3.5.0.Beta1'
}
Create a client
You can create a client instance as follows using a full amqp uri:
require 'vertx-rabbitmq/rabbit_mq_client'
config = {
}
# full amqp uri
config['uri'] = "amqp://xvjvsrrc:VbuL1atClKt7zVNQha0bnnScbNvGiqgb@moose.rmq.cloudamqp.com/xvjvsrrc"
client = VertxRabbitmq::RabbitMQClient.create(vertx, config)
Or you can also specify individual parameters manually:
require 'vertx-rabbitmq/rabbit_mq_client'
config = {
}
# Each parameter is optional
# The default parameter with be used if the parameter is not set
config['user'] = "user1"
config['password'] = "password1"
config['host'] = "localhost"
config['port'] = 5672
config['virtualHost'] = "vhost1"
config['connectionTimeout'] = 6000
config['requestedHeartbeat'] = 60
config['handshakeTimeout'] = 6000
config['requestedChannelMax'] = 5
config['networkRecoveryInterval'] = 500
config['automaticRecoveryEnabled'] = true
client = VertxRabbitmq::RabbitMQClient.create(vertx, config)
Declare exchange with additional config
You can pass additional config parameters to RabbitMQ’s exchangeDeclare method
config = Hash.new()
config["x-dead-letter-exchange"] = "my.deadletter.exchange"
config["alternate-exchange"] = "my.alternate.exchange"
# ...
client.exchange_declare("my.exchange", "fanout", true, false, config) { |onResult_err,onResult|
if (onResult_err == nil)
puts "Exchange successfully declared with config"
else
onResult_err.print_stack_trace()
end
}
Operations
The following are some examples of the operations supported by the RabbitMQService API.
Consult the javadoc/documentation for detailed information on all API methods.
Publish
Publish a message to a queue
message = {
'body' => "Hello RabbitMQ, from Vert.x !"
}
client.basic_publish("", "my.queue", message) { |pubResult_err,pubResult|
if (pubResult_err == nil)
puts "Message published !"
else
pubResult_err.print_stack_trace()
end
}
Publish with confirm
Publish a message to a queue and confirm the broker acknowledged it.
message = {
'body' => "Hello RabbitMQ, from Vert.x !"
}
# Put the channel in confirm mode. This can be done once at init.
client.confirm_select() { |confirmResult_err,confirmResult|
if (confirmResult_err == nil)
client.basic_publish("", "my.queue", message) { |pubResult_err,pubResult|
if (pubResult_err == nil)
# Check the message got confirmed by the broker.
client.wait_for_confirms() { |waitResult_err,waitResult|
if (waitResult_err == nil)
puts "Message published !"else
waitResult_err.print_stack_trace()end
}
else
pubResult_err.print_stack_trace()
end
}
else
confirmResult_err.print_stack_trace()
end
}
Consume
Consume messages from a queue
// Create the event bus handler which messages will be sent to
# Create the event bus handler which messages will be sent to
vertx.event_bus().consumer("my.address") { |msg|
json = msg.body()
puts "Got message: #{json['body']}"
}
# Setup the link between rabbitmq consumer and event bus address
client.basic_consume("my.queue", "my.address") { |consumeResult_err,consumeResult|
if (consumeResult_err == nil)
puts "RabbitMQ consumer created !"
else
consumeResult_err.print_stack_trace()
end
}
Get
Will get a message from a queue
client.basic_get("my.queue", true) { |getResult_err,getResult|
if (getResult_err == nil)
msg = getResult
puts "Got message: #{msg['body']}"
else
getResult_err.print_stack_trace()
end
}
Consume messages without auto-ack
# Create the event bus handler which messages will be sent to
vertx.event_bus().consumer("my.address") { |msg|
json = msg.body()
puts "Got message: #{json['body']}"
# ack
client.basic_ack(json['deliveryTag'], false) { |asyncResult_err,asyncResult|
}
}
# Setup the link between rabbitmq consumer and event bus address
client.basic_consume("my.queue", "my.address", false) { |consumeResult_err,consumeResult|
if (consumeResult_err == nil)
puts "RabbitMQ consumer created !"
else
consumeResult_err.print_stack_trace()
end
}
Running the tests
You will need to have RabbitMQ installed and running with default ports on localhost for this to work. <a href="mailto:nscavell@redhat.com">Nick Scavelli</a>