Formatting The Apache Kafka Console Consumer Output | by Ivelina Yordanova | Feb, 2022

Consume all the messages

Ivelina Yordanova
Photo by Kevin Ku on Unsplash
kafka-console-consumer 
--bootstrap-server localhost:9092
--topic my_topic
--property print.offset=true
--property print.partition=true
--property print.headers=true
--property print.timestamp=true
--property print.key=true
kafka-console-consumer 
--bootstrap-server localhost:9092
--topic my_topic
kafka-console-consumer 
--bootstrap-server localhost:9092
--topic my_topic
--property print.offset=true
--property print.partition=true
--property print.headers=true
--property print.timestamp=true
--property print.key=true
--formatter my.custom.KafkaMetricsFormatter
libraryDependencies += "org.apache.kafka" %% "kafka" % "3.1.0"
import Console._override def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {

def deserialize(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String) = {
val nonNullBytes = Option(sourceBytes).getOrElse(nullLiteral)
val convertedBytes = deserializer
.map(d => utfBytes(d.deserialize(topic, consumerRecord.headers, nonNullBytes).toString))
.getOrElse(nonNullBytes)
convertedBytes
}

import consumerRecord._

if (printTimestamp && timestampType != TimestampType.NO_TIMESTAMP_TYPE) {
output.write(utfBytes(s"$CYAN└── $timestampType:$RESET $timestamp"))
output.write(lineSeparator)
}

if (printPartition) {
output.write(utfBytes(s"$CYAN└── Partition:$RESET ${partition().toString}"))
output.write(lineSeparator)
}

if (printOffset) {
output.write(utfBytes(s"$CYAN└── Offset:$RESET ${offset().toString}"))
output.write(lineSeparator)
}

if (printHeaders) {
if (!headers().toArray.isEmpty) {
output.write(utfBytes(s"$CYAN└── Headers:$RESET "))
}
val headersIt = headers().iterator
while (headersIt.hasNext) {
val header = headersIt.next()
output.write(utfBytes(s"nt${header.key()}: "))
output.write(deserialize(headersDeserializer, header.value(), topic))
if (headersIt.hasNext) {
output.write(headersSeparator)
}
}
output.write(lineSeparator)
}

if (printKey) {
output.write(utfBytes(s"$CYAN└── Key:$RESET "))
output.write(deserialize(keyDeserializer, key, topic))
output.write(lineSeparator)
}

if (printValue) {
output.write(deserialize(valueDeserializer, value, topic))
output.write(lineSeparator)
output.write(utfBytes(s"---------------"))
output.write(lineSeparator)
}
}

private def utfBytes(str: String) = str.getBytes(StandardCharsets.UTF_8)

sbt assembly
--formatter my.custom.KafkaMessageFormatter

Leave a Comment