The Reactive DB2 Client is a client for DB2 with a straightforward API focusing on scalability and low overhead.

The client is reactive and non blocking, allowing to handle many database connections with a single thread.

Features

  • Support for DB2 on Linux, Unix, and Windows

  • Limited support for DB2 on z/OS

  • Event driven

  • Lightweight

  • Built-in connection pooling

  • Prepared queries caching

  • Batch and cursor

  • Row streaming

  • RxJava 1 and RxJava 2

  • Direct memory to object without unnecessary copies

  • Java 8 Date and Time

  • SSL/TLS

  • HTTP/1.x CONNECT, SOCKS4a or SOCKS5 proxy support

Current limitations

  • No stored procedures support

  • Some column types (e.g. BLOB and CLOB) are not supported

Usage

To use the Reactive DB2 Client add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-lang-groovy</artifactId>
 <version>4.0.0.Beta1</version>
</dependency>
  • Gradle (in your build.gradle file):

dependencies {
 compile 'io.vertx:vertx-lang-groovy:4.0.0.Beta1'
}

Getting started

Here is the simplest way to connect, query and disconnect

// Connect options
def connectOptions = [
  port:50000,
  host:"the-host",
  database:"the-db",
  user:"user",
  password:"secret"
]

// Pool options
def poolOptions = [
  maxSize:5
]

// Create the client pool
def client = DB2Pool.pool(connectOptions, poolOptions)

// A simple query
client.query("SELECT * FROM users WHERE id='julien'").execute({ ar ->
  if (ar.succeeded()) {
    def result = ar.result()
    println("Got ${result.size()} rows ")
  } else {
    println("Failure: ${ar.cause().getMessage()}")
  }

  // Now close the pool
  client.close()
})

Connecting to DB2

Most of the time you will use a pool to connect to DB2:

// Connect options
def connectOptions = [
  port:50000,
  host:"the-host",
  database:"the-db",
  user:"user",
  password:"secret"
]

// Pool options
def poolOptions = [
  maxSize:5
]

// Create the pooled client
def client = DB2Pool.pool(connectOptions, poolOptions)

The pooled client uses a connection pool and any operation will borrow a connection from the pool to execute the operation and release it to the pool.

If you are running with Vert.x you can pass it your Vertx instance:

// Connect options
def connectOptions = [
  port:50000,
  host:"the-host",
  database:"the-db",
  user:"user",
  password:"secret"
]

// Pool options
def poolOptions = [
  maxSize:5
]
// Create the pooled client
def client = DB2Pool.pool(vertx, connectOptions, poolOptions)

You need to release the pool when you don’t need it anymore:

// Close the pool and all the associated resources
pool.close()

When you need to execute several operations on the same connection, you need to use a client connection.

You can easily get one from the pool:

// Connect options
def connectOptions = [
  port:50000,
  host:"the-host",
  database:"the-db",
  user:"user",
  password:"secret"
]

// Pool options
def poolOptions = [
  maxSize:5
]

// Create the pooled client
def client = DB2Pool.pool(vertx, connectOptions, poolOptions)

// Get a connection from the pool
client.getConnection().compose({ conn ->
  println("Got a connection from the pool")

  // All operations execute on the same connection
  return conn.query("SELECT * FROM users WHERE id='julien'").execute().compose({ res ->
    conn.query("SELECT * FROM users WHERE id='emad'").execute()
  }).onComplete({ ar ->
    // Release the connection to the pool
    conn.close()
  })
}).onComplete({ ar ->
  if (ar.succeeded()) {

    println("Done")
  } else {
    println("Something went wrong ${ar.cause().getMessage()}")
  }
})

Once you are done with the connection you must close it to release it to the pool, so it can be reused.

Configuration

There are several alternatives for you to configure the client.

data object

A simple way to configure the client is to specify a DB2ConnectOptions data object.

// Data object
def connectOptions = [
  port:50000,
  host:"the-host",
  database:"the-db",
  user:"user",
  password:"secret"
]

// Pool Options
def poolOptions = [
  maxSize:5
]

// Create the pool from the data object
def pool = DB2Pool.pool(vertx, connectOptions, poolOptions)

pool.getConnection({ ar ->
  // Handling your connection
})

You can also configure the generic properties with the setProperties or addProperty methods. Note setProperties will override the default client properties.

connection uri

Apart from configuring with a DB2ConnectOptions data object, We also provide you an alternative way to connect when you want to configure with a connection URI:

// Connection URI
def connectionUri = "db2://dbuser:secretpassword@database.server.com:50000/mydb"

