Phy Ssrf Audit

ClawSkills 作者 PHY041 v1.0.0

Server-Side Request Forgery (SSRF) vulnerability scanner (OWASP A10:2021). Detects URL-fetching sinks in Python/Java/Node.js/PHP/Go/Ruby that accept user-controlled URLs without validation. Flags cloud metadata endpoint access (AWS IMDS 169.254.169.254, GCP metadata.google.internal, Azure IMDS), DNS rebinding exposure, missing allowlist checks. Outputs CWE-918 findings with HTTP taint analysis and per-framework fix snippets. Zero competitors on ClawHub.

源码 ↗

安装 / 下载方式

TotalClaw CLI推荐
totalclaw install clawskills:phy041~phy-ssrf-audit
cURL直接下载,无需登录
curl -fsSL https://skills.taituai.com/api/skills/clawskills%3Aphy041~phy-ssrf-audit/file -o phy-ssrf-audit.md
Git 仓库获取源码
git clone https://github.com/openclaw/skills/commit/d836ad013c84c5ec2c4f5b44d52b8867b997bbfd
# phy-ssrf-audit

Static scanner for **OWASP A10:2021 — Server-Side Request Forgery (SSRF)** vulnerabilities. Finds all URL-fetching sinks in your codebase, traces HTTP input to those sinks, and checks for missing allowlist/blocklist guards. Flags hardcoded cloud metadata endpoint access as CRITICAL. Zero external API calls, zero dependencies beyond Python 3 stdlib.

## Why SSRF Matters in 2026

SSRF lets attackers force your server to fetch internal URLs, bypassing firewalls and reaching:
- **AWS IMDS** (`169.254.169.254`) → steal IAM credentials, account ID, region
- **GCP metadata** (`metadata.google.internal`) → steal service account tokens
- **Azure IMDS** (`169.254.169.254/metadata/instance`) → steal managed identity tokens
- **Internal services** → Redis, Elasticsearch, Kubernetes API without auth
- **Private network scanning** → map internal topology via timing side-channels

Real-world examples: Capital One breach (2019), GitLab SSRF (CVE-2021-22214), Confluence SSRF (CVE-2022-26134 adjacent).

## What It Detects

### Python
| Sink | Severity | Notes |
|------|----------|-------|
| `requests.get/post/put/delete/head(user_url)` | HIGH | Most common SSRF vector |
| `urllib.request.urlopen(user_url)` | HIGH | stdlib fetch |
| `httpx.get/post(user_url)` | HIGH | async-first HTTP client |
| `aiohttp.ClientSession().get(user_url)` | HIGH | async |
| `socket.create_connection((user_host, port))` | HIGH | raw socket SSRF |
| `subprocess.*("curl", user_url)` | CRITICAL | SSRF + command injection |
| `open(user_url)` where URL is http:// | HIGH | Python 2 urllib alias |

### Java
| Sink | Severity | Notes |
|------|----------|-------|
| `new URL(userInput).openConnection()` | CRITICAL | Direct Java SSRF |
| `new URL(userInput).openStream()` | CRITICAL | Direct Java SSRF |
| `RestTemplate.getForObject(userInput, ...)` | HIGH | Spring REST |
| `RestTemplate.exchange(userInput, ...)` | HIGH | Spring REST |
| `WebClient.get().uri(userInput)` | HIGH | Spring WebFlux |
| `HttpClient.newHttpClient().send(HttpRequest.newBuilder(URI.create(userInput)))` | CRITICAL | Java 11+ HTTP client |
| `OkHttpClient().newCall(Request.Builder().url(userInput))` | HIGH | OkHttp |

### Node.js / TypeScript
| Sink | Severity | Notes |
|------|----------|-------|
| `http.get(req.*.url)` / `https.get(req.*.url)` | HIGH | Node.js stdlib |
| `fetch(req.*.url)` / `fetch(req.*.target)` | HIGH | Fetch API |
| `axios.get(req.*.url)` / `axios.post(req.*.url)` | HIGH | Axios |
| `got(req.*.url)` / `got.get(req.*.url)` | HIGH | got client |
| `node-fetch(req.*.url)` | HIGH | node-fetch |
| `superagent.get(req.*.url)` | HIGH | SuperAgent |
| `needle.get(req.*.url)` | HIGH | Needle |
| `request(req.*.url, ...)` | HIGH | request (deprecated) |

### PHP
| Sink | Severity | Notes |
|------|----------|-------|
| `curl_setopt($ch, CURLOPT_URL, $_GET[...])` | CRITICAL | Direct SSRF |
| `file_get_contents($_GET[...])` | CRITICAL | PHP wrapper SSRF + LFI |
| `file_get_contents($_POST[...])` | CRITICAL | |
| `curl_setopt($ch, CURLOPT_URL, $url)` where `$url` from request | HIGH | Indirect |
| `fopen($_GET[...], 'r')` | HIGH | Remote file open |

### Go
| Sink | Severity | Notes |
|------|----------|-------|
| `http.Get(r.FormValue("url"))` | CRITICAL | Direct SSRF |
| `http.Get(r.URL.Query().Get("url"))` | CRITICAL | Direct SSRF |
| `http.NewRequest("GET", userURL, nil)` | HIGH | |
| `http.Client{}.Do(req)` where URL is user-controlled | HIGH | |
| `http.Post(userURL, ...)` | HIGH | |

