QA Engineer Skills 2026QA-2026Event-Driven System Testing

Event-Driven System Testing

The Fundamental Challenge

Event-driven architectures (SQS, EventBridge, Kafka, SNS) are notoriously difficult to test because there is no synchronous request-response cycle to assert against. When you publish an event, you cannot immediately check the result. The consumer might process it in milliseconds, seconds, or minutes. It might fail silently. It might process it twice.

This asynchronous nature requires a fundamentally different testing approach: instead of "call and assert," you adopt "publish, wait, and verify state."


Testing EventBridge Rules

EventBridge is AWS's serverless event bus. Testing EventBridge means verifying that rules match the correct event patterns and route to the correct targets.

Pattern Matching Verification

# tests/integration/test_eventbridge.py
import boto3
import json
import pytest

def test_eventbridge_rule_matching():
    """Verify that EventBridge rules match expected event patterns."""
    client = boto3.client("events", endpoint_url="http://localhost:4566")  # LocalStack

    # Put a test event
    response = client.put_events(
        Entries=[{
            "Source": "myapp.orders",
            "DetailType": "OrderCreated",
            "Detail": json.dumps({
                "orderId": "ORD-TEST",
                "amount": 150.00,
                "region": "us-east-1"
            }),
            "EventBusName": "orders-bus"
        }]
    )

    assert response["FailedEntryCount"] == 0

    # Verify the rule exists and has correct pattern
    rule = client.describe_rule(
        Name="high-value-orders",
        EventBusName="orders-bus"
    )
    pattern = json.loads(rule["EventPattern"])
    assert pattern["source"] == ["myapp.orders"]
    assert pattern["detail"]["amount"] == [{"numeric": [">=", 100]}]

def test_event_pattern_does_not_match_low_value_orders():
    """Verify that low-value orders are NOT matched by the high-value rule."""
    # This is a negative test -- equally important
    pattern = {
        "source": ["myapp.orders"],
        "detail": {
            "amount": [{"numeric": [">=", 100]}]
        }
    }

    low_value_event = {
        "source": "myapp.orders",
        "detail": {
            "orderId": "ORD-SMALL",
            "amount": 25.00,
        }
    }

    # AWS provides a test-event-pattern API
    client = boto3.client("events", endpoint_url="http://localhost:4566")
    result = client.test_event_pattern(
        EventPattern=json.dumps(pattern),
        Event=json.dumps(low_value_event)
    )
    assert result["Result"] is False

End-to-End Event Flow Testing

def test_order_event_triggers_notification(localstack_clients):
    """Test the full event flow: OrderCreated -> EventBridge -> SNS -> Email."""
    events = localstack_clients["events"]
    sns = localstack_clients["sns"]
    sqs = localstack_clients["sqs"]

    # Create an SQS queue subscribed to the SNS topic
    # (SQS acts as a test observer for the notification)
    queue = sqs.create_queue(QueueName="test-notifications")
    queue_url = queue["QueueUrl"]
    queue_arn = sqs.get_queue_attributes(
        QueueUrl=queue_url,
        AttributeNames=["QueueArn"]
    )["Attributes"]["QueueArn"]

    # Subscribe the SQS queue to the notification topic
    sns.subscribe(
        TopicArn="arn:aws:sns:us-east-1:000000000000:order-notifications",
        Protocol="sqs",
        Endpoint=queue_arn,
    )

    # Publish the event
    events.put_events(Entries=[{
        "Source": "myapp.orders",
        "DetailType": "OrderCreated",
        "Detail": json.dumps({
            "orderId": "ORD-789",
            "amount": 250.00,
            "customerEmail": "test@example.com"
        }),
        "EventBusName": "orders-bus"
    }])

    # Wait and verify the notification arrived
    import time
    time.sleep(5)

    messages = sqs.receive_message(QueueUrl=queue_url, WaitTimeSeconds=10)
    assert "Messages" in messages
    notification = json.loads(messages["Messages"][0]["Body"])
    assert "ORD-789" in notification["Message"]

SQS Testing Patterns

Dead Letter Queue Testing

One of the most critical but often untested patterns is the dead letter queue (DLQ). When a message fails processing multiple times, it should land in the DLQ -- not disappear silently.

# tests/integration/test_sqs_processing.py
import boto3
import pytest
import json
from testcontainers.localstack import LocalStackContainer

@pytest.fixture(scope="module")
def localstack():
    with LocalStackContainer("localstack/localstack:3.0") as ls:
        yield ls

@pytest.fixture
def sqs_client(localstack):
    return boto3.client(
        "sqs",
        endpoint_url=localstack.get_url(),
        region_name="us-east-1",
        aws_access_key_id="test",
        aws_secret_access_key="test",
    )

