QA Engineer Skills 2026QA-2026Testing Kafka Consumers

Testing Kafka Consumers

The Challenge of Event-Driven Testing

Testing event-driven systems is hard because:

  1. Events are asynchronous (fire and forget)
  2. Delivery is not guaranteed uniformly (at-least-once, at-most-once, exactly-once)
  3. Order is not guaranteed (events may arrive out of sequence)
  4. Side effects are distributed (one event triggers actions in multiple services)

Kafka consumer testing must verify that consumers correctly process events, handle duplicates (idempotency), survive malformed events, and maintain ordering guarantees.


Basic Consumer Test: Event Processing

import pytest
from confluent_kafka import Producer, Consumer
import json
import time

class TestOrderEventConsumer:
    """Test the order event consumer that processes Kafka messages."""

    @pytest.fixture
    def kafka_producer(self):
        """Create a test Kafka producer."""
        return Producer({
            'bootstrap.servers': 'localhost:9092',
            'client.id': 'test-producer'
        })

    @pytest.fixture
    def order_consumer(self):
        """Create the order consumer under test."""
        consumer = OrderEventConsumer(
            bootstrap_servers='localhost:9092',
            group_id='test-group',
            topic='order-events'
        )
        yield consumer
        consumer.close()

    def test_order_created_event_processed(self, kafka_producer, order_consumer, db):
        """When an order.created event is published, the consumer stores it."""
        event = {
            "event_type": "order.created",
            "timestamp": "2026-02-09T10:00:00Z",
            "data": {
                "order_id": "ord-123",
                "customer_id": "cust-456",
                "total": 99.99,
                "items": [{"product_id": "prod-789", "quantity": 2}]
            }
        }
        kafka_producer.produce(
            'order-events',
            key="ord-123",
            value=json.dumps(event).encode('utf-8')
        )
        kafka_producer.flush()

        # Wait for consumer to process (with timeout)
        deadline = time.time() + 10
        while time.time() < deadline:
            order_consumer.poll(timeout=1.0)
            order = db.get_order("ord-123")
            if order:
                break
        else:
            pytest.fail("Consumer did not process order.created within 10s")

        assert order.customer_id == "cust-456"
        assert order.total == 99.99
        assert len(order.items) == 1

Idempotency Testing

At-least-once delivery means consumers may receive the same event multiple times. Tests must verify idempotent processing.

    def test_duplicate_event_idempotent(self, kafka_producer, order_consumer, db):
        """Processing the same event twice should not create duplicate records."""
        event = {
            "event_type": "order.created",
            "event_id": "evt-unique-001",  # Idempotency key
            "data": {
                "order_id": "ord-dup",
                "customer_id": "cust-1",
                "total": 50.00,
                "items": [{"product_id": "prod-1", "quantity": 1}]
            }
        }

        # Send the same event twice
        for _ in range(2):
            kafka_producer.produce(
                'order-events',
                key="ord-dup",
                value=json.dumps(event).encode('utf-8')
            )
        kafka_producer.flush()

        # Allow consumer to process both
        time.sleep(5)
        order_consumer.poll(timeout=1.0)

        orders = db.get_orders_by_id("ord-dup")
        assert len(orders) == 1, (
            f"Expected 1 order, got {len(orders)} -- idempotency failure"
        )

    def test_duplicate_event_with_different_event_ids(self, kafka_producer, order_consumer, db):
        """Same order data but different event IDs should still be idempotent
        based on the business key (order_id), not just the event_id."""
        for event_id in ["evt-001", "evt-002"]:
            event = {
                "event_type": "order.created",
                "event_id": event_id,
                "data": {"order_id": "ord-biz-dup", "customer_id": "cust-1", "total": 75.00}
            }
            kafka_producer.produce(
                'order-events',
                key="ord-biz-dup",
                value=json.dumps(event).encode('utf-8')
            )
        kafka_producer.flush()

        time.sleep(5)
        order_consumer.poll(timeout=1.0)

        orders = db.get_orders_by_id("ord-biz-dup")
        assert len(orders) == 1, "Business-key idempotency not enforced"

