Kafka Consumer Error Handling and Retries - Part 2
Gautam Singh | Jan 26, 2026
In Part 1, we explored the theory behind error handling in Kafka consumers. We talked about Gir, our fictional sustainable e-commerce platform, and walked through all the things that can go wrong when processing order messages. We classified errors into recoverable and non-recoverable categories, and discussed strategies like retries, dead letter queues, and circuit breakers.
Theory is great. But as any developer knows, the gap between "I understand the concept" and "I can build it" is where the real learning happens.
In this part, we're going to bridge that gap. We'll take every principle from Part 1 and implement it in plain Java — no Spring, no frameworks, just the raw Kafka consumer library. We'll start simple and layer complexity one step at a time, exactly how you'd build this in a real project.
By the end of this article, you'll have a Kafka consumer that:
- Classifies every error into an actionable category
- Routes bad messages to a dead letter queue with full diagnostic headers
- Retries transient failures with exponential backoff and jitter
- Trips a circuit breaker when downstream systems are consistently failing
- Handles every error path without ever silently losing a message
Let's build.
A Quick Recap: The Three Error Categories
Before we write a single line of code, let's revisit the mental model from Part 1. When a Kafka consumer encounters an error, it falls into one of three buckets:
-
Recoverable — Transient failures like network timeouts or a temporarily unavailable database. These resolve themselves if you wait and try again.
-
Non-Recoverable Tolerable — Permanent failures for a specific message, but the consumer can keep going. Think: malformed JSON, business rule violations like a fraudulent order. The message itself is the problem, not the system.
-
Non-Recoverable Intolerable — Something is fundamentally broken.
OutOfMemoryError,StackOverflowError— things that mean the consumer process itself is unhealthy. Stop everything.
Each category demands a different response. Recoverable errors get retried. Tolerable errors get routed to a dead letter queue. Intolerable errors trigger a graceful shutdown. That's the entire decision tree — and every line of code we write serves this model.
Step 1: Defining the Error Categories
Let's start with the simplest possible thing: giving our three categories a name.
public enum ErrorCategory {
/** Transient failure; retry with backoff is appropriate. */
RECOVERABLE,
/** Permanent business or data failure; route to DLQ and continue consuming. */
NON_RECOVERABLE_TOLERABLE,
/** Fatal system failure; stop the consumer immediately. */
NON_RECOVERABLE_INTOLERABLE
}Three values. That's it. Every error your consumer ever encounters will map to one of these. This enum becomes the single source of truth that the rest of your error handling pipeline routes on.
Step 2: Classifying Errors Automatically
Now, how does a caught exception become an ErrorCategory? You could write a big if-else chain, but that gets messy fast and couples your classification logic to specific exception types. Instead, let's make it configurable.
The idea: pass in a map of "exception class → category" at construction time. When an error is caught, walk up the exception's class hierarchy until you find a match.
public class ErrorClassifier {
private final Map<Class<? extends Throwable>, ErrorCategory> classificationMap;
private final ErrorCategory defaultCategory;
public ErrorClassifier(Map<Class<? extends Throwable>, ErrorCategory> classificationMap,
ErrorCategory defaultCategory) {
this.classificationMap = new HashMap<>(classificationMap);
this.defaultCategory = defaultCategory;
}
public ErrorCategory classify(Throwable throwable) {
Class<?> type = throwable.getClass();
while (type != null) {
ErrorCategory category = classificationMap.get(type);
if (category != null) {
return category;
}
type = type.getSuperclass();
}
return defaultCategory;
}
}The hierarchy traversal is the key insight here. Say you map Error.class to NON_RECOVERABLE_INTOLERABLE. Now OutOfMemoryError, StackOverflowError, and every other Error subclass is automatically caught — you don't need to enumerate them all. The classifier walks from the specific exception class upward through its parents until it finds a registered mapping.
And because the map is injected at construction, you can change your classification rules without touching this class. Here's how our order consumer wires it up:
Map<Class<? extends Throwable>, ErrorCategory> mapping = new HashMap<>();
mapping.put(DeserializationException.class, ErrorCategory.NON_RECOVERABLE_TOLERABLE);
mapping.put(FraudDetectedException.class, ErrorCategory.NON_RECOVERABLE_TOLERABLE);
mapping.put(Error.class, ErrorCategory.NON_RECOVERABLE_INTOLERABLE);
ErrorClassifier classifier = new ErrorClassifier(mapping, ErrorCategory.RECOVERABLE);Notice the default: RECOVERABLE. Any exception not explicitly mapped — SocketTimeoutException, ConnectException, anything unexpected — is treated as recoverable and gets retried. This is a deliberate choice. When in doubt, retry. If it's truly non-recoverable, the retries will exhaust and the message lands in the DLQ anyway.
Step 3: The Dead Letter Queue
When a message can't be processed — either because it's inherently bad (non-recoverable tolerable) or because retries have been exhausted — it needs to go somewhere. Silently dropping it is not an option. That's where the dead letter queue comes in.
A DLQ is just another Kafka topic. But the value of a good DLQ isn't just "storing failed messages" — it's the diagnostic context you attach. When an ops engineer looks at a DLQ message at 3am, they need to know: Where did this come from? What went wrong? How many times did we try?
public class DeadLetterQueueProducer {
private KafkaProducer<String, String> kafkaProducer;
public void send(String dlqTopic, ConsumerRecord<String, String> original,
Throwable cause, int retryCount) {
RecordHeaders headers = buildHeaders(original, cause, retryCount);
ProducerRecord<String, String> dlqRecord =
new ProducerRecord<>(dlqTopic, null, null,
original.key(), original.value(), headers);
try {
kafkaProducer.send(dlqRecord).get(); // synchronous — we need to know it landed
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("DLQ send interrupted", e);
} catch (ExecutionException e) {
throw new RuntimeException("DLQ send failed: " + e.getCause().getMessage(), e);
}
}
private RecordHeaders buildHeaders(ConsumerRecord<String, String> record,
Throwable cause, int retryCount) {
RecordHeaders headers = new RecordHeaders();
headers.add("__dlq.original.topic",
record.topic().getBytes(StandardCharsets.UTF_8));
headers.add("__dlq.original.partition",
String.valueOf(record.partition()).getBytes(StandardCharsets.UTF_8));
headers.add("__dlq.original.offset",
String.valueOf(record.offset()).getBytes(StandardCharsets.UTF_8));
headers.add("__dlq.error.message",
nullSafeBytes(cause.getMessage()));
headers.add("__dlq.error.exception.class",
cause.getClass().getName().getBytes(StandardCharsets.UTF_8));
headers.add("__dlq.retry.count",
String.valueOf(retryCount).getBytes(StandardCharsets.UTF_8));
return headers;
}
}Six headers that tell you everything:
| Header | What it tells you |
|---|---|
__dlq.original.topic | Which topic the message came from |
__dlq.original.partition | Which partition |
__dlq.original.offset | The exact offset — you can find this message in the original topic |
__dlq.error.message | The exception message |
__dlq.error.exception.class | The full class name of the exception |
__dlq.retry.count | How many retries were attempted before giving up |
The original message key and value are preserved untouched. This means you can reprocess DLQ messages after fixing the underlying issue — or route them to a manual review workflow.
One important design choice: the send() call is synchronous (.get() on the future). We need confirmation that the DLQ message actually landed before we commit the offset of the original message. If the DLQ send fails, we'd rather crash than silently lose the message.
Step 4: Retry with Exponential Backoff
Now for the interesting part. When we classify an error as RECOVERABLE, we don't just retry immediately — that would hammer an already struggling downstream service. We wait, and we wait progressively longer.
The Backoff Calculator
The formula is straightforward: delay = min(maxDelay, initialDelay * multiplier^attempt).
But there's a subtlety. If you have 50 consumers all hitting a timeout at the same moment, they'll all compute the exact same backoff delay and retry at the same time. This is the thundering herd problem, and it makes things worse instead of better.
The fix is full jitter: instead of sleeping for the computed delay, sleep for a random duration between 0 and the computed delay.
public class ExponentialBackoff {
private final long initialDelayMs;
private final double multiplier;
private final long maxDelayMs;
public long calculateDelay(int attempt) {
double exponentialDelay = initialDelayMs * Math.pow(multiplier, attempt);
long cappedDelay = (long) Math.min(maxDelayMs, exponentialDelay);
// Full jitter: uniform random in [0, cappedDelay]
return ThreadLocalRandom.current().nextLong(0, cappedDelay + 1);
}
}With default configuration (1 second initial, 2x multiplier, 10 second cap), the backoff windows look like this:
| Retry | Max Delay | Actual Delay |
|---|---|---|
| 1st | 1,000ms | random(0 - 1,000ms) |
| 2nd | 2,000ms | random(0 - 2,000ms) |
| 3rd | 4,000ms | random(0 - 4,000ms) |
| 4th+ | 10,000ms | random(0 - 10,000ms) |
The randomness ensures that even if 50 consumers fail at the same instant, their retries are spread across the entire window instead of spiking at one point. This is the AWS-recommended approach for distributed systems.
The Retry Handler
The backoff calculator is stateless — it just computes delays. The RetryHandler is what actually manages the retry loop: catch, classify, decide, sleep, retry.
public class RetryHandler {
@FunctionalInterface
public interface RecordProcessor {
void process(ConsumerRecord<String, String> record) throws Throwable;
}
private final ExponentialBackoff backoff;
private final ErrorClassifier classifier;
private final DeadLetterQueueProducer dlqProducer;
private final int maxRetries;
public boolean processWithRetry(ConsumerRecord<String, String> record,
RecordProcessor processor,
String dlqTopic) throws Throwable {
int attempt = 0;
while (true) {
try {
processor.process(record);
return true; // success
} catch (Throwable t) {
ErrorCategory category = classifier.classify(t);
if (category != ErrorCategory.RECOVERABLE) {
throw t; // not our problem — rethrow immediately
}
attempt++;
if (attempt > maxRetries) {
int retryCount = attempt - 1;
dlqProducer.send(dlqTopic, record, t, retryCount);
return false; // exhausted — routed to DLQ
}
long delay = backoff.calculateDelay(attempt - 1);
Thread.sleep(delay);
}
}
}
}There are a few design decisions here worth calling out:
The return type is boolean, not void. true means the message was processed successfully. false means retries were exhausted and the message was sent to the DLQ. Both are "handled" from the consumer loop's perspective — the offset should be committed either way. But the circuit breaker (coming next) needs to know the difference: true is a success, false is a failure.
Re-classification happens on every catch. This is important. Imagine your first attempt throws a SocketTimeoutException (recoverable), but the retry throws a FraudDetectedException (non-recoverable tolerable). The retry handler re-classifies on every catch and immediately rethrows if the new category isn't recoverable. It doesn't blindly retry just because the first error was transient.
Non-recoverable errors are rethrown, not handled. The retry handler's job is narrow: retry recoverable errors. Everything else is the consumer loop's responsibility. This keeps the retry handler focused and composable.
The RecordProcessor functional interface decouples the retry loop from any specific message handler. The consumer loop passes in a lambda, and the retry handler doesn't need to know what "processing" means.
Step 5: The Circuit Breaker
Retries handle transient failures on individual messages. But what if the downstream system is completely down? Every single message will fail, retry three times, and land in the DLQ. You're burning resources retrying against a dead service, and you're flooding the DLQ with messages that would succeed if you just waited.
This is where the circuit breaker pattern comes in. It works like an electrical circuit breaker: when too many failures happen in a row, it "trips" and stops processing entirely for a cool-down period. Then it lets one message through as a probe — if it succeeds, normal processing resumes.
Three States
CLOSED ──(threshold failures)──> OPEN ──(cooldown expires)──> HALF_OPEN
^ |
└──────────────(probe succeeds)────────────────────────────────┘- CLOSED: Normal operation. Every message is processed. Consecutive failures are counted.
- OPEN: Circuit tripped. No messages are processed. A timer is ticking down.
- HALF_OPEN: Cool-down expired. Exactly one message is allowed through as a probe. If it succeeds, back to CLOSED. If it fails, back to OPEN with a longer cool-down.
The Implementation
public class CircuitBreaker {
public enum State { CLOSED, OPEN, HALF_OPEN }
private final int failureThreshold;
private final long initialCooldownMs;
private final double cooldownMultiplier;
private final long maxCooldownMs;
private State state = State.CLOSED;
private int consecutiveFailures;
private long cooldownOpenedAt;
private long currentCooldownMs;
private int tripCount;
public boolean canProcess() {
switch (state) {
case CLOSED:
return true;
case HALF_OPEN:
return false; // probe already in flight
case OPEN:
if (System.currentTimeMillis() - cooldownOpenedAt >= currentCooldownMs) {
transitionTo(State.HALF_OPEN);
return true; // allow exactly one probe
}
return false;
default:
throw new IllegalStateException("Unknown state: " + state);
}
}
public void recordSuccess() {
switch (state) {
case HALF_OPEN:
consecutiveFailures = 0;
currentCooldownMs = initialCooldownMs;
tripCount = 0;
transitionTo(State.CLOSED);
break;
case CLOSED:
consecutiveFailures = 0;
break;
}
}
public void recordFailure() {
consecutiveFailures++;
if (state == State.HALF_OPEN || consecutiveFailures >= failureThreshold) {
openCircuit();
}
}
private void openCircuit() {
tripCount++;
double rawCooldown = initialCooldownMs * Math.pow(cooldownMultiplier, tripCount - 1);
currentCooldownMs = (long) Math.min(maxCooldownMs, rawCooldown);
cooldownOpenedAt = System.currentTimeMillis();
consecutiveFailures = 0;
transitionTo(State.OPEN);
}
}Notice the exponential cool-down. The first time the circuit trips, it waits 30 seconds (configurable). If the probe fails and it trips again, it waits 60 seconds. Then 120. The system gets progressively more cautious the longer the downstream service remains unhealthy. This prevents the circuit breaker from repeatedly hammering a struggling service with probe requests.
With default settings:
| Trip # | Cool-down Duration |
|---|---|
| 1st | 30 seconds |
| 2nd | 60 seconds |
| 3rd | 120 seconds (max) |
| 4th+ | 120 seconds (capped) |
One more thing: when a HALF_OPEN probe succeeds, the tripCount resets to 0. The system fully trusts that recovery means recovery. No "probation period" — it goes straight back to normal processing.
Step 6: Wiring It All Together — The Consumer Loop
We've built four components: the classifier, the DLQ producer, the retry handler, and the circuit breaker. Each is focused and testable on its own. Now we wire them into the Kafka consumer poll loop — where the real orchestration happens.
This is the code that makes "no message is ever silently lost" a reality, not just a design goal.
Setting Up
public class CreateOrderMessageConsumer {
private final ErrorClassifier classifier;
private final DeadLetterQueueProducer dlqProducer;
private final RetryHandler retryHandler;
private final CircuitBreaker circuitBreaker;
public CreateOrderMessageConsumer() {
// Wire the error classification map
Map<Class<? extends Throwable>, ErrorCategory> mapping = new HashMap<>();
mapping.put(DeserializationException.class, ErrorCategory.NON_RECOVERABLE_TOLERABLE);
mapping.put(FraudDetectedException.class, ErrorCategory.NON_RECOVERABLE_TOLERABLE);
mapping.put(Error.class, ErrorCategory.NON_RECOVERABLE_INTOLERABLE);
this.classifier = new ErrorClassifier(mapping, ErrorCategory.RECOVERABLE);
// DLQ producer for failed messages
this.dlqProducer = new DeadLetterQueueProducer();
// Retry handler with exponential backoff
RetryConfig retryConfig = KafkaConfig.getRetryConfig();
this.retryHandler = new RetryHandler(retryConfig, classifier, dlqProducer);
// Circuit breaker for systemic failures
CircuitBreakerConfig cbConfig = KafkaConfig.getCircuitBreakerConfig();
this.circuitBreaker = new CircuitBreaker(cbConfig, CircuitBreakerStateListener.noOp());
}Everything is composed at construction. The classifier feeds into the retry handler and the consumer loop. The DLQ producer is shared between the retry handler (for exhausted retries) and the consumer loop (for non-recoverable tolerable errors caught outside the retry loop).
The Poll Loop
Here's the heart of it. I'll walk through each section:
public void consume(String topicName, KafkaMessageHandler callback) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(KafkaConfig.getConsumerConfig());
try {
consumer.subscribe(List.of(topicName));
while (!closed.get()) {Standard Kafka consumer setup. The closed flag is an AtomicBoolean set by the shutdown hook.
Circuit Breaker Cool-Down
// If circuit is OPEN, sleep for remaining cool-down
if (circuitBreaker.getState() == CircuitBreaker.State.OPEN) {
long remaining = circuitBreaker.remainingCooldownMs();
if (remaining > 0) {
Thread.sleep(remaining);
}
// Fall through to poll() — do NOT continue here
}
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(10));When the circuit is OPEN, we sleep for the remaining cool-down, then fall through to poll(). Why not continue back to the top of the loop? Because we need canProcess() to be called on the first record of the next batch — that's what triggers the OPEN to HALF_OPEN transition. If we used continue, we'd check the circuit state again, find it still OPEN (the transition hasn't happened yet), and sleep again in an infinite loop.
Processing Each Record
for (ConsumerRecord<String, String> record : records) {
// Check circuit breaker before each record
if (!circuitBreaker.canProcess()) {
break; // skip rest of batch; no commit for unprocessed records
}
try {
boolean success = retryHandler.processWithRetry(record,
r -> callback.processMessage(
new CreateOrderMessage(r.key(), r.value())),
dlqTopic);
if (success) {
circuitBreaker.recordSuccess();
} else {
// Retry exhaustion → DLQ → still counts as failure
circuitBreaker.recordFailure();
}
// Commit offset whether success or DLQ exhaustion
consumer.commitSync(Map.of(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)));
// If circuit just opened, skip rest of batch
if (circuitBreaker.getState() == CircuitBreaker.State.OPEN) {
break;
}This is the happy path and the "exhausted retry" path. In both cases, we commit the offset. The circuit breaker records the outcome. If the circuit trips from the failure, we stop processing the current batch — unprocessed messages in this batch will be re-delivered on the next poll.
Handling Non-Recoverable Errors
The retry handler only retries RECOVERABLE errors. Everything else is rethrown and caught here:
} catch (Throwable t) {
// CRITICAL: Check for WakeupException first
if (t instanceof WakeupException) {
throw (WakeupException) t;
}
ErrorCategory category = classifier.classify(t);
if (category == ErrorCategory.NON_RECOVERABLE_TOLERABLE) {
// Bad message — route to DLQ, commit, continue
dlqProducer.send(dlqTopic, record, t, 0);
consumer.commitSync(Map.of(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)));
circuitBreaker.recordFailure();
if (circuitBreaker.getState() == CircuitBreaker.State.OPEN) {
break;
}
} else {
// NON_RECOVERABLE_INTOLERABLE — graceful shutdown
consumer.commitSync(Map.of(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)));
consumer.close();
return;
}
}
}
}Two critical details here:
WakeupException must be checked first. Kafka uses WakeupException as a signal for graceful shutdown — your shutdown hook calls consumer.wakeup(), which causes the next blocking call to throw WakeupException. But our catch (Throwable t) catches it too. If we let it flow into the classifier, it maps to RECOVERABLE (it's a RuntimeException subclass with no explicit mapping) and gets retried. The consumer never shuts down cleanly. Always check for WakeupException before classifying.
Intolerable errors commit the offset before shutting down. This seems counterintuitive — why commit a message that crashed the consumer? Because without this commit, the message is re-delivered on restart. If the message itself triggers the intolerable error (a poisoned message), you get an infinite restart loop: start → process → crash → restart → process the same message → crash. By committing, you acknowledge "this message was seen and cannot be processed" and let the consumer move past it.
The Full Picture
Here's how a message flows through the entire error handling pipeline:
Message arrives from Kafka
|
v
Circuit breaker: canProcess()?
|
NO --+----------> Skip message, break batch
|
YES v
Process message (via RetryHandler)
|
SUCCESS ---------> recordSuccess() -> commit offset -> next message
|
FAIL v
Classify error
|
RECOVERABLE -----> Retry with backoff (up to 3 times)
| |
| EXHAUSTED --> Send to DLQ -> recordFailure() -> commit
|
TOLERABLE -------> Send to DLQ -> recordFailure() -> commit -> continue
|
INTOLERABLE -----> recordFailure() -> commit -> close consumer -> returnEvery path either processes the message, routes it to the DLQ, or shuts down gracefully. No message is ever silently lost. That's the guarantee.
Configuration
All of this is tunable via properties. No code changes needed to adjust behavior:
# Retry configuration
retry.initial.delay.ms=1000
retry.multiplier=2.0
retry.max.delay.ms=10000
retry.max.retries=3
# Circuit breaker configuration
circuit.breaker.failure.threshold=5
circuit.breaker.cooldown.initial.ms=30000
circuit.breaker.cooldown.multiplier=2.0
circuit.breaker.cooldown.max.ms=120000
# Dead letter queue
dlq.topic=orders.dlqStart with these defaults. Tune based on your downstream service characteristics — if your database recovers quickly, lower the retry delay. If it takes minutes, increase the circuit breaker cool-down. The code doesn't change; the behavior adapts.
What We Built
Let's take a step back and look at what we've put together:
| Component | Responsibility | Lines of Code |
|---|---|---|
ErrorCategory | Define the three error buckets | ~25 |
ErrorClassifier | Map exceptions to categories via hierarchy traversal | ~65 |
DeadLetterQueueProducer | Route failed messages with diagnostic headers | ~90 |
ExponentialBackoff | Calculate jittered delays | ~75 |
RetryHandler | Retry loop with classification and DLQ exhaustion | ~135 |
CircuitBreaker | Three-state FSM with exponential cool-down | ~190 |
| Consumer loop integration | Wire it all together | ~215 |
None of these components are particularly large or complex on their own. The power is in the composition — each one handles a narrow concern, and together they cover every error scenario.
This is plain Java. No framework magic, no annotations that hide behavior, no auto-configuration you can't debug. When something goes wrong at 3am, you can trace the exact code path from the exception to the DLQ message to the offset commit. That's the value of understanding what's under the hood.
Wrapping Up
In Part 1, we asked "what could possibly go wrong?" and the answer was: a lot. In this part, we answered every one of those failure modes with code.
The key principles, translated to implementation:
- Classify, then route — Don't write ad-hoc error handling. Classify every error into a category, then let the category determine the response.
- Make it configurable — Exception mappings, retry counts, backoff parameters, circuit breaker thresholds — all injectable. The code is the strategy; the configuration is the policy.
- Never silently lose a message — Every error path commits an offset only after the message is either processed, sent to the DLQ, or acknowledged as a fatal failure.
- Protect downstream systems — Retries with jitter prevent thundering herds. Circuit breakers prevent hammering a dead service. Both get more cautious the longer the problem persists.
The full source code for this project is available on GitHub.
Happy coding.