// Create the pool from the connection URI
def pool = DB2Pool.pool(connectionUri)

// Create the connection from the connection URI
DB2Connection.connect(vertx, connectionUri, { res ->
  // Handling your connection
})

The URI format for a connection string is:

db2://<USERNAME>:<PASSWORD>@<HOSTNAME>:<PORT>/<DBNAME>

Currently the client supports the following parameter key words in connection uri

  • host

  • port

  • user

  • password

  • dbname

Note: configuring properties in connection URI will override the default properties.

Running queries

When you don’t need a transaction or run single queries, you can run queries directly on the pool; the pool will use one of its connection to run the query and return the result to you.

Here is how to run simple queries:

client.query("SELECT * FROM users WHERE id='andy'").execute({ ar ->
  if (ar.succeeded()) {
    def result = ar.result()
    println("Got ${result.size()} rows ")
  } else {
    println("Failure: ${ar.cause().getMessage()}")
  }
})

Prepared queries

You can do the same with prepared queries.

The SQL string can refer to parameters by position, using the database syntax `?`​

client.preparedQuery("SELECT * FROM users WHERE id=$1").execute(Tuple.of("andy"), { ar ->
  if (ar.succeeded()) {
    def rows = ar.result()
    println("Got ${rows.size()} rows ")
  } else {
    println("Failure: ${ar.cause().getMessage()}")
  }
})

Query methods provides an asynchronous RowSet instance that works for SELECT queries

client.preparedQuery("SELECT first_name, last_name FROM users").execute({ ar ->
  if (ar.succeeded()) {
    def rows = ar.result()
    rows.each { row ->
      println("User ${row.getString(0)} ${row.getString(1)}")
    }
  } else {
    println("Failure: ${ar.cause().getMessage()}")
  }
})

or UPDATE/INSERT queries:

client.preparedQuery("INSERT INTO users (first_name, last_name) VALUES ($1, $2)").execute(Tuple.of("Andy", "Guibert"), { ar ->
  if (ar.succeeded()) {
    def rows = ar.result()
    println(rows.rowCount())
  } else {
    println("Failure: ${ar.cause().getMessage()}")
  }
})

The Row gives you access to your data by index

println("User ${row.getString(0)} ${row.getString(1)}")

or by name

println("User ${row.getString("first_name")} ${row.getString("last_name")}")

The client will not do any magic here and the column name is identified with the name in the table regardless of how your SQL text is.

You can access a wide variety of of types

def firstName = row.getString("first_name")
def male = row.getBoolean("male")
def age = row.getInteger("age")

// ...

You can use cached prepared statements to execute one-shot prepared queries:

// Enable prepare statements caching
connectOptions.cachePreparedStatements = true
client.preparedQuery("SELECT * FROM users WHERE id = ?").execute(Tuple.of("julien"), { ar ->
  if (ar.succeeded()) {
    def rows = ar.result()
    println("Got ${rows.size()} rows ")
  } else {
    println("Failure: ${ar.cause().getMessage()}")
  }
})

You can create a PreparedStatement and manage the lifecycle by yourself.

sqlConnection.prepare("SELECT * FROM users WHERE id= ?", { ar ->
  if (ar.succeeded()) {
    def preparedStatement = ar.result()
    preparedStatement.query().execute(Tuple.of("julien"), { ar2 ->
      if (ar2.succeeded()) {
        def rows = ar2.result()
        println("Got ${rows.size()} rows ")
        preparedStatement.close()
      } else {
        println("Failure: ${ar2.cause().getMessage()}")
      }
    })
  } else {
    println("Failure: ${ar.cause().getMessage()}")
  }
})

Batches

You can execute prepared batch

// Add commands to the batch
def batch = []
batch.add(Tuple.of("julien", "Julient Viet"))
batch.add(Tuple.of("emad", "Emad Alblueshi"))
batch.add(Tuple.of("andy", "Andy Guibert"))

// Execute the prepared batch
client.preparedQuery("INSERT INTO USERS (id, name) VALUES ($1, $2)").executeBatch(batch, { res ->
  if (res.succeeded()) {

    // Process rows
    def rows = res.result()
  } else {
    println("Batch failed ${res.cause()}")
  }
})

You can fetch generated keys by wrapping your query in SELECT <COLUMNS> FROM FINAL TABLE ( <SQL> ), for example:

