Schema Drift Detection
The Problem
API documentation says one thing, the implementation does another. This is schema drift -- one of the most common sources of integration bugs. A field gets renamed, an enum gains a value, a nullable field becomes required, and downstream consumers break silently because the documentation (and therefore the consumer tests) are stale.
AI-Powered Drift Detection
from dataclasses import dataclass
@dataclass
class Drift:
type: str # UNDOCUMENTED_FIELD, MISSING_FIELD, TYPE_MISMATCH, etc.
field: str
severity: str # HIGH, MEDIUM, LOW
message: str
class SchemaDriftDetector:
"""Detect differences between documented API schema and actual behavior."""
def __init__(self, openapi_spec: dict, base_url: str):
self.spec = openapi_spec
self.base_url = base_url
def detect_response_drift(self, endpoint: str, method: str) -> list[Drift]:
"""Compare actual API response against documented schema."""
# Get documented schema
documented = self.spec["paths"][endpoint][method]["responses"]["200"]
documented_schema = documented["content"]["application/json"]["schema"]
# Get actual response
response = httpx.request(method.upper(), f"{self.base_url}{endpoint}")
actual_body = response.json()
drifts = []
# Check for undocumented fields
actual_fields = set(self._flatten_keys(actual_body))
documented_fields = set(self._flatten_keys(documented_schema.get("properties", {})))
undocumented = actual_fields - documented_fields
for field in undocumented:
drifts.append(Drift(
type="UNDOCUMENTED_FIELD",
field=field,
severity="LOW",
message=f"Field '{field}' present in response but not in schema"
))
# Check for missing documented fields
missing = documented_fields - actual_fields
for field in missing:
required = field in documented_schema.get("required", [])
drifts.append(Drift(
type="MISSING_FIELD",
field=field,
severity="HIGH" if required else "LOW",
message=f"Field '{field}' documented but missing from response"
))
# Check for type mismatches
for field in actual_fields & documented_fields:
actual_type = type(actual_body.get(field)).__name__
doc_type = documented_schema["properties"].get(field, {}).get("type")
if doc_type and not self._types_match(actual_type, doc_type):
drifts.append(Drift(
type="TYPE_MISMATCH",
field=field,
severity="HIGH",
message=f"Field '{field}': documented as '{doc_type}', "
f"actual is '{actual_type}'"
))
return drifts
def _flatten_keys(self, obj, prefix="") -> list[str]:
"""Flatten a nested dict/schema into dot-notation keys."""
keys = []
if isinstance(obj, dict):
for key in obj:
full_key = f"{prefix}.{key}" if prefix else key
keys.append(full_key)
return keys
def _types_match(self, python_type: str, openapi_type: str) -> bool:
"""Check if a Python type matches an OpenAPI type."""
mapping = {
"str": "string",
"int": "integer",
"float": "number",
"bool": "boolean",
"list": "array",
"dict": "object",
"NoneType": "null",
}
return mapping.get(python_type) == openapi_type
Drift Categories and Severity
| Drift Type | Severity | Example | Impact |
|---|---|---|---|
| Missing required field | HIGH | id documented as required but absent |
Consumer deserialization failure |
| Type mismatch | HIGH | price documented as number, actual is string |
Type errors in consumers |
| Undocumented field | LOW | _internal_id in response but not in schema |
May leak internal data |
| New enum value | MEDIUM | status returns "archived" but schema only lists "active|inactive" |
Consumer switch/case falls through |
| Changed field name | HIGH | user_name renamed to username |
All consumers break |
| Deprecated field removed | MEDIUM | legacy_id removed without notice |
Older consumers break |
| Nullable field undocumented | MEDIUM | description returns null but schema says required |
Null pointer in consumers |
Automated Drift Detection in CI
# .github/workflows/schema-drift.yml
name: Schema Drift Detection
on:
schedule:
- cron: '0 6 * * *' # Daily at 6 AM
push:
paths:
- 'docs/openapi.yaml'
jobs:
detect-drift:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Start staging environment
run: docker-compose up -d
- name: Wait for API
run: |
for i in $(seq 1 30); do
curl -s http://localhost:8080/health && break
sleep 2
done
- name: Run drift detection
run: |
python -m schema_drift_detector \
--spec docs/openapi.yaml \
--base-url http://localhost:8080 \
--output drift-report.json
- name: Fail on high-severity drift
run: |
HIGH_COUNT=$(jq '[.drifts[] | select(.severity=="HIGH")] | length' drift-report.json)
if [ "$HIGH_COUNT" -gt 0 ]; then
echo "Found $HIGH_COUNT high-severity schema drifts:"
jq '.drifts[] | select(.severity=="HIGH")' drift-report.json
exit 1
fi
- name: Upload drift report
if: always()
uses: actions/upload-artifact@v4
with:
name: schema-drift-report
path: drift-report.json
Comprehensive Drift Scanner
class FullDriftScanner:
"""Scan all endpoints for all types of drift."""
def __init__(self, spec: dict, base_url: str, auth_token: str = None):
self.spec = spec
self.base_url = base_url
self.headers = {}
if auth_token:
self.headers["Authorization"] = f"Bearer {auth_token}"
def scan_all(self) -> dict:
"""Scan every endpoint in the spec."""
report = {
"timestamp": datetime.now().isoformat(),
"base_url": self.base_url,
"endpoints_scanned": 0,
"drifts": [],
}
for path, methods in self.spec["paths"].items():
for method, details in methods.items():
if method not in ("get", "post", "put", "patch", "delete"):
continue
report["endpoints_scanned"] += 1
try:
drifts = self.check_endpoint(path, method, details)
report["drifts"].extend(drifts)
except Exception as e:
report["drifts"].append(Drift(
type="SCAN_ERROR",
field=f"{method.upper()} {path}",
severity="MEDIUM",
message=f"Could not scan: {e}"
))
return report
def check_endpoint(self, path, method, details) -> list[Drift]:
"""Check a single endpoint for drift."""
drifts = []
# Check response schema drift
if "200" in details.get("responses", {}):
response_drifts = self.check_response_drift(path, method, details)
drifts.extend(response_drifts)
# Check if documented error codes are actually returned
for status_code in details.get("responses", {}):
if status_code.startswith("4") or status_code.startswith("5"):
# Attempt to trigger this error code
pass # Requires endpoint-specific logic
return drifts
Drift Prevention: Schema Validation Middleware
The best way to prevent drift is to validate responses against the schema at runtime:
# Express.js middleware example
from openapi_core import OpenAPI
from openapi_core.validation.response import V31ResponseValidator
class SchemaValidationMiddleware:
"""Validate every API response against the OpenAPI schema."""
def __init__(self, spec_path: str, enforce: bool = False):
self.openapi = OpenAPI.from_file_path(spec_path)
self.enforce = enforce # True = block invalid responses
def validate_response(self, request, response):
result = self.openapi.validate_response(request, response)
if result.errors:
for error in result.errors:
logger.warning(f"Schema drift detected: {error}")
if self.enforce:
return {"error": "Response does not match API schema"}, 500
return response
Run this middleware in staging (enforce=False, log only) to continuously detect drift without impacting production.
Key Takeaway
Schema drift is the silent killer of microservice integrations. AI-powered drift detection compares your OpenAPI documentation against actual API behavior, automatically categorizes findings by severity, and runs in CI to catch drift before it reaches production. The combination of daily automated scanning and schema validation middleware provides comprehensive protection against the "docs say X but API does Y" class of bugs.