RxJava 2 API

SQL Transaction management with RxJava

Managing SQL transactions manually with the Rxified SQLConnection requires quite a lot of boilerplate code.

Vert.x provides observable transformers that you can apply to your flows with compose to make them transactional:

  • SQLClientHelper#txFlowableTransformer

  • SQLClientHelper#txObservableTransformer

  • SQLClientHelper#txSingleTransformer

  • SQLClientHelper#txMaybeTransformer

  • SQLClientHelper#txCompletableTransformer

These transformers wrap the corresponding source of events with SQL transaction management. Let’s take an example.

In your music library application, you need to insert a row in table albums, then some rows in table tracks. These two steps shall be part of the same atomic transaction. If it succeeds, the application must return results from a query involving both tables.

After your got an instance of io.vertx.reactivex.ext.sql.SQLConnection, you can use it to perform the SQL operations:

conn.rxExecute("... insert into album ...")
  .andThen(conn.rxExecute("... insert into tracks ..."))
  .compose(SQLClientHelper.txCompletableTransformer(conn)) (1)
  .andThen(conn.rxQuery("... select from album, tracks ...").map(ResultSet::getResults))
  .subscribe(rows -> {
    // send to client
  }, throwable -> {
    // handle error
  });
  1. Transaction management appplied to the Completable source

Source transformers provide maximum flexibility: you are still able to execute operations with the connection after the transaction completes.

But more often than not, you do not need the connection after the changes are commited or rollbacked. In this case, you may simply create you source observable with one of the transactional helper methods in io.vertx.reactivex.ext.sql.SQLClientHelper.

Let’s rewrite the previous example:

SQLClientHelper.inTransactionSingle(sqlClient, conn -> {
  return conn.rxExecute("... insert into album ...")
    .andThen(conn.rxExecute("... insert into tracks ..."))
    .andThen(conn.rxQuery("... select from album, tracks ...").map(ResultSet::getResults)); (1)
}).subscribe(rows -> {
  // send to client
}, throwable -> {
  // handle error
});
  1. the SELECT query is now part of the transaction