This component provides a Vert.x wrapper around the Kafka Admin Client API. The Kafka Admin Client is used to create, modify, and delete topics. It also provides methods for handling ACLs (Access Control Lists), consumer groups and many more.
Creating the Kafka Admin Client
Creating the admin client is quite similar on how it works using the native Kafka client library.
It needs to be configured with a bunch of properties as described in the official Apache Kafka documentation, for the admin.
To achieve that, a map can be configured with such properties passing it to one of the
static creation methods exposed by KafkaAdminClient
.
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
KafkaAdminClient adminClient = KafkaAdminClient.create(vertx, config);
Using the Kafka Admin Client
Listing topics
You can call the listTopics
for listing the topics in the cluster.
The only parameter is the usual callback to handle the result, which provides the topics list.
adminClient.listTopics().onSuccess(topics ->
System.out.println("Topics= " + topics)
);
Describe topics
You can call describeTopics
to describe topics in the cluster.
Describing a topic means getting all related metadata like number of partitions, replicas, leader, in-sync replicas and so on.
The needed parameters are the list of topics names to describe, and the usual callback to handle the result providing
a map with topic names and related TopicDescription
.
adminClient.describeTopics(Collections.singletonList("my-topic")).onSuccess(topics -> {
TopicDescription topicDescription = topics.get("first-topic");
System.out.println("Topic name=" + topicDescription.getName() +
" isInternal= " + topicDescription.isInternal() +
" partitions= " + topicDescription.getPartitions().size());
for (TopicPartitionInfo topicPartitionInfo : topicDescription.getPartitions()) {
System.out.println("Partition id= " + topicPartitionInfo.getPartition() +
" leaderId= " + topicPartitionInfo.getLeader().getId() +
" replicas= " + topicPartitionInfo.getReplicas() +
" isr= " + topicPartitionInfo.getIsr());
}
});
Create topic
You can call createTopics
to create topics in the cluster.
The needed parameters are the list of the topics to create, and the usual callback to handle the result.
The topics to create are defined via the NewTopic
class specifying the name, the number of
partitions and the replication factor.
It is also possible to describe the replicas assignment, mapping each replica to the broker id, instead of specifying the
number of partitions and the replication factor (which in this case has to be set to -1).
adminClient.createTopics(Collections.singletonList(new NewTopic("testCreateTopic", 1, (short)1)))
.onSuccess(v -> {
// topics created successfully
})
.onFailure(cause -> {
// something went wrong when creating the topics
});
Delete topic
You can call deleteTopics
to delete topics in the cluster.
The needed parameters are the list of the topics to delete, and the usual callback to handle the result.
adminClient.deleteTopics(Collections.singletonList("topicToDelete"))
.onSuccess(v -> {
// topics deleted successfully
})
.onFailure(cause -> {
// something went wrong when removing the topics
});
Describe configuration
You can call describeConfigs
to describe resources configuration.
Describing resources configuration means getting all configuration information for cluster resources like topics or brokers.
The needed parameters are the list of the resources for which you want the configuration, and the usual callback to handle the result.
The resources are described by a collection of ConfigResource
while the result maps
each resource with a corresponding Config
which as more ConfigEntry
for
each configuration parameter.
adminClient.describeConfigs(Collections.singletonList(
new ConfigResource(org.apache.kafka.common.config.ConfigResource.Type.TOPIC, "my-topic"))).onSuccess(configs -> {
// check the configurations
});
Alter configuration
You can call alterConfigs
to alter resources configuration.
Altering resources configuration means updating configuration information for cluster resources like topics or brokers.
The needed parameters are the list of the resources with the related configurations to updated, and the usual callback to handle the result.
It is possible to alter configurations for different resources with just one call. The input parameter maps each
ConfigResource
with the corresponding Config
you want to apply.
ConfigResource resource = new ConfigResource(org.apache.kafka.common.config.ConfigResource.Type.TOPIC, "my-topic");
// create a entry for updating the retention.ms value on the topic
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "51000");
Map<ConfigResource, Config> updateConfig = new HashMap<>();
updateConfig.put(resource, new Config(Collections.singletonList(retentionEntry)));
adminClient.alterConfigs(updateConfig)
.onSuccess(v -> {
// configuration altered successfully
})
.onFailure(cause -> {
// something went wrong when altering configs
});
List consumer groups
You can call the listConsumerGroups
for listing the consumer groups in the cluster.
The only parameter is the usual callback to handle the result, which provides the consumer groups list.
adminClient.listConsumerGroups().onSuccess(consumerGroups ->
System.out.println("ConsumerGroups= " + consumerGroups)
);
Describe consumer groups
You can call describeConsumerGroups
to describe consumer groups in the cluster.
Describing a consumer group means getting all related information like members, related ids, topics subscribed, partitions assignment and so on.
The needed parameters are the list of consumer groups names to describe, and the usual callback to handle the result providing
a map with consumer group names and related MemberDescription
.
adminClient.describeTopics(Collections.singletonList("my-topic")).onSuccess(topics -> {
TopicDescription topicDescription = topics.get("first-topic");
System.out.println("Topic name=" + topicDescription.getName() +
" isInternal= " + topicDescription.isInternal() +
" partitions= " + topicDescription.getPartitions().size());
for (TopicPartitionInfo topicPartitionInfo : topicDescription.getPartitions()) {
System.out.println("Partition id= " + topicPartitionInfo.getPartition() +
" leaderId= " + topicPartitionInfo.getLeader().getId() +
" replicas= " + topicPartitionInfo.getReplicas() +
" isr= " + topicPartitionInfo.getIsr());
}
});