QA Engineer Skills 2026QA-2026Fuzzing Implementation

Fuzzing Implementation

Building a Complete Semantic Fuzzer

This file provides the full implementation of an AI-powered semantic fuzzer that you can adapt for your API testing needs.


The SemanticFuzzer Class

import json
import httpx
import time
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class FuzzPayload:
    payload: dict
    category: str
    expected_vulnerability: str

@dataclass
class FuzzResult:
    payload: FuzzPayload
    response_status: int
    response_body: str
    response_time_ms: float
    anomaly: Optional[str] = None
    severity: Optional[str] = None

@dataclass
class CampaignReport:
    campaign_id: str
    endpoint: str
    method: str
    total_payloads: int
    anomalies_found: int
    findings: list[FuzzResult]
    duration_seconds: float
    token_cost: float = 0.0


class SemanticFuzzer:
    def __init__(self, llm, schema: dict, base_url: str, auth_headers: dict = None):
        self.llm = llm
        self.schema = schema
        self.base_url = base_url
        self.auth_headers = auth_headers or {}

    def generate_fuzz_payloads(self, endpoint: str, count: int = 50) -> list[FuzzPayload]:
        """Generate semantically meaningful fuzz payloads."""
        endpoint_schema = self.schema["paths"].get(endpoint, {})

        prompt = f"""
        Given this API endpoint schema:
        {json.dumps(endpoint_schema, indent=2)}

        Generate {count} test payloads designed to find bugs. Categories:

        1. SQL INJECTION (5 payloads)
           - In string fields, include SQL fragments
        2. XSS (5 payloads)
           - In string fields, include HTML/JS
        3. BOUNDARY VALUES (10 payloads)
           - Min/max for numbers, empty/max-length strings
        4. TYPE CONFUSION (10 payloads)
           - String where number expected, array where object expected
        5. UNICODE EDGE CASES (5 payloads)
           - RTL characters, zero-width joiners, emoji, null bytes
        6. BUSINESS LOGIC (10 payloads)
           - Valid structure but nonsensical values (negative quantity,
             price of $0.001, dates in the past)
        7. AUTHORIZATION BYPASS (5 payloads)
           - Payloads that try to escalate privileges or access
             other users' data (IDOR patterns)

        Output as a JSON array of objects. Each object has:
        - "payload": the request body
        - "category": which category above
        - "expected_vulnerability": what bug this might find
        """
        raw = self.llm.generate_json(prompt)
        return [FuzzPayload(**item) for item in raw]

    def execute_fuzz_campaign(
        self, endpoint: str, method: str = "POST", timeout: float = 10.0
    ) -> CampaignReport:
        """Run a complete fuzz campaign against an endpoint."""
        start_time = time.time()
        payloads = self.generate_fuzz_payloads(endpoint)
        results = []

        for payload_info in payloads:
            try:
                req_start = time.time()
                response = httpx.request(
                    method,
                    f"{self.base_url}{endpoint}",
                    json=payload_info.payload,
                    headers=self.auth_headers,
                    timeout=timeout,
                )
                response_time = (time.time() - req_start) * 1000

                anomaly = self.detect_anomaly(response, payload_info)
                severity = self.classify_severity(anomaly, payload_info) if anomaly else None

                result = FuzzResult(
                    payload=payload_info,
                    response_status=response.status_code,
                    response_body=response.text[:500],
                    response_time_ms=response_time,
                    anomaly=anomaly,
                    severity=severity,
                )
                results.append(result)

            except httpx.TimeoutException:
                results.append(FuzzResult(
                    payload=payload_info,
                    response_status=0,
                    response_body="TIMEOUT",
                    response_time_ms=timeout * 1000,
                    anomaly=f"Request timeout on {payload_info.category} payload",
                    severity="MEDIUM",
                ))
            except Exception as e:
                results.append(FuzzResult(
                    payload=payload_info,
                    response_status=0,
                    response_body=str(e),
                    response_time_ms=0,
                    anomaly=f"Request exception: {type(e).__name__}: {e}",
                    severity="HIGH",
                ))

        anomaly_results = [r for r in results if r.anomaly]

        return CampaignReport(
            campaign_id=f"fuzz-{int(time.time())}",
            endpoint=endpoint,
            method=method,
            total_payloads=len(payloads),
            anomalies_found=len(anomaly_results),
            findings=anomaly_results,
            duration_seconds=time.time() - start_time,
        )

    def detect_anomaly(self, response, payload_info: FuzzPayload) -> Optional[str]:
        """Detect if the response indicates a potential vulnerability."""
        # 500 errors on any input = server-side failure
        if response.status_code >= 500:
            return f"Server error ({response.status_code}) on {payload_info.category} payload"

        # XSS reflection
        if payload_info.category == "XSS":
            dangerous_patterns = ["<script>", "onerror=", "javascript:", "onclick="]
            if any(p in response.text for p in dangerous_patterns):
                return "XSS reflection detected in response"

        # SQL error disclosure
        sql_indicators = ["syntax error", "mysql", "postgresql", "sqlite",
                         "ORA-", "SQLSTATE", "unterminated"]
        if any(ind in response.text.lower() for ind in sql_indicators):
            return "SQL error message disclosed in response"

        # Stack trace leak
        if "Traceback" in response.text or "at Object." in response.text:
            return "Stack trace leaked in response body"

        # Timeout on specific payload (potential DoS)
        if hasattr(response, 'elapsed') and response.elapsed.total_seconds() > 5:
            return f"Slow response ({response.elapsed.total_seconds():.1f}s) on {payload_info.category} payload"

        return None

    def classify_severity(self, anomaly: str, payload: FuzzPayload) -> str:
        """Classify the severity of a detected anomaly."""
        if "XSS reflection" in anomaly:
            return "HIGH"
        if "SQL error" in anomaly:
            return "HIGH"
        if "Stack trace" in anomaly:
            return "MEDIUM"
        if "Server error" in anomaly:
            if payload.category in ["SQL_INJECTION", "AUTHORIZATION_BYPASS"]:
                return "HIGH"
            return "MEDIUM"
        if "Slow response" in anomaly:
            return "LOW"
        if "Timeout" in anomaly:
            return "MEDIUM"
        return "LOW"

