15 Kafka CLI Commands For Everyday Programming

Demonstrating the use of the most commonly used Kafka Command Line Interface Commands

Photo by Blake Connally on Unsplash

Apache Kafka is one of the most commonly used technologies that facilitate even-streaming architectures. Despite often being seen as *just* a message broker (like RabbitMQ for example), Kafka is definitely way more than just this.

Kafka was an internal LinkedIn project which was open sourced in 2011 and quickly evolved from a message broker to a complete platform that enables even-streaming in a highly scalable, fault-tolerant distributed manner.

In today’s article, we will go through some of the most commonly used commands in Kafka’s command line interface. Make sure to bookmark this tutorial as the chances are you will need to reference some parts quite often when performing certain administration tasks over Kafka clusters.

If you want to list the topics included in a specific broker, the following command will do the trick:

$ kafka-topics 
--bootstrap-server localhost:9092
--list

Note that in older versions, you could also use the Zookeeper endpoint as demonstrated below:

$ kafka-topics 
--zookeeper localhost:2181
--list

Since recent Kafka releases will remove the Zookeeper dependency, I would advise using the first command.

Finally, if you want to list the topics across the whole cluster, then make sure to include all the brokers in --bootstrap-server:

$ kafka-topics 
--bootstrap-server localhost:9092,localhost:9093,localhost:9094
--list

In order to delete a topic from the cluster, you need to pass the --delete flag along with brokers and the topic name to be deleted.

$ kafka-topics 
--bootstrap-server localhost:9092,localhost:9093,localhost:9094
--delete
--topic topic_for_deletion

When deleting topics from Kafka brokers, you can even specify regex-like expressions in order to delete multiple topics in one go. For example, let’s suppose we want to delete all topics starting with the prefix test-. The following command should do the trick:

$ kafka-topics 
--bootstrap-server localhost:9092,localhost:9093,localhost:9094
--delete
--topic 'test-.*'

Now if you want to create a new topic, you can simply do so using the --create option of kafka-topics runner.

$ kafka-topics 
--bootstrap-server localhost:9092
--create
--topic topic-name

Note that when creating a topic, you can provide some additional configuration options such as the number of partitions and/or the replication factor. For instance, in order to create a topic with three partitions and a replication factor of 2, the following command would do the trick:

$ kafka-topics 
--bootstrap-server localhost:9092
--create
--topic topic-name
--partitions 3
--replication-factor 2

If you want to retrieve the configuration details of a specific topic you can use the --describe option with kafka-topics runner.

$ kafka-topics 
--bootstrap-server localhost:9092
--describe
--topic topic-name

The command’s output will give you information such as partition counts, replication factor, leader, replicas, and ISR as well as additional configurations (such as retention period if the default one was changed).

Topic:topic-name  PartitionCount:1    ReplicationFactor:1 Configs:
Topic: topic-name Partition: 0 Leader: 0 Replicas: 0 Isr: 0

Now let’s get to the data generation. In order to produce some data on a specific topic, all you need is the kafka-console-consumer runner:

$ kafka-console-producer  
--bootstrap-server localhost:9092
--topic topic-name
> One event
> Oh, another event!

If you also want to set keys for each message generated, then you can use some additional options (ie, enable keys and key separator character) as demonstrated below:

$ kafka-console-producer  
--bootstrap-server localhost:9092
--topic topic-name
--property "parse.key=true"
--property "key.separator=:"
> key1:One event
> key2:Oh, another event!

On the other hand, if you want to consume messages in the terminal from a particular Kafka topic then all you need is the kafka-console-consumer:

$ kafka-console-consumer 
--bootstrap-server localhost:9092
--topic topic-name
--from-beginning
One event
Oh, another event!

If you also want to print out the keys, then you may also want to specify the key separator character, the flags to enable printing for both the key and the value, as well as the key and value deserializers.

For example,

$ kafka-console-consumer 
--bootstrap-server localhost:9092
--topic topic-name
--from-beginning
--property key.separator="-"
--property print.key=true
--property print.value=true
--property key.deserialzer=org.apache.kafka.common.serialization.StringDeserializer
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Now in order to purge a Kafka topic and delete all the messages, you can simply change the retention period to one second temporarily and then change it back again.

To change the retention to one second you can simply use the --add-config flag with the kafka-configs runner:

