Testing Kafka Consumers
The Challenge of Event-Driven Testing
Testing event-driven systems is hard because:
- Events are asynchronous (fire and forget)
- Delivery is not guaranteed uniformly (at-least-once, at-most-once, exactly-once)
- Order is not guaranteed (events may arrive out of sequence)
- Side effects are distributed (one event triggers actions in multiple services)
Kafka consumer testing must verify that consumers correctly process events, handle duplicates (idempotency), survive malformed events, and maintain ordering guarantees.
Basic Consumer Test: Event Processing
import pytest
from confluent_kafka import Producer, Consumer
import json
import time
class TestOrderEventConsumer:
"""Test the order event consumer that processes Kafka messages."""
@pytest.fixture
def kafka_producer(self):
"""Create a test Kafka producer."""
return Producer({
'bootstrap.servers': 'localhost:9092',
'client.id': 'test-producer'
})
@pytest.fixture
def order_consumer(self):
"""Create the order consumer under test."""
consumer = OrderEventConsumer(
bootstrap_servers='localhost:9092',
group_id='test-group',
topic='order-events'
)
yield consumer
consumer.close()
def test_order_created_event_processed(self, kafka_producer, order_consumer, db):
"""When an order.created event is published, the consumer stores it."""
event = {
"event_type": "order.created",
"timestamp": "2026-02-09T10:00:00Z",
"data": {
"order_id": "ord-123",
"customer_id": "cust-456",
"total": 99.99,
"items": [{"product_id": "prod-789", "quantity": 2}]
}
}
kafka_producer.produce(
'order-events',
key="ord-123",
value=json.dumps(event).encode('utf-8')
)
kafka_producer.flush()
# Wait for consumer to process (with timeout)
deadline = time.time() + 10
while time.time() < deadline:
order_consumer.poll(timeout=1.0)
order = db.get_order("ord-123")
if order:
break
else:
pytest.fail("Consumer did not process order.created within 10s")
assert order.customer_id == "cust-456"
assert order.total == 99.99
assert len(order.items) == 1
Idempotency Testing
At-least-once delivery means consumers may receive the same event multiple times. Tests must verify idempotent processing.
def test_duplicate_event_idempotent(self, kafka_producer, order_consumer, db):
"""Processing the same event twice should not create duplicate records."""
event = {
"event_type": "order.created",
"event_id": "evt-unique-001", # Idempotency key
"data": {
"order_id": "ord-dup",
"customer_id": "cust-1",
"total": 50.00,
"items": [{"product_id": "prod-1", "quantity": 1}]
}
}
# Send the same event twice
for _ in range(2):
kafka_producer.produce(
'order-events',
key="ord-dup",
value=json.dumps(event).encode('utf-8')
)
kafka_producer.flush()
# Allow consumer to process both
time.sleep(5)
order_consumer.poll(timeout=1.0)
orders = db.get_orders_by_id("ord-dup")
assert len(orders) == 1, (
f"Expected 1 order, got {len(orders)} -- idempotency failure"
)
def test_duplicate_event_with_different_event_ids(self, kafka_producer, order_consumer, db):
"""Same order data but different event IDs should still be idempotent
based on the business key (order_id), not just the event_id."""
for event_id in ["evt-001", "evt-002"]:
event = {
"event_type": "order.created",
"event_id": event_id,
"data": {"order_id": "ord-biz-dup", "customer_id": "cust-1", "total": 75.00}
}
kafka_producer.produce(
'order-events',
key="ord-biz-dup",
value=json.dumps(event).encode('utf-8')
)
kafka_producer.flush()
time.sleep(5)
order_consumer.poll(timeout=1.0)
orders = db.get_orders_by_id("ord-biz-dup")
assert len(orders) == 1, "Business-key idempotency not enforced"
Malformed Event Handling
def test_malformed_event_does_not_crash(self, kafka_producer, order_consumer):
"""Consumer should handle malformed events gracefully."""
# Send invalid JSON
kafka_producer.produce(
'order-events',
key="bad",
value=b"this is not json"
)
kafka_producer.flush()
# Consumer should not crash
order_consumer.poll(timeout=5.0)
assert order_consumer.is_alive(), "Consumer crashed on malformed event"
def test_event_with_missing_required_fields(self, kafka_producer, order_consumer, db):
"""Consumer should reject events missing required fields."""
incomplete_event = {
"event_type": "order.created",
"data": {
"order_id": "ord-incomplete"
# Missing: customer_id, total, items
}
}
kafka_producer.produce(
'order-events',
key="ord-incomplete",
value=json.dumps(incomplete_event).encode('utf-8')
)
kafka_producer.flush()
time.sleep(5)
order_consumer.poll(timeout=1.0)
order = db.get_order("ord-incomplete")
assert order is None, "Incomplete event should not create an order"
def test_event_with_unknown_type(self, kafka_producer, order_consumer):
"""Consumer should skip events with unrecognized event_type."""
unknown_event = {
"event_type": "order.teleported", # Not a real event type
"data": {"order_id": "ord-unknown"}
}
kafka_producer.produce(
'order-events',
key="ord-unknown",
value=json.dumps(unknown_event).encode('utf-8')
)
kafka_producer.flush()
order_consumer.poll(timeout=5.0)
assert order_consumer.is_alive(), "Consumer crashed on unknown event type"
Ordering Tests
def test_event_ordering_within_partition(self, kafka_producer, order_consumer, db):
"""Events for the same key arrive in order within a partition."""
order_id = "ord-sequence"
# Send events in specific order
events = [
{"event_type": "order.created", "sequence": 1, "data": {"order_id": order_id, "status": "pending"}},
{"event_type": "order.paid", "sequence": 2, "data": {"order_id": order_id, "status": "paid"}},
{"event_type": "order.shipped", "sequence": 3, "data": {"order_id": order_id, "status": "shipped"}},
]
for event in events:
kafka_producer.produce(
'order-events',
key=order_id, # Same key = same partition = ordered
value=json.dumps(event).encode('utf-8')
)
kafka_producer.flush()
time.sleep(5)
order_consumer.poll(timeout=1.0)
order = db.get_order(order_id)
assert order.status == "shipped", (
f"Expected final status 'shipped', got '{order.status}'. "
f"Events may have been processed out of order."
)
Dead Letter Queue Testing
def test_failed_events_go_to_dlq(self, kafka_producer, order_consumer, dlq_consumer):
"""Events that fail processing should be sent to the dead letter queue."""
# Send an event that will cause a processing error
poison_event = {
"event_type": "order.created",
"data": {
"order_id": "ord-poison",
"customer_id": "nonexistent-customer", # Causes FK violation
"total": 100.00
}
}
kafka_producer.produce(
'order-events',
key="ord-poison",
value=json.dumps(poison_event).encode('utf-8')
)
kafka_producer.flush()
time.sleep(10)
order_consumer.poll(timeout=1.0)
# Check that the event landed in the DLQ
dlq_message = dlq_consumer.poll(timeout=10.0)
assert dlq_message is not None, "Failed event did not appear in DLQ"
dlq_data = json.loads(dlq_message.value())
assert dlq_data["original_event"]["data"]["order_id"] == "ord-poison"
assert "error" in dlq_data # Should include the failure reason
Test Infrastructure: Testcontainers
For isolated Kafka testing, use Testcontainers:
import pytest
from testcontainers.kafka import KafkaContainer
@pytest.fixture(scope="session")
def kafka_container():
"""Start a Kafka container for the test session."""
with KafkaContainer("confluentinc/cp-kafka:7.5.0") as kafka:
yield kafka
@pytest.fixture
def kafka_bootstrap(kafka_container):
return kafka_container.get_bootstrap_server()
Key Takeaway
Kafka consumer testing must cover five areas: basic event processing (does the consumer store data correctly?), idempotency (does processing the same event twice produce the same result?), malformed event handling (does the consumer survive garbage?), ordering (are events processed in the correct sequence?), and dead letter queues (do failed events get properly routed for later investigation?). AI can generate these test categories from your event schema definitions.