RxJava 2 API
SQL Transaction management with RxJava
Managing SQL transactions manually with the Rxified SQLConnection
requires quite a lot of boilerplate code.
Vert.x provides observable transformers that you can apply to your flows with compose
to make them transactional:
-
SQLClientHelper#txFlowableTransformer
-
SQLClientHelper#txObservableTransformer
-
SQLClientHelper#txSingleTransformer
-
SQLClientHelper#txMaybeTransformer
-
SQLClientHelper#txCompletableTransformer
These transformers wrap the corresponding source of events with SQL transaction management. Let’s take an example.
In your music library application, you need to insert a row in table albums
, then some rows in table tracks
.
These two steps shall be part of the same atomic transaction.
If it succeeds, the application must return results from a query involving both tables.
After your got an instance of io.vertx.reactivex.ext.sql.SQLConnection
, you can use it to perform the SQL operations:
conn.rxExecute("... insert into album ...")
.andThen(conn.rxExecute("... insert into tracks ..."))
.compose(SQLClientHelper.txCompletableTransformer(conn)) (1)
.andThen(conn.rxQuery("... select from album, tracks ...").map(ResultSet::getResults))
.subscribe(rows -> {
// send to client
}, throwable -> {
// handle error
});
-
Transaction management appplied to the
Completable
source
Source transformers provide maximum flexibility: you are still able to execute operations with the connection after the transaction completes.
But more often than not, you do not need the connection after the changes are commited or rollbacked.
In this case, you may simply create you source observable with one of the transactional helper methods in io.vertx.reactivex.ext.sql.SQLClientHelper
.
Let’s rewrite the previous example:
SQLClientHelper.inTransactionSingle(sqlClient, conn -> {
return conn.rxExecute("... insert into album ...")
.andThen(conn.rxExecute("... insert into tracks ..."))
.andThen(conn.rxQuery("... select from album, tracks ...").map(ResultSet::getResults)); (1)
}).subscribe(rows -> {
// send to client
}, throwable -> {
// handle error
});
-
the
SELECT
query is now part of the transaction