Kotlin API
TODO== RxJava 2 API
The Rxified API supports RxJava 1 and RxJava 2, the following examples use RxJava 2.
Single<RowSet<Row>> single = pool.query("SELECT * FROM users WHERE id='julien'").rxExecute();
// Execute the query
single.subscribe(result -> {
System.out.println("Got " + result.size() + " rows ");
}, err -> {
System.out.println("Failure: " + err.getMessage());
});
Streaming
RxJava 2 supports Observable
and Flowable
types, these are exposed using
the RowStream
that you can get
from a PreparedQuery
:
Observable<Row> observable = pool.rxGetConnection().flatMapObservable(conn -> conn
.rxBegin()
.flatMapObservable(tx ->
conn
.rxPrepare("SELECT * FROM users WHERE first_name LIKE $1")
.flatMapObservable(preparedQuery -> {
// Fetch 50 rows at a time
RowStream<Row> stream = preparedQuery.createStream(50, Tuple.of("julien"));
return stream.toObservable();
})
.doAfterTerminate(tx::commit)));
// Then subscribe
observable.subscribe(row -> {
System.out.println("User: " + row.getString("last_name"));
}, err -> {
System.out.println("Error: " + err.getMessage());
}, () -> {
System.out.println("End of stream");
});
The same example using Flowable
:
Flowable<Row> flowable = pool.rxGetConnection().flatMapPublisher(conn -> conn
.rxBegin()
.flatMapPublisher(tx ->
conn
.rxPrepare("SELECT * FROM users WHERE first_name LIKE $1")
.flatMapPublisher(preparedQuery -> {
// Fetch 50 rows at a time
RowStream<Row> stream = preparedQuery.createStream(50, Tuple.of("julien"));
return stream.toFlowable();
})
.doAfterTerminate(tx::commit)));
// Then subscribe
flowable.subscribe(new Subscriber<Row>() {
private Subscription sub;
@Override
public void onSubscribe(Subscription subscription) {
sub = subscription;
subscription.request(1);
}
@Override
public void onNext(Row row) {
sub.request(1);
System.out.println("User: " + row.getString("last_name"));
}
@Override
public void onError(Throwable err) {
System.out.println("Error: " + err.getMessage());
}
@Override
public void onComplete() {
System.out.println("End of stream");
}
});
Transaction
The simplified transaction API allows to easily write transactional asynchronous flows:
Completable completable = pool.rxGetConnection()
.flatMapCompletable(conn -> conn
.rxBegin()
.flatMapCompletable(tx ->
conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.rxExecute()
.flatMap(result -> conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.rxExecute())
.flatMapCompletable(res -> tx.rxCommit())
));
completable.subscribe(() -> {
// Transaction succeeded
}, err -> {
// Transaction failed
});