Event-Driven System Testing
The Fundamental Challenge
Event-driven architectures (SQS, EventBridge, Kafka, SNS) are notoriously difficult to test because there is no synchronous request-response cycle to assert against. When you publish an event, you cannot immediately check the result. The consumer might process it in milliseconds, seconds, or minutes. It might fail silently. It might process it twice.
This asynchronous nature requires a fundamentally different testing approach: instead of "call and assert," you adopt "publish, wait, and verify state."
Testing EventBridge Rules
EventBridge is AWS's serverless event bus. Testing EventBridge means verifying that rules match the correct event patterns and route to the correct targets.
Pattern Matching Verification
# tests/integration/test_eventbridge.py
import boto3
import json
import pytest
def test_eventbridge_rule_matching():
"""Verify that EventBridge rules match expected event patterns."""
client = boto3.client("events", endpoint_url="http://localhost:4566") # LocalStack
# Put a test event
response = client.put_events(
Entries=[{
"Source": "myapp.orders",
"DetailType": "OrderCreated",
"Detail": json.dumps({
"orderId": "ORD-TEST",
"amount": 150.00,
"region": "us-east-1"
}),
"EventBusName": "orders-bus"
}]
)
assert response["FailedEntryCount"] == 0
# Verify the rule exists and has correct pattern
rule = client.describe_rule(
Name="high-value-orders",
EventBusName="orders-bus"
)
pattern = json.loads(rule["EventPattern"])
assert pattern["source"] == ["myapp.orders"]
assert pattern["detail"]["amount"] == [{"numeric": [">=", 100]}]
def test_event_pattern_does_not_match_low_value_orders():
"""Verify that low-value orders are NOT matched by the high-value rule."""
# This is a negative test -- equally important
pattern = {
"source": ["myapp.orders"],
"detail": {
"amount": [{"numeric": [">=", 100]}]
}
}
low_value_event = {
"source": "myapp.orders",
"detail": {
"orderId": "ORD-SMALL",
"amount": 25.00,
}
}
# AWS provides a test-event-pattern API
client = boto3.client("events", endpoint_url="http://localhost:4566")
result = client.test_event_pattern(
EventPattern=json.dumps(pattern),
Event=json.dumps(low_value_event)
)
assert result["Result"] is False
End-to-End Event Flow Testing
def test_order_event_triggers_notification(localstack_clients):
"""Test the full event flow: OrderCreated -> EventBridge -> SNS -> Email."""
events = localstack_clients["events"]
sns = localstack_clients["sns"]
sqs = localstack_clients["sqs"]
# Create an SQS queue subscribed to the SNS topic
# (SQS acts as a test observer for the notification)
queue = sqs.create_queue(QueueName="test-notifications")
queue_url = queue["QueueUrl"]
queue_arn = sqs.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=["QueueArn"]
)["Attributes"]["QueueArn"]
# Subscribe the SQS queue to the notification topic
sns.subscribe(
TopicArn="arn:aws:sns:us-east-1:000000000000:order-notifications",
Protocol="sqs",
Endpoint=queue_arn,
)
# Publish the event
events.put_events(Entries=[{
"Source": "myapp.orders",
"DetailType": "OrderCreated",
"Detail": json.dumps({
"orderId": "ORD-789",
"amount": 250.00,
"customerEmail": "test@example.com"
}),
"EventBusName": "orders-bus"
}])
# Wait and verify the notification arrived
import time
time.sleep(5)
messages = sqs.receive_message(QueueUrl=queue_url, WaitTimeSeconds=10)
assert "Messages" in messages
notification = json.loads(messages["Messages"][0]["Body"])
assert "ORD-789" in notification["Message"]
SQS Testing Patterns
Dead Letter Queue Testing
One of the most critical but often untested patterns is the dead letter queue (DLQ). When a message fails processing multiple times, it should land in the DLQ -- not disappear silently.
# tests/integration/test_sqs_processing.py
import boto3
import pytest
import json
from testcontainers.localstack import LocalStackContainer
@pytest.fixture(scope="module")
def localstack():
with LocalStackContainer("localstack/localstack:3.0") as ls:
yield ls
@pytest.fixture
def sqs_client(localstack):
return boto3.client(
"sqs",
endpoint_url=localstack.get_url(),
region_name="us-east-1",
aws_access_key_id="test",
aws_secret_access_key="test",
)
def test_dead_letter_queue_receives_failed_messages(sqs_client):
"""Messages that fail processing N times must land in the DLQ."""
# Create DLQ
dlq = sqs_client.create_queue(QueueName="orders-dlq")
dlq_arn = sqs_client.get_queue_attributes(
QueueUrl=dlq["QueueUrl"],
AttributeNames=["QueueArn"]
)["Attributes"]["QueueArn"]
# Create main queue with DLQ redrive policy
main_queue = sqs_client.create_queue(
QueueName="orders",
Attributes={
"RedrivePolicy": json.dumps({
"deadLetterTargetArn": dlq_arn,
"maxReceiveCount": "2"
})
}
)
# Send a message that will fail processing
sqs_client.send_message(
QueueUrl=main_queue["QueueUrl"],
MessageBody='{"orderId": "INVALID"}'
)
# Simulate 2 failed receive attempts (receive but don't delete)
for _ in range(2):
msgs = sqs_client.receive_message(QueueUrl=main_queue["QueueUrl"])
# Not deleting = simulating processing failure
# Verify message lands in DLQ
import time
time.sleep(2)
dlq_msgs = sqs_client.receive_message(QueueUrl=dlq["QueueUrl"])
assert len(dlq_msgs.get("Messages", [])) == 1
failed_body = json.loads(dlq_msgs["Messages"][0]["Body"])
assert failed_body["orderId"] == "INVALID"
Message Ordering and Deduplication (FIFO Queues)
def test_fifo_queue_preserves_message_order(sqs_client):
"""FIFO queues must deliver messages in the order they were sent."""
queue = sqs_client.create_queue(
QueueName="orders.fifo",
Attributes={
"FifoQueue": "true",
"ContentBasedDeduplication": "true"
}
)
# Send messages in a specific order
for i in range(5):
sqs_client.send_message(
QueueUrl=queue["QueueUrl"],
MessageBody=json.dumps({"sequence": i}),
MessageGroupId="order-group-1",
)
# Receive and verify order
received_sequences = []
for _ in range(5):
msgs = sqs_client.receive_message(QueueUrl=queue["QueueUrl"])
if "Messages" in msgs:
body = json.loads(msgs["Messages"][0]["Body"])
received_sequences.append(body["sequence"])
sqs_client.delete_message(
QueueUrl=queue["QueueUrl"],
ReceiptHandle=msgs["Messages"][0]["ReceiptHandle"]
)
assert received_sequences == [0, 1, 2, 3, 4]
def test_fifo_deduplication(sqs_client):
"""FIFO queues must deduplicate messages with the same dedup ID."""
queue = sqs_client.create_queue(
QueueName="dedup-test.fifo",
Attributes={"FifoQueue": "true"}
)
# Send the same message twice with the same dedup ID
for _ in range(2):
sqs_client.send_message(
QueueUrl=queue["QueueUrl"],
MessageBody="duplicate message",
MessageGroupId="group-1",
MessageDeduplicationId="same-dedup-id",
)
# Should only receive one message
import time
time.sleep(1)
msgs = sqs_client.receive_message(
QueueUrl=queue["QueueUrl"],
MaxNumberOfMessages=10,
)
assert len(msgs.get("Messages", [])) == 1
Testing Asynchronous Patterns
The Polling Pattern
When testing async systems, use polling with a timeout instead of time.sleep():
import time
def wait_for_condition(check_fn, timeout=30, interval=1):
"""Poll a condition until it's true or timeout."""
deadline = time.time() + timeout
last_error = None
while time.time() < deadline:
try:
result = check_fn()
if result:
return result
except Exception as e:
last_error = e
time.sleep(interval)
raise TimeoutError(
f"Condition not met within {timeout}s. Last error: {last_error}"
)
# Usage:
def test_async_processing():
publish_event({"orderId": "ORD-123"})
# Poll DynamoDB until the order appears
def check_order_processed():
item = dynamodb.get_item(TableName="orders", Key={"orderId": {"S": "ORD-123"}})
return item.get("Item", {}).get("status", {}).get("S") == "processed"
wait_for_condition(check_order_processed, timeout=30)
The Observer Pattern
For complex event flows, attach a test observer that captures events for later assertion:
class EventObserver:
"""Captures events for testing by subscribing to the event stream."""
def __init__(self, sqs_client, queue_name="test-observer"):
self.sqs = sqs_client
self.queue = sqs_client.create_queue(QueueName=queue_name)
self.queue_url = self.queue["QueueUrl"]
self.captured = []
def drain(self, timeout=10):
"""Collect all messages received within the timeout period."""
deadline = time.time() + timeout
while time.time() < deadline:
msgs = self.sqs.receive_message(
QueueUrl=self.queue_url,
WaitTimeSeconds=min(5, int(deadline - time.time())),
MaxNumberOfMessages=10,
)
for msg in msgs.get("Messages", []):
self.captured.append(json.loads(msg["Body"]))
self.sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=msg["ReceiptHandle"],
)
return self.captured
def assert_received(self, event_type, count=1):
"""Assert that a specific event type was received."""
matching = [e for e in self.captured if e.get("eventType") == event_type]
assert len(matching) == count, \
f"Expected {count} '{event_type}' events, found {len(matching)}"
These patterns work across event systems -- SQS, EventBridge, Kafka, RabbitMQ. The specific API calls change, but the testing strategy remains the same: publish, observe, and verify state.