Consume all the messages

Anyone that’s ever worked with Kafka must have had some Thoughts about the console consumer output. Usually, there are at least 2 questions that come up inevitably:
- How do I print X? — X can be “key”, “partition”, “offset”, “timestamp”, “header”.. all bits that could be quite useful but are not part of the default output. This was more of an issue in version 2.6 and older where some of the above properties were not available at all.
- How am I supposed to spot what I need in this mess of characters? — the joy of figuring out how to print whatever you need is short-lived once you see how it looks.
If you’ve already googled all these, you know you can use kafkacat
and that’s a totally viable option. However, I am offering you a more creative and flexible alternative solution.
As mentioned, in terms of completeness the latest versions of Kafka do offer ways to print more details for the records by passing each additional property separately.
kafka-console-consumer
--bootstrap-server localhost:9092
--topic my_topic
--property print.offset=true
--property print.partition=true
--property print.headers=true
--property print.timestamp=true
--property print.key=true
However, if you are stuck with legacy code or for some reason still work on an older version, your options are quite limited
kafka-console-consumer
--bootstrap-server localhost:9092
--topic my_topic
Either of those isn’t great if you need to consume more than 3–4 messages and that more than once. Trust me, your eyes will thank you if you switch to staring at something like this instead:
kafka-console-consumer
--bootstrap-server localhost:9092
--topic my_topic
--property print.offset=true
--property print.partition=true
--property print.headers=true
--property print.timestamp=true
--property print.key=true
--formatter my.custom.KafkaMetricsFormatter
I am not saying this design is what you should go for, what I am saying is that there is a way to format the output in any way you want — add color and spacing, why not go crazy and underline or make things bold.
What’s more, it’s super easy, it will take you 15mins and probably save you more in the long run…or at least make you feel better about using the consumer console, or is it just me?
Anyway, let’s see how to achieve this:
First, create a scala project and add Kafka as a dependency.
libraryDependencies += "org.apache.kafka" %% "kafka" % "3.1.0"
Second, create a new scala class extending the DefaultMessageFormatter
and override the writeTo
method to do the formatting any way you like.
The code for the above example looks like this:
import Console._override def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {def deserialize(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String) = {
val nonNullBytes = Option(sourceBytes).getOrElse(nullLiteral)
val convertedBytes = deserializer
.map(d => utfBytes(d.deserialize(topic, consumerRecord.headers, nonNullBytes).toString))
.getOrElse(nonNullBytes)
convertedBytes
}import consumerRecord._
if (printTimestamp && timestampType != TimestampType.NO_TIMESTAMP_TYPE) {
output.write(utfBytes(s"$CYAN└── $timestampType:$RESET $timestamp"))
output.write(lineSeparator)
}if (printPartition) {
output.write(utfBytes(s"$CYAN└── Partition:$RESET ${partition().toString}"))
output.write(lineSeparator)
}if (printOffset) {
output.write(utfBytes(s"$CYAN└── Offset:$RESET ${offset().toString}"))
output.write(lineSeparator)
}if (printHeaders) {
if (!headers().toArray.isEmpty) {
output.write(utfBytes(s"$CYAN└── Headers:$RESET "))
}
val headersIt = headers().iterator
while (headersIt.hasNext) {
val header = headersIt.next()
output.write(utfBytes(s"nt${header.key()}: "))
output.write(deserialize(headersDeserializer, header.value(), topic))
if (headersIt.hasNext) {
output.write(headersSeparator)
}
}
output.write(lineSeparator)
}if (printKey) {
output.write(utfBytes(s"$CYAN└── Key:$RESET "))
output.write(deserialize(keyDeserializer, key, topic))
output.write(lineSeparator)
}if (printValue) {
output.write(deserialize(valueDeserializer, value, topic))
output.write(lineSeparator)
output.write(utfBytes(s"---------------"))
output.write(lineSeparator)
}
}private def utfBytes(str: String) = str.getBytes(StandardCharsets.UTF_8)
Third, create the jar by running:
sbt assembly
Fourth, copy or move the resulting jar from <project>/target/jars/**
to the libs dir of your local Kafka install (should be something like **/usr/local/Cellar/kafka/3.0.1/libexec/libs/**
).
Finally, use and enjoy by just passing this to any Kafka-console-consumer command:
--formatter my.custom.KafkaMessageFormatter
You can see the full code for reference here or clone and use as-is or edit to match your preferences.
Now, with this cool console output.. consume all the messages !!