This component provides a vert.x wrapper around the most important functions of Kafka’s AdminUtils. AdminUtils are used to create, modify, and delete topics. Other functionality covered by AdminUtils, but not this wrapper, includes Partition Management, Broker Configuration management, etc.

Using the AdminUtils

Create a topic

You can call createTopic to create a topic. Parameters are: topic name, number of partitions, number of replicas, and the usual callback to handle the result. It might return an error, e.g. if the number of requested replicas is greater than the number of brokers.

require 'vertx/vertx'
require 'vertx-kafka-client/admin_utils'
adminUtils = VertxKafkaClient::AdminUtils.create(Vertx::Vertx.vertx(), "localhost:2181", true)
# Create topic 'myNewTopic' with 2 partition and 1 replicas
adminUtils.create_topic("myNewTopic", 2, 1) { |result_err,result|
  if (result_err == nil)
    puts "Creation of topic myNewTopic successful!"else
    puts "Creation of topic myNewTopic failed: #{result_err.get_localized_message()}"end
}

Delete a topic

You can call deleteTopic to delete a topic. Parameters are: topic name, and the usual callback to handle the result. It might return an error, e.g. if the topic does not exist.

require 'vertx/vertx'
require 'vertx-kafka-client/admin_utils'
adminUtils = VertxKafkaClient::AdminUtils.create(Vertx::Vertx.vertx(), "localhost:2181", true)
# Delete topic 'myNewTopic'
adminUtils.delete_topic("myNewTopic") { |result_err,result|
  if (result_err == nil)
    puts "Deletion of topic myNewTopic successful!"else
    puts "Deletion of topic myNewTopic failed: #{result_err.get_localized_message()}"end
}

Change a topic’s configuration

If you need to update the configuration of a topic, e.g., you want to update the retention policy, you can call changeTopicConfig to update a topic. Parameters are: topic name, a Map (String → String) with parameters to be changed, and the usual callback to handle the result. It might return an error, e.g. if the topic does not exist.

require 'vertx/vertx'
require 'vertx-kafka-client/admin_utils'
adminUtils = VertxKafkaClient::AdminUtils.create(Vertx::Vertx.vertx(), "localhost:2181", true)
# Set retention to 1000 ms and max size of the topic partition to 1 kiByte
properties = Hash.new()
properties["delete.retention.ms"] = "1000"
properties["retention.bytes"] = "1024"
adminUtils.change_topic_config("myNewTopic", properties) { |result_err,result|
  if (result_err == nil)
    puts "Configuration change of topic myNewTopic successful!"else
    puts "Configuration change of topic myNewTopic failed: #{result_err.get_localized_message()}"end
}
}

Check if a topic exists

If you want to check if a topic exists, you can call topicExists. Parameters are: topic name, and the usual callback to handle the result. It might return an error, e.g. if the topic does not exist.

require 'vertx/vertx'
require 'vertx-kafka-client/admin_utils'
adminUtils = VertxKafkaClient::AdminUtils.create(Vertx::Vertx.vertx(), "localhost:2181", true)
adminUtils.topic_exists("myNewTopic") { |result_err,result|
  if (result_err == nil)
    puts "Topic myNewTopic exists: #{result}"
  else
    puts "Failed to check if topic myNewTopic exists: #{result_err.get_localized_message()}"end
}