Open Data Integrator

TotalClaw 作者 datadrivenconstruction v2.1.0

集成开放的施工数据集。结合开放数据源以增强分析

源码 ↗

安装 / 下载方式

TotalClaw CLI推荐
totalclaw install totalclaw:datadrivenconstruction~open-data-integrator
cURL直接下载,无需登录
curl -fsSL https://skills.taituai.com/api/skills/totalclaw%3Adatadrivenconstruction~open-data-integrator/file -o open-data-integrator.md
Git 仓库获取源码
git clone https://github.com/openclaw/skills/commit/c2517b6456f40676d5eaa992552984b4237ef6cc
## 概述(中文)

集成开放的施工数据集。结合开放数据源以增强分析

## 原文

# Open Data Integrator

## Overview

Based on DDC methodology (Chapter 2.2), this skill integrates open construction datasets from various sources like government databases, industry benchmarks, weather services, and geospatial data.

**Book Reference:** "Доминирование открытых данных" / "Open Data Dominance"

## Quick Start

```python
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Optional, Any, Callable
from datetime import datetime, date
import json
import requests
from abc import ABC, abstractmethod

class DataSourceType(Enum):
    """Types of open data sources"""
    GOVERNMENT = "government"           # Government statistics
    INDUSTRY_BENCHMARK = "benchmark"    # Industry benchmarks
    WEATHER = "weather"                 # Weather data
    GEOSPATIAL = "geospatial"           # Geographic data
    MATERIAL_PRICES = "material_prices" # Material cost indices
    LABOR_RATES = "labor_rates"         # Labor cost data
    BUILDING_PERMITS = "permits"        # Permit data
    ENERGY = "energy"                   # Energy prices/data
    ECONOMIC = "economic"               # Economic indicators

class UpdateFrequency(Enum):
    """Data update frequency"""
    REALTIME = "realtime"
    HOURLY = "hourly"
    DAILY = "daily"
    WEEKLY = "weekly"
    MONTHLY = "monthly"
    QUARTERLY = "quarterly"
    ANNUAL = "annual"

@dataclass
class OpenDataSource:
    """Definition of an open data source"""
    id: str
    name: str
    source_type: DataSourceType
    url: str
    api_key_required: bool = False
    update_frequency: UpdateFrequency = UpdateFrequency.DAILY
    format: str = "json"
    license: str = "open"
    description: Optional[str] = None
    fields: List[str] = field(default_factory=list)

@dataclass
class DataRecord:
    """A single data record from a source"""
    source_id: str
    timestamp: datetime
    data: Dict[str, Any]
    metadata: Dict[str, Any] = field(default_factory=dict)

@dataclass
class IntegrationResult:
    """Result of data integration"""
    source: str
    records_fetched: int
    records_processed: int
    errors: List[str]
    last_updated: datetime
    sample_data: List[Dict]

@dataclass
class EnrichedData:
    """Data enriched with open data"""
    original_data: Dict[str, Any]
    enrichments: Dict[str, Any]
    sources_used: List[str]
    confidence: float


class OpenDataConnector(ABC):
    """Base class for open data connectors"""

    @abstractmethod
    def fetch(self, params: Dict) -> List[DataRecord]:
        pass

    @abstractmethod
    def get_metadata(self) -> Dict:
        pass


class WeatherDataConnector(OpenDataConnector):
    """Connector for weather data (e.g., OpenWeatherMap)"""

    def __init__(self, api_key: Optional[str] = None):
        self.api_key = api_key
        self.base_url = "https://api.openweathermap.org/data/2.5"

    def fetch(
        self,
        params: Dict
    ) -> List[DataRecord]:
        """Fetch weather data for location"""
        lat = params.get("lat")
        lon = params.get("lon")
        start_date = params.get("start_date")
        end_date = params.get("end_date")

        # Simulate API call (in production, use actual API)
        records = []

        # Generate sample historical data
        current = start_date
        while current <= end_date:
            records.append(DataRecord(
                source_id="openweathermap",
                timestamp=datetime.combine(current, datetime.min.time()),
                data={
                    "date": current.isoformat(),
                    "temp_max": 25.0,
                    "temp_min": 15.0,
                    "precipitation": 0.0,
                    "wind_speed": 10.0,
                    "weather_code": "clear"
                },
                metadata={"lat": lat, "lon": lon}
            ))
            current = date(current.year, current.month, current.day + 1) if current.day < 28 else date(current.year, current.month + 1 if current.month < 12 else 1, 1)

        return records[:30]  # Limit for demo

    def get_metadata(self) -> Dict:
        return {
            "source": "OpenWeatherMap",
            "type": DataSourceType.WEATHER.value,
            "frequency": UpdateFrequency.HOURLY.value,
            "fields": ["temp_max", "temp_min", "precipitation", "wind_speed"]
        }


class MaterialPriceConnector(OpenDataConnector):
    """Connector for material price indices"""

    def __init__(self, region: str = "US"):
        self.region = region
        self.price_indices = self._load_indices()

    def _load_indices(self) -> Dict[str, Dict]:
        """Load material price indices"""
        return {
            "concrete": {"base": 100, "current": 125, "trend": "up"},
            "steel": {"base": 100, "current": 145, "trend": "up"},
            "lumber": {"base": 100, "current": 180, "trend": "stable"},
            "copper": {"base": 100, "current": 135, "trend": "up"},
            "asphalt": {"base": 100, "current": 115, "trend": "stable"},
            "gypsum": {"base": 100, "current": 110, "trend": "stable"},
            "glass": {"base": 100, "current": 105, "trend": "down"},
            "cement": {"base": 100, "current": 120, "trend": "up"},
        }

    def fetch(self, params: Dict) -> List[DataRecord]:
        """Fetch material price data"""
        materials = params.get("materials", list(self.price_indices.keys()))

        records = []
        for material in materials:
            if material in self.price_indices:
                records.append(DataRecord(
                    source_id="material_prices",
                    timestamp=datetime.now(),
                    data={
                        "material": material,
                        "region": self.region,
                        **self.price_indices[material]
                    }
                ))
        return records

    def get_metadata(self) -> Dict:
        return {
            "source": "Material Price Index",
            "type": DataSourceType.MATERIAL_PRICES.value,
            "frequency": UpdateFrequency.MONTHLY.value,
            "materials": list(self.price_indices.keys())
        }


class LaborRateConnector(OpenDataConnector):
    """Connector for labor rate data"""

    def __init__(self, region: str = "US"):
        self.region = region
        self.labor_rates = self._load_rates()

    def _load_rates(self) -> Dict[str, Dict]:
        """Load labor rates by trade"""
        return {
            "carpenter": {"hourly": 45.00, "burden_rate": 1.35},
            "electrician": {"hourly": 55.00, "burden_rate": 1.40},
            "plumber": {"hourly": 52.00, "burden_rate": 1.38},
            "ironworker": {"hourly": 58.00, "burden_rate": 1.42},
            "laborer": {"hourly": 32.00, "burden_rate": 1.30},
            "operator": {"hourly": 48.00, "burden_rate": 1.35},
            "mason": {"hourly": 50.00, "burden_rate": 1.36},
            "painter": {"hourly": 38.00, "burden_rate": 1.32},
            "hvac_tech": {"hourly": 54.00, "burden_rate": 1.38},
            "welder": {"hourly": 52.00, "burden_rate": 1.40},
        }

    def fetch(self, params: Dict) -> List[DataRecord]:
        """Fetch labor rate data"""
        trades = params.get("trades", list(self.labor_rates.keys()))

        records = []
        for trade in trades:
            if trade in self.labor_rates:
                rate_data = self.labor_rates[trade]
                records.append(DataRecord(
                    source_id="labor_rates",
                    timestamp=datetime.now(),
                    data={
                        "trade": trade,
                        "region": self.region,
                        "hourly_rate": rate_data["hourly"],
                        "burden_rate": rate_data["burden_rate"],
                        "fully_loaded": rate_data["hourly"] * rate_data["burden_rate"]
                    }