Learn All About Kafka Consumers. Learn about Kafka consumers and how to… | by Ashutosh Narang | Feb, 2022

Learn about Kafka consumers and how to reliably consume data from Kafka topics

Ashutosh Narang
Kafka Producer — Broker — Consumer
Consume Process and Store
N Partitions 1 Consumer
N Partitions M Consumers (N > M > 1)
N Partitions M Consumers (N == M)
N Partitions M Consumers (N < M)
N Partitions M Consumers (N < M) 2 Consumer Groups
Partition Rebalancing GIF

Create a KafkaConsumer Instance

Properties props = new Properties();props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "ConsumerGroup1");
props.put("key.deserializer", StringDeserializer");
props.put("value.deserializer", StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
String>(props);

Subscribe to a Topic

consumer.subscribe(Collections.singletonList("topic1"));

Write the poll loop

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.toString())
}
}
} finally {
consumer.close();
}

Scenario-1: Happy State

Committed Offset == Offset of Last Message Processed by the Consumer

Scenario-2: Data Loss

Committed Offset > Offset of Last Message Processed by the Consumer

Scenario-3: Double Processing

Committed Offset < Offset of Last Message Processed by the Consumer

Automatic Commit

Commit Current Offset

Synchronous Commit

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("value = %s", record.value());
}
consumer.commitSync();
try {
consumer.commitSync();
} catch (CommitFailedException e) {
log.error("commit failed", e)
}
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("value = %s", record.value());
}
consumer.commitAsync();
}

Leave a Comment