Vert.x API for RxJava
RxJava is a popular library for composing asynchronous and event based programs using observable sequences for the Java VM.
Vert.x integrates naturally with RxJava, allowing to use observable wherever you can use streams or asynchronous results.
There are two ways for using the RxJava API with Vert.x:
-
via the original Vert.x API with the
RxHelper
helper class that provides static methods for converting objects between Vert.x core API and RxJava API. -
via the Rxified Vert.x API enhancing the core Vert.x API.
Read stream support
RxJava observable is a perfect match for Vert.x ReadStream
class : both provides provides a flow of items.
The RxHelper.toObservable
static methods converts
a Vert.x read stream to an rx.Observable
:
FileSystem fileSystem = vertx.fileSystem();
fileSystem.open("/data.txt", new OpenOptions(), result -> {
AsyncFile file = result.result();
Observable<Buffer> observable = RxHelper.toObservable(file);
observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8")));
});
The Rxified Vert.x API provides a toObservable
method on
ReadStream
:
FileSystem fs = vertx.fileSystem();
fs.open("/data.txt", new OpenOptions(), result -> {
AsyncFile file = result.result();
Observable<Buffer> observable = file.toObservable();
observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8")));
});
Such observables are hot observables, i.e they will produce notifications regardless of subscriptions.
Handler support
The RxHelper
can create an ObservableHandler
: an Observable
with a
toHandler
method returning an Handler<T>
implementation:
ObservableHandler<Long> observable = RxHelper.observableHandler();
observable.subscribe(id -> {
// Fired
});
vertx.setTimer(1000, observable.toHandler());
The Rxified Vert.x API does not provide a specific API for handler.
Async result support
The Vert.x Handler<AsyncResult<T>>
construct occuring as last parameter of an asynchronous methods can
be mapped to an observable of a single element:
-
when the callback is a success, the observer
onNext
method is called with the item and theonComplete
method is immediatly invoked after -
when the callback is a failure, the observer
onError
method is called
The RxHelper.observableFuture
method creates an ObservableFuture
:
an Observable
with a toHandler
method returning a Handler<AsyncResult<T>>
implementation:
ObservableFuture<HttpServer> observable = RxHelper.observableFuture();
observable.subscribe(
server -> {
// Server is listening
},
failure -> {
// Server could not start
}
);
vertx.createHttpServer(new HttpServerOptions().
setPort(1234).
setHost("localhost")
).listen(observable.toHandler());
The ObservableFuture<Server>
will get a single HttpServer
object, if the listen
operation fails,
the subscriber will be notified with the failure.
The RxHelper.toHandler
method adapts an existing Observer
into an handler:
Observer<HttpServer> observer = new Observer<HttpServer>() {
@Override
public void onNext(HttpServer o) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onCompleted() {
}
};
Handler<AsyncResult<HttpServer>> handler = RxHelper.toFuture(observer);
It also works with just actions:
Action1<HttpServer> onNext = httpServer -> {};
Action1<Throwable> onError = httpServer -> {};
Action0 onComplete = () -> {};
Handler<AsyncResult<HttpServer>> handler1 = RxHelper.toFuture(onNext);
Handler<AsyncResult<HttpServer>> handler2 = RxHelper.toFuture(onNext, onError);
Handler<AsyncResult<HttpServer>> handler3 = RxHelper.toFuture(onNext, onError, onComplete);
The Rxified Vert.x API duplicates each such method with the Observable
suffix that returns an observable:
vertx.createHttpServer(
new HttpServerOptions().setPort(1234).setHost("localhost")
).listenObservable().
subscribe(
server -> {
// Server is listening
},
failure -> {
// Server could not start
}
);
Such observables are cold observables, i.e they will produce notifications on request.
Scheduler support
The reactive extension sometimes needs to schedule actions, for instance Observable#timer
creates and returns
a timer that emit periodic events. By default, scheduled actions are managed by RxJava, it means that the
timer thread are not Vert.x threads and therefore not executing in a Vert.x event loop.
When an RxJava method deals with a scheduler, it accepts an overloaded method accepting an extra rx.Scheduler
,
the RxHelper.scheduler
method will return a scheduler that can be used
in such places.
Scheduler scheduler = RxHelper.scheduler(vertx);
Observable<Long> timer = Observable.timer(100, 100, TimeUnit.MILLISECONDS, scheduler);
For blocking scheduled actions, a scheduler can be created with the RxHelper.blockingScheduler
method:
Scheduler scheduler = RxHelper.blockingScheduler(vertx);
Observable<Integer> obs = blockingObservable.observeOn(scheduler);
RxJava can also be reconfigured to use the Vert.x scheduler, thanks to the scheduler hook created with
RxHelper.schedulerHook
, the returned scheduler hook
uses a blocking scheduler for IO actions:
RxJavaSchedulersHook hook = RxHelper.schedulerHook(vertx);
rx.plugins.RxJavaPlugins.getInstance().registerSchedulersHook(hook);
The Rxified Vert.x API provides also similar method on the RxHelper
class:
Scheduler scheduler = io.vertx.rxjava.core.RxHelper.scheduler(vertx);
Observable<Long> timer = Observable.timer(100, 100, TimeUnit.MILLISECONDS, scheduler);
RxJavaSchedulersHook hook = io.vertx.rxjava.core.RxHelper.schedulerHook(vertx);
rx.plugins.RxJavaPlugins.getInstance().registerSchedulersHook(hook);
Json unmashalling
The RxHelper.unmarshaller
creates an rx.Observable.Operator
that
transforms an Observable<Buffer>
in json format into an object observable:
fileSystem.open("/data.txt", new OpenOptions(), result -> {
AsyncFile file = result.result();
Observable<Buffer> observable = RxHelper.toObservable(file);
observable.lift(RxHelper.unmarshaller(MyPojo.class)).subscribe(
mypojo -> {
// Process the object
}
);
});
The same can be done with the Rxified helper:
fileSystem.open("/data.txt", new OpenOptions(), result -> {
AsyncFile file = result.result();
Observable<Buffer> observable = file.toObservable();
observable.lift(io.vertx.rxjava.core.RxHelper.unmarshaller(MyPojo.class)).subscribe(
mypojo -> {
// Process the object
}
);
});
Rxified API
io.vertx.rxjava
prefix, for instance the io.vertx.core.Vertx
class is
translated to the Vertx
class.
Embedding Rxfified Vert.x
Just use the Vertx.vertx
methods:
Vertx vertx = io.vertx.rxjava.core.Vertx.vertx();
As a Verticle
Extend the AbstractVerticle
class, it will wrap it for you:
class MyVerticle extends io.vertx.rxjava.core.AbstractVerticle {
public void start() {
// Use Rxified Vertx here
}
}
Deploying an RxJava verticle is still performed by the Java deployer and does not need a specified deployer.
Api examples
Let’s study now a few examples of using Vert.x with RxJava.
EventBus message stream
The event bus MessageConsumer
provides naturally an Observable<Message<T>>
:
EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Observable<Message<String>> observable = consumer.toObservable();
Subscription sub = observable.subscribe(msg -> {
// Got message
});
// Unregisters the stream after 10 seconds
vertx.setTimer(10000, id -> {
sub.unsubscribe();
});
The MessageConsumer
provides a stream of Message
.
The body
gives access to a new stream of message bodies if needed:
EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Observable<String> observable = consumer.bodyStream().toObservable();
RxJava map/reduce composition style can be then be used:
Observable<Double> observable = vertx.eventBus().
<Double>consumer("heat-sensor").
bodyStream().
toObservable();
observable.
buffer(1, TimeUnit.SECONDS).
map(samples -> samples.
stream().
collect(Collectors.averagingDouble(d -> d))).
subscribe(heat -> {
vertx.eventBus().send("news-feed", "Current heat is " + heat);
});
Timers
Timer task can be created with timerStream
:
vertx.timerStream(1000).
toObservable().
subscribe(
id -> {
System.out.println("Callback after 1 second");
}
);
Periodic task can be created with periodicStream
:
vertx.periodicStream(1000).
toObservable().
subscribe(
id -> {
System.out.println("Callback every second");
}
);
The observable can be cancelled with an unsubscription:
vertx.periodicStream(1000).
toObservable().
subscribe(new Subscriber<Long>() {
public void onNext(Long aLong) {
// Callback
unsubscribe();
}
public void onError(Throwable e) {}
public void onCompleted() {}
});
Http client requests
toObservable
provides a one shot callback with the
HttpClientResponse
object. The observable reports a request failure.
HttpClient client = vertx.createHttpClient(new HttpClientOptions());
HttpClientRequest request = client.request(HttpMethod.GET, 8080, "localhost", "/the_uri");
request.toObservable().subscribe(
response -> {
// Process the response
},
error -> {
// Could not connect
}
);
request.end();
The response can be processed as an Observable<Buffer>
with the
toObservable
method:
request.toObservable().
subscribe(
response -> {
Observable<Buffer> observable = response.toObservable();
observable.forEach(
buffer -> {
// Process buffer
}
);
}
);
The same flow can be achieved with the flatMap
operation:
request.toObservable().
flatMap(HttpClientResponse::toObservable).
forEach(
buffer -> {
// Process buffer
}
);
We can also unmarshall the Observable<Buffer>
into an object using the RxHelper.unmarshaller
static method. This method creates an Rx.Observable.Operator
unmarshalling buffers to an object:
request.toObservable().
flatMap(HttpClientResponse::toObservable).
lift(io.vertx.rxjava.core.RxHelper.unmarshaller(MyPojo.class)).
forEach(
pojo -> {
// Process pojo
}
);
Http server requests
The requestStream
provides a callback for each incoming
request:
Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
requestObservable.subscribe(request -> {
// Process request
});
The HttpServerRequest
can then be adapted to an Observable<Buffer>
:
Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable(); requestObservable.subscribe(request -> { Observable<Buffer> observable = request.toObservable(); });
The RxHelper.unmarshaller
can be used to parse and map
a json request to an object:
Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable(); requestObservable.subscribe(request -> { Observable<MyPojo> observable = request. toObservable(). lift(io.vertx.rxjava.core.RxHelper.unmarshaller(MyPojo.class)); });
Websocket client
The websocketStream
provides a single callback when the websocket
connects, otherwise a failure:
HttpClient client = vertx.createHttpClient(new HttpClientOptions());
WebSocketStream stream = client.websocketStream(8080, "localhost", "/the_uri");
stream.toObservable().subscribe(
ws -> {
// Use the websocket
},
error -> {
// Could not connect
}
);
The WebSocket
can then be turned into an Observable<Buffer>
easily
socketObservable.subscribe(
socket -> {
Observable<Buffer> dataObs = socket.toObservable();
dataObs.subscribe(buffer -> {
System.out.println("Got message " + buffer.toString("UTF-8"));
});
}
);
Websocket server
The websocketStream
provides a callback for each incoming
connection:
Observable<ServerWebSocket> socketObservable = server.websocketStream().toObservable();
socketObservable.subscribe(
socket -> System.out.println("Web socket connect"),
failure -> System.out.println("Should never be called"),
() -> {
System.out.println("Subscription ended or server closed");
}
);
The ServerWebSocket
can be turned into an Observable<Buffer>
easily:
socketObservable.subscribe(
socket -> {
Observable<Buffer> dataObs = socket.toObservable();
dataObs.subscribe(buffer -> {
System.out.println("Got message " + buffer.toString("UTF-8"));
});
}
);