Kotlin API

TODO== RxJava 2 API

The Rxified API supports RxJava 1 and RxJava 2, the following examples use RxJava 2.

Single<RowSet<Row>> single = pool.query("SELECT * FROM users WHERE id='julien'").rxExecute();

// Execute the query
single.subscribe(result -> {
  System.out.println("Got " + result.size() + " rows ");
}, err -> {
  System.out.println("Failure: " + err.getMessage());
});

Connection

The simplified connection API allows to easily use a connection, the withConnection method borrows a connection from the pool and return it for you:

Maybe<RowSet<Row>> maybe = pool.rxWithConnection((Function<SqlConnection, Maybe<RowSet<Row>>>) conn ->
  conn
    .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
    .rxExecute()
    .flatMap(result -> conn
      .query("SELECT * FROM Users")
      .rxExecute())
    .toMaybe());

maybe.subscribe(rows -> {
  // Success
}, err -> {
  // Failed
});

Transaction

The simplified transaction API allows to easily write transactional asynchronous flows, The withTransaction method start and commit a transaction for you:

Completable completable = pool.rxWithTransaction((Function<SqlConnection, Maybe<RowSet<Row>>>) conn ->
  conn
    .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
    .rxExecute()
    .flatMap(result -> conn
      .query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
      .rxExecute())
    .toMaybe())
  .ignoreElement();

completable.subscribe(() -> {
  // Transaction succeeded
}, err -> {
  // Transaction failed
});

Streaming

RxJava 2 supports Observable and Flowable types, these are exposed using the RowStream that you can get from a PreparedQuery:

Observable<Row> observable = pool.rxGetConnection().flatMapObservable(conn -> conn
  .rxBegin()
  .flatMapObservable(tx ->
    conn
      .rxPrepare("SELECT * FROM users WHERE first_name LIKE $1")
      .flatMapObservable(preparedQuery -> {
        // Fetch 50 rows at a time
        RowStream<Row> stream = preparedQuery.createStream(50, Tuple.of("julien"));
        return stream.toObservable();
      })
      .doAfterTerminate(tx::commit)));

// Then subscribe
observable.subscribe(row -> {
  System.out.println("User: " + row.getString("last_name"));
}, err -> {
  System.out.println("Error: " + err.getMessage());
}, () -> {
  System.out.println("End of stream");
});

The same example using Flowable:

Flowable<Row> flowable = pool.rxGetConnection().flatMapPublisher(conn -> conn
  .rxBegin()
  .flatMapPublisher(tx ->
    conn
      .rxPrepare("SELECT * FROM users WHERE first_name LIKE $1")
      .flatMapPublisher(preparedQuery -> {
        // Fetch 50 rows at a time
        RowStream<Row> stream = preparedQuery.createStream(50, Tuple.of("julien"));
        return stream.toFlowable();
      })
      .doAfterTerminate(tx::commit)));

// Then subscribe
flowable.subscribe(new Subscriber<Row>() {

  private Subscription sub;

  @Override
  public void onSubscribe(Subscription subscription) {
    sub = subscription;
    subscription.request(1);
  }

  @Override
  public void onNext(Row row) {
    sub.request(1);
    System.out.println("User: " + row.getString("last_name"));
  }

  @Override
  public void onError(Throwable err) {
    System.out.println("Error: " + err.getMessage());
  }

  @Override
  public void onComplete() {
    System.out.println("End of stream");
  }
});