Webhook and SQS/EventBridge Testing
Testing AWS Event-Driven Services
AWS EventBridge, SQS, and SNS are common building blocks for event-driven architectures. Testing them requires verifying event routing rules, queue delivery, and cross-service message propagation.
EventBridge: Event Routing Tests
import json
import boto3
import pytest
class TestEventBridgeIntegration:
"""Test AWS EventBridge event routing and processing."""
@pytest.fixture
def eventbridge(self):
return boto3.client('events', region_name='us-east-1')
@pytest.fixture
def sqs_queues(self):
sqs = boto3.client('sqs', region_name='us-east-1')
return {
"billing": sqs.get_queue_url(QueueName='billing-queue')['QueueUrl'],
"shipping": sqs.get_queue_url(QueueName='shipping-queue')['QueueUrl'],
"fraud-check": sqs.get_queue_url(QueueName='fraud-check-queue')['QueueUrl'],
}
def test_order_event_routes_to_correct_targets(self, eventbridge, sqs_queues):
"""Verify that order events route to billing and shipping queues."""
event = {
"Source": "orders.service",
"DetailType": "OrderPlaced",
"Detail": json.dumps({
"order_id": "ord-route-test",
"total": 150.00,
"shipping_method": "express"
})
}
eventbridge.put_events(Entries=[event])
# Check billing queue received the event
billing_msg = poll_sqs(sqs_queues["billing"], timeout=10)
assert billing_msg is not None, (
"Billing queue did not receive OrderPlaced event"
)
assert json.loads(billing_msg["Body"])["detail"]["order_id"] == "ord-route-test"
# Check shipping queue received the event
shipping_msg = poll_sqs(sqs_queues["shipping"], timeout=10)
assert shipping_msg is not None, (
"Shipping queue did not receive OrderPlaced event"
)
def test_event_filtering_rules(self, eventbridge, sqs_queues):
"""High-value orders should also route to the fraud-check queue."""
high_value_event = {
"Source": "orders.service",
"DetailType": "OrderPlaced",
"Detail": json.dumps({
"order_id": "ord-hv",
"total": 5000.00
})
}
low_value_event = {
"Source": "orders.service",
"DetailType": "OrderPlaced",
"Detail": json.dumps({
"order_id": "ord-lv",
"total": 25.00
})
}
eventbridge.put_events(Entries=[high_value_event, low_value_event])
# Fraud queue should only get the high-value order
fraud_msgs = poll_sqs_all(sqs_queues["fraud-check"], timeout=10)
fraud_order_ids = [
json.loads(m["Body"])["detail"]["order_id"] for m in fraud_msgs
]
assert "ord-hv" in fraud_order_ids, "High-value order missing from fraud queue"
assert "ord-lv" not in fraud_order_ids, "Low-value order incorrectly in fraud queue"
def test_event_schema_validation(self, eventbridge):
"""Events with missing required fields should be rejected or routed to DLQ."""
invalid_event = {
"Source": "orders.service",
"DetailType": "OrderPlaced",
"Detail": json.dumps({
# Missing order_id and total
"shipping_method": "standard"
})
}
response = eventbridge.put_events(Entries=[invalid_event])
# EventBridge accepts all events but routing rules may filter
# Check that the event did not reach any processing queue
# or was routed to a validation-error queue
SQS Polling Helpers
import time
import boto3
def poll_sqs(queue_url: str, timeout: int = 10) -> dict | None:
"""Poll an SQS queue for a single message with timeout."""
sqs = boto3.client('sqs', region_name='us-east-1')
deadline = time.time() + timeout
while time.time() < deadline:
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1,
WaitTimeSeconds=2,
)
messages = response.get("Messages", [])
if messages:
# Delete the message so it does not interfere with other tests
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=messages[0]["ReceiptHandle"]
)
return messages[0]
return None
def poll_sqs_all(queue_url: str, timeout: int = 10) -> list[dict]:
"""Poll all available messages from an SQS queue."""
sqs = boto3.client('sqs', region_name='us-east-1')
all_messages = []
deadline = time.time() + timeout
while time.time() < deadline:
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=1,
)
messages = response.get("Messages", [])
if not messages:
break
all_messages.extend(messages)
for msg in messages:
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=msg["ReceiptHandle"]
)
return all_messages
Webhook Testing
Webhooks require a different approach: you spin up a temporary HTTP server, register it as a webhook target, trigger the event, and verify the webhook was called.
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
import json
class WebhookCapture:
"""Temporary HTTP server that captures webhook calls."""
def __init__(self, port: int = 0):
self.received = []
self._server = None
self._thread = None
self.port = port
def start(self):
capture = self
class Handler(BaseHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length)
capture.received.append({
"path": self.path,
"headers": dict(self.headers),
"body": json.loads(body) if body else None,
"timestamp": time.time(),
})
self.send_response(200)
self.end_headers()
def log_message(self, *args):
pass # Suppress request logging
self._server = HTTPServer(("0.0.0.0", self.port), Handler)
self.port = self._server.server_address[1]
self._thread = threading.Thread(target=self._server.serve_forever, daemon=True)
self._thread.start()
return self
def stop(self):
if self._server:
self._server.shutdown()
def wait_for_call(self, timeout: int = 10) -> dict | None:
deadline = time.time() + timeout
while time.time() < deadline:
if self.received:
return self.received[-1]
time.sleep(0.1)
return None
@property
def url(self):
return f"http://localhost:{self.port}"
class TestWebhookDelivery:
"""Test webhook delivery for order events."""
@pytest.fixture
def webhook_server(self):
server = WebhookCapture().start()
yield server
server.stop()
def test_order_webhook_delivered(self, api_client, webhook_server, admin_token):
"""Creating an order should trigger a webhook to registered URL."""
# Register webhook
api_client.post("/api/webhooks", json={
"url": f"{webhook_server.url}/hooks/orders",
"events": ["order.created"],
}, headers={"Authorization": f"Bearer {admin_token}"})
# Create an order (triggers the webhook)
api_client.post("/api/orders", json={
"items": [{"product_id": "prod-1", "quantity": 1}],
"idempotency_key": "idem-1",
}, headers={"Authorization": f"Bearer {admin_token}"})
# Verify webhook was called
call = webhook_server.wait_for_call(timeout=10)
assert call is not None, "Webhook was not delivered within 10s"
assert call["path"] == "/hooks/orders"
assert call["body"]["event_type"] == "order.created"
assert call["body"]["data"]["items"][0]["product_id"] == "prod-1"
def test_webhook_retry_on_failure(self, api_client, admin_token):
"""Webhooks should retry when the target returns a 5xx error."""
# Register webhook pointing to a non-existent server
api_client.post("/api/webhooks", json={
"url": "http://localhost:19999/will-fail",
"events": ["order.created"],
}, headers={"Authorization": f"Bearer {admin_token}"})
# Create an order
api_client.post("/api/orders", json={
"items": [{"product_id": "prod-1", "quantity": 1}],
"idempotency_key": "idem-retry",
}, headers={"Authorization": f"Bearer {admin_token}"})
# Check webhook delivery log shows retry attempts
time.sleep(15) # Wait for retry cycle
log = api_client.get("/api/webhooks/delivery-log",
headers={"Authorization": f"Bearer {admin_token}"})
deliveries = log.json()["deliveries"]
failed = [d for d in deliveries if d["status"] == "failed"]
assert len(failed) >= 2, "Expected at least 2 retry attempts"
def test_webhook_signature_verification(self, webhook_server, api_client, admin_token):
"""Webhook payloads should include a signature header for verification."""
api_client.post("/api/webhooks", json={
"url": f"{webhook_server.url}/hooks/signed",
"events": ["order.created"],
"secret": "webhook-secret-123",
}, headers={"Authorization": f"Bearer {admin_token}"})
api_client.post("/api/orders", json={
"items": [{"product_id": "prod-1", "quantity": 1}],
"idempotency_key": "idem-signed",
}, headers={"Authorization": f"Bearer {admin_token}"})
call = webhook_server.wait_for_call(timeout=10)
assert call is not None
assert "X-Webhook-Signature" in call["headers"], (
"Webhook signature header is missing"
)
Key Takeaway
Webhook and SQS/EventBridge testing requires purpose-built infrastructure: temporary HTTP servers for webhook capture, SQS polling helpers for queue verification, and event routing assertions for EventBridge rules. The patterns are reusable across projects -- the WebhookCapture class and SQS polling functions can be extracted into a shared test library.