Consul is a tool for discovering and configuring services in your infrastructure.

A Vert.x client allowing applications to interact with a Consul system via blocking and non-blocking HTTP API.

Using Vert.x Consul Client

To use this project, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-consul-client</artifactId>
  <version>3.4.0.Beta1</version>
</dependency>
  • Gradle (in your build.gradle file):

compile 'io.vertx:vertx-consul-client:3.4.0.Beta1'

Creating a client

Just use factory method:

import io.vertx.ext.consul.ConsulClient

def client = ConsulClient.create(vertx)

Also the client can be configured with an options.

import io.vertx.ext.consul.ConsulClient

def options = [
  host:"consul.example.com"
]

def client = ConsulClient.create(vertx, options)

The following configuration is supported by the consul client:

host

Consul host. Defaults to localhost

port

Consul HTTP API port. Defaults to 8500

timeout

Sets the amount of time (in milliseconds) after which if the request does not return any data within the timeout period an failure will be passed to the handler and the request will be closed.

acl_token

The ACL token. When provided, the client will use this token when making requests to the Consul by providing the "?token" query parameter. When not provided, the empty token, which maps to the 'anonymous' ACL policy, is used.

dc

The datacenter name. When provided, the client will use it when making requests to the Consul by providing the "?dc" query parameter. When not provided, the datacenter of the consul agent is queried.

Using the API

The client API is represented by ConsulClient. The API is very similar to Consul’s HTTP API that described in Consul API docs

Blocking queries

Certain endpoints support a feature called a "blocking query." A blocking query is used to wait for a potential change using long polling. Any endpoint that supports blocking also provide a unique identifier (index) representing the current state of the requested resource. The following configuration is used to perform blocking queries:

index

value indicating that the client wishes to wait for any changes subsequent to that index.

wait

parameter specifying a maximum duration for the blocking request. This is limited to 10 minutes.

def opts = [
  index:lastIndex,
  wait:"1m"
]

Key/Value Store

// put Key/Value pair to Consul Store
consulClient.putValue("foo", "bar", { res ->

  if (res.succeeded()) {

    println("result of the operation: ${(res.result() ? "success" : "fail")}")

  } else {

    res.cause().printStackTrace()

  }

})

//  get Key/Value pair from Consul Store
consulClient.getValue("foo", { res ->

  if (res.succeeded()) {

    println("retrieved value: ${res.result().value}")

    println("modify index: ${res.result().modifyIndex}")

  } else {

    res.cause().printStackTrace()

  }

})

The modify index can be used for blocking requests:

def opts = [
  index:modifyIndex
]

consulClient.getValueWithOptions("foo", opts, { res ->

  if (res.succeeded()) {

    println("retrieved value: ${res.result().value}")

  } else {

    res.cause().printStackTrace()

  }

})

Health Checks

def alwaysGood = { h ->
  h.response().setStatusCode(200).end()
}

// create HTTP server to responce health check

vertx.createHttpServer().requestHandler(alwaysGood).listen(4848)

// check health via TCP port every 1 sec

def opts = [
  tcp:"localhost:4848",
  interval:"1s"
]

// register TCP check

consulClient.registerCheck(opts, { res ->

  if (res.succeeded()) {

    println("check successfully registered")

  } else {

    res.cause().printStackTrace()

  }

})

Services

def opts = [
  id:"serviceId",
  name:"serviceName",
  tags:["tag1", "tag2"],
  checkOptions:[
    ttl:"10s"
  ],
  address:"10.0.0.1",
  port:8080
]

// Service registration

consulClient.registerService(opts, { res ->

  if (res.succeeded()) {

    println("Service successfully registered")

  } else {

    res.cause().printStackTrace()

  }

})

// Discovery registered service

consulClient.catalogServiceNodes("serviceName", { res ->

  if (res.succeeded()) {

    println("found ${res.result().list.size()} services")

    println("consul state index: ${res.result().index}")

    res.result().list.each { service ->

      println("Service node: ${service.node}")

      println("Service address: ${service.address}")

      println("Service port: ${service.port}")

    }

  } else {

    res.cause().printStackTrace()

  }

})

// Blocking request for nodes that provide given service, sorted by distance from agent

def queryOpts = [
  near:"_agent",
  blockingOptions:[
    index:lastIndex
  ]
]

consulClient.catalogServiceNodesWithOptions("serviceName", queryOpts, { res ->

  if (res.succeeded()) {

    println("found ${res.result().list.size()} services")

  } else {

    res.cause().printStackTrace()

  }

})

// Service deregistration

consulClient.deregisterService("serviceId", { res ->

  if (res.succeeded()) {

    println("Service successfully deregistered")

  } else {

    res.cause().printStackTrace()

  }

})

Events

def opts = [
  tag:"tag",
  payload:"message"
]

// trigger a new user event

consulClient.fireEventWithOptions("eventName", opts, { res ->

  if (res.succeeded()) {

    println("Event sent")

    println("id: ${res.result().id}")

  } else {

    res.cause().printStackTrace()

  }

})

// most recent events known by the agent

consulClient.listEvents({ res ->

  if (res.succeeded()) {

    res.result().list.each { event ->

      println("Event id: ${event.id}")

      println("Event name: ${event.name}")

      println("Event payload: ${event.payload}")

    }

  } else {

    res.cause().printStackTrace()

  }

})

Sessions

import io.vertx.ext.consul.SessionBehavior

def opts = [
  node:"nodeId",
  behavior:"RELEASE"
]

// Create session

consulClient.createSessionWithOptions(opts, { res ->

  if (res.succeeded()) {

    println("Session successfully created")

    println("id: ${res.result()}")

  } else {

    res.cause().printStackTrace()

  }

})

// Lists sessions belonging to a node

consulClient.listNodeSessions("nodeId", { res ->

  if (res.succeeded()) {

    res.result().list.each { session ->

      println("Session id: ${session.id}")

      println("Session node: ${session.node}")

      println("Session create index: ${session.createIndex}")

    }

  } else {

    res.cause().printStackTrace()

  }

})

// Blocking query for all active sessions

def blockingOpts = [
  index:lastIndex
]

consulClient.listSessionsWithOptions(blockingOpts, { res ->

  if (res.succeeded()) {

    println("Found ${res.result().list.size()} sessions")

  } else {

    res.cause().printStackTrace()

  }

})

// Destroy session

consulClient.destroySession(sessionId, { res ->

  if (res.succeeded()) {

    println("Session successfully destroyed")

  } else {

    res.cause().printStackTrace()

  }

})

Nodes in cluster

consulClient.catalogNodes({ res ->

  if (res.succeeded()) {

    println("found ${res.result().list.size()} nodes")

    println("consul state index ${res.result().index}")

  } else {

    res.cause().printStackTrace()

  }

})

// blocking request to catalog for nodes, sorted by distance from agent

def opts = [
  near:"_agent",
  blockingOptions:[
    index:lastIndex
  ]
]

consulClient.catalogNodesWithOptions(opts, { res ->

  if (res.succeeded()) {

    println("found ${res.result().list.size()} nodes")

  } else {

    res.cause().printStackTrace()

  }

})