phy-pipeline-contract-enforcer

GitHub 作者 PHY041

Data pipeline contract enforcer. Define the expected schema at each pipeline stage boundary — field names, types, nullability, value ranges, business invariants — and validate actual data samples against those contracts. Catches schema drift between pipeline stages before it reaches production. Supports dbt models, Spark DataFrames, Pandas DataFrames, Kafka topics, REST API payloads, and raw CSV/JSON files. Can auto-generate a contract from a sample, validate a sample against an existing contract, detect when a contract has been broken by upstream changes, and produce a migration plan to fix violations. Zero external API — pure local file and CLI analysis. Triggers on "pipeline contract", "schema drift", "validate pipeline output", "data contract", "pipeline schema mismatch", "contract enforcement", "/pipeline-contract".

安装 / 下载方式

TotalClaw CLI推荐
totalclaw install github:LeoYeAI~openclaw-master-skills~phy-pipeline-contract-enforcer
cURL直接下载,无需登录
curl -fsSL https://skills.taituai.com/api/skills/github%3ALeoYeAI~openclaw-master-skills~phy-pipeline-contract-enforcer/file -o phy-pipeline-contract-enforcer.md
# Pipeline Contract Enforcer

Data pipelines fail silently. A column goes nullable in staging. An upstream team renames a field. A new data source adds an unexpected type. By the time it surfaces in a dashboard or an alert, the corrupt data is already in production.

Paste a data sample and a contract (or generate the contract from the sample), and get an instant audit: which fields violate the contract, what the violation is, and what to fix.

**Works with dbt, Spark, Pandas, Kafka, REST APIs, CSV/JSON. Zero external services. No Great Expectations config required.**

---

## Trigger Phrases

- "pipeline contract", "data contract", "schema enforcement"
- "validate my pipeline output", "schema drift", "pipeline schema changed"
- "my downstream job broke", "column type mismatch", "unexpected null"
- "generate contract from sample", "write a data contract"
- "dbt schema contract", "validate DataFrame schema"
- "/pipeline-contract"

---

## How to Provide Input

```bash
# Option 1: Generate a contract from a sample file
/pipeline-contract --generate sample.json
/pipeline-contract --generate output.csv

# Option 2: Validate a sample against an existing contract
/pipeline-contract --validate sample.json --contract contracts/orders.yaml

# Option 3: Check for drift between two samples (old vs new)
/pipeline-contract --diff old-output.json new-output.json

# Option 4: From dbt schema.yml
/pipeline-contract --from-dbt models/schema.yml --validate data/orders.csv

# Option 5: Validate a Pandas/Spark DataFrame (paste the schema)
/pipeline-contract --dataframe
[paste df.dtypes or df.schema output here]

# Option 6: Full pipeline audit (multiple stage files)
/pipeline-contract --audit pipeline/
# Reads: pipeline/stage-1.json, pipeline/stage-2.json, contracts/*.yaml
```

---

## Step 1: Infer Pipeline Stage Type

Identify what kind of pipeline artifact is being validated:

```bash
# Detect file type and format
file_ext="${1##*.}"
case "$file_ext" in
  json)   STAGE_TYPE="json_payload" ;;
  csv)    STAGE_TYPE="tabular_csv" ;;
  parquet) STAGE_TYPE="parquet" ;;
  yaml|yml) STAGE_TYPE="contract_or_dbt" ;;
  *)      STAGE_TYPE="unknown" ;;
esac

# Check if it looks like dbt schema.yml
if grep -q "^models:" "$1" 2>/dev/null || grep -q "^version:" "$1" 2>/dev/null; then
  STAGE_TYPE="dbt_schema"
fi

# Check if it's a Python type annotation block (DataFrame schema)
if echo "$1" | grep -qE "dtype|object|int64|float64|datetime64"; then
  STAGE_TYPE="pandas_schema"
fi
```

### Supported Stage Types

| Stage Type | Detection | Examples |
|-----------|-----------|---------|
| JSON payload | `.json` extension or curly braces | REST API output, Kafka message |
| Tabular CSV | `.csv` extension | ETL output, export files |
| dbt schema | `models:` / `version:` key | dbt `schema.yml` |
| Pandas schema | `dtype`/`int64`/`float64` patterns | `df.dtypes` output |
| Spark schema | `StructType` / `StructField` | `df.printSchema()` output |
| Parquet | `.parquet` extension | Processed pipeline files |