client.preparedQuery("SELECT color_id FROM FINAL TABLE ( INSERT INTO color (color_name) VALUES (?), (?), (?) )").execute(Tuple.of("white", "red", "blue"), { ar ->
  if (ar.succeeded()) {
    def rows = ar.result()
    println("Inserted ${rows.rowCount()} new rows.")
    rows.each { row ->
      println("generated key: ${row.getInteger("color_id")}")
    }
  } else {
    println("Failure: ${ar.cause().getMessage()}")
  }
})

Using connections

When you need to execute sequential queries (without a transaction), you can create a new connection or borrow one from the pool:

Code not translatable

Prepared queries can be created:

connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", { ar1 ->
  if (ar1.succeeded()) {
    def pq = ar1.result()
    pq.query().execute(Tuple.of("andy"), { ar2 ->
      if (ar2.succeeded()) {
        // All rows
        def rows = ar2.result()
      }
    })
  }
})

Using transactions

Transactions with connections

You can execute transaction using SQL BEGIN/COMMIT/ROLLBACK, if you do so you must use a SqlConnection and manage it yourself.

Or you can use the transaction API of SqlConnection:

pool.getConnection({ res ->
  if (res.succeeded()) {

    // Transaction must use a connection
    def conn = res.result()

    // Begin the transaction
    conn.begin({ ar0 ->
      if (ar0.succeeded()) {
        def tx = ar0.result()
        // Various statements
        conn.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')").execute({ ar1 ->
          if (ar1.succeeded()) {
            conn.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')").execute({ ar2 ->
              if (ar2.succeeded()) {
                // Commit the transaction
                tx.commit({ ar3 ->
                  if (ar3.succeeded()) {
                    println("Transaction succeeded")
                  } else {
                    println("Transaction failed ${ar3.cause().getMessage()}")
                  }
                  // Return the connection to the pool
                  conn.close()
                })
              } else {
                // Return the connection to the pool
                conn.close()
              }
            })
          } else {
            // Return the connection to the pool
            conn.close()
          }
        })
      } else {
        // Return the connection to the pool
        conn.close()
      }
    })
  }
})

When the database server reports the current transaction is failed (e.g the infamous current transaction is aborted, commands ignored until end of transaction block), the transaction is rollbacked and the completion future is failed with a TransactionRollbackException:

tx.completion().onFailure({ err ->
  println("Transaction failed => rollbacked")
})

Simplified transaction API

When you use a pool, you can call withTransaction to pass it a function executed within a transaction.

It borrows a connection from the pool, begins the transaction and calls the function with a client executing all operations in the scope of this transaction.

The function must return a future of an arbitrary result:

  • when the future succeeds the client will commit the transaction

  • when the future fails the client will rollback the transaction

After the transaction completes, the connection is returned to the pool and the overall result is provided.

// Acquire a transaction and begin the transaction
pool.withTransaction({ client ->
  client.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')").execute().flatMap({ res ->
    client.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')").execute().map("Users inserted")
  })
}).onComplete({ ar ->
  // The connection was automatically return to the pool
  if (ar.succeeded()) {
    // Transaction was committed
    def message = ar.result()
    println("Transaction succeeded: ${message}")
  } else {
    // Transaction was rolled back
    println("Transaction failed ${ar.cause().getMessage()}")
  }
})

Cursors and streaming

By default prepared query execution fetches all rows, you can use a Cursor to control the amount of rows you want to read:

connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", { ar0 ->
  if (ar0.succeeded()) {
    def pq = ar0.result()

    // Cursors require to run within a transaction
    connection.begin({ ar1 ->
      if (ar1.succeeded()) {
        def tx = ar1.result()

        // Create a cursor
        def cursor = pq.cursor(Tuple.of("julien"))

        // Read 50 rows
        cursor.read(50, { ar2 ->
          if (ar2.succeeded()) {
            def rows = ar2.result()

            // Check for more ?
            if (cursor.hasMore()) {
              // Repeat the process...
            } else {
              // No more rows - commit the transaction
              tx.commit()
            }
          }
        })
      }
    })
  }
})

Cursors shall be closed when they are released prematurely:

cursor.read(50, { ar2 ->
  if (ar2.succeeded()) {
    // Close the cursor
    cursor.close()
  }
})

A stream API is also available for cursors, which can be more convenient, specially with the Rxified version.

connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", { ar0 ->
  if (ar0.succeeded()) {
    def pq = ar0.result()

    // Streams require to run within a transaction
    connection.begin({ ar1 ->
      if (ar1.succeeded()) {
        def tx = ar1.result()

        // Fetch 50 rows at a time
        def stream = pq.createStream(50, Tuple.of("julien"))

        // Use the stream
        stream.exceptionHandler({ err ->
          println("Error: ${err.getMessage()}")
        })
        stream.endHandler({ v ->
          tx.commit()
          println("End of stream")
        })
        stream.handler({ row ->
          println("User: ${row.getString("last_name")}")
        })
      }
    })
  }
})