### Ruby
| Sink | Severity | Notes |
|------|----------|-------|
| `Net::HTTP.get(URI(params[:url]))` | CRITICAL | Direct SSRF |
| `URI.open(params[:url])` / `open(params[:url])` | CRITICAL | Also code execution risk |
| `HTTP.get(params[:url])` | HIGH | http gem |
| `Faraday.new(params[:url])` | HIGH | Faraday |
| `RestClient.get(params[:url])` | HIGH | rest-client |
| `HTTParty.get(params[:url])` | HIGH | HTTParty |

## Cloud Metadata Endpoint Patterns (CRITICAL)

Regardless of HTTP input taint, flag any hardcoded or constructed URL containing:

| Endpoint | Cloud Provider | Risk |
|---------|----------------|------|
| `169.254.169.254` | AWS/GCP/Azure/Alibaba | IAM credentials, instance metadata |
| `metadata.google.internal` | GCP | Service account tokens |
| `169.254.170.2` | AWS ECS | Task metadata + credentials |
| `fd00:ec2::254` | AWS IPv6 IMDS | IPv6 IMDSv2 |
| `100.100.100.200` | Alibaba Cloud IMDS | RAM credentials |
| `169.254.169.254/metadata/instance` | Azure IMDS | Managed identity tokens |
| `169.254.0.1` | Oracle Cloud IMDS | Instance credentials |
| `kubernetes.default.svc` | Kubernetes | API server auth bypass |
| `etcd.kube-system.svc` | Kubernetes | etcd direct access |

**Note:** Even legitimate server-side proxy features must validate against a metadata blocklist. No exceptions.

## Missing Guard Detection

After finding a sink, the scanner checks if a validation guard exists within ±50 lines:

**Python guards (safe if present):**
```python
# Allowlist check
if urlparse(url).netloc not in ALLOWED_HOSTS:
    raise ValueError("blocked")

# Private IP check (requests-ssrf / ssrf-filter libraries)
import ssrf_filter
ssrf_filter.validate_url(url)

# ipaddress check
addr = socket.gethostbyname(host)
if ipaddress.ip_address(addr).is_private:
    raise ValueError("blocked")
```

**Node.js guards (safe if present):**
```javascript
// URL allowlist
const { hostname } = new URL(url);
if (!ALLOWED_HOSTS.includes(hostname)) throw new Error("blocked");

// ssrf-req-filter / ssrf-check libraries
import ssrfCheck from 'ssrf-req-filter';
await ssrfCheck(url);
```

**Go guards (safe if present):**
```go
// IP range check
ip := net.ParseIP(host)
if ip.IsLoopback() || ip.IsPrivate() { ... }

// Allowlist
if !slices.Contains(allowedHosts, parsedURL.Host) { ... }
```

## Implementation

```python
#!/usr/bin/env python3
"""
phy-ssrf-audit — OWASP A10:2021 SSRF scanner
Usage: python3 audit_ssrf.py [path] [--json] [--ci]
"""
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional

CRITICAL, HIGH, MEDIUM, INFO = "CRITICAL", "HIGH", "MEDIUM", "INFO"

CLOUD_METADATA_RE = re.compile(
    r'169\.254\.169\.254|'
    r'metadata\.google\.internal|'
    r'169\.254\.170\.2|'
    r'fd00:ec2::254|'
    r'100\.100\.100\.200|'
    r'kubernetes\.default\.svc|'
    r'etcd\.kube-system\.svc|'
    r'169\.254\.0\.1'
)

@dataclass
class Finding:
    file: str
    line: int
    pattern_name: str
    matched_text: str
    severity: str
    description: str
    fix: str
    has_http_taint: bool = False
    is_metadata_endpoint: bool = False
    guard_detected: bool = False

# ─── Pattern registry ────────────────────────────────────────────────────────
PATTERNS = {
    ".py": [
        ("REQUESTS_FETCH",
         re.compile(r'\brequests\.(get|post|put|delete|head|patch|request)\s*\('),
         HIGH,
         "requests.{method}() with user-controlled URL enables SSRF.",
         "Validate URL against an allowlist of permitted hosts before fetching. "
         "Use ssrf-filter or validate urlparse(url).netloc against ALLOWED_HOSTS."),

        ("URLLIB_URLOPEN",
         re.compile(r'\burllib\.request\.urlopen\s*\(|\burllib2\.urlopen\s*\('),
         HIGH,
         "urllib.request.urlopen() with user-controlled URL enables SSRF.",
         "Validate URL hostname and IP against an allowlist. Block private/loopback IPs."),

        ("HTTPX_FETCH",
         re.compile(r'\bhttpx\.(get|post|put|delete|request|stream)\s*\('),
         HIGH,
         "httpx.{method}() with user-controlled URL enables SSRF.",
         "Validate URL with httpx.URL(url).host against an allowlist before fetching."),

        ("AIOHTTP_FETCH",
         re.compile(r'aiohttp\.ClientSession\s*\(\s*\)'),
         HIGH,
         "aiohttp.ClientSession() — verify URL is validated before any .get()/.post() calls.",
         "Use a custom connector with allowed_hosts or validate