Consul is a tool for discovering and configuring services in your infrastructure.
A Vert.x client allowing applications to interact with a Consul system via blocking and non-blocking HTTP API.
Using Vert.x Consul Client
To use this project, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-consul-client</artifactId>
<version>3.4.0.Beta1</version>
</dependency>
-
Gradle (in your
build.gradle
file):
compile 'io.vertx:vertx-consul-client:3.4.0.Beta1'
Creating a client
Just use factory method:
require 'vertx-consul/consul_client'
client = VertxConsul::ConsulClient.create(vertx)
Also the client can be configured with an options.
require 'vertx-consul/consul_client'
options = {
'host' => "consul.example.com"
}
client = VertxConsul::ConsulClient.create(vertx, options)
The following configuration is supported by the consul client:
host
-
Consul host. Defaults to
localhost
port
-
Consul HTTP API port. Defaults to
8500
timeout
-
Sets the amount of time (in milliseconds) after which if the request does not return any data within the timeout period an failure will be passed to the handler and the request will be closed.
acl_token
-
The ACL token. When provided, the client will use this token when making requests to the Consul by providing the "?token" query parameter. When not provided, the empty token, which maps to the 'anonymous' ACL policy, is used.
dc
-
The datacenter name. When provided, the client will use it when making requests to the Consul by providing the "?dc" query parameter. When not provided, the datacenter of the consul agent is queried.
Using the API
The client API is represented by ConsulClient
. The API is very similar to Consul’s
HTTP API that described in Consul API docs
Blocking queries
Certain endpoints support a feature called a "blocking query." A blocking query is used to wait for a potential change using long polling. Any endpoint that supports blocking also provide a unique identifier (index) representing the current state of the requested resource. The following configuration is used to perform blocking queries:
index
-
value indicating that the client wishes to wait for any changes subsequent to that index.
wait
-
parameter specifying a maximum duration for the blocking request. This is limited to 10 minutes.
opts = {
'index' => lastIndex,
'wait' => "1m"
}
Key/Value Store
# put Key/Value pair to Consul Store
consulClient.put_value("foo", "bar") { |res_err,res|
if (res_err == nil)
puts "result of the operation: #{(res ? "success" : "fail")}"
else
res_err.print_stack_trace()
end
}
# get Key/Value pair from Consul Store
consulClient.get_value("foo") { |res_err,res|
if (res_err == nil)
puts "retrieved value: #{res['value']}"
puts "modify index: #{res['modifyIndex']}"
else
res_err.print_stack_trace()
end
}
The modify index can be used for blocking requests:
opts = {
'index' => modifyIndex
}
consulClient.get_value_with_options("foo", opts) { |res_err,res|
if (res_err == nil)
puts "retrieved value: #{res['value']}"
else
res_err.print_stack_trace()
end
}
Health Checks
alwaysGood = lambda { |h|
h.response().set_status_code(200).end()
}
# create HTTP server to responce health check
vertx.create_http_server().request_handler(&alwaysGood).listen(4848)
# check health via TCP port every 1 sec
opts = {
'tcp' => "localhost:4848",
'interval' => "1s"
}
# register TCP check
consulClient.register_check(opts) { |res_err,res|
if (res_err == nil)
puts "check successfully registered"
else
res_err.print_stack_trace()
end
}
Services
opts = {
'id' => "serviceId",
'name' => "serviceName",
'tags' => ["tag1", "tag2"],
'checkOptions' => {
'ttl' => "10s"
},
'address' => "10.0.0.1",
'port' => 8080
}
# Service registration
consulClient.register_service(opts) { |res_err,res|
if (res_err == nil)
puts "Service successfully registered"
else
res_err.print_stack_trace()
end
}
# Discovery registered service
consulClient.catalog_service_nodes("serviceName") { |res_err,res|
if (res_err == nil)
puts "found #{res['list'].length} services"
puts "consul state index: #{res['index']}"
res['list'].each do |service|
puts "Service node: #{service['node']}"
puts "Service address: #{service['address']}"
puts "Service port: #{service['port']}"
end
else
res_err.print_stack_trace()
end
}
# Blocking request for nodes that provide given service, sorted by distance from agent
queryOpts = {
'near' => "_agent",
'blockingOptions' => {
'index' => lastIndex
}
}
consulClient.catalog_service_nodes_with_options("serviceName", queryOpts) { |res_err,res|
if (res_err == nil)
puts "found #{res['list'].length} services"
else
res_err.print_stack_trace()
end
}
# Service deregistration
consulClient.deregister_service("serviceId") { |res_err,res|
if (res_err == nil)
puts "Service successfully deregistered"
else
res_err.print_stack_trace()
end
}
Events
opts = {
'tag' => "tag",
'payload' => "message"
}
# trigger a new user event
consulClient.fire_event_with_options("eventName", opts) { |res_err,res|
if (res_err == nil)
puts "Event sent"
puts "id: #{res['id']}"
else
res_err.print_stack_trace()
end
}
# most recent events known by the agent
consulClient.list_events() { |res_err,res|
if (res_err == nil)
res['list'].each do |event|
puts "Event id: #{event['id']}"
puts "Event name: #{event['name']}"
puts "Event payload: #{event['payload']}"
end
else
res_err.print_stack_trace()
end
}
Sessions
opts = {
'node' => "nodeId",
'behavior' => "RELEASE"
}
# Create session
consulClient.create_session_with_options(opts) { |res_err,res|
if (res_err == nil)
puts "Session successfully created"
puts "id: #{res}"
else
res_err.print_stack_trace()
end
}
# Lists sessions belonging to a node
consulClient.list_node_sessions("nodeId") { |res_err,res|
if (res_err == nil)
res['list'].each do |session|
puts "Session id: #{session['id']}"
puts "Session node: #{session['node']}"
puts "Session create index: #{session['createIndex']}"
end
else
res_err.print_stack_trace()
end
}
# Blocking query for all active sessions
blockingOpts = {
'index' => lastIndex
}
consulClient.list_sessions_with_options(blockingOpts) { |res_err,res|
if (res_err == nil)
puts "Found #{res['list'].length} sessions"
else
res_err.print_stack_trace()
end
}
# Destroy session
consulClient.destroy_session(sessionId) { |res_err,res|
if (res_err == nil)
puts "Session successfully destroyed"
else
res_err.print_stack_trace()
end
}
Nodes in cluster
consulClient.catalog_nodes() { |res_err,res|
if (res_err == nil)
puts "found #{res['list'].length} nodes"
puts "consul state index #{res['index']}"
else
res_err.print_stack_trace()
end
}
# blocking request to catalog for nodes, sorted by distance from agent
opts = {
'near' => "_agent",
'blockingOptions' => {
'index' => lastIndex
}
}
consulClient.catalog_nodes_with_options(opts) { |res_err,res|
if (res_err == nil)
puts "found #{res['list'].length} nodes"
else
res_err.print_stack_trace()
end
}