RxJava is a popular library for composing asynchronous and event
based programs using observable sequences for the Java VM. RxGroovy
is the Reactive Extensions for Groovy. This adaptor allows groovy.lang.Closure
functions to be used
and RxJava will know how to invoke them.
Vert.x integrates naturally with RxGroovy, allowing to use observable wherever you can use streams or asynchronous results.
To use vert.x API for RxGroovy, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>{maven-groupId}</groupId>
<artifactId>{maven-artifactId}</artifactId>
<version>{maven-version}</version>
</dependency>
-
Gradle (in your
build.gradle
file):
compile {maven-groupId}:{maven-artifactId}:{maven-version}
Read stream support
RxJava observable is a perfect match for Vert.x ReadStream
class : both provides provides a flow of items.
Vert.x API for Groovy provides io.vertx.groovy.core.stream.ReadStream
objects, the RxGroovy provides a
Groovy extension module that adds the toObservable
method to the read stream class.
def fs = vertx.fileSystem()
fs.open("/data.txt", [:], { result ->
def file = result.result()
def observable = file.toObservable()
observable.forEach({ data -> println("Read data: ${data.toString("UTF-8")}") })
})
Handler support
The RxJava io.vertx.ext.rx.java.RxHelper
should be used to:
- create an io.vertx.ext.rx.java.ObservableHandler
,
- transform actions to an handler
The RxGroovy extension module adds the toHandler
method on the rx.Observer
class:
Observer<Long> observer = Observers.create({ item -> println("Timer fired!") })
Handler<Long> handler = observer.toHandler()
vertx.setTimer(1000, handler)
Async result support
In Vert.x future objects are modelled as async result handlers and occur as last parameter of asynchronous methods.
The RxGroovy extension module adds the toFuture
method on the rx.Observer
class:
Observer<HttpServer> observer = Observers.create({ server -> println("Server started") })
Handler<AsyncResult<HttpServer>> handler = observer.toFuture()
vertx.createHttpServer([port:1234,host:"localhost"]).listen(handler)
Scheduler support
The reactive extension sometimes needs to schedule actions, for instance Observable#timer
create and returns
a timer that emit periodic events. By default, scheduled actions are managed by RxJava, it means that the
timer thread are not Vert.x threads and therefore not executing in a Vert.x event loop.
When an RxJava method deals with a scheduler, it accepts an overloaded method accepting an extra Rx.Scheduler
,
the RxGroovy extension module adds to the Vertx
class the scheduler()
method will return a scheduler that can be used in such places.
Observable<Long> timer = Observable.timer(100, 100, TimeUnit.MILLISECONDS, vertx.scheduler())
For blocking scheduled actions, a scheduler can be created with the blockingScheduler
method:
Scheduler scheduler = vertx.blockingScheduler();
Observable<Integer> obs = blockingObservable.observeOn(scheduler);
RxJava can also be configured to use a scheduler by default, the returned scheduler hook uses a blocking scheduler for IO actions:
RxJavaSchedulersHook hook = RxHelper.schedulerHook(vertx)
RxJavaPlugins.getInstance().registerSchedulersHook(hook)
Json unmashalling
The io.vertx.rx.groovy.RxHelper#unmarshaller(java.lang.Class)}
creates an rx.Observable.Operator
that
transforms an Observable<Buffer>
in json format into an object observable:
def fileSystem = vertx.fileSystem()
fileSystem.open("/data.txt", [:], { result ->
AsyncFile file = result.result()
Observable<Buffer> observable = file.toObservable()
observable.lift(RxHelper.unmarshaller(MyPojo.class)).subscribe({ mypojo ->
// Process the object
})
})
Api examples
Let’s study now a few examples of using Vert.x with RxJava.
EventBus message stream
The event bus MessageConsumer
provides naturally an Observable<Message<T>>
:
EventBus eb = vertx.eventBus()
MessageConsumer<String> consumer = eb.<String>consumer("the-address")
Observable<Message<String>> observable = consumer.toObservable()
Subscription sub = observable.subscribe({ msg ->
// Got message
});
// Unregisters the stream after 10 seconds
vertx.setTimer(10000, { id ->
sub.unsubscribe()
});
The MessageConsumer
provides a stream of Message
.
The Message#body()
gives access to a new stream of message bodies if needed:
EventBus eb = vertx.eventBus()
MessageConsumer<String> consumer = eb.<String>consumer("the-address")
Observable<String> observable = consumer.bodyStream().toObservable()
RxJava map/reduce composition style can be then be used:
Observable<Double> observable = vertx.eventBus().
<Double>consumer("heat-sensor").
bodyStream().
toObservable()
observable.
buffer(1, TimeUnit.SECONDS).
map({ samples -> samples.sum() }).
subscribe({ heat ->
vertx.eventBus().send("news-feed", "Current heat is " + heat)
});
Timers
Timer task can be created with Vertx#timerStream(long)
:
vertx.timerStream(1000).
toObservable().
subscribe({ id ->
println("Callback after 1 second")
}
)
Periodic task can be created with Vertx#periodicStream(long)
:
vertx.periodicStream(1000).
toObservable().
subscribe({ id ->
println("Callback every second")
}
)
The observable can be cancelled with an unsubscription:
vertx.periodicStream(1000).
toObservable().
subscribe(new Subscriber<Long>() {
public void onNext(Long aLong) {
// Callback
unsubscribe()
}
public void onError(Throwable e) {}
public void onCompleted() {}
})
Http client requests
HttpClientRequest#toObservable()
provides a one shot callback with the HttpClientResponse
}
object. The observable reports a request failure.
HttpClientRequest request = client.request(HttpMethod.GET, 8080, "localhost", "/the_uri")
request.toObservable().subscribe({ response ->
// Process the response
}, { error ->
// Could not connect
})
request.end()
The response can be processed as an `Observable<Buffer>` with the `HttpClientResponse#toObservable()` method:
HttpClientRequest request = client.request(HttpMethod.GET, 8080, "localhost", "/the_uri")
request.toObservable().
subscribe(
{ response ->
Observable<Buffer> observable = response.toObservable()
observable.forEach(
{ buffer ->
// Process buffer
})
})
The same flow can be achieved with the flatMap
operation:
request.toObservable().
flatMap({resp -> resp.&toObservable}).
forEach(
{ buffer ->
// Process buffer
})
We can also unmarshall the Observable<Buffer>
into an object using the {@link io.vertx.rx.groovy.RxHelpe.RxHelper#unmarshaller(java.lang.Class)}
static method. This method creates an Rx.Observable.Operator
unmarshalling buffers to an object:
request.toObservable().
flatMap({ resp -> resp.&toObservable }).
lift(RxHelper.unmarshaller(MyPojo.class)).
forEach({ pojo ->
// Process pojo
})
Http server requests
The HttpServer#requestStream()
provides a callback for each incoming request:
Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable()
requestObservable.subscribe({ request ->
// Process request
})
The HttpServerRequest
can then be adapted to an Observable<Buffer>
:
Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable()
requestObservable.subscribe({ request ->
Observable<Buffer> observable = request.toObservable()
});
The io.vertx.rx.groovy.RxHelpe.RxHelper#unmarshaller(java.lang.Class)}
can be used to parse and map a json request to an object:
Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable()
requestObservable.subscribe({ request ->
Observable<MyPojo> observable = request.
toObservable().
lift(RxHelper.unmarshaller(MyPojo.class))
})
Websocket client
The`HttpClient#websocketStream`} provides a single callback when the websocket connects, otherwise a failure:
HttpClient client = vertx.createHttpClient()
WebSocketStream stream = client.websocketStream(8080, "localhost", "/the_uri")
Observable<WebSocket> socketObservable = stream.toObservable()
socketObservable.subscribe(
{ ws ->
// Use the websocket
}, { error ->
// Could not connect
})
The WebSocket
can then be turned into an Observable<Buffer>
easily
socketObservable.subscribe(
{ socket ->
Observable<Buffer> dataObs = socket.toObservable()
dataObs.subscribe({ buffer ->
println("Got message ${buffer.toString("UTF-8")}")
})
})
Websocket server
The HttpServer#websocketStream()
provides a callback for each incoming connection:
Observable<ServerWebSocket> socketObservable = server.websocketStream().toObservable()
socketObservable.subscribe(
{ socket -> println("Web socket connect") },
{ failure -> println("Should never be called") },
{ println("Subscription ended or server closed") }
)
The ServerWebSocket
can be turned into an Observable<Buffer>
easily:
Observable<ServerWebSocket> socketObservable = server.websocketStream().toObservable()
socketObservable.subscribe({ socket ->
Observable<Buffer> dataObs = socket.toObservable()
dataObs.subscribe({ buffer ->
println("Got message ${buffer.toString("UTF-8")}")
})
})