Phy Ssrf Audit
Server-Side Request Forgery (SSRF) vulnerability scanner (OWASP A10:2021). Detects URL-fetching sinks in Python/Java/Node.js/PHP/Go/Ruby that accept user-controlled URLs without validation. Flags cloud metadata endpoint access (AWS IMDS 169.254.169.254, GCP metadata.google.internal, Azure IMDS), DNS rebinding exposure, missing allowlist checks. Outputs CWE-918 findings with HTTP taint analysis and per-framework fix snippets. Zero competitors on ClawHub.
安装 / 下载方式
TotalClaw CLI推荐
totalclaw install clawskills:phy041~phy-ssrf-auditcURL直接下载,无需登录
curl -fsSL https://skills.taituai.com/api/skills/clawskills%3Aphy041~phy-ssrf-audit/file -o phy-ssrf-audit.mdGit 仓库获取源码
git clone https://github.com/openclaw/skills/commit/d836ad013c84c5ec2c4f5b44d52b8867b997bbfd# phy-ssrf-audit
Static scanner for **OWASP A10:2021 — Server-Side Request Forgery (SSRF)** vulnerabilities. Finds all URL-fetching sinks in your codebase, traces HTTP input to those sinks, and checks for missing allowlist/blocklist guards. Flags hardcoded cloud metadata endpoint access as CRITICAL. Zero external API calls, zero dependencies beyond Python 3 stdlib.
## Why SSRF Matters in 2026
SSRF lets attackers force your server to fetch internal URLs, bypassing firewalls and reaching:
- **AWS IMDS** (`169.254.169.254`) → steal IAM credentials, account ID, region
- **GCP metadata** (`metadata.google.internal`) → steal service account tokens
- **Azure IMDS** (`169.254.169.254/metadata/instance`) → steal managed identity tokens
- **Internal services** → Redis, Elasticsearch, Kubernetes API without auth
- **Private network scanning** → map internal topology via timing side-channels
Real-world examples: Capital One breach (2019), GitLab SSRF (CVE-2021-22214), Confluence SSRF (CVE-2022-26134 adjacent).
## What It Detects
### Python
| Sink | Severity | Notes |
|------|----------|-------|
| `requests.get/post/put/delete/head(user_url)` | HIGH | Most common SSRF vector |
| `urllib.request.urlopen(user_url)` | HIGH | stdlib fetch |
| `httpx.get/post(user_url)` | HIGH | async-first HTTP client |
| `aiohttp.ClientSession().get(user_url)` | HIGH | async |
| `socket.create_connection((user_host, port))` | HIGH | raw socket SSRF |
| `subprocess.*("curl", user_url)` | CRITICAL | SSRF + command injection |
| `open(user_url)` where URL is http:// | HIGH | Python 2 urllib alias |
### Java
| Sink | Severity | Notes |
|------|----------|-------|
| `new URL(userInput).openConnection()` | CRITICAL | Direct Java SSRF |
| `new URL(userInput).openStream()` | CRITICAL | Direct Java SSRF |
| `RestTemplate.getForObject(userInput, ...)` | HIGH | Spring REST |
| `RestTemplate.exchange(userInput, ...)` | HIGH | Spring REST |
| `WebClient.get().uri(userInput)` | HIGH | Spring WebFlux |
| `HttpClient.newHttpClient().send(HttpRequest.newBuilder(URI.create(userInput)))` | CRITICAL | Java 11+ HTTP client |
| `OkHttpClient().newCall(Request.Builder().url(userInput))` | HIGH | OkHttp |
### Node.js / TypeScript
| Sink | Severity | Notes |
|------|----------|-------|
| `http.get(req.*.url)` / `https.get(req.*.url)` | HIGH | Node.js stdlib |
| `fetch(req.*.url)` / `fetch(req.*.target)` | HIGH | Fetch API |
| `axios.get(req.*.url)` / `axios.post(req.*.url)` | HIGH | Axios |
| `got(req.*.url)` / `got.get(req.*.url)` | HIGH | got client |
| `node-fetch(req.*.url)` | HIGH | node-fetch |
| `superagent.get(req.*.url)` | HIGH | SuperAgent |
| `needle.get(req.*.url)` | HIGH | Needle |
| `request(req.*.url, ...)` | HIGH | request (deprecated) |
### PHP
| Sink | Severity | Notes |
|------|----------|-------|
| `curl_setopt($ch, CURLOPT_URL, $_GET[...])` | CRITICAL | Direct SSRF |
| `file_get_contents($_GET[...])` | CRITICAL | PHP wrapper SSRF + LFI |
| `file_get_contents($_POST[...])` | CRITICAL | |
| `curl_setopt($ch, CURLOPT_URL, $url)` where `$url` from request | HIGH | Indirect |
| `fopen($_GET[...], 'r')` | HIGH | Remote file open |
### Go
| Sink | Severity | Notes |
|------|----------|-------|
| `http.Get(r.FormValue("url"))` | CRITICAL | Direct SSRF |
| `http.Get(r.URL.Query().Get("url"))` | CRITICAL | Direct SSRF |
| `http.NewRequest("GET", userURL, nil)` | HIGH | |
| `http.Client{}.Do(req)` where URL is user-controlled | HIGH | |
| `http.Post(userURL, ...)` | HIGH | |
### Ruby
| Sink | Severity | Notes |
|------|----------|-------|
| `Net::HTTP.get(URI(params[:url]))` | CRITICAL | Direct SSRF |
| `URI.open(params[:url])` / `open(params[:url])` | CRITICAL | Also code execution risk |
| `HTTP.get(params[:url])` | HIGH | http gem |
| `Faraday.new(params[:url])` | HIGH | Faraday |
| `RestClient.get(params[:url])` | HIGH | rest-client |
| `HTTParty.get(params[:url])` | HIGH | HTTParty |
## Cloud Metadata Endpoint Patterns (CRITICAL)
Regardless of HTTP input taint, flag any hardcoded or constructed URL containing:
| Endpoint | Cloud Provider | Risk |
|---------|----------------|------|
| `169.254.169.254` | AWS/GCP/Azure/Alibaba | IAM credentials, instance metadata |
| `metadata.google.internal` | GCP | Service account tokens |
| `169.254.170.2` | AWS ECS | Task metadata + credentials |
| `fd00:ec2::254` | AWS IPv6 IMDS | IPv6 IMDSv2 |
| `100.100.100.200` | Alibaba Cloud IMDS | RAM credentials |
| `169.254.169.254/metadata/instance` | Azure IMDS | Managed identity tokens |
| `169.254.0.1` | Oracle Cloud IMDS | Instance credentials |
| `kubernetes.default.svc` | Kubernetes | API server auth bypass |
| `etcd.kube-system.svc` | Kubernetes | etcd direct access |
**Note:** Even legitimate server-side proxy features must validate against a metadata blocklist. No exceptions.
## Missing Guard Detection
After finding a sink, the scanner checks if a validation guard exists within ±50 lines:
**Python guards (safe if present):**
```python
# Allowlist check
if urlparse(url).netloc not in ALLOWED_HOSTS:
raise ValueError("blocked")
# Private IP check (requests-ssrf / ssrf-filter libraries)
import ssrf_filter
ssrf_filter.validate_url(url)
# ipaddress check
addr = socket.gethostbyname(host)
if ipaddress.ip_address(addr).is_private:
raise ValueError("blocked")
```
**Node.js guards (safe if present):**
```javascript
// URL allowlist
const { hostname } = new URL(url);
if (!ALLOWED_HOSTS.includes(hostname)) throw new Error("blocked");
// ssrf-req-filter / ssrf-check libraries
import ssrfCheck from 'ssrf-req-filter';
await ssrfCheck(url);
```
**Go guards (safe if present):**
```go
// IP range check
ip := net.ParseIP(host)
if ip.IsLoopback() || ip.IsPrivate() { ... }
// Allowlist
if !slices.Contains(allowedHosts, parsedURL.Host) { ... }
```
## Implementation
```python
#!/usr/bin/env python3
"""
phy-ssrf-audit — OWASP A10:2021 SSRF scanner
Usage: python3 audit_ssrf.py [path] [--json] [--ci]
"""
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
CRITICAL, HIGH, MEDIUM, INFO = "CRITICAL", "HIGH", "MEDIUM", "INFO"
CLOUD_METADATA_RE = re.compile(
r'169\.254\.169\.254|'
r'metadata\.google\.internal|'
r'169\.254\.170\.2|'
r'fd00:ec2::254|'
r'100\.100\.100\.200|'
r'kubernetes\.default\.svc|'
r'etcd\.kube-system\.svc|'
r'169\.254\.0\.1'
)
@dataclass
class Finding:
file: str
line: int
pattern_name: str
matched_text: str
severity: str
description: str
fix: str
has_http_taint: bool = False
is_metadata_endpoint: bool = False
guard_detected: bool = False
# ─── Pattern registry ────────────────────────────────────────────────────────
PATTERNS = {
".py": [
("REQUESTS_FETCH",
re.compile(r'\brequests\.(get|post|put|delete|head|patch|request)\s*\('),
HIGH,
"requests.{method}() with user-controlled URL enables SSRF.",
"Validate URL against an allowlist of permitted hosts before fetching. "
"Use ssrf-filter or validate urlparse(url).netloc against ALLOWED_HOSTS."),
("URLLIB_URLOPEN",
re.compile(r'\burllib\.request\.urlopen\s*\(|\burllib2\.urlopen\s*\('),
HIGH,
"urllib.request.urlopen() with user-controlled URL enables SSRF.",
"Validate URL hostname and IP against an allowlist. Block private/loopback IPs."),
("HTTPX_FETCH",
re.compile(r'\bhttpx\.(get|post|put|delete|request|stream)\s*\('),
HIGH,
"httpx.{method}() with user-controlled URL enables SSRF.",
"Validate URL with httpx.URL(url).host against an allowlist before fetching."),
("AIOHTTP_FETCH",
re.compile(r'aiohttp\.ClientSession\s*\(\s*\)'),
HIGH,
"aiohttp.ClientSession() — verify URL is validated before any .get()/.post() calls.",
"Use a custom connector with allowed_hosts or validate