QA Engineer Skills 2026QA-2026Webhook and SQS/EventBridge Testing

Webhook and SQS/EventBridge Testing

Testing AWS Event-Driven Services

AWS EventBridge, SQS, and SNS are common building blocks for event-driven architectures. Testing them requires verifying event routing rules, queue delivery, and cross-service message propagation.


EventBridge: Event Routing Tests

import json
import boto3
import pytest

class TestEventBridgeIntegration:
    """Test AWS EventBridge event routing and processing."""

    @pytest.fixture
    def eventbridge(self):
        return boto3.client('events', region_name='us-east-1')

    @pytest.fixture
    def sqs_queues(self):
        sqs = boto3.client('sqs', region_name='us-east-1')
        return {
            "billing": sqs.get_queue_url(QueueName='billing-queue')['QueueUrl'],
            "shipping": sqs.get_queue_url(QueueName='shipping-queue')['QueueUrl'],
            "fraud-check": sqs.get_queue_url(QueueName='fraud-check-queue')['QueueUrl'],
        }

    def test_order_event_routes_to_correct_targets(self, eventbridge, sqs_queues):
        """Verify that order events route to billing and shipping queues."""
        event = {
            "Source": "orders.service",
            "DetailType": "OrderPlaced",
            "Detail": json.dumps({
                "order_id": "ord-route-test",
                "total": 150.00,
                "shipping_method": "express"
            })
        }
        eventbridge.put_events(Entries=[event])

        # Check billing queue received the event
        billing_msg = poll_sqs(sqs_queues["billing"], timeout=10)
        assert billing_msg is not None, (
            "Billing queue did not receive OrderPlaced event"
        )
        assert json.loads(billing_msg["Body"])["detail"]["order_id"] == "ord-route-test"

        # Check shipping queue received the event
        shipping_msg = poll_sqs(sqs_queues["shipping"], timeout=10)
        assert shipping_msg is not None, (
            "Shipping queue did not receive OrderPlaced event"
        )

    def test_event_filtering_rules(self, eventbridge, sqs_queues):
        """High-value orders should also route to the fraud-check queue."""
        high_value_event = {
            "Source": "orders.service",
            "DetailType": "OrderPlaced",
            "Detail": json.dumps({
                "order_id": "ord-hv",
                "total": 5000.00
            })
        }
        low_value_event = {
            "Source": "orders.service",
            "DetailType": "OrderPlaced",
            "Detail": json.dumps({
                "order_id": "ord-lv",
                "total": 25.00
            })
        }

        eventbridge.put_events(Entries=[high_value_event, low_value_event])

        # Fraud queue should only get the high-value order
        fraud_msgs = poll_sqs_all(sqs_queues["fraud-check"], timeout=10)
        fraud_order_ids = [
            json.loads(m["Body"])["detail"]["order_id"] for m in fraud_msgs
        ]
        assert "ord-hv" in fraud_order_ids, "High-value order missing from fraud queue"
        assert "ord-lv" not in fraud_order_ids, "Low-value order incorrectly in fraud queue"

    def test_event_schema_validation(self, eventbridge):
        """Events with missing required fields should be rejected or routed to DLQ."""
        invalid_event = {
            "Source": "orders.service",
            "DetailType": "OrderPlaced",
            "Detail": json.dumps({
                # Missing order_id and total
                "shipping_method": "standard"
            })
        }
        response = eventbridge.put_events(Entries=[invalid_event])
        # EventBridge accepts all events but routing rules may filter
        # Check that the event did not reach any processing queue
        # or was routed to a validation-error queue

SQS Polling Helpers

import time
import boto3

def poll_sqs(queue_url: str, timeout: int = 10) -> dict | None:
    """Poll an SQS queue for a single message with timeout."""
    sqs = boto3.client('sqs', region_name='us-east-1')
    deadline = time.time() + timeout

    while time.time() < deadline:
        response = sqs.receive_message(
            QueueUrl=queue_url,
            MaxNumberOfMessages=1,
            WaitTimeSeconds=2,
        )
        messages = response.get("Messages", [])
        if messages:
            # Delete the message so it does not interfere with other tests
            sqs.delete_message(
                QueueUrl=queue_url,
                ReceiptHandle=messages[0]["ReceiptHandle"]
            )
            return messages[0]

    return None

def poll_sqs_all(queue_url: str, timeout: int = 10) -> list[dict]:
    """Poll all available messages from an SQS queue."""
    sqs = boto3.client('sqs', region_name='us-east-1')
    all_messages = []
    deadline = time.time() + timeout

    while time.time() < deadline:
        response = sqs.receive_message(
            QueueUrl=queue_url,
            MaxNumberOfMessages=10,
            WaitTimeSeconds=1,
        )
        messages = response.get("Messages", [])
        if not messages:
            break
        all_messages.extend(messages)
        for msg in messages:
            sqs.delete_message(
                QueueUrl=queue_url,
                ReceiptHandle=msg["ReceiptHandle"]
            )

    return all_messages

