phy-pipeline-contract-enforcer
Data pipeline contract enforcer. Define the expected schema at each pipeline stage boundary — field names, types, nullability, value ranges, business invariants — and validate actual data samples against those contracts. Catches schema drift between pipeline stages before it reaches production. Supports dbt models, Spark DataFrames, Pandas DataFrames, Kafka topics, REST API payloads, and raw CSV/JSON files. Can auto-generate a contract from a sample, validate a sample against an existing contract, detect when a contract has been broken by upstream changes, and produce a migration plan to fix violations. Zero external API — pure local file and CLI analysis. Triggers on "pipeline contract", "schema drift", "validate pipeline output", "data contract", "pipeline schema mismatch", "contract enforcement", "/pipeline-contract".
安装 / 下载方式
totalclaw install github:LeoYeAI~openclaw-master-skills~phy-pipeline-contract-enforcercurl -fsSL https://skills.taituai.com/api/skills/github%3ALeoYeAI~openclaw-master-skills~phy-pipeline-contract-enforcer/file -o phy-pipeline-contract-enforcer.md# Pipeline Contract Enforcer
Data pipelines fail silently. A column goes nullable in staging. An upstream team renames a field. A new data source adds an unexpected type. By the time it surfaces in a dashboard or an alert, the corrupt data is already in production.
Paste a data sample and a contract (or generate the contract from the sample), and get an instant audit: which fields violate the contract, what the violation is, and what to fix.
**Works with dbt, Spark, Pandas, Kafka, REST APIs, CSV/JSON. Zero external services. No Great Expectations config required.**
---
## Trigger Phrases
- "pipeline contract", "data contract", "schema enforcement"
- "validate my pipeline output", "schema drift", "pipeline schema changed"
- "my downstream job broke", "column type mismatch", "unexpected null"
- "generate contract from sample", "write a data contract"
- "dbt schema contract", "validate DataFrame schema"
- "/pipeline-contract"
---
## How to Provide Input
```bash
# Option 1: Generate a contract from a sample file
/pipeline-contract --generate sample.json
/pipeline-contract --generate output.csv
# Option 2: Validate a sample against an existing contract
/pipeline-contract --validate sample.json --contract contracts/orders.yaml
# Option 3: Check for drift between two samples (old vs new)
/pipeline-contract --diff old-output.json new-output.json
# Option 4: From dbt schema.yml
/pipeline-contract --from-dbt models/schema.yml --validate data/orders.csv
# Option 5: Validate a Pandas/Spark DataFrame (paste the schema)
/pipeline-contract --dataframe
[paste df.dtypes or df.schema output here]
# Option 6: Full pipeline audit (multiple stage files)
/pipeline-contract --audit pipeline/
# Reads: pipeline/stage-1.json, pipeline/stage-2.json, contracts/*.yaml
```
---
## Step 1: Infer Pipeline Stage Type
Identify what kind of pipeline artifact is being validated:
```bash
# Detect file type and format
file_ext="${1##*.}"
case "$file_ext" in
json) STAGE_TYPE="json_payload" ;;
csv) STAGE_TYPE="tabular_csv" ;;
parquet) STAGE_TYPE="parquet" ;;
yaml|yml) STAGE_TYPE="contract_or_dbt" ;;
*) STAGE_TYPE="unknown" ;;
esac
# Check if it looks like dbt schema.yml
if grep -q "^models:" "$1" 2>/dev/null || grep -q "^version:" "$1" 2>/dev/null; then
STAGE_TYPE="dbt_schema"
fi
# Check if it's a Python type annotation block (DataFrame schema)
if echo "$1" | grep -qE "dtype|object|int64|float64|datetime64"; then
STAGE_TYPE="pandas_schema"
fi
```
### Supported Stage Types
| Stage Type | Detection | Examples |
|-----------|-----------|---------|
| JSON payload | `.json` extension or curly braces | REST API output, Kafka message |
| Tabular CSV | `.csv` extension | ETL output, export files |
| dbt schema | `models:` / `version:` key | dbt `schema.yml` |
| Pandas schema | `dtype`/`int64`/`float64` patterns | `df.dtypes` output |
| Spark schema | `StructType` / `StructField` | `df.printSchema()` output |
| Parquet | `.parquet` extension | Processed pipeline files |
---
## Step 2: Extract Schema from Sample
From any data sample, extract the implicit schema:
### JSON / REST Payload
```python
# For each field in the JSON object, infer:
# - field name
# - data type (string, integer, float, boolean, null, array, object)
# - nullable (does the field ever appear as null or absent?)
# - example values (first 3 distinct values)
# - cardinality hint (if < 20 distinct values across samples → likely enum)
def infer_json_schema(samples: list[dict]) -> dict:
schema = {}
for sample in samples:
for key, value in sample.items():
if key not in schema:
schema[key] = {
"type": type(value).__name__,
"nullable": False,
"examples": [],
"values_seen": set()
}
if value is None:
schema[key]["nullable"] = True
schema[key]["values_seen"].add(str(value)[:50])
if len(schema[key]["examples"]) < 3:
schema[key]["examples"].append(value)
# Detect enums: < 20 distinct values
for field in schema.values():
if len(field["values_seen"]) < 20:
field["enum_hint"] = sorted(field["values_seen"])
return schema
```
### CSV / Tabular
```bash
# Get column names, types, null counts, sample values
python3 -c "
import csv, sys
from collections import defaultdict, Counter
with open('$FILE') as f:
reader = csv.DictReader(f)
rows = list(reader)
schema = defaultdict(lambda: {'types': Counter(), 'null_count': 0, 'examples': []})
for row in rows:
for col, val in row.items():
schema[col]['null_count'] += 1 if not val.strip() else 0
schema[col]['types'][type(val).__name__] += 1
if len(schema[col]['examples']) < 3:
schema[col]['examples'].append(val)
for col, info in schema.items():
null_pct = round(100 * info['null_count'] / len(rows), 1)
print(f'{col}: {dict(info[\"types\"])} | null={null_pct}% | examples={info[\"examples\"]}')
"
```
### dbt schema.yml
```bash
# Extract column contracts already defined
grep -A 20 "columns:" "$FILE" | grep -E "name:|description:|tests:" | head -50
```
---
## Step 3: Contract Format
A Pipeline Contract is a YAML file that defines the expected schema at a specific stage boundary:
```yaml
# contracts/orders-output.yaml
contract:
name: orders-output
version: "1.0"
description: "Expected schema at the output of the orders processing stage"
stage: orders_processor
produces: downstream-billing, downstream-reporting
fields:
order_id:
type: string
nullable: false
pattern: "^ORD-[0-9]{8}$" # Regex pattern check
unique: true
customer_id:
type: string
nullable: false
order_total:
type: float
nullable: false
min: 0.01
max: 999999.99
status:
type: string
nullable: false
enum: [pending, confirmed, shipped, delivered, cancelled]
created_at:
type: datetime
nullable: false
format: "ISO 8601" # 2026-03-18T10:42:00Z
discount_code:
type: string
nullable: true # Explicitly allowed null
metadata:
type: object
nullable: true
required_keys: [source, version] # If not null, these keys must exist
invariants:
- name: "order_total matches items"
description: "order_total must equal sum of line_items[*].price"
severity: error
- name: "shipped orders have tracking"
description: "If status=shipped, tracking_number must not be null"
severity: error
- name: "discount positive"
description: "discount_amount must be 0 or positive"
severity: warning
row_count:
min: 1
max: null # No upper bound
```
---
## Step 4: Validate Sample Against Contract
For each field in the contract, check the sample data:
```python
def validate_against_contract(data: list[dict], contract: dict) -> list[dict]:
violations = []
for row_num, row in enumerate(data):
for field_name, rules in contract["fields"].items():
# CHECK: Required field present
if field_name not in row and not rules.get("nullable"):
violations.append({
"row": row_num,
"field": field_name,
"violation": "MISSING_REQUIRED_FIELD",
"expected": f"field '{field_name}' must be present",
"got": "field absent"
})
continue
value = row.get(field_name)
# CHECK: Null constraint
if value is None and not rules.get("nullable"):
violations.append({
"row": row_num,
"field": field_name,
"violation": "UNEXPECTED_NULL",
"expected": "not null",
"got": "null"