Fuzzing Implementation
Building a Complete Semantic Fuzzer
This file provides the full implementation of an AI-powered semantic fuzzer that you can adapt for your API testing needs.
The SemanticFuzzer Class
import json
import httpx
import time
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class FuzzPayload:
payload: dict
category: str
expected_vulnerability: str
@dataclass
class FuzzResult:
payload: FuzzPayload
response_status: int
response_body: str
response_time_ms: float
anomaly: Optional[str] = None
severity: Optional[str] = None
@dataclass
class CampaignReport:
campaign_id: str
endpoint: str
method: str
total_payloads: int
anomalies_found: int
findings: list[FuzzResult]
duration_seconds: float
token_cost: float = 0.0
class SemanticFuzzer:
def __init__(self, llm, schema: dict, base_url: str, auth_headers: dict = None):
self.llm = llm
self.schema = schema
self.base_url = base_url
self.auth_headers = auth_headers or {}
def generate_fuzz_payloads(self, endpoint: str, count: int = 50) -> list[FuzzPayload]:
"""Generate semantically meaningful fuzz payloads."""
endpoint_schema = self.schema["paths"].get(endpoint, {})
prompt = f"""
Given this API endpoint schema:
{json.dumps(endpoint_schema, indent=2)}
Generate {count} test payloads designed to find bugs. Categories:
1. SQL INJECTION (5 payloads)
- In string fields, include SQL fragments
2. XSS (5 payloads)
- In string fields, include HTML/JS
3. BOUNDARY VALUES (10 payloads)
- Min/max for numbers, empty/max-length strings
4. TYPE CONFUSION (10 payloads)
- String where number expected, array where object expected
5. UNICODE EDGE CASES (5 payloads)
- RTL characters, zero-width joiners, emoji, null bytes
6. BUSINESS LOGIC (10 payloads)
- Valid structure but nonsensical values (negative quantity,
price of $0.001, dates in the past)
7. AUTHORIZATION BYPASS (5 payloads)
- Payloads that try to escalate privileges or access
other users' data (IDOR patterns)
Output as a JSON array of objects. Each object has:
- "payload": the request body
- "category": which category above
- "expected_vulnerability": what bug this might find
"""
raw = self.llm.generate_json(prompt)
return [FuzzPayload(**item) for item in raw]
def execute_fuzz_campaign(
self, endpoint: str, method: str = "POST", timeout: float = 10.0
) -> CampaignReport:
"""Run a complete fuzz campaign against an endpoint."""
start_time = time.time()
payloads = self.generate_fuzz_payloads(endpoint)
results = []
for payload_info in payloads:
try:
req_start = time.time()
response = httpx.request(
method,
f"{self.base_url}{endpoint}",
json=payload_info.payload,
headers=self.auth_headers,
timeout=timeout,
)
response_time = (time.time() - req_start) * 1000
anomaly = self.detect_anomaly(response, payload_info)
severity = self.classify_severity(anomaly, payload_info) if anomaly else None
result = FuzzResult(
payload=payload_info,
response_status=response.status_code,
response_body=response.text[:500],
response_time_ms=response_time,
anomaly=anomaly,
severity=severity,
)
results.append(result)
except httpx.TimeoutException:
results.append(FuzzResult(
payload=payload_info,
response_status=0,
response_body="TIMEOUT",
response_time_ms=timeout * 1000,
anomaly=f"Request timeout on {payload_info.category} payload",
severity="MEDIUM",
))
except Exception as e:
results.append(FuzzResult(
payload=payload_info,
response_status=0,
response_body=str(e),
response_time_ms=0,
anomaly=f"Request exception: {type(e).__name__}: {e}",
severity="HIGH",
))
anomaly_results = [r for r in results if r.anomaly]
return CampaignReport(
campaign_id=f"fuzz-{int(time.time())}",
endpoint=endpoint,
method=method,
total_payloads=len(payloads),
anomalies_found=len(anomaly_results),
findings=anomaly_results,
duration_seconds=time.time() - start_time,
)
def detect_anomaly(self, response, payload_info: FuzzPayload) -> Optional[str]:
"""Detect if the response indicates a potential vulnerability."""
# 500 errors on any input = server-side failure
if response.status_code >= 500:
return f"Server error ({response.status_code}) on {payload_info.category} payload"
# XSS reflection
if payload_info.category == "XSS":
dangerous_patterns = ["<script>", "onerror=", "javascript:", "onclick="]
if any(p in response.text for p in dangerous_patterns):
return "XSS reflection detected in response"
# SQL error disclosure
sql_indicators = ["syntax error", "mysql", "postgresql", "sqlite",
"ORA-", "SQLSTATE", "unterminated"]
if any(ind in response.text.lower() for ind in sql_indicators):
return "SQL error message disclosed in response"
# Stack trace leak
if "Traceback" in response.text or "at Object." in response.text:
return "Stack trace leaked in response body"
# Timeout on specific payload (potential DoS)
if hasattr(response, 'elapsed') and response.elapsed.total_seconds() > 5:
return f"Slow response ({response.elapsed.total_seconds():.1f}s) on {payload_info.category} payload"
return None
def classify_severity(self, anomaly: str, payload: FuzzPayload) -> str:
"""Classify the severity of a detected anomaly."""
if "XSS reflection" in anomaly:
return "HIGH"
if "SQL error" in anomaly:
return "HIGH"
if "Stack trace" in anomaly:
return "MEDIUM"
if "Server error" in anomaly:
if payload.category in ["SQL_INJECTION", "AUTHORIZATION_BYPASS"]:
return "HIGH"
return "MEDIUM"
if "Slow response" in anomaly:
return "LOW"
if "Timeout" in anomaly:
return "MEDIUM"
return "LOW"
Running Fuzzing in CI
# .github/workflows/api-fuzz.yml
name: API Fuzzing
on:
schedule:
- cron: '0 2 * * *' # Nightly at 2 AM
jobs:
fuzz:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Start staging environment
run: docker-compose up -d
- name: Wait for API readiness
run: |
for i in $(seq 1 30); do
curl -s http://localhost:8080/health && break
sleep 2
done
- name: Run fuzz campaign
run: |
python -m api_fuzzer \
--schema docs/openapi.yaml \
--base-url http://localhost:8080 \
--output fuzz-report.json \
--endpoints "/api/v2/products,/api/v2/orders"
- name: Check for critical findings
run: |
CRITICAL=$(jq '[.findings[] | select(.severity=="HIGH" or .severity=="CRITICAL")] | length' fuzz-report.json)
if [ "$CRITICAL" -gt 0 ]; then
echo "::error::Found $CRITICAL high/critical findings"
jq '.findings[] | select(.severity=="HIGH" or .severity=="CRITICAL")' fuzz-report.json
exit 1
fi
- name: Upload fuzz report
if: always()
uses: actions/upload-artifact@v4
with:
name: fuzz-report
path: fuzz-report.json
Fuzz Campaign Scheduling
| Frequency | Scope | Budget | Purpose |
|---|---|---|---|
| Per PR | Changed endpoints only | 10 payloads | Quick sanity check |
| Nightly | All endpoints | 50 payloads each | Comprehensive coverage |
| Weekly | All endpoints + new categories | 100 payloads | Deep exploration |
| Quarterly | Full security audit scope | 500+ payloads | Compliance/audit |
Interpreting Results
False Positive Reduction
Not every anomaly is a real vulnerability. Common false positives:
| Anomaly | Often a False Positive When | Action |
|---|---|---|
| 500 error on type confusion | API returns 500 instead of 400 for bad types | File as code quality issue, not security |
| Slow response on large payload | API legitimately processes large data | Increase timeout threshold |
| XSS "reflection" in error message | Error message includes the input value | Check if HTML is escaped in the response |
Triage Workflow
1. Sort findings by severity (CRITICAL → HIGH → MEDIUM → LOW)
2. For each HIGH/CRITICAL:
- Reproduce manually with curl
- Verify it is not a false positive
- File a security bug if confirmed
3. For each MEDIUM:
- Reproduce, assess business impact
- File as bug or tech debt based on impact
4. For each LOW:
- Log for awareness
- Fix opportunistically (do not block releases)
Key Takeaway
A complete semantic fuzzer combines AI-generated payloads with automated anomaly detection and severity classification. Run it nightly in CI against staging, triage findings by severity, and use the results to identify security vulnerabilities that traditional testing misses. The implementation is straightforward -- the value comes from the AI's ability to generate contextually relevant attack payloads.