Malformed Event Handling

    def test_malformed_event_does_not_crash(self, kafka_producer, order_consumer):
        """Consumer should handle malformed events gracefully."""
        # Send invalid JSON
        kafka_producer.produce(
            'order-events',
            key="bad",
            value=b"this is not json"
        )
        kafka_producer.flush()

        # Consumer should not crash
        order_consumer.poll(timeout=5.0)
        assert order_consumer.is_alive(), "Consumer crashed on malformed event"

    def test_event_with_missing_required_fields(self, kafka_producer, order_consumer, db):
        """Consumer should reject events missing required fields."""
        incomplete_event = {
            "event_type": "order.created",
            "data": {
                "order_id": "ord-incomplete"
                # Missing: customer_id, total, items
            }
        }
        kafka_producer.produce(
            'order-events',
            key="ord-incomplete",
            value=json.dumps(incomplete_event).encode('utf-8')
        )
        kafka_producer.flush()

        time.sleep(5)
        order_consumer.poll(timeout=1.0)

        order = db.get_order("ord-incomplete")
        assert order is None, "Incomplete event should not create an order"

    def test_event_with_unknown_type(self, kafka_producer, order_consumer):
        """Consumer should skip events with unrecognized event_type."""
        unknown_event = {
            "event_type": "order.teleported",  # Not a real event type
            "data": {"order_id": "ord-unknown"}
        }
        kafka_producer.produce(
            'order-events',
            key="ord-unknown",
            value=json.dumps(unknown_event).encode('utf-8')
        )
        kafka_producer.flush()

        order_consumer.poll(timeout=5.0)
        assert order_consumer.is_alive(), "Consumer crashed on unknown event type"

Ordering Tests

    def test_event_ordering_within_partition(self, kafka_producer, order_consumer, db):
        """Events for the same key arrive in order within a partition."""
        order_id = "ord-sequence"

        # Send events in specific order
        events = [
            {"event_type": "order.created", "sequence": 1, "data": {"order_id": order_id, "status": "pending"}},
            {"event_type": "order.paid", "sequence": 2, "data": {"order_id": order_id, "status": "paid"}},
            {"event_type": "order.shipped", "sequence": 3, "data": {"order_id": order_id, "status": "shipped"}},
        ]

        for event in events:
            kafka_producer.produce(
                'order-events',
                key=order_id,  # Same key = same partition = ordered
                value=json.dumps(event).encode('utf-8')
            )
        kafka_producer.flush()

        time.sleep(5)
        order_consumer.poll(timeout=1.0)

        order = db.get_order(order_id)
        assert order.status == "shipped", (
            f"Expected final status 'shipped', got '{order.status}'. "
            f"Events may have been processed out of order."
        )

Dead Letter Queue Testing

    def test_failed_events_go_to_dlq(self, kafka_producer, order_consumer, dlq_consumer):
        """Events that fail processing should be sent to the dead letter queue."""
        # Send an event that will cause a processing error
        poison_event = {
            "event_type": "order.created",
            "data": {
                "order_id": "ord-poison",
                "customer_id": "nonexistent-customer",  # Causes FK violation
                "total": 100.00
            }
        }
        kafka_producer.produce(
            'order-events',
            key="ord-poison",
            value=json.dumps(poison_event).encode('utf-8')
        )
        kafka_producer.flush()

        time.sleep(10)
        order_consumer.poll(timeout=1.0)

        # Check that the event landed in the DLQ
        dlq_message = dlq_consumer.poll(timeout=10.0)
        assert dlq_message is not None, "Failed event did not appear in DLQ"

        dlq_data = json.loads(dlq_message.value())
        assert dlq_data["original_event"]["data"]["order_id"] == "ord-poison"
        assert "error" in dlq_data  # Should include the failure reason

Test Infrastructure: Testcontainers

For isolated Kafka testing, use Testcontainers:

import pytest
from testcontainers.kafka import KafkaContainer

@pytest.fixture(scope="session")
def kafka_container():
    """Start a Kafka container for the test session."""
    with KafkaContainer("confluentinc/cp-kafka:7.5.0") as kafka:
        yield kafka

@pytest.fixture
def kafka_bootstrap(kafka_container):
    return kafka_container.get_bootstrap_server()

Key Takeaway

Kafka consumer testing must cover five areas: basic event processing (does the consumer store data correctly?), idempotency (does processing the same event twice produce the same result?), malformed event handling (does the consumer survive garbage?), ordering (are events processed in the correct sequence?), and dead letter queues (do failed events get properly routed for later investigation?). AI can generate these test categories from your event schema definitions.