---

## Step 2: Extract Schema from Sample

From any data sample, extract the implicit schema:

### JSON / REST Payload

```python
# For each field in the JSON object, infer:
# - field name
# - data type (string, integer, float, boolean, null, array, object)
# - nullable (does the field ever appear as null or absent?)
# - example values (first 3 distinct values)
# - cardinality hint (if < 20 distinct values across samples → likely enum)

def infer_json_schema(samples: list[dict]) -> dict:
    schema = {}
    for sample in samples:
        for key, value in sample.items():
            if key not in schema:
                schema[key] = {
                    "type": type(value).__name__,
                    "nullable": False,
                    "examples": [],
                    "values_seen": set()
                }
            if value is None:
                schema[key]["nullable"] = True
            schema[key]["values_seen"].add(str(value)[:50])
            if len(schema[key]["examples"]) < 3:
                schema[key]["examples"].append(value)
    # Detect enums: < 20 distinct values
    for field in schema.values():
        if len(field["values_seen"]) < 20:
            field["enum_hint"] = sorted(field["values_seen"])
    return schema
```

### CSV / Tabular

```bash
# Get column names, types, null counts, sample values
python3 -c "
import csv, sys
from collections import defaultdict, Counter

with open('$FILE') as f:
    reader = csv.DictReader(f)
    rows = list(reader)

schema = defaultdict(lambda: {'types': Counter(), 'null_count': 0, 'examples': []})
for row in rows:
    for col, val in row.items():
        schema[col]['null_count'] += 1 if not val.strip() else 0
        schema[col]['types'][type(val).__name__] += 1
        if len(schema[col]['examples']) < 3:
            schema[col]['examples'].append(val)

for col, info in schema.items():
    null_pct = round(100 * info['null_count'] / len(rows), 1)
    print(f'{col}: {dict(info[\"types\"])} | null={null_pct}% | examples={info[\"examples\"]}')
"
```

### dbt schema.yml

```bash
# Extract column contracts already defined
grep -A 20 "columns:" "$FILE" | grep -E "name:|description:|tests:" | head -50
```

---

## Step 3: Contract Format

A Pipeline Contract is a YAML file that defines the expected schema at a specific stage boundary:

```yaml
# contracts/orders-output.yaml
contract:
  name: orders-output
  version: "1.0"
  description: "Expected schema at the output of the orders processing stage"
  stage: orders_processor
  produces: downstream-billing, downstream-reporting

  fields:
    order_id:
      type: string
      nullable: false
      pattern: "^ORD-[0-9]{8}$"        # Regex pattern check
      unique: true

    customer_id:
      type: string
      nullable: false

    order_total:
      type: float
      nullable: false
      min: 0.01
      max: 999999.99

    status:
      type: string
      nullable: false
      enum: [pending, confirmed, shipped, delivered, cancelled]

    created_at:
      type: datetime
      nullable: false
      format: "ISO 8601"               # 2026-03-18T10:42:00Z

    discount_code:
      type: string
      nullable: true                   # Explicitly allowed null

    metadata:
      type: object
      nullable: true
      required_keys: [source, version] # If not null, these keys must exist

  invariants:
    - name: "order_total matches items"
      description: "order_total must equal sum of line_items[*].price"
      severity: error
    - name: "shipped orders have tracking"
      description: "If status=shipped, tracking_number must not be null"
      severity: error
    - name: "discount positive"
      description: "discount_amount must be 0 or positive"
      severity: warning

  row_count:
    min: 1
    max: null        # No upper bound
```

---

## Step 4: Validate Sample Against Contract

For each field in the contract, check the sample data:

```python
def validate_against_contract(data: list[dict], contract: dict) -> list[dict]:
    violations = []

    for row_num, row in enumerate(data):
        for field_name, rules in contract["fields"].items():

            # CHECK: Required field present
            if field_name not in row and not rules.get("nullable"):
                violations.append({
                    "row": row_num,
                    "field": field_name,
                    "violation": "MISSING_REQUIRED_FIELD",
                    "expected": f"field '{field_name}' must be present",
                    "got": "field absent"
                })
                continue

            value = row.get(field_name)

            # CHECK: Null constraint
            if value is None and not rules.get("nullable"):
                violations.append({
                    "row": row_num,
                    "field": field_name,
                    "violation": "UNEXPECTED_NULL",
                    "expected": "not null",
                    "got": "null"