Consul is a tool for discovering and configuring services in your infrastructure.
A Vert.x client allowing applications to interact with a Consul system via blocking and non-blocking HTTP API.
Using Vert.x Consul Client
To use this project, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-consul-client</artifactId>
<version>3.4.0.Beta1</version>
</dependency>
-
Gradle (in your
build.gradle
file):
compile 'io.vertx:vertx-consul-client:3.4.0.Beta1'
Creating a client
Just use factory method:
import io.vertx.ext.consul.ConsulClient
var client = ConsulClient.create(vertx)
Also the client can be configured with an options.
import io.vertx.ext.consul.ConsulClient
import io.vertx.ext.consul.ConsulClientOptions
import io.vertx.kotlin.ext.consul.*
var options = ConsulClientOptions(
host = "consul.example.com")
var client = ConsulClient.create(vertx, options)
The following configuration is supported by the consul client:
host
-
Consul host. Defaults to
localhost
port
-
Consul HTTP API port. Defaults to
8500
timeout
-
Sets the amount of time (in milliseconds) after which if the request does not return any data within the timeout period an failure will be passed to the handler and the request will be closed.
acl_token
-
The ACL token. When provided, the client will use this token when making requests to the Consul by providing the "?token" query parameter. When not provided, the empty token, which maps to the 'anonymous' ACL policy, is used.
dc
-
The datacenter name. When provided, the client will use it when making requests to the Consul by providing the "?dc" query parameter. When not provided, the datacenter of the consul agent is queried.
Using the API
The client API is represented by ConsulClient
. The API is very similar to Consul’s
HTTP API that described in Consul API docs
Blocking queries
Certain endpoints support a feature called a "blocking query." A blocking query is used to wait for a potential change using long polling. Any endpoint that supports blocking also provide a unique identifier (index) representing the current state of the requested resource. The following configuration is used to perform blocking queries:
index
-
value indicating that the client wishes to wait for any changes subsequent to that index.
wait
-
parameter specifying a maximum duration for the blocking request. This is limited to 10 minutes.
import io.vertx.ext.consul.BlockingQueryOptions
import io.vertx.kotlin.ext.consul.*
var opts = BlockingQueryOptions(
index = lastIndex,
wait = "1m")
Key/Value Store
// put Key/Value pair to Consul Store
consulClient.putValue("foo", "bar", { res ->
if (res.succeeded()) {
println("result of the operation: ${(res.result() ? "success" : "fail")}")
} else {
res.cause().printStackTrace()
}
})
// get Key/Value pair from Consul Store
consulClient.getValue("foo", { res ->
if (res.succeeded()) {
println("retrieved value: ${res.result().value}")
println("modify index: ${res.result().modifyIndex}")
} else {
res.cause().printStackTrace()
}
})
The modify index can be used for blocking requests:
import io.vertx.ext.consul.BlockingQueryOptions
import io.vertx.kotlin.ext.consul.*
var opts = BlockingQueryOptions(
index = modifyIndex)
consulClient.getValueWithOptions("foo", opts, { res ->
if (res.succeeded()) {
println("retrieved value: ${res.result().value}")
} else {
res.cause().printStackTrace()
}
})
Health Checks
import io.vertx.ext.consul.CheckOptions
import io.vertx.kotlin.ext.consul.*
var alwaysGood = { h ->
h.response().setStatusCode(200).end()
}
// create HTTP server to responce health check
vertx.createHttpServer().requestHandler(alwaysGood).listen(4848)
// check health via TCP port every 1 sec
var opts = CheckOptions(
tcp = "localhost:4848",
interval = "1s")
// register TCP check
consulClient.registerCheck(opts, { res ->
if (res.succeeded()) {
println("check successfully registered")
} else {
res.cause().printStackTrace()
}
})
Services
import io.vertx.ext.consul.BlockingQueryOptions
import io.vertx.ext.consul.CheckOptions
import io.vertx.ext.consul.ServiceOptions
import io.vertx.ext.consul.ServiceQueryOptions
import io.vertx.kotlin.ext.consul.*
var opts = ServiceOptions(
id = "serviceId",
name = "serviceName",
tags = listOf("tag1", "tag2"),
checkOptions = CheckOptions(
ttl = "10s"),
address = "10.0.0.1",
port = 8080)
// Service registration
consulClient.registerService(opts, { res ->
if (res.succeeded()) {
println("Service successfully registered")
} else {
res.cause().printStackTrace()
}
})
// Discovery registered service
consulClient.catalogServiceNodes("serviceName", { res ->
if (res.succeeded()) {
println("found ${res.result().list.size} services")
println("consul state index: ${res.result().index}")
for (service in res.result().list) {
println("Service node: ${service.node}")
println("Service address: ${service.address}")
println("Service port: ${service.port}")
}
} else {
res.cause().printStackTrace()
}
})
// Blocking request for nodes that provide given service, sorted by distance from agent
var queryOpts = ServiceQueryOptions(
near = "_agent",
blockingOptions = BlockingQueryOptions(
index = lastIndex))
consulClient.catalogServiceNodesWithOptions("serviceName", queryOpts, { res ->
if (res.succeeded()) {
println("found ${res.result().list.size} services")
} else {
res.cause().printStackTrace()
}
})
// Service deregistration
consulClient.deregisterService("serviceId", { res ->
if (res.succeeded()) {
println("Service successfully deregistered")
} else {
res.cause().printStackTrace()
}
})
Events
import io.vertx.ext.consul.EventOptions
import io.vertx.kotlin.ext.consul.*
var opts = EventOptions(
tag = "tag",
payload = "message")
// trigger a new user event
consulClient.fireEventWithOptions("eventName", opts, { res ->
if (res.succeeded()) {
println("Event sent")
println("id: ${res.result().id}")
} else {
res.cause().printStackTrace()
}
})
// most recent events known by the agent
consulClient.listEvents({ res ->
if (res.succeeded()) {
for (event in res.result().list) {
println("Event id: ${event.id}")
println("Event name: ${event.name}")
println("Event payload: ${event.payload}")
}
} else {
res.cause().printStackTrace()
}
})
Sessions
import io.vertx.ext.consul.BlockingQueryOptions
import io.vertx.ext.consul.SessionBehavior
import io.vertx.ext.consul.SessionOptions
import io.vertx.kotlin.ext.consul.*
var opts = SessionOptions(
node = "nodeId",
behavior = SessionBehavior.RELEASE)
// Create session
consulClient.createSessionWithOptions(opts, { res ->
if (res.succeeded()) {
println("Session successfully created")
println("id: ${res.result()}")
} else {
res.cause().printStackTrace()
}
})
// Lists sessions belonging to a node
consulClient.listNodeSessions("nodeId", { res ->
if (res.succeeded()) {
for (session in res.result().list) {
println("Session id: ${session.id}")
println("Session node: ${session.node}")
println("Session create index: ${session.createIndex}")
}
} else {
res.cause().printStackTrace()
}
})
// Blocking query for all active sessions
var blockingOpts = BlockingQueryOptions(
index = lastIndex)
consulClient.listSessionsWithOptions(blockingOpts, { res ->
if (res.succeeded()) {
println("Found ${res.result().list.size} sessions")
} else {
res.cause().printStackTrace()
}
})
// Destroy session
consulClient.destroySession(sessionId, { res ->
if (res.succeeded()) {
println("Session successfully destroyed")
} else {
res.cause().printStackTrace()
}
})
Nodes in cluster
import io.vertx.ext.consul.BlockingQueryOptions
import io.vertx.ext.consul.NodeQueryOptions
import io.vertx.kotlin.ext.consul.*
consulClient.catalogNodes({ res ->
if (res.succeeded()) {
println("found ${res.result().list.size} nodes")
println("consul state index ${res.result().index}")
} else {
res.cause().printStackTrace()
}
})
// blocking request to catalog for nodes, sorted by distance from agent
var opts = NodeQueryOptions(
near = "_agent",
blockingOptions = BlockingQueryOptions(
index = lastIndex))
consulClient.catalogNodesWithOptions(opts, { res ->
if (res.succeeded()) {
println("found ${res.result().list.size} nodes")
} else {
res.cause().printStackTrace()
}
})