$ kafka-configs 
--bootstrap-server localhost:9092
--alter
--entity-type topics
--entity-name topic-name
--add-config retention.ms=1000

And to revert it back, simply delete this config (so that the default retention is then used):

$ kafka-configs 
--bootstrap-server localhost:9092
--alter
--entity-type topics
--entity-name topic-name
--delete-config retention.ms

Every Kafka topic may have numerous consumer groups. In order to list all the consumer groups across all topics in a Kafka cluster, you can simply use the --list flag with the kafka-consumer-groups runner.

$ kafka-consumer-groups 
--bootstrap-server localhost:9092
--list

The output will contain the names of all consumer groups:

test-consumer-group-1
test-consumer-group-2
test-consumer-group-3
...

Now in order to get some more details about a specific consumer group, you can use the --describe flag along with the consumer group name (that can be inferred using the command we demonstrated in the previous section).

$ kafka-consumer-groups 
--bootstrap-server localhost:9092
--describe
--group my-group

And the output will contain the following information:

TOPIC 
PARTITION
CURRENT-OFFSET
LOG-END-OFFSET
LAG
CONSUMER-ID
HOST
CLIENT-ID

The --describe The option can also be combined with additional options in order to get even more details about a specific consumer group.

In order to list all active members in a specific consumer group, then you can also specify the --members flag:

$ kafka-consumer-groups 
--bootstrap-server localhost:9092
--describe
--group my-group
--members

Here’s an example output from the above command:

CONSUMER-ID          HOST           CLIENT-ID       #PARTITIONS
consumer1-3fc8d.. /127.0.0.1 consumer1 2
consumer4-117fe.. /127.0.0.1 consumer4 1
consumer2-e76ea.. /127.0.0.1 consumer2 3
consumer3-ecea4.. /127.0.0.1

To get even more details you can also use the --verbose option that will also report the partitions assigned to each member.

$ kafka-consumer-groups 
--bootstrap-server localhost:9092
--describe
--group my-group
--members
--verbose

And the output should look like the one shared below.

CONSUMER-ID HOST       CLIENT-ID  #PARTITIONS ASSIGNMENT
.. /127.0.0.1 consumer1 2 topic1(0), topic2(0)
.. /127.0.0.1 consumer4 1 topic3(2)
.. /127.0.0.1 consumer2 3 topic2(1), topic3(0,1)
.. /127.0.0.1 consumer3 0 -

If for any reason you need to increase the number of partitions for a specific topic, then you can use the --alter flag in order to specify the new — increase — number of partitions.

$ kafka-topics 
--bootstrap-server localhost:9092
--alter
--topic topic-name
--partitions 40

Another common thing you may need to do is to add further configuration options to a specific (or even all) broker.

For instance, let’s suppose we want to specify the number of background threads to use for log cleaning. This behavior can be adjusted through the log.cleaner.threads configuration option.

To do so, you can run the kafka-configs runner along with the broker details and --add-configas demonstrated below.

$ kafka-configs 
--bootstrap-server localhost:9092
--entity-type brokers
--entity-name 0
--alter
--add-config log.cleaner.threads=2

If you want to add a configuration to all brokers in a cluster, then you can use the --entity-default flag (instead of specifying a specific broker through (--entity-name broker-id), as demonstrated below.

$ kafka-configs 
--bootstrap-server localhost:9092
--entity-type brokers
--entity-default
--alter
--add-config log.cleaner.threads=2

It’s important to note here that some configuration to be effective may require the brokers to be restarted. You can check the update mode of each individual broker configuration in the relevant section of the official documentation.

On the other hand, you may also want to remove some configurations from a specific broker. All you need is to specify the broker id, (--entity-name) and the --delete-config flag to specify the option you wish to remove.

$ kafka-configs 
--bootstrap-server localhost:9092
--entity-type brokers
--entity-name 0
--alter
--delete-config log.cleaner.threads

Again, instead of specifying the broker id through --entity-nameyou can provide the --entity-default flag if you wish the removal to be effective for every broker across the cluster.

Now, in order to get the current configuration of a specific broker, you can specify the broker id (ie --entity-name) along with the --describe flag when executing kafka-configs runner.

$ kafka-configs 
--bootstrap-server localhost:9092
--entity-type brokers
--entity-name 0
--describe

Leave a Comment