Consul is a tool for discovering and configuring services in your infrastructure.

A Vert.x client allowing applications to interact with a Consul system via blocking and non-blocking HTTP API.

Using Vert.x Consul Client

To use this project, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-consul-client</artifactId>
  <version>3.4.0.Beta1</version>
</dependency>
  • Gradle (in your build.gradle file):

compile 'io.vertx:vertx-consul-client:3.4.0.Beta1'

Creating a client

Just use factory method:

require 'vertx-consul/consul_client'

client = VertxConsul::ConsulClient.create(vertx)

Also the client can be configured with an options.

require 'vertx-consul/consul_client'

options = {
  'host' => "consul.example.com"
}

client = VertxConsul::ConsulClient.create(vertx, options)

The following configuration is supported by the consul client:

host

Consul host. Defaults to localhost

port

Consul HTTP API port. Defaults to 8500

timeout

Sets the amount of time (in milliseconds) after which if the request does not return any data within the timeout period an failure will be passed to the handler and the request will be closed.

acl_token

The ACL token. When provided, the client will use this token when making requests to the Consul by providing the "?token" query parameter. When not provided, the empty token, which maps to the 'anonymous' ACL policy, is used.

dc

The datacenter name. When provided, the client will use it when making requests to the Consul by providing the "?dc" query parameter. When not provided, the datacenter of the consul agent is queried.

Using the API

The client API is represented by ConsulClient. The API is very similar to Consul’s HTTP API that described in Consul API docs

Blocking queries

Certain endpoints support a feature called a "blocking query." A blocking query is used to wait for a potential change using long polling. Any endpoint that supports blocking also provide a unique identifier (index) representing the current state of the requested resource. The following configuration is used to perform blocking queries:

index

value indicating that the client wishes to wait for any changes subsequent to that index.

wait

parameter specifying a maximum duration for the blocking request. This is limited to 10 minutes.

opts = {
  'index' => lastIndex,
  'wait' => "1m"
}

Key/Value Store

# put Key/Value pair to Consul Store
consulClient.put_value("foo", "bar") { |res_err,res|

  if (res_err == nil)

    puts "result of the operation: #{(res ? "success" : "fail")}"

  else

    res_err.print_stack_trace()

  end

}

#  get Key/Value pair from Consul Store
consulClient.get_value("foo") { |res_err,res|

  if (res_err == nil)

    puts "retrieved value: #{res['value']}"

    puts "modify index: #{res['modifyIndex']}"

  else

    res_err.print_stack_trace()

  end

}

The modify index can be used for blocking requests:

opts = {
  'index' => modifyIndex
}

consulClient.get_value_with_options("foo", opts) { |res_err,res|

  if (res_err == nil)

    puts "retrieved value: #{res['value']}"

  else

    res_err.print_stack_trace()

  end

}

Health Checks

alwaysGood = lambda { |h|
  h.response().set_status_code(200).end()
}

# create HTTP server to responce health check

vertx.create_http_server().request_handler(&alwaysGood).listen(4848)

# check health via TCP port every 1 sec

opts = {
  'tcp' => "localhost:4848",
  'interval' => "1s"
}

# register TCP check

consulClient.register_check(opts) { |res_err,res|

  if (res_err == nil)

    puts "check successfully registered"

  else

    res_err.print_stack_trace()

  end

}

Services

opts = {
  'id' => "serviceId",
  'name' => "serviceName",
  'tags' => ["tag1", "tag2"],
  'checkOptions' => {
    'ttl' => "10s"
  },
  'address' => "10.0.0.1",
  'port' => 8080
}

# Service registration

consulClient.register_service(opts) { |res_err,res|

  if (res_err == nil)

    puts "Service successfully registered"

  else

    res_err.print_stack_trace()

  end

}

# Discovery registered service

consulClient.catalog_service_nodes("serviceName") { |res_err,res|

  if (res_err == nil)

    puts "found #{res['list'].length} services"

    puts "consul state index: #{res['index']}"

    res['list'].each do |service|

      puts "Service node: #{service['node']}"

      puts "Service address: #{service['address']}"

      puts "Service port: #{service['port']}"

    end

  else

    res_err.print_stack_trace()

  end

}

# Blocking request for nodes that provide given service, sorted by distance from agent

queryOpts = {
  'near' => "_agent",
  'blockingOptions' => {
    'index' => lastIndex
  }
}

consulClient.catalog_service_nodes_with_options("serviceName", queryOpts) { |res_err,res|

  if (res_err == nil)

    puts "found #{res['list'].length} services"

  else

    res_err.print_stack_trace()

  end

}

# Service deregistration

consulClient.deregister_service("serviceId") { |res_err,res|

  if (res_err == nil)

    puts "Service successfully deregistered"

  else

    res_err.print_stack_trace()

  end

}

Events

opts = {
  'tag' => "tag",
  'payload' => "message"
}

# trigger a new user event

consulClient.fire_event_with_options("eventName", opts) { |res_err,res|

  if (res_err == nil)

    puts "Event sent"

    puts "id: #{res['id']}"

  else

    res_err.print_stack_trace()

  end

}

# most recent events known by the agent

consulClient.list_events() { |res_err,res|

  if (res_err == nil)

    res['list'].each do |event|

      puts "Event id: #{event['id']}"

      puts "Event name: #{event['name']}"

      puts "Event payload: #{event['payload']}"

    end

  else

    res_err.print_stack_trace()

  end

}

Sessions

opts = {
  'node' => "nodeId",
  'behavior' => "RELEASE"
}

# Create session

consulClient.create_session_with_options(opts) { |res_err,res|

  if (res_err == nil)

    puts "Session successfully created"

    puts "id: #{res}"

  else

    res_err.print_stack_trace()

  end

}

# Lists sessions belonging to a node

consulClient.list_node_sessions("nodeId") { |res_err,res|

  if (res_err == nil)

    res['list'].each do |session|

      puts "Session id: #{session['id']}"

      puts "Session node: #{session['node']}"

      puts "Session create index: #{session['createIndex']}"

    end

  else

    res_err.print_stack_trace()

  end

}

# Blocking query for all active sessions

blockingOpts = {
  'index' => lastIndex
}

consulClient.list_sessions_with_options(blockingOpts) { |res_err,res|

  if (res_err == nil)

    puts "Found #{res['list'].length} sessions"

  else

    res_err.print_stack_trace()

  end

}

# Destroy session

consulClient.destroy_session(sessionId) { |res_err,res|

  if (res_err == nil)

    puts "Session successfully destroyed"

  else

    res_err.print_stack_trace()

  end

}

Nodes in cluster

consulClient.catalog_nodes() { |res_err,res|

  if (res_err == nil)

    puts "found #{res['list'].length} nodes"

    puts "consul state index #{res['index']}"

  else

    res_err.print_stack_trace()

  end

}

# blocking request to catalog for nodes, sorted by distance from agent

opts = {
  'near' => "_agent",
  'blockingOptions' => {
    'index' => lastIndex
  }
}

consulClient.catalog_nodes_with_options(opts) { |res_err,res|

  if (res_err == nil)

    puts "found #{res['list'].length} nodes"

  else

    res_err.print_stack_trace()

  end

}