Consumer Groups and Concurrency Models

Master Kafka consumers: consumer groups, partition assignment, scaling strategies, offset management, and multi-threaded consumption patterns.

The Consumer Group: Kafka's Killer Feature

This is what makes Kafka scale. Let me explain with an analogy:

Imagine a restaurant with 4 tables (partitions) serving pizza. You have a consumer group - your team of waiters.

Scenario 1: One waiter (consumer)

  • Serves all 4 tables alone
  • Overworked, slow service

Scenario 2: Four waiters

  • Each serves exactly one table
  • Perfect distribution
  • Maximum parallelism

Scenario 3: Eight waiters

  • Only 4 can work (one per table)
  • Other 4 sit idle
  • Waste of resources

🎯 The Golden Rule

Effective consumers = min(num_partitions, num_consumers_in_group)

Multiple Consumer Groups

You can have multiple consumer groups reading the same topic independently:

Topic: user-events (4 partitions)

Consumer Group "analytics"
├── Consumer 1 → Partition 0, 1
└── Consumer 2 → Partition 2, 3

Consumer Group "fraud-detection"  
├── Consumer 1 → Partition 0
├── Consumer 2 → Partition 1
├── Consumer 3 → Partition 2
└── Consumer 4 → Partition 3

Each group maintains its own offset, so they don't interfere with each other.

Offset Management: The Bookmark System

Offset = Your reading position in a partition

Think of it like bookmarking in a book:

Partition 0:
[msg-0] [msg-1] [msg-2] [msg-3] [msg-4] [msg-5]
                         ↑
                    last read (offset=3)

Kafka stores these bookmarks in a special internal topic called __consumer_offsets.

When Do You Move the Bookmark?

The critical decision: When do you move the bookmark?

Auto-commit (dangerous default)

consumer = KafkaConsumer(
    enable_auto_commit=True,
    auto_commit_interval_ms=5000  # Every 5 seconds
)

for message in consumer:
    process(message)  # What if this fails?

Problem: Kafka commits offset even if processing fails. You lose messages.

Manual commit (safer)

consumer = KafkaConsumer(
    enable_auto_commit=False
)

for message in consumer:
    try:
        process(message)
        consumer.commit()  # Commit after success
    except Exception:
        # Handle error, don't commit
        log_and_retry(message)

Async commit (fastest)

def on_commit(offsets, response):
    if response:
        log_error(response)

for message in consumer:
    process(message)
    consumer.commit_async(callback=on_commit)

Delivery Semantics: Pick Your Poison

At-most-once (commit before processing)

consumer.commit()  # Commit first
process(message)   # Process second
# If processing fails → message lost

At-least-once (commit after processing)

process(message)   # Process first
consumer.commit()  # Commit second
# If commit fails → message reprocessed

Exactly-once (transactional)

consumer.begin_transaction()
result = process(message)
database.write(result)
consumer.commit_offsets(offsets)
consumer.commit_transaction()
# Atomic: either both happen or neither

For 99% of use cases, at-least-once + idempotent processing is the sweet spot.

Rebalancing: The Silent Killer

This is where most production issues come from. Let me explain what happens:

Trigger events:

  • Consumer joins group
  • Consumer crashes/leaves
  • Consumer takes too long to process
  • Partition count changes

The rebalance dance:

  1. Coordinator detects trigger
  2. ALL consumers stop consuming (🚨 this is the problem)
  3. Partition assignment recalculated
  4. Consumers resume with new assignments

During rebalance (can be 10-60 seconds), nothing processes. Your lag spikes, dashboards go red, PagerDuty fires.

Cooperative Sticky Rebalancing

The modern solution to minimize rebalance impact:

Old way (Eager rebalancing):

Consumer 1: [P0, P1, P2] → [] → [P0, P1]
Consumer 2: [P3, P4, P5] → [] → [P2, P3, P4, P5]
                           ↑
                      everyone stops

New way (Cooperative rebalancing):

Consumer 1: [P0, P1, P2] → [P0, P1] (keeps most partitions)
Consumer 2: [P3, P4, P5] → [P2, P3, P4, P5] (picks up P2)
                           ↑
                    only P2 pauses briefly

Enable it:

partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

Heartbeat and Session Management

The coordinator doesn't know if a consumer is alive unless it tells it:

session.timeout.ms = 45000           # "If I don't hear from you in 45s, I'll kick you out"
heartbeat.interval.ms = 3000         # "I'll send heartbeat every 3s"
max.poll.interval.ms = 300000        # "Max 5 mins between poll() calls"

⚠️ Common Failure Mode

Your processing takes 6 minutes:

for message in consumer:
    slow_processing(message)  # Takes 6 minutes
    # Consumer kicked out after 5 minutes!

Fix:

max.poll.interval.ms = 600000  # 10 minutes
# OR better - process in batches:
messages = consumer.poll(timeout_ms=1000, max_records=100)
for message in messages:
    process_quickly(message)  # Back to poll() soon

Static Membership: For Stable Deployments

If you're running on Kubernetes and pods restart frequently, each restart triggers a rebalance. This is painful.

Solution: Static group membership

consumer = KafkaConsumer(
    group_instance_id='pod-1-unique-id'  # Stable across restarts
)

Now when pod restarts, Kafka recognizes it as the same consumer and skips rebalance.

Consumer Configuration Cheat Sheet

Group coordination

group.id = my-consumer-group
group.instance.id = unique-per-instance    # For static membership

Rebalancing

partition.assignment.strategy = org.apache.kafka.clients.consumer.CooperativeStickyAssignor
session.timeout.ms = 45000
heartbeat.interval.ms = 3000
max.poll.interval.ms = 300000

Offset management

enable.auto.commit = false                  # Manual commit preferred
auto.offset.reset = earliest               # Or 'latest'

Performance tuning

fetch.min.bytes = 1024                     # Min data per fetch
fetch.max.wait.ms = 500
max.poll.records = 500                     # Messages per poll()
max.partition.fetch.bytes = 1048576        # 1 MB per partition

Key Takeaways

  1. Consumer groups enable horizontal scaling - more consumers = more parallelism
  2. Manual offset management is safer than auto-commit
  3. Use cooperative rebalancing to minimize downtime
  4. Static membership prevents rebalance storms in containerized environments
  5. At-least-once + idempotent processing is the sweet spot for most use cases

Next Steps

Ready to dive deeper into rebalancing? Check out our next lesson on Rebalancing Deep Dive and Optimization where we'll learn how to minimize rebalance impact and optimize consumer performance.