Running Fuzzing in CI

# .github/workflows/api-fuzz.yml
name: API Fuzzing

on:
  schedule:
    - cron: '0 2 * * *'  # Nightly at 2 AM

jobs:
  fuzz:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Start staging environment
        run: docker-compose up -d

      - name: Wait for API readiness
        run: |
          for i in $(seq 1 30); do
            curl -s http://localhost:8080/health && break
            sleep 2
          done

      - name: Run fuzz campaign
        run: |
          python -m api_fuzzer \
            --schema docs/openapi.yaml \
            --base-url http://localhost:8080 \
            --output fuzz-report.json \
            --endpoints "/api/v2/products,/api/v2/orders"

      - name: Check for critical findings
        run: |
          CRITICAL=$(jq '[.findings[] | select(.severity=="HIGH" or .severity=="CRITICAL")] | length' fuzz-report.json)
          if [ "$CRITICAL" -gt 0 ]; then
            echo "::error::Found $CRITICAL high/critical findings"
            jq '.findings[] | select(.severity=="HIGH" or .severity=="CRITICAL")' fuzz-report.json
            exit 1
          fi

      - name: Upload fuzz report
        if: always()
        uses: actions/upload-artifact@v4
        with:
          name: fuzz-report
          path: fuzz-report.json

Fuzz Campaign Scheduling

Frequency Scope Budget Purpose
Per PR Changed endpoints only 10 payloads Quick sanity check
Nightly All endpoints 50 payloads each Comprehensive coverage
Weekly All endpoints + new categories 100 payloads Deep exploration
Quarterly Full security audit scope 500+ payloads Compliance/audit

Interpreting Results

False Positive Reduction

Not every anomaly is a real vulnerability. Common false positives:

Anomaly Often a False Positive When Action
500 error on type confusion API returns 500 instead of 400 for bad types File as code quality issue, not security
Slow response on large payload API legitimately processes large data Increase timeout threshold
XSS "reflection" in error message Error message includes the input value Check if HTML is escaped in the response

Triage Workflow

1. Sort findings by severity (CRITICAL → HIGH → MEDIUM → LOW)
2. For each HIGH/CRITICAL:
   - Reproduce manually with curl
   - Verify it is not a false positive
   - File a security bug if confirmed
3. For each MEDIUM:
   - Reproduce, assess business impact
   - File as bug or tech debt based on impact
4. For each LOW:
   - Log for awareness
   - Fix opportunistically (do not block releases)

Key Takeaway

A complete semantic fuzzer combines AI-generated payloads with automated anomaly detection and severity classification. Run it nightly in CI against staging, triage findings by severity, and use the results to identify security vulnerabilities that traditional testing misses. The implementation is straightforward -- the value comes from the AI's ability to generate contextually relevant attack payloads.