This component provides a vert.x wrapper around the most important functions of Kafka’s AdminUtils. AdminUtils are used to create, modify, and delete topics. Other functionality covered by AdminUtils, but not this wrapper, includes Partition Management, Broker Configuration management, etc.
Using the AdminUtils
Create a topic
You can call createTopic
to create a topic.
Parameters are: topic name, number of partitions, number of replicas, and the usual callback to handle the result.
It might return an error, e.g. if the number of requested replicas is greater than the number of brokers.
var adminUtils = AdminUtils.create(Vertx.vertx(), "localhost:2181", true)
// Create topic 'myNewTopic' with 2 partition and 1 replicas
adminUtils.createTopicFuture("myNewTopic", 2, 1).onComplete{
case Success(result) => {
println("Creation of topic myNewTopic successful!")}
case Failure(cause) => {
println(s"$cause")
}
}
Delete a topic
You can call deleteTopic
to delete a topic.
Parameters are: topic name, and the usual callback to handle the result.
It might return an error, e.g. if the topic does not exist.
var adminUtils = AdminUtils.create(Vertx.vertx(), "localhost:2181", true)
// Delete topic 'myNewTopic'
adminUtils.deleteTopicFuture("myNewTopic").onComplete{
case Success(result) => {
println("Deletion of topic myNewTopic successful!")}
case Failure(cause) => {
println(s"$cause")
}
}
Change a topic’s configuration
If you need to update the configuration of a topic, e.g., you want to update the retention policy,
you can call changeTopicConfig
to update a topic.
Parameters are: topic name, a Map (String → String) with parameters to be changed,
and the usual callback to handle the result.
It might return an error, e.g. if the topic does not exist.
var adminUtils = AdminUtils.create(Vertx.vertx(), "localhost:2181", true)
// Set retention to 1000 ms and max size of the topic partition to 1 kiByte
var properties = Map()
properties + ("delete.retention.ms" -> "1000")
properties + ("retention.bytes" -> "1024")
adminUtils.changeTopicConfigFuture("myNewTopic", properties).onComplete{
case Success(result) => {
println("Configuration change of topic myNewTopic successful!")}
case Failure(cause) => {
println(s"$cause")
}
}
}
Check if a topic exists
If you want to check if a topic exists, you can call topicExists
.
Parameters are: topic name, and the usual callback to handle the result.
It might return an error, e.g. if the topic does not exist.
var adminUtils = AdminUtils.create(Vertx.vertx(), "localhost:2181", true)
adminUtils.topicExistsFuture("myNewTopic").onComplete{
case Success(result) => {
println(s"Topic myNewTopic exists: ${result}")
}
case Failure(cause) => {
println(s"$cause")
}
}