Open Data Integrator
集成开放的施工数据集。结合开放数据源以增强分析
安装 / 下载方式
TotalClaw CLI推荐
totalclaw install totalclaw:datadrivenconstruction~open-data-integratorcURL直接下载,无需登录
curl -fsSL https://skills.taituai.com/api/skills/totalclaw%3Adatadrivenconstruction~open-data-integrator/file -o open-data-integrator.mdGit 仓库获取源码
git clone https://github.com/openclaw/skills/commit/c2517b6456f40676d5eaa992552984b4237ef6cc## 概述(中文)
集成开放的施工数据集。结合开放数据源以增强分析
## 原文
# Open Data Integrator
## Overview
Based on DDC methodology (Chapter 2.2), this skill integrates open construction datasets from various sources like government databases, industry benchmarks, weather services, and geospatial data.
**Book Reference:** "Доминирование открытых данных" / "Open Data Dominance"
## Quick Start
```python
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Optional, Any, Callable
from datetime import datetime, date
import json
import requests
from abc import ABC, abstractmethod
class DataSourceType(Enum):
"""Types of open data sources"""
GOVERNMENT = "government" # Government statistics
INDUSTRY_BENCHMARK = "benchmark" # Industry benchmarks
WEATHER = "weather" # Weather data
GEOSPATIAL = "geospatial" # Geographic data
MATERIAL_PRICES = "material_prices" # Material cost indices
LABOR_RATES = "labor_rates" # Labor cost data
BUILDING_PERMITS = "permits" # Permit data
ENERGY = "energy" # Energy prices/data
ECONOMIC = "economic" # Economic indicators
class UpdateFrequency(Enum):
"""Data update frequency"""
REALTIME = "realtime"
HOURLY = "hourly"
DAILY = "daily"
WEEKLY = "weekly"
MONTHLY = "monthly"
QUARTERLY = "quarterly"
ANNUAL = "annual"
@dataclass
class OpenDataSource:
"""Definition of an open data source"""
id: str
name: str
source_type: DataSourceType
url: str
api_key_required: bool = False
update_frequency: UpdateFrequency = UpdateFrequency.DAILY
format: str = "json"
license: str = "open"
description: Optional[str] = None
fields: List[str] = field(default_factory=list)
@dataclass
class DataRecord:
"""A single data record from a source"""
source_id: str
timestamp: datetime
data: Dict[str, Any]
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class IntegrationResult:
"""Result of data integration"""
source: str
records_fetched: int
records_processed: int
errors: List[str]
last_updated: datetime
sample_data: List[Dict]
@dataclass
class EnrichedData:
"""Data enriched with open data"""
original_data: Dict[str, Any]
enrichments: Dict[str, Any]
sources_used: List[str]
confidence: float
class OpenDataConnector(ABC):
"""Base class for open data connectors"""
@abstractmethod
def fetch(self, params: Dict) -> List[DataRecord]:
pass
@abstractmethod
def get_metadata(self) -> Dict:
pass
class WeatherDataConnector(OpenDataConnector):
"""Connector for weather data (e.g., OpenWeatherMap)"""
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key
self.base_url = "https://api.openweathermap.org/data/2.5"
def fetch(
self,
params: Dict
) -> List[DataRecord]:
"""Fetch weather data for location"""
lat = params.get("lat")
lon = params.get("lon")
start_date = params.get("start_date")
end_date = params.get("end_date")
# Simulate API call (in production, use actual API)
records = []
# Generate sample historical data
current = start_date
while current <= end_date:
records.append(DataRecord(
source_id="openweathermap",
timestamp=datetime.combine(current, datetime.min.time()),
data={
"date": current.isoformat(),
"temp_max": 25.0,
"temp_min": 15.0,
"precipitation": 0.0,
"wind_speed": 10.0,
"weather_code": "clear"
},
metadata={"lat": lat, "lon": lon}
))
current = date(current.year, current.month, current.day + 1) if current.day < 28 else date(current.year, current.month + 1 if current.month < 12 else 1, 1)
return records[:30] # Limit for demo
def get_metadata(self) -> Dict:
return {
"source": "OpenWeatherMap",
"type": DataSourceType.WEATHER.value,
"frequency": UpdateFrequency.HOURLY.value,
"fields": ["temp_max", "temp_min", "precipitation", "wind_speed"]
}
class MaterialPriceConnector(OpenDataConnector):
"""Connector for material price indices"""
def __init__(self, region: str = "US"):
self.region = region
self.price_indices = self._load_indices()
def _load_indices(self) -> Dict[str, Dict]:
"""Load material price indices"""
return {
"concrete": {"base": 100, "current": 125, "trend": "up"},
"steel": {"base": 100, "current": 145, "trend": "up"},
"lumber": {"base": 100, "current": 180, "trend": "stable"},
"copper": {"base": 100, "current": 135, "trend": "up"},
"asphalt": {"base": 100, "current": 115, "trend": "stable"},
"gypsum": {"base": 100, "current": 110, "trend": "stable"},
"glass": {"base": 100, "current": 105, "trend": "down"},
"cement": {"base": 100, "current": 120, "trend": "up"},
}
def fetch(self, params: Dict) -> List[DataRecord]:
"""Fetch material price data"""
materials = params.get("materials", list(self.price_indices.keys()))
records = []
for material in materials:
if material in self.price_indices:
records.append(DataRecord(
source_id="material_prices",
timestamp=datetime.now(),
data={
"material": material,
"region": self.region,
**self.price_indices[material]
}
))
return records
def get_metadata(self) -> Dict:
return {
"source": "Material Price Index",
"type": DataSourceType.MATERIAL_PRICES.value,
"frequency": UpdateFrequency.MONTHLY.value,
"materials": list(self.price_indices.keys())
}
class LaborRateConnector(OpenDataConnector):
"""Connector for labor rate data"""
def __init__(self, region: str = "US"):
self.region = region
self.labor_rates = self._load_rates()
def _load_rates(self) -> Dict[str, Dict]:
"""Load labor rates by trade"""
return {
"carpenter": {"hourly": 45.00, "burden_rate": 1.35},
"electrician": {"hourly": 55.00, "burden_rate": 1.40},
"plumber": {"hourly": 52.00, "burden_rate": 1.38},
"ironworker": {"hourly": 58.00, "burden_rate": 1.42},
"laborer": {"hourly": 32.00, "burden_rate": 1.30},
"operator": {"hourly": 48.00, "burden_rate": 1.35},
"mason": {"hourly": 50.00, "burden_rate": 1.36},
"painter": {"hourly": 38.00, "burden_rate": 1.32},
"hvac_tech": {"hourly": 54.00, "burden_rate": 1.38},
"welder": {"hourly": 52.00, "burden_rate": 1.40},
}
def fetch(self, params: Dict) -> List[DataRecord]:
"""Fetch labor rate data"""
trades = params.get("trades", list(self.labor_rates.keys()))
records = []
for trade in trades:
if trade in self.labor_rates:
rate_data = self.labor_rates[trade]
records.append(DataRecord(
source_id="labor_rates",
timestamp=datetime.now(),
data={
"trade": trade,
"region": self.region,
"hourly_rate": rate_data["hourly"],
"burden_rate": rate_data["burden_rate"],
"fully_loaded": rate_data["hourly"] * rate_data["burden_rate"]
}