STOMP is the Simple (or Streaming) Text Orientated Messaging Protocol.

STOMP provides an interoperable wire format so that STOMP clients can communicate with any STOMP message broker to provide easy and widespread messaging interoperability among many languages, platforms and brokers. Get more details about STOMP on https://stomp.github.io/index.html.

Vertx-Stomp is an implementation of a STOMP server and client. You can use the STOMP server with other clients and use the STOMP client with other servers. The server and the client supports the version 1.0, 1.1 and 1.2 of the STOMP protocol (see https://stomp.github.io/stomp-specification-1.2.html). The STOMP server can also be used as a bridge with the vert.x event bus.

Using vertx-stomp

To use the Vert.x Stomp server and client, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-stomp</artifactId>
  <version>3.1.0</version>
</dependency>
  • Gradle (in your build.gradle file):

compile io.vertx:vertx-stomp:3.1.0

STOMP server

Creating a STOMP server

The simplest way to create an STOMP server, using all default options is as follows:

import io.vertx.groovy.ext.stomp.StompServerHandler
import io.vertx.groovy.ext.stomp.StompServer
def server = StompServer.create(vertx).handler(StompServerHandler.create(vertx)).listen()

This creates a STOMP server listening on localhost:61613 that is compliant with the STOMP specification.

You can configure the port and host in the listen method:

import io.vertx.groovy.ext.stomp.StompServerHandler
import io.vertx.groovy.ext.stomp.StompServer
def server = StompServer.create(vertx).handler(StompServerHandler.create(vertx)).listen(1234, "0.0.0.0")

To be notified when the server is ready, use a handler as follows:

import io.vertx.groovy.ext.stomp.StompServerHandler
import io.vertx.groovy.ext.stomp.StompServer
def server = StompServer.create(vertx).handler(StompServerHandler.create(vertx)).listen({ ar ->
  if (ar.failed()) {
    println("Failing to start the STOMP server : ${ar.cause().getMessage()}")
  } else {
    println("Ready to receive STOMP frames")
  }
})

The handler receive a reference on the StompServer.

You can also configure the host and port in StompServerOptions:

import io.vertx.groovy.ext.stomp.StompServerHandler
import io.vertx.groovy.ext.stomp.StompServer
def server = StompServer.create(vertx, [
  port:1234,
  host:"0.0.0.0"
]).handler(StompServerHandler.create(vertx)).listen()

Closing a STOMP server

STOMP servers are closed as follows:

server.close({ ar ->
  if (ar.succeeded()) {
    println("The STOMP server has been closed")
  } else {
    println("The STOMP server failed to close : ${ar.cause().getMessage()}")
  }
})

Configuration

The StompServerOptions let you configure some aspects of the STOMP server.

First, the STOMP server is based on a NetServer, so you can configure the underlying NetServer from the StompServerOptions. Alternatively you can also pass the NetServer you want to use:

import io.vertx.groovy.ext.stomp.StompServerHandler
import io.vertx.groovy.ext.stomp.StompServer
def server = StompServer.create(vertx, netServer).handler(StompServerHandler.create(vertx)).listen()

The StompServerOptions let you configure:

  • the host and port of the STOMP server - defaults to 0.0.0.0:61613.

  • whether or not the STOMP server is secured - defaults to false

  • the max STOMP frame body - default to 10 Mb

  • the maximum number of headers accepted in a STOMP frame - defaults to 1000

  • the max length of a header line in a STOMP frame - defaults to 10240

  • the STOMP heartbeat time - default to 1000, 1000

  • the supported STOMP protocol versions (1.0, 1.1 and 1.2 by default)

  • the maximum number of frame allowed in a transaction (defaults to 1000)

  • the size of the transaction chunk - defaults to 1000 (see transactionChunkSize)

  • the maximum number of subscriptions a client can handle - defaults to 1000

The STOMP heartbeat is configured using a JSON object as follows:

import io.vertx.groovy.ext.stomp.StompServerHandler
import io.vertx.groovy.ext.stomp.StompServer
def server = StompServer.create(vertx, [
  heartbeat:[
    x:1000,
    y:1000
  ]
]).handler(StompServerHandler.create(vertx)).listen()

Enabling security requires an additional AuthProvider handling the authentication requests:

import io.vertx.groovy.ext.stomp.StompServerHandler
import io.vertx.groovy.ext.stomp.StompServer
def server = StompServer.create(vertx, [
  secured:true
]).handler(StompServerHandler.create(vertx).authProvider(provider)).listen()

More information about AuthProvider is available here.

If a frame exceeds one of the size limits, the frame is rejected and the client receives an ERROR frame. As the specification requires, the client connection is closed immediately after having sent the error. The same behavior happens with the other thresholds.

Subscriptions

The default STOMP server handles subscription destination as opaque Strings. So it does not promote a structure and it not hierarchic. By default the STOMP server follow a topic semantic (so messages are dispatched to all subscribers).

Type of destinations

By default, the STOMP server manages destinations as topics. So messages are dispatched to all subscribers. You can configure the server to use queues, or mix both types:

import io.vertx.groovy.ext.stomp.Destination
import io.vertx.groovy.ext.stomp.StompServerHandler
import io.vertx.groovy.ext.stomp.StompServer
def server = StompServer.create(vertx).handler(StompServerHandler.create(vertx).destinationFactory({ v, name ->
  if (name.startsWith("/queue")) {
    return Destination.queue(vertx, name)
  } else {
    return Destination.topic(vertx, name)
  }
})).listen()

In the last example, all destination starting with /queue are queues while others are topics. The destination is created when the first subscription on this destination is received.

A server can decide to reject the destination creation by returning null:

import io.vertx.groovy.ext.stomp.Destination
import io.vertx.groovy.ext.stomp.StompServerHandler
import io.vertx.groovy.ext.stomp.StompServer
def server = StompServer.create(vertx).handler(StompServerHandler.create(vertx).destinationFactory({ v, name ->
  if (name.startsWith("/forbidden")) {
    return null
  } else if (name.startsWith("/queue")) {
    return Destination.queue(vertx, name)
  } else {
    return Destination.topic(vertx, name)
  }
})).listen()

In this case, the subscriber received an ERROR frame.

Queues dispatches messages using a round-robin strategies.

Providing your own type of destination

On purpose the STOMP server does not implement any advanced feature. IF you need more advanced dispatching policy, you can implement your own type of destination by providing a DestinationFactory returning your own Destination object.

Acknowledgment

By default, the STOMP server does nothing when a message is not acknowledged. You can customize this by providing your own Destination implementation.

The custom destination should call the onAck