The stream read the rows by batch of 50 and stream them, when the rows have been passed to the handler, a new batch of 50 is read and so on.

The stream can be resumed or paused, the loaded rows will remain in memory until they are delivered and the cursor will stop iterating.

DB2 type mapping

Currently the client supports the following DB2 types

  • BOOLEAN (java.lang.Boolean) (DB2 LUW only)

  • SMALLINT (java.lang.Short)

  • INTEGER (java.lang.Integer)

  • BIGINT (java.lang.Long)

  • REAL (java.lang.Float)

  • DOUBLE (java.lang.Double)

  • DECIMAL (io.vertx.sqlclient.data.Numeric)

  • CHAR (java.lang.String)

  • VARCHAR (java.lang.String)

  • ENUM (java.lang.String)

  • DATE (java.time.LocalDate)

  • TIME (java.time.LocalTime)

  • TIMESTAMP (java.time.LocalDateTime)

  • BINARY (byte[])

  • VARBINARY (byte[])

  • ROWID (io.vertx.db2client.impl.drda.DB2RowId or java.sql.RowId) (DB2 z/OS only)

Some types that are currently NOT supported are:

  • XML

  • BLOB

  • CLOB

  • DBCLOB

  • GRAPHIC / VARGRAPHIC

For a further documentation on DB2 data types, see the following resources:

Tuple decoding uses the above types when storing values, it also performs on the fly conversion of the actual value when possible:

pool.query("SELECT an_int_column FROM exampleTable").execute({ ar ->
  def rowSet = ar.result()
  def row = rowSet.iterator().next()

  // Stored as INTEGER column type and represented as java.lang.Integer
  def value = row.getValue(0)

  // Convert to java.lang.Long
  def longValue = row.getLong(0)
})

Using Java enum types

You can map Java enum types to these column types:

  • Strings (VARCHAR, TEXT)

  • Numbers (SMALLINT, INTEGER, BIGINT)

client.preparedQuery("SELECT day_name FROM FINAL TABLE ( INSERT INTO days (day_name) VALUES (?), (?), (?) )").execute(Tuple.of(Days.FRIDAY, Days.SATURDAY, Days.SUNDAY), { ar ->
  if (ar.succeeded()) {
    def rows = ar.result()
    println("Inserted ${rows.rowCount()} new rows")
    rows.each { row ->
      println("Day: ${row.get(Days.class, "day_name")}")
    }
  } else {
    println("Failure: ${ar.cause().getMessage()}")
  }
})

client.preparedQuery("SELECT day_num FROM FINAL TABLE ( INSERT INTO days (day_num) VALUES (?), (?), (?) )").execute(Tuple.of(Days.FRIDAY.ordinal(), Days.SATURDAY.ordinal(), Days.SUNDAY.ordinal()), { ar ->
  if (ar.succeeded()) {
    def rows = ar.result()
    println("Inserted ${rows.rowCount()} new rows")
    rows.each { row ->
      println("Day: ${row.get(Days.class, "day_num")}")
    }
  } else {
    println("Failure: ${ar.cause().getMessage()}")
  }
})

The String type is matched with the Java enum’s name returned by the name() method.

Number types are matched with the Java enum’s ordinal returned by the ordinal() method and the row.get() method returns the corresponding enum’s name() value at the ordinal position of the integer value retrieved.

Collector queries

You can use Java collectors with the query API:

Code not translatable

The collector processing must not keep a reference on the Row as there is a single row used for processing the entire set.

The Java Collectors provides many interesting predefined collectors, for example you can create easily create a string directly from the row set:

Code not translatable

Using SSL/TLS

To configure the client to use SSL connection, you can configure the DB2ConnectOptions like a Vert.x NetClient.

def options = [
  port:50001,
  host:"the-host",
  database:"the-db",
  user:"user",
  password:"secret",
  ssl:true,
  trustStoreOptions:[
    path:"/path/to/keystore.p12",
    password:"keystoreSecret"
  ]
]

DB2Connection.connect(vertx, options, { res ->
  if (res.succeeded()) {
    // Connected with SSL
  } else {
    println("Could not connect ${res.cause()}")
  }
})

More information can be found in the Vert.x documentation.

Using a proxy

You can also configure the client to use an HTTP/1.x CONNECT, SOCKS4a or SOCKS5 proxy.

More information can be found in the Vert.x documentation.