Asynchronous Testing Patterns
The Pattern Matrix
Asynchronous testing requires different strategies depending on the system architecture. This reference covers the five core patterns for testing event-driven and async systems.
Pattern 1: Poll and Wait
The simplest async pattern. Check for the expected state within a timeout.
def wait_for_condition(check_fn, timeout=10, interval=0.5, message="Condition not met"):
"""Poll a condition function until it returns True or timeout."""
deadline = time.time() + timeout
last_result = None
while time.time() < deadline:
last_result = check_fn()
if last_result:
return last_result
time.sleep(interval)
pytest.fail(f"{message} (timeout={timeout}s, last_result={last_result})")
# Usage
def test_order_processed(kafka_producer, db):
kafka_producer.send("order.created", {"order_id": "ord-1"})
order = wait_for_condition(
lambda: db.get_order("ord-1"),
timeout=10,
message="Order was not processed"
)
assert order.status == "pending"
When to use: Simple event-to-state verification. One event, one expected state change.
Pitfalls:
- Setting timeout too low causes flaky tests
- Setting interval too high misses the window (state changes then changes again)
- No information about why the condition was not met
Enhanced Version with Diagnostics
def wait_for_condition(check_fn, timeout=10, interval=0.5, message="", diagnostics_fn=None):
"""Poll with diagnostics on failure."""
deadline = time.time() + timeout
attempts = 0
while time.time() < deadline:
attempts += 1
result = check_fn()
if result:
return result
time.sleep(interval)
# Gather diagnostics on failure
diag = ""
if diagnostics_fn:
diag = f"\nDiagnostics: {diagnostics_fn()}"
pytest.fail(
f"{message}\n"
f"Timeout: {timeout}s, Attempts: {attempts}, "
f"Interval: {interval}s{diag}"
)
Pattern 2: Callback Capture
Spin up a temporary HTTP server, register it as a webhook target, and capture incoming calls.
# See webhook-and-sqs-testing.md for the WebhookCapture implementation
def test_payment_webhook(webhook_server, payment_service):
"""Verify payment completion triggers a webhook."""
payment_service.register_webhook(
url=f"{webhook_server.url}/payment-complete",
events=["payment.completed"]
)
payment_service.process_payment(order_id="ord-1", amount=99.99)
call = webhook_server.wait_for_call(timeout=15)
assert call is not None, "Payment webhook not received"
assert call["body"]["order_id"] == "ord-1"
assert call["body"]["amount"] == 99.99
assert call["body"]["status"] == "completed"
When to use: Webhook testing. Verify that an external system calls your endpoint.
Pattern 3: Queue Drain
Read from a message queue until all expected events have arrived or timeout.
def drain_queue(queue_url, expected_count, timeout=30):
"""Read messages from a queue until expected count reached or timeout."""
messages = []
deadline = time.time() + timeout
while len(messages) < expected_count and time.time() < deadline:
batch = poll_sqs(queue_url, timeout=2)
if batch:
messages.append(batch)
return messages
# Usage
def test_order_generates_three_events(api_client, sqs_queues):
"""Creating an order should produce events to billing, shipping, and inventory."""
api_client.post("/api/orders", json=ORDER_PAYLOAD)
# Drain all three queues
billing = drain_queue(sqs_queues["billing"], expected_count=1, timeout=10)
shipping = drain_queue(sqs_queues["shipping"], expected_count=1, timeout=10)
inventory = drain_queue(sqs_queues["inventory"], expected_count=1, timeout=10)
assert len(billing) == 1, "Missing billing event"
assert len(shipping) == 1, "Missing shipping event"
assert len(inventory) == 1, "Missing inventory event"
When to use: Event-driven systems where one action produces events to multiple queues.
Pattern 4: Trace Correlation
Use a correlation ID to trace an event chain across multiple services.
import uuid
def test_order_flow_end_to_end(api_client, trace_store):
"""Trace an order through the entire event pipeline."""
correlation_id = str(uuid.uuid4())
# Initiate the flow
response = api_client.post("/api/orders", json={
**ORDER_PAYLOAD,
"correlation_id": correlation_id,
})
assert response.status_code == 201
# Wait for all services to process
time.sleep(15)
# Query the trace store for all events with this correlation ID
traces = trace_store.get_traces(correlation_id)
# Verify the expected event chain
event_types = [t["event_type"] for t in traces]
assert "order.created" in event_types
assert "payment.initiated" in event_types
assert "payment.completed" in event_types
assert "inventory.reserved" in event_types
assert "shipping.label_created" in event_types
assert "notification.sent" in event_types
# Verify ordering
order_idx = event_types.index("order.created")
payment_idx = event_types.index("payment.completed")
shipping_idx = event_types.index("shipping.label_created")
assert order_idx < payment_idx < shipping_idx, (
f"Events out of order: order@{order_idx}, "
f"payment@{payment_idx}, shipping@{shipping_idx}"
)
When to use: Complex flows spanning multiple services. Verifying the entire chain completed correctly.
Pattern 5: Snapshot Comparison
Compare database state before and after an event to verify the event's side effects.
def test_inventory_updated_after_order(db, kafka_producer):
"""Order event should decrement inventory."""
# Snapshot before
before = db.get_inventory("prod-789")
assert before.quantity == 100
# Send order event
kafka_producer.send("order.created", {
"order_id": "ord-snap",
"items": [{"product_id": "prod-789", "quantity": 3}]
})
# Wait for processing
wait_for_condition(
lambda: db.get_inventory("prod-789").quantity != 100,
timeout=10,
message="Inventory was not updated"
)
# Snapshot after
after = db.get_inventory("prod-789")
assert after.quantity == 97, (
f"Expected 97 (100 - 3), got {after.quantity}"
)
When to use: State verification. Confirming that events correctly modify persistent state.
Pattern Selection Guide
| Pattern | When to Use | Complexity | Reliability |
|---|---|---|---|
| Poll and wait | Simple state check after event | Low | High |
| Callback capture | Webhook verification | Medium | High |
| Queue drain | Multi-queue event verification | Medium | Medium |
| Trace correlation | Cross-service event chains | High | Medium |
| Snapshot comparison | Database state verification | Low | High |
Common Pitfalls in Async Testing
Pitfall 1: Hardcoded sleep()
# BAD: hardcoded sleep
kafka_producer.send("order.created", event)
time.sleep(5) # Hope it processed in 5 seconds
order = db.get_order("ord-1")
assert order is not None
# GOOD: poll with timeout
kafka_producer.send("order.created", event)
order = wait_for_condition(
lambda: db.get_order("ord-1"),
timeout=10
)
assert order is not None
Pitfall 2: Not Cleaning Up Queues Between Tests
@pytest.fixture(autouse=True)
def clean_queues(sqs_queues):
"""Drain all queues before each test to prevent cross-contamination."""
for queue_url in sqs_queues.values():
drain_queue(queue_url, expected_count=100, timeout=2)
yield
Pitfall 3: Ignoring Message Ordering
When testing ordering, always verify with indexed assertions, not just presence checks:
# BAD: only checks presence
assert "order.created" in events
assert "payment.completed" in events
# GOOD: checks order
assert events.index("order.created") < events.index("payment.completed")
Key Takeaway
Asynchronous testing requires explicit patterns for waiting, capturing, and verifying. The five patterns (poll-and-wait, callback capture, queue drain, trace correlation, snapshot comparison) cover the full range of async testing scenarios. Choose based on system architecture and verify not just that events arrive, but that they arrive in the correct order with the correct content.