Kotlin API

TODO== RxJava 2 API

The Rxified API supports RxJava 1 and RxJava 2, the following examples use RxJava 2.

Single<RowSet<Row>> single = pool.query("SELECT * FROM users WHERE id='julien'").rxExecute();

// Execute the query
single.subscribe(result -> {
  System.out.println("Got " + result.size() + " rows ");
}, err -> {
  System.out.println("Failure: " + err.getMessage());
});

Streaming

RxJava 2 supports Observable and Flowable types, these are exposed using the RowStream that you can get from a PreparedQuery:

Observable<Row> observable = pool.rxGetConnection().flatMapObservable(conn -> conn
  .rxBegin()
  .flatMapObservable(tx ->
    conn
      .rxPrepare("SELECT * FROM users WHERE first_name LIKE $1")
      .flatMapObservable(preparedQuery -> {
        // Fetch 50 rows at a time
        RowStream<Row> stream = preparedQuery.createStream(50, Tuple.of("julien"));
        return stream.toObservable();
      })
      .doAfterTerminate(tx::commit)));

// Then subscribe
observable.subscribe(row -> {
  System.out.println("User: " + row.getString("last_name"));
}, err -> {
  System.out.println("Error: " + err.getMessage());
}, () -> {
  System.out.println("End of stream");
});

The same example using Flowable:

Flowable<Row> flowable = pool.rxGetConnection().flatMapPublisher(conn -> conn
  .rxBegin()
  .flatMapPublisher(tx ->
    conn
      .rxPrepare("SELECT * FROM users WHERE first_name LIKE $1")
      .flatMapPublisher(preparedQuery -> {
        // Fetch 50 rows at a time
        RowStream<Row> stream = preparedQuery.createStream(50, Tuple.of("julien"));
        return stream.toFlowable();
      })
      .doAfterTerminate(tx::commit)));

// Then subscribe
flowable.subscribe(new Subscriber<Row>() {

  private Subscription sub;

  @Override
  public void onSubscribe(Subscription subscription) {
    sub = subscription;
    subscription.request(1);
  }

  @Override
  public void onNext(Row row) {
    sub.request(1);
    System.out.println("User: " + row.getString("last_name"));
  }

  @Override
  public void onError(Throwable err) {
    System.out.println("Error: " + err.getMessage());
  }

  @Override
  public void onComplete() {
    System.out.println("End of stream");
  }
});

Transaction

The simplified transaction API allows to easily write transactional asynchronous flows:

Completable completable = pool.rxGetConnection()
  .flatMapCompletable(conn -> conn
    .rxBegin()
    .flatMapCompletable(tx ->
    conn
      .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
      .rxExecute()
      .flatMap(result -> conn
        .query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
        .rxExecute())
      .flatMapCompletable(res -> tx.rxCommit())
  ));

completable.subscribe(() -> {
  // Transaction succeeded
}, err -> {
  // Transaction failed
});