Consul is a tool for discovering and configuring services in your infrastructure.
A Vert.x client allowing applications to interact with a Consul system via blocking and non-blocking HTTP API.
Using Vert.x Consul Client
To use this project, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-consul-client</artifactId>
<version>3.4.0.Beta1</version>
</dependency>
-
Gradle (in your
build.gradle
file):
compile 'io.vertx:vertx-consul-client:3.4.0.Beta1'
Creating a client
Just use factory method:
ConsulClient client = ConsulClient.create(vertx);
Also the client can be configured with an options.
ConsulClientOptions options = new ConsulClientOptions()
.setHost("consul.example.com");
ConsulClient client = ConsulClient.create(vertx, options);
The following configuration is supported by the consul client:
host
-
Consul host. Defaults to
localhost
port
-
Consul HTTP API port. Defaults to
8500
timeout
-
Sets the amount of time (in milliseconds) after which if the request does not return any data within the timeout period an failure will be passed to the handler and the request will be closed.
acl_token
-
The ACL token. When provided, the client will use this token when making requests to the Consul by providing the "?token" query parameter. When not provided, the empty token, which maps to the 'anonymous' ACL policy, is used.
dc
-
The datacenter name. When provided, the client will use it when making requests to the Consul by providing the "?dc" query parameter. When not provided, the datacenter of the consul agent is queried.
Using the API
The client API is represented by ConsulClient
. The API is very similar to Consul’s
HTTP API that described in Consul API docs
Blocking queries
Certain endpoints support a feature called a "blocking query." A blocking query is used to wait for a potential change using long polling. Any endpoint that supports blocking also provide a unique identifier (index) representing the current state of the requested resource. The following configuration is used to perform blocking queries:
index
-
value indicating that the client wishes to wait for any changes subsequent to that index.
wait
-
parameter specifying a maximum duration for the blocking request. This is limited to 10 minutes.
BlockingQueryOptions opts = new BlockingQueryOptions()
.setIndex(lastIndex)
.setWait("1m");
Key/Value Store
consulClient.putValue("foo", "bar", res -> {
if (res.succeeded()) {
System.out.println("result of the operation: " + (res.result() ? "success" : "fail"));
} else {
res.cause().printStackTrace();
}
});
// get Key/Value pair from Consul Store
consulClient.getValue("foo", res -> {
if (res.succeeded()) {
System.out.println("retrieved value: " + res.result().getValue());
System.out.println("modify index: " + res.result().getModifyIndex());
} else {
res.cause().printStackTrace();
}
});
The modify index can be used for blocking requests:
BlockingQueryOptions opts = new BlockingQueryOptions().setIndex(modifyIndex);
consulClient.getValueWithOptions("foo", opts, res -> {
if (res.succeeded()) {
System.out.println("retrieved value: " + res.result().getValue());
} else {
res.cause().printStackTrace();
}
});
Health Checks
Handler<HttpServerRequest> alwaysGood = h -> h.response()
.setStatusCode(200)
.end();
// create HTTP server to responce health check
vertx.createHttpServer()
.requestHandler(alwaysGood)
.listen(4848);
// check health via TCP port every 1 sec
CheckOptions opts = new CheckOptions().setTcp("localhost:4848").setInterval("1s");
// register TCP check
consulClient.registerCheck(opts, res -> {
if (res.succeeded()) {
System.out.println("check successfully registered");
} else {
res.cause().printStackTrace();
}
});
Services
ServiceOptions opts = new ServiceOptions()
.setId("serviceId")
.setName("serviceName")
.setTags(Arrays.asList("tag1", "tag2"))
.setCheckOptions(new CheckOptions().setTtl("10s"))
.setAddress("10.0.0.1")
.setPort(8080);
// Service registration
consulClient.registerService(opts, res -> {
if (res.succeeded()) {
System.out.println("Service successfully registered");
} else {
res.cause().printStackTrace();
}
});
// Discovery registered service
consulClient.catalogServiceNodes("serviceName", res -> {
if (res.succeeded()) {
System.out.println("found " + res.result().getList().size() + " services");
System.out.println("consul state index: " + res.result().getIndex());
for (Service service : res.result().getList()) {
System.out.println("Service node: " + service.getNode());
System.out.println("Service address: " + service.getAddress());
System.out.println("Service port: " + service.getPort());
}
} else {
res.cause().printStackTrace();
}
});
// Blocking request for nodes that provide given service, sorted by distance from agent
ServiceQueryOptions queryOpts = new ServiceQueryOptions()
.setNear("_agent")
.setBlockingOptions(new BlockingQueryOptions().setIndex(lastIndex));
consulClient.catalogServiceNodesWithOptions("serviceName", queryOpts, res -> {
if (res.succeeded()) {
System.out.println("found " + res.result().getList().size() + " services");
} else {
res.cause().printStackTrace();
}
});
// Service deregistration
consulClient.deregisterService("serviceId", res -> {
if (res.succeeded()) {
System.out.println("Service successfully deregistered");
} else {
res.cause().printStackTrace();
}
});
Events
EventOptions opts = new EventOptions()
.setTag("tag")
.setPayload("message");
// trigger a new user event
consulClient.fireEventWithOptions("eventName", opts, res -> {
if (res.succeeded()) {
System.out.println("Event sent");
System.out.println("id: " + res.result().getId());
} else {
res.cause().printStackTrace();
}
});
// most recent events known by the agent
consulClient.listEvents(res -> {
if (res.succeeded()) {
for(Event event: res.result().getList()) {
System.out.println("Event id: " + event.getId());
System.out.println("Event name: " + event.getName());
System.out.println("Event payload: " + event.getPayload());
}
} else {
res.cause().printStackTrace();
}
});
Sessions
SessionOptions opts = new SessionOptions()
.setNode("nodeId")
.setBehavior(SessionBehavior.RELEASE);
// Create session
consulClient.createSessionWithOptions(opts, res -> {
if (res.succeeded()) {
System.out.println("Session successfully created");
System.out.println("id: " + res.result());
} else {
res.cause().printStackTrace();
}
});
// Lists sessions belonging to a node
consulClient.listNodeSessions("nodeId", res -> {
if (res.succeeded()) {
for(Session session: res.result().getList()) {
System.out.println("Session id: " + session.getId());
System.out.println("Session node: " + session.getNode());
System.out.println("Session create index: " + session.getCreateIndex());
}
} else {
res.cause().printStackTrace();
}
});
// Blocking query for all active sessions
BlockingQueryOptions blockingOpts = new BlockingQueryOptions()
.setIndex(lastIndex);
consulClient.listSessionsWithOptions(blockingOpts, res -> {
if (res.succeeded()) {
System.out.println("Found " + res.result().getList().size() + " sessions");
} else {
res.cause().printStackTrace();
}
});
// Destroy session
consulClient.destroySession(sessionId, res -> {
if (res.succeeded()) {
System.out.println("Session successfully destroyed");
} else {
res.cause().printStackTrace();
}
});
Nodes in cluster
consulClient.catalogNodes(res -> {
if (res.succeeded()) {
System.out.println("found " + res.result().getList().size() + " nodes");
System.out.println("consul state index " + res.result().getIndex());
} else {
res.cause().printStackTrace();
}
});
// blocking request to catalog for nodes, sorted by distance from agent
NodeQueryOptions opts = new NodeQueryOptions()
.setNear("_agent")
.setBlockingOptions(new BlockingQueryOptions().setIndex(lastIndex));
consulClient.catalogNodesWithOptions(opts, res -> {
if (res.succeeded()) {
System.out.println("found " + res.result().getList().size() + " nodes");
} else {
res.cause().printStackTrace();
}
});