Structured Logging Best Practices
Why Structured Logging Matters for QA
Unstructured logs are human-readable but machine-hostile. When a production incident occurs at 3 AM, you need to query logs programmatically -- filter by service, time range, error type, and user ID. Structured logging outputs events in a parseable format (JSON), enabling automated analysis, filtering, and correlation that is impossible with free-text logs.
Unstructured vs. Structured Logging
# BAD: Unstructured -- human-readable, machine-hostile
2026-01-15 14:23:45 ERROR Failed to process order for user john@example.com: timeout after 30s
# GOOD: Structured JSON -- machine-parseable, human-readable with tooling
{
"timestamp": "2026-01-15T14:23:45.123Z",
"level": "error",
"service": "order-service",
"event": "order_processing_failed",
"user_id": "usr_42",
"order_id": "ord_789",
"error_type": "timeout",
"timeout_seconds": 30,
"downstream_service": "payment-service",
"trace_id": "abc123def456",
"span_id": "span_789",
"environment": "production",
"version": "2.4.1"
}
The structured version enables queries like:
- "Show all timeout errors in the last hour for the payment-service dependency"
- "Count errors by type for user usr_42"
- "Correlate this error with the trace abc123def456 to see the full request flow"
Implementing Structured Logging (Python)
# structured_logging_setup.py
import structlog
import logging
# Configure structlog for JSON output
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars, # thread-safe context
structlog.processors.add_log_level, # add "level" field
structlog.processors.StackInfoRenderer(), # include stack traces
structlog.dev.set_exc_info, # attach exception info
structlog.processors.TimeStamper(fmt="iso"), # ISO 8601 timestamps
structlog.processors.JSONRenderer(), # output as JSON
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
)
log = structlog.get_logger()
def process_order(user_id: str, order_id: str, items: list):
# Bind context that will appear in ALL subsequent log entries
log_ctx = log.bind(user_id=user_id, order_id=order_id, item_count=len(items))
log_ctx.info("order_processing_started")
try:
total = calculate_total(items)
log_ctx.info("order_total_calculated", total_cents=total)
payment_result = charge_payment(user_id, total)
log_ctx.info("payment_processed",
payment_id=payment_result.id,
payment_method=payment_result.method)
reserve_inventory(items)
log_ctx.info("inventory_reserved")
log_ctx.info("order_processing_completed", duration_ms=elapsed())
except PaymentTimeoutError as e:
log_ctx.error("payment_timeout",
downstream_service="payment-service",
timeout_seconds=e.timeout,
retry_count=e.retries)
raise
except InsufficientInventoryError as e:
log_ctx.warning("inventory_insufficient",
missing_items=e.missing_items,
available=e.available)
raise
Node.js/TypeScript Example
// structured-logger.ts
import pino from 'pino';
const logger = pino({
level: process.env.LOG_LEVEL || 'info',
formatters: {
level: (label) => ({ level: label }),
},
timestamp: pino.stdTimeFunctions.isoTime,
base: {
service: 'order-service',
version: process.env.APP_VERSION,
environment: process.env.NODE_ENV,
},
});
export function processOrder(userId: string, orderId: string, items: Item[]) {
const orderLog = logger.child({ userId, orderId, itemCount: items.length });
orderLog.info('order_processing_started');
try {
const total = calculateTotal(items);
orderLog.info({ totalCents: total }, 'order_total_calculated');
const payment = chargePayment(userId, total);
orderLog.info({ paymentId: payment.id, method: payment.method }, 'payment_processed');
orderLog.info('order_processing_completed');
} catch (err) {
orderLog.error({ err, downstreamService: 'payment-service' }, 'order_processing_failed');
throw err;
}
}
Essential Fields for Every Log Entry
| Field | Required? | Purpose |
|---|---|---|
timestamp |
Yes | When the event occurred (ISO 8601) |
level |
Yes | Severity (debug, info, warn, error) |
service |
Yes | Which service emitted the log |
event |
Yes | Machine-readable event name (snake_case) |
trace_id |
Yes | Correlation with distributed traces |
span_id |
Recommended | Correlation with specific trace span |
environment |
Recommended | production, staging, development |
version |
Recommended | Application version for deployment correlation |
user_id |
When available | User attribution (hashed for PII compliance) |
request_id |
Recommended | Correlation within a single request |
Logging Dos and Don'ts for Testability
| Do | Don't |
|---|---|
| Log at every significant state transition | Log raw request/response bodies (PII risk) |
| Include correlation IDs (trace_id, request_id) | Use string interpolation for log messages |
| Use consistent event names across services | Log at DEBUG level in production |
| Include timing data for operations | Log secrets, tokens, or passwords |
| Separate business events from technical events | Create logs only useful during development |
| Use structured fields for every variable value | Embed values in the message string |
Anti-Pattern: Values in Message Strings
# BAD -- values embedded in string, impossible to query
log.info(f"User {user_id} placed order {order_id} for ${total}")
# GOOD -- values as structured fields, queryable
log.info("order_placed", user_id=user_id, order_id=order_id, total_cents=total)
Log-Based Testing
Structured logs enable a powerful testing pattern: assert on log output to verify system behavior:
# test_order_logging.py
import json
def test_order_flow_produces_expected_logs(capsys, order_service):
"""Verify the order flow logs all expected events."""
order_service.process_order("user_1", "order_1", [{"sku": "A", "qty": 1}])
captured = capsys.readouterr()
log_lines = [json.loads(line) for line in captured.out.strip().split('\n')]
# Verify expected events are logged in order
events = [log["event"] for log in log_lines]
assert "order_processing_started" in events
assert "order_total_calculated" in events
assert "payment_processed" in events
assert "inventory_reserved" in events
assert "order_processing_completed" in events
# Verify context is propagated
for log in log_lines:
assert log["user_id"] == "user_1"
assert log["order_id"] == "order_1"
# Verify timing data is present
completed = next(l for l in log_lines if l["event"] == "order_processing_completed")
assert "duration_ms" in completed
assert completed["duration_ms"] > 0
Log Aggregation Architecture
[Service A] [Service B] [Service C]
stdout stdout stdout
| | |
v v v
[Log Collector: Fluentd / Fluent Bit / Vector]
|
v
[Log Storage: Elasticsearch / Loki / CloudWatch Logs]
|
v
[Query UI: Kibana / Grafana / CloudWatch Insights]
|
v
[Alerting: based on log patterns and frequency]
Tool Recommendations
| Component | Recommended | Alternative |
|---|---|---|
| Collection | Fluent Bit (low resource) | Fluentd, Vector |
| Storage | Grafana Loki (cost-effective) | Elasticsearch, CloudWatch |
| Query | Grafana (LogQL) | Kibana (KQL), CloudWatch Insights |
| Alerting | Grafana Alerts | ElastAlert, CloudWatch Alarms |
PII in Logs: A Testing Concern
QA architects must verify that production logs do not contain personally identifiable information (PII). Add automated PII detection to your log pipeline:
# pii_log_scanner.py
import re
PII_PATTERNS = {
"email": re.compile(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}"),
"phone": re.compile(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b"),
"ssn": re.compile(r"\b\d{3}-\d{2}-\d{4}\b"),
"credit_card": re.compile(r"\b(?:\d{4}[-\s]?){3}\d{4}\b"),
}
def scan_log_entry(log_json: dict) -> list:
"""Scan a structured log entry for PII."""
findings = []
text = json.dumps(log_json)
for pii_type, pattern in PII_PATTERNS.items():
matches = pattern.findall(text)
real_matches = [m for m in matches if "example" not in m and "test" not in m]
if real_matches:
findings.append({"type": pii_type, "count": len(real_matches)})
return findings
Structured logging is not just a developer convenience -- it is a prerequisite for every other observability practice in this chapter. Invest in it early.