Webhook Testing

Webhooks require a different approach: you spin up a temporary HTTP server, register it as a webhook target, trigger the event, and verify the webhook was called.

from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
import json

class WebhookCapture:
    """Temporary HTTP server that captures webhook calls."""

    def __init__(self, port: int = 0):
        self.received = []
        self._server = None
        self._thread = None
        self.port = port

    def start(self):
        capture = self

        class Handler(BaseHTTPRequestHandler):
            def do_POST(self):
                content_length = int(self.headers.get('Content-Length', 0))
                body = self.rfile.read(content_length)
                capture.received.append({
                    "path": self.path,
                    "headers": dict(self.headers),
                    "body": json.loads(body) if body else None,
                    "timestamp": time.time(),
                })
                self.send_response(200)
                self.end_headers()

            def log_message(self, *args):
                pass  # Suppress request logging

        self._server = HTTPServer(("0.0.0.0", self.port), Handler)
        self.port = self._server.server_address[1]
        self._thread = threading.Thread(target=self._server.serve_forever, daemon=True)
        self._thread.start()
        return self

    def stop(self):
        if self._server:
            self._server.shutdown()

    def wait_for_call(self, timeout: int = 10) -> dict | None:
        deadline = time.time() + timeout
        while time.time() < deadline:
            if self.received:
                return self.received[-1]
            time.sleep(0.1)
        return None

    @property
    def url(self):
        return f"http://localhost:{self.port}"


class TestWebhookDelivery:
    """Test webhook delivery for order events."""

    @pytest.fixture
    def webhook_server(self):
        server = WebhookCapture().start()
        yield server
        server.stop()

    def test_order_webhook_delivered(self, api_client, webhook_server, admin_token):
        """Creating an order should trigger a webhook to registered URL."""
        # Register webhook
        api_client.post("/api/webhooks", json={
            "url": f"{webhook_server.url}/hooks/orders",
            "events": ["order.created"],
        }, headers={"Authorization": f"Bearer {admin_token}"})

        # Create an order (triggers the webhook)
        api_client.post("/api/orders", json={
            "items": [{"product_id": "prod-1", "quantity": 1}],
            "idempotency_key": "idem-1",
        }, headers={"Authorization": f"Bearer {admin_token}"})

        # Verify webhook was called
        call = webhook_server.wait_for_call(timeout=10)
        assert call is not None, "Webhook was not delivered within 10s"
        assert call["path"] == "/hooks/orders"
        assert call["body"]["event_type"] == "order.created"
        assert call["body"]["data"]["items"][0]["product_id"] == "prod-1"

    def test_webhook_retry_on_failure(self, api_client, admin_token):
        """Webhooks should retry when the target returns a 5xx error."""
        # Register webhook pointing to a non-existent server
        api_client.post("/api/webhooks", json={
            "url": "http://localhost:19999/will-fail",
            "events": ["order.created"],
        }, headers={"Authorization": f"Bearer {admin_token}"})

        # Create an order
        api_client.post("/api/orders", json={
            "items": [{"product_id": "prod-1", "quantity": 1}],
            "idempotency_key": "idem-retry",
        }, headers={"Authorization": f"Bearer {admin_token}"})

        # Check webhook delivery log shows retry attempts
        time.sleep(15)  # Wait for retry cycle
        log = api_client.get("/api/webhooks/delivery-log",
                           headers={"Authorization": f"Bearer {admin_token}"})
        deliveries = log.json()["deliveries"]
        failed = [d for d in deliveries if d["status"] == "failed"]
        assert len(failed) >= 2, "Expected at least 2 retry attempts"

    def test_webhook_signature_verification(self, webhook_server, api_client, admin_token):
        """Webhook payloads should include a signature header for verification."""
        api_client.post("/api/webhooks", json={
            "url": f"{webhook_server.url}/hooks/signed",
            "events": ["order.created"],
            "secret": "webhook-secret-123",
        }, headers={"Authorization": f"Bearer {admin_token}"})

        api_client.post("/api/orders", json={
            "items": [{"product_id": "prod-1", "quantity": 1}],
            "idempotency_key": "idem-signed",
        }, headers={"Authorization": f"Bearer {admin_token}"})

        call = webhook_server.wait_for_call(timeout=10)
        assert call is not None
        assert "X-Webhook-Signature" in call["headers"], (
            "Webhook signature header is missing"
        )

Key Takeaway

Webhook and SQS/EventBridge testing requires purpose-built infrastructure: temporary HTTP servers for webhook capture, SQS polling helpers for queue verification, and event routing assertions for EventBridge rules. The patterns are reusable across projects -- the WebhookCapture class and SQS polling functions can be extracted into a shared test library.