def test_dead_letter_queue_receives_failed_messages(sqs_client):
    """Messages that fail processing N times must land in the DLQ."""
    # Create DLQ
    dlq = sqs_client.create_queue(QueueName="orders-dlq")
    dlq_arn = sqs_client.get_queue_attributes(
        QueueUrl=dlq["QueueUrl"],
        AttributeNames=["QueueArn"]
    )["Attributes"]["QueueArn"]

    # Create main queue with DLQ redrive policy
    main_queue = sqs_client.create_queue(
        QueueName="orders",
        Attributes={
            "RedrivePolicy": json.dumps({
                "deadLetterTargetArn": dlq_arn,
                "maxReceiveCount": "2"
            })
        }
    )

    # Send a message that will fail processing
    sqs_client.send_message(
        QueueUrl=main_queue["QueueUrl"],
        MessageBody='{"orderId": "INVALID"}'
    )

    # Simulate 2 failed receive attempts (receive but don't delete)
    for _ in range(2):
        msgs = sqs_client.receive_message(QueueUrl=main_queue["QueueUrl"])
        # Not deleting = simulating processing failure

    # Verify message lands in DLQ
    import time
    time.sleep(2)
    dlq_msgs = sqs_client.receive_message(QueueUrl=dlq["QueueUrl"])
    assert len(dlq_msgs.get("Messages", [])) == 1
    failed_body = json.loads(dlq_msgs["Messages"][0]["Body"])
    assert failed_body["orderId"] == "INVALID"

Message Ordering and Deduplication (FIFO Queues)

def test_fifo_queue_preserves_message_order(sqs_client):
    """FIFO queues must deliver messages in the order they were sent."""
    queue = sqs_client.create_queue(
        QueueName="orders.fifo",
        Attributes={
            "FifoQueue": "true",
            "ContentBasedDeduplication": "true"
        }
    )

    # Send messages in a specific order
    for i in range(5):
        sqs_client.send_message(
            QueueUrl=queue["QueueUrl"],
            MessageBody=json.dumps({"sequence": i}),
            MessageGroupId="order-group-1",
        )

    # Receive and verify order
    received_sequences = []
    for _ in range(5):
        msgs = sqs_client.receive_message(QueueUrl=queue["QueueUrl"])
        if "Messages" in msgs:
            body = json.loads(msgs["Messages"][0]["Body"])
            received_sequences.append(body["sequence"])
            sqs_client.delete_message(
                QueueUrl=queue["QueueUrl"],
                ReceiptHandle=msgs["Messages"][0]["ReceiptHandle"]
            )

    assert received_sequences == [0, 1, 2, 3, 4]

def test_fifo_deduplication(sqs_client):
    """FIFO queues must deduplicate messages with the same dedup ID."""
    queue = sqs_client.create_queue(
        QueueName="dedup-test.fifo",
        Attributes={"FifoQueue": "true"}
    )

    # Send the same message twice with the same dedup ID
    for _ in range(2):
        sqs_client.send_message(
            QueueUrl=queue["QueueUrl"],
            MessageBody="duplicate message",
            MessageGroupId="group-1",
            MessageDeduplicationId="same-dedup-id",
        )

    # Should only receive one message
    import time
    time.sleep(1)
    msgs = sqs_client.receive_message(
        QueueUrl=queue["QueueUrl"],
        MaxNumberOfMessages=10,
    )
    assert len(msgs.get("Messages", [])) == 1

Testing Asynchronous Patterns

The Polling Pattern

When testing async systems, use polling with a timeout instead of time.sleep():

import time

def wait_for_condition(check_fn, timeout=30, interval=1):
    """Poll a condition until it's true or timeout."""
    deadline = time.time() + timeout
    last_error = None
    while time.time() < deadline:
        try:
            result = check_fn()
            if result:
                return result
        except Exception as e:
            last_error = e
        time.sleep(interval)
    raise TimeoutError(
        f"Condition not met within {timeout}s. Last error: {last_error}"
    )

# Usage:
def test_async_processing():
    publish_event({"orderId": "ORD-123"})

    # Poll DynamoDB until the order appears
    def check_order_processed():
        item = dynamodb.get_item(TableName="orders", Key={"orderId": {"S": "ORD-123"}})
        return item.get("Item", {}).get("status", {}).get("S") == "processed"

    wait_for_condition(check_order_processed, timeout=30)

The Observer Pattern

For complex event flows, attach a test observer that captures events for later assertion:

class EventObserver:
    """Captures events for testing by subscribing to the event stream."""

    def __init__(self, sqs_client, queue_name="test-observer"):
        self.sqs = sqs_client
        self.queue = sqs_client.create_queue(QueueName=queue_name)
        self.queue_url = self.queue["QueueUrl"]
        self.captured = []

    def drain(self, timeout=10):
        """Collect all messages received within the timeout period."""
        deadline = time.time() + timeout
        while time.time() < deadline:
            msgs = self.sqs.receive_message(
                QueueUrl=self.queue_url,
                WaitTimeSeconds=min(5, int(deadline - time.time())),
                MaxNumberOfMessages=10,
            )
            for msg in msgs.get("Messages", []):
                self.captured.append(json.loads(msg["Body"]))
                self.sqs.delete_message(
                    QueueUrl=self.queue_url,
                    ReceiptHandle=msg["ReceiptHandle"],
                )
        return self.captured

    def assert_received(self, event_type, count=1):
        """Assert that a specific event type was received."""
        matching = [e for e in self.captured if e.get("eventType") == event_type]
        assert len(matching) == count, \
            f"Expected {count} '{event_type}' events, found {len(matching)}"

These patterns work across event systems -- SQS, EventBridge, Kafka, RabbitMQ. The specific API calls change, but the testing strategy remains the same: publish, observe, and verify state.