This component provides a Vert.x wrapper around the Kafka Admin Client API. The Kafka Admin Client is used to create, modify, and delete topics. It also provides methods for handling ACLs (Access Control Lists), consumer groups and many more.

Creating the Kafka Admin Client

Creating the admin client is quite similar on how it works using the native Kafka client library.

It needs to be configured with a bunch of properties as described in the official Apache Kafka documentation, for the admin.

To achieve that, a map can be configured with such properties passing it to one of the static creation methods exposed by KafkaAdminClient.

Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

KafkaAdminClient adminClient = KafkaAdminClient.create(vertx, config);

Using the Kafka Admin Client

Listing topics

You can call the listTopics for listing the topics in the cluster. The only parameter is the usual callback to handle the result, which provides the topics list.

adminClient.listTopics().onSuccess(topics ->
    System.out.println("Topics= " + topics)
);

Describe topics

You can call describeTopics to describe topics in the cluster. Describing a topic means getting all related metadata like number of partitions, replicas, leader, in-sync replicas and so on. The needed parameters are the list of topics names to describe, and the usual callback to handle the result providing a map with topic names and related TopicDescription.

adminClient.describeTopics(Collections.singletonList("my-topic")).onSuccess(topics -> {
  TopicDescription topicDescription = topics.get("first-topic");

  System.out.println("Topic name=" + topicDescription.getName() +
      " isInternal= " + topicDescription.isInternal() +
      " partitions= " + topicDescription.getPartitions().size());

  for (TopicPartitionInfo topicPartitionInfo : topicDescription.getPartitions()) {
    System.out.println("Partition id= " + topicPartitionInfo.getPartition() +
      " leaderId= " + topicPartitionInfo.getLeader().getId() +
      " replicas= " + topicPartitionInfo.getReplicas() +
      " isr= " + topicPartitionInfo.getIsr());
  }
});

Create topic

You can call createTopics to create topics in the cluster. The needed parameters are the list of the topics to create, and the usual callback to handle the result. The topics to create are defined via the NewTopic class specifying the name, the number of partitions and the replication factor. It is also possible to describe the replicas assignment, mapping each replica to the broker id, instead of specifying the number of partitions and the replication factor (which in this case has to be set to -1).

adminClient.createTopics(Collections.singletonList(new NewTopic("testCreateTopic", 1, (short)1)))
  .onSuccess(v -> {
    // topics created successfully
  })
  .onFailure(cause -> {
    // something went wrong when creating the topics
  });

Delete topic

You can call deleteTopics to delete topics in the cluster. The needed parameters are the list of the topics to delete, and the usual callback to handle the result.

adminClient.deleteTopics(Collections.singletonList("topicToDelete"))
  .onSuccess(v -> {
    // topics deleted successfully
  })
  .onFailure(cause -> {
    // something went wrong when removing the topics
  });

Describe configuration

You can call describeConfigs to describe resources configuration. Describing resources configuration means getting all configuration information for cluster resources like topics or brokers. The needed parameters are the list of the resources for which you want the configuration, and the usual callback to handle the result. The resources are described by a collection of ConfigResource while the result maps each resource with a corresponding Config which as more ConfigEntry for each configuration parameter.

adminClient.describeConfigs(Collections.singletonList(
  new ConfigResource(org.apache.kafka.common.config.ConfigResource.Type.TOPIC, "my-topic"))).onSuccess(configs -> {
  // check the configurations
});

Alter configuration

You can call alterConfigs to alter resources configuration. Altering resources configuration means updating configuration information for cluster resources like topics or brokers. The needed parameters are the list of the resources with the related configurations to updated, and the usual callback to handle the result. It is possible to alter configurations for different resources with just one call. The input parameter maps each ConfigResource with the corresponding Config you want to apply.

ConfigResource resource = new ConfigResource(org.apache.kafka.common.config.ConfigResource.Type.TOPIC, "my-topic");
// create a entry for updating the retention.ms value on the topic
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "51000");
Map<ConfigResource, Config> updateConfig = new HashMap<>();
updateConfig.put(resource, new Config(Collections.singletonList(retentionEntry)));
adminClient.alterConfigs(updateConfig)
  .onSuccess(v -> {
    // configuration altered successfully
  })
  .onFailure(cause -> {
    // something went wrong when altering configs
  });

List consumer groups

You can call the listConsumerGroups for listing the consumer groups in the cluster. The only parameter is the usual callback to handle the result, which provides the consumer groups list.

adminClient.listConsumerGroups().onSuccess(consumerGroups ->
  System.out.println("ConsumerGroups= " + consumerGroups)
);

Describe consumer groups

You can call describeConsumerGroups to describe consumer groups in the cluster. Describing a consumer group means getting all related information like members, related ids, topics subscribed, partitions assignment and so on. The needed parameters are the list of consumer groups names to describe, and the usual callback to handle the result providing a map with consumer group names and related MemberDescription.

adminClient.describeTopics(Collections.singletonList("my-topic")).onSuccess(topics -> {
  TopicDescription topicDescription = topics.get("first-topic");

  System.out.println("Topic name=" + topicDescription.getName() +
      " isInternal= " + topicDescription.isInternal() +
      " partitions= " + topicDescription.getPartitions().size());

  for (TopicPartitionInfo topicPartitionInfo : topicDescription.getPartitions()) {
    System.out.println("Partition id= " + topicPartitionInfo.getPartition() +
      " leaderId= " + topicPartitionInfo.getLeader().getId() +
      " replicas= " + topicPartitionInfo.getReplicas() +
      " isr= " + topicPartitionInfo.getIsr());
  }
});