Claw Code (Claude Code Python移植) 深度架构洞察
日期: 2026-04-01
源码: /home/ai/claude-code-port/
类型: Python移植版本架构分析
价值: 补充之前15大思想之外的额外发现
🎯 新发现的核心架构模式
从Claw Code的实现中,我们又发现了8个重要的架构设计思想:
1️⃣6️⃣ 数据类驱动的架构 (Dataclass-Driven Architecture)
Claw Code的实现
观察:广泛使用@dataclass(frozen=True)
@dataclass(frozen=True)
class ToolDefinition:
name: str
purpose: str
@dataclass(frozen=True)
class RoutedMatch:
kind: str
name: str
source_hint: str
score: int
@dataclass(frozen=True)
class PermissionDenial:
tool_name: str
reason: str
核心思想:
- 不可变性 -
frozen=True防止意外修改 - 类型安全 - 明确的类型注解
- 零样板代码 - 自动生成
__init__,__repr__等 - 易于序列化 - 可直接转换为JSON
- 可组合 - 小的数据类可组合成大的
应用到灵知系统
from dataclasses import dataclass, field
from typing import Dict, Any, List
from datetime import datetime
@dataclass(frozen=True)
class EvolutionRequest:
"""进化请求(不可变)"""
query: str
old_response: str
user_id: str
timestamp: datetime = field(default_factory=datetime.utcnow)
@dataclass(frozen=True)
class EvolutionResult:
"""进化结果(不可变)"""
request: EvolutionRequest
new_response: str
verification: VerificationResult
confidence: float
adopted: bool
@dataclass(frozen=True)
class AIComparison:
"""AI对比(不可变)"""
query: str
providers: Dict[str, str] # provider -> response
metrics: Dict[str, float] # provider -> score
winner: str
timestamp: datetime = field(default_factory=datetime.utcnow)
# 可变的数据类(用于运行时状态)
@dataclass
class EvolutionSession:
"""进化会话(可变)"""
session_id: str
user_id: str
requests: List[EvolutionRequest] = field(default_factory=list)
results: List[EvolutionResult] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.utcnow)
def add_request(self, request: EvolutionRequest):
"""添加请求"""
self.requests.append(request)
def add_result(self, result: EvolutionResult):
"""添加结果"""
self.results.append(result)
优势:
- ✅ 线程安全(不可变对象)
- ✅ 易于调试(清晰的__repr__)
- ✅ 类型检查(mypy友好)
- ✅ 性能优化(__slots__可优化内存)
1️⃣7️⃣ 智能路由系统 (Intelligent Routing)
Claw Code的实现
观察:PortRuntime.route_prompt()
class PortRuntime:
def route_prompt(self, prompt: str, limit: int = 5) -> list[RoutedMatch]:
# 1. 分词
tokens = {token.lower() for token in prompt.replace('/', ' ').split()}
# 2. 按类型收集匹配
by_kind = {
'command': self._collect_matches(tokens, PORTED_COMMANDS, 'command'),
'tool': self._collect_matches(tokens, PORTED_TOOLS, 'tool'),
}
# 3. 选择最佳匹配(每种类型一个)
selected: list[RoutedMatch] = []
for kind in ('command', 'tool'):
if by_kind[kind]:
selected.append(by_kind[kind].pop(0))
# 4. 添加剩余的高分匹配
leftovers = sorted([...], key=lambda item: (-item.score, item.kind, item.name))
selected.extend(leftovers[: max(0, limit - len(selected))])
return selected[:limit]
核心思想:
- 分词归一化 - 统一处理大小写、分隔符
- 多维度匹配 - 按类型(command/tool)分别匹配
- 平衡选择 - 每种类型至少选一个
- 评分排序 - 按分数和类型排序
- 限制数量 - 避免过多匹配
应用到灵知系统
class IntelligentEvolutionRouter:
"""智能进化路由器"""
def __init__(self):
self.routes = {
"qa": EvolutionRoute(
name="qa",
patterns=["如何", "怎么", "什么", "为什么", "?", "?"],
agent_type="qa_agent",
providers=["hunyuan", "deepseek"]
),
"comparison": EvolutionRoute(
name="comparison",
patterns=["对比", "比较", "哪个好", "和...比"],
agent_type="comparison_agent",
providers=["hunyuan", "deepseek", "lingzhi"]
),
"podcast": EvolutionRoute(
name="podcast",
patterns=["播客", "音频", "录音", "节目"],
agent_type="podcast_agent",
providers=["hunyuan"]
)
}
async def route_request(
self,
user_input: str,
context: Dict[str, Any]
) -> RoutingDecision:
"""路由用户请求"""
# 1. 分词和归一化
tokens = self._tokenize(user_input)
# 2. 匹配所有路由
matches = []
for route_name, route in self.routes.items():
score = self._calculate_match_score(tokens, route, context)
if score > 0:
matches.append(RoutedMatch(
route_name=route_name,
score=score,
agent_type=route.agent_type,
providers=route.providers
))
# 3. 排序并选择最佳匹配
matches.sort(key=lambda m: -m.score)
if not matches:
# 默认路由
return RoutingDecision(
route_name="default",
agent_type="general_agent",
providers=["lingzhi"],
confidence=0.5
)
best_match = matches[0]
# 4. 计算置信度
confidence = min(best_match.score / len(tokens), 1.0)
return RoutingDecision(
route_name=best_match.route_name,
agent_type=best_match.agent_type,
providers=best_match.providers,
confidence=confidence,
alternative_routes=matches[1:3] # 保留前2个备选
)
def _calculate_match_score(
self,
tokens: set,
route: EvolutionRoute,
context: Dict[str, Any]
) -> int:
"""计算匹配分数"""
score = 0
# 模式匹配
for pattern in route.patterns:
if pattern.lower() in " ".join(tokens):
score += 1
# 上下文增强
if context.get("recent_topics"):
for topic in context["recent_topics"]:
if topic.lower() in tokens:
score += 0.5
return score
1️⃣8️⃣ 快照驱动开发 (Snapshot-Driven Development)
Claw Code的实现
观察:使用JSON快照记录原始架构
SNAPSHOT_PATH = Path(__file__).resolve().parent / 'reference_data' / 'commands_snapshot.json'
@lru_cache(maxsize=1)
def load_command_snapshot() -> tuple[PortingModule, ...]:
raw_entries = json.loads(SNAPSHOT_PATH.read_text())
return tuple(
PortingModule(
name=entry['name'],
responsibility=entry['responsibility'],
source_hint=entry['source_hint'],
status='mirrored',
)
for entry in raw_entries
)
核心思想:
- 架构快照 - 记录原始系统的结构
- 渐进移植 - 逐步实现每个模块
- 状态追踪 - 每个模块的状态(planned/mirrored/implemented)
- 缓存优化 - 使用
@lru_cache避免重复加载 - 可审计 - 保留移植历史
应用到灵知系统
class EvolutionSnapshotManager:
"""进化快照管理器"""
def __init__(self, snapshot_dir: Path):
self.snapshot_dir = snapshot_dir
self.snapshot_dir.mkdir(parents=True, exist_ok=True)
def create_snapshot(
self,
evolution_id: str,
state: Dict[str, Any]
) -> Path:
"""创建进化快照"""
snapshot_file = self.snapshot_dir / f"{evolution_id}.json"
snapshot = {
"evolution_id": evolution_id,
"timestamp": datetime.utcnow().isoformat(),
"state": state,
"version": "1.0.0"
}
# 写入快照
snapshot_file.write_text(
json.dumps(snapshot, indent=2, ensure_ascii=False)
)
# 保留历史版本
self._archive_snapshot(evolution_id, snapshot)
return snapshot_file
def load_snapshot(self, evolution_id: str) -> Dict[str, Any]:
"""加载快照"""
snapshot_file = self.snapshot_dir / f"{evolution_id}.json"
if not snapshot_file.exists():
raise FileNotFoundError(f"Snapshot not found: {evolution_id}")
return json.loads(snapshot_file.read_text())
def compare_snapshots(
self,
evolution_id: str,
before: str,
after: str
) -> Dict[str, Any]:
"""对比两个快照"""
snapshot_before = self.load_snapshot(before)
snapshot_after = self.load_snapshot(after)
# 对比状态
diff = self._compute_diff(
snapshot_before["state"],
snapshot_after["state"]
)
return {
"evolution_id": evolution_id,
"before": before,
"after": after,
"diff": diff,
"improvements": self._identify_improvements(diff)
}
def _archive_snapshot(self, evolution_id: str, snapshot: Dict):
"""归档快照到历史"""
archive_dir = self.snapshot_dir / "archive" / evolution_id
archive_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
archive_file = archive_dir / f"{timestamp}.json"
archive_file.write_text(
json.dumps(snapshot, indent=2, ensure_ascii=False)
)
1️⃣9️⃣ 会话持久化与恢复 (Session Persistence)
Claw Code的实现
观察:RuntimeSession包含persisted_session_path
@dataclass
class RuntimeSession:
# ... 其他字段 ...
persisted_session_path: str
# 在QueryEngine中
def persist_session(self) -> str:
session_path = self._generate_session_path()
# 保存会话状态
return session_path
核心思想:
- 状态序列化 - 会话状态可序列化
- 路径追踪 - 记录会话文件路径
- 断点恢复 - 可从持久化状态恢复
- 历史回溯 - 保留完整历史
- 跨会话记忆 - 重要的记忆持久化
应用到灵知系统
class EvolutionSessionManager:
"""进化会话管理器"""
def __init__(self, session_dir: Path):
self.session_dir = session_dir
self.session_dir.mkdir(parents=True, exist_ok=True)
self.active_sessions: Dict[str, EvolutionSession] = {}
async def create_session(
self,
user_id: str,
metadata: Dict[str, Any] = None
) -> EvolutionSession:
"""创建新会话"""
session_id = str(uuid.uuid4())
session = EvolutionSession(
session_id=session_id,
user_id=user_id,
metadata=metadata or {}
)
self.active_sessions[session_id] = session
# 持久化
await self._persist_session(session)
return session
async def get_session(
self,
session_id: str
) -> EvolutionSession:
"""获取会话"""
# 先从内存中查找
if session_id in self.active_sessions:
return self.active_sessions[session_id]
# 从磁盘加载
session_file = self.session_dir / f"{session_id}.json"
if not session_file.exists():
raise SessionNotFoundError(session_id)
session_data = json.loads(session_file.read_text())
# 反序列化
session = self._deserialize_session(session_data)
# 加入内存缓存
self.active_sessions[session_id] = session
return session
async def restore_session(
self,
session_id: str
) -> EvolutionSession:
"""恢复会话到之前的状态"""
session = await self.get_session(session_id)
# 恢复上下文
if session.last_state:
# 重新构建上下文
context = await self._rebuild_context(session.last_state)
# 恢复Agent状态
for agent_name, agent_state in session.last_state["agents"].items():
agent = self._get_agent(agent_name)
await agent.restore_state(agent_state)
return session
async def _persist_session(self, session: EvolutionSession):
"""持久化会话"""
session_file = self.session_dir / f"{session.session_id}.json"
session_data = {
"session_id": session.session_id,
"user_id": session.user_id,
"created_at": session.created_at.isoformat(),
"updated_at": session.updated_at.isoformat(),
"requests": [asdict(req) for req in session.requests],
"results": [asdict(res) for res in session.results],
"metadata": session.metadata,
"last_state": await self._capture_state(session)
}
session_file.write_text(
json.dumps(session_data, indent=2, ensure_ascii=False)
)
async def _capture_state(self, session: EvolutionSession) -> Dict:
"""捕获当前状态"""
state = {
"timestamp": datetime.utcnow().isoformat(),
"agents": {}
}
# 捕获所有Agent的状态
for agent_name in ["explorer", "planner", "verifier"]:
agent = self._get_agent(agent_name)
state["agents"][agent_name] = await agent.capture_state()
return state
2️⃣0️⃣ 历史日志系统 (History Logging)
Claw Code的实现
观察:HistoryLog类
class HistoryLog:
def __init__(self):
self.events: list[tuple[str, str]] = []
def add(self, category: str, message: str) -> None:
self.events.append((category, message))
def as_markdown(self) -> str:
lines = ['# History Log', '']
for category, message in self.events:
lines.append(f'- [{category}] {message}')
return '\n'.join(lines)
核心思想:
- 分类记录 - 按类别组织事件
- 时间顺序 - 保持事件顺序
- 多格式输出 - 支持Markdown、JSON等
- 轻量级 - 简单的列表结构
- 易于分析 - 可直接处理
应用到灵知系统
class EvolutionHistoryLogger:
"""进化历史日志"""
def __init__(self):
self.events: List[HistoryEvent] = []
async def log(
self,
event_type: str,
category: str,
message: str,
metadata: Dict[str, Any] = None
):
"""记录事件"""
event = HistoryEvent(
timestamp=datetime.utcnow(),
event_type=event_type,
category=category,
message=message,
metadata=metadata or {}
)
self.events.append(event)
# 异步写入持久化存储
await self._persist_event(event)
async def query_history(
self,
filters: Dict[str, Any] = None
) -> List[HistoryEvent]:
"""查询历史"""
events = self.events
if filters:
if "event_type" in filters:
events = [e for e in events if e.event_type == filters["event_type"]]
if "category" in filters:
events = [e for e in events if e.category == filters["category"]]
if "start_time" in filters:
events = [e for e in events if e.timestamp >= filters["start_time"]]
if "end_time" in filters:
events = [e for e in events if e.timestamp <= filters["end_time"]]
return events
async def generate_report(
self,
start_time: datetime,
end_time: datetime
) -> str:
"""生成历史报告"""
events = await self.query_history({
"start_time": start_time,
"end_time": end_time
})
# 按类别分组
by_category: Dict[str, List[HistoryEvent]] = {}
for event in events:
if event.category not in by_category:
by_category[event.category] = []
by_category[event.category].append(event)
# 生成Markdown报告
lines = [
f"# 进化历史报告",
f"",
f"**时间范围**: {start_time} 至 {end_time}",
f"**总事件数**: {len(events)}",
f""
]
for category, category_events in sorted(by_category.items()):
lines.append(f"## {category}")
lines.append(f"")
for event in category_events:
lines.append(f"- {event.timestamp}: {event.message}")
lines.append(f"")
return "\n".join(lines)
async def _persist_event(self, event: HistoryEvent):
"""持久化事件到数据库"""
await self.db.insert(EvolutionHistoryEvent, {
"timestamp": event.timestamp,
"event_type": event.event_type,
"category": event.category,
"message": event.message,
"metadata": event.metadata
})
2️⃣1️⃣ 权限拒绝追踪 (Permission Denial Tracking)
Claw Code的实现
观察:PermissionDenial数据类
@dataclass(frozen=True)
class PermissionDenial:
tool_name: str
reason: str
# 在runtime中使用
denials = tuple(self._infer_permission_denials(matches))
核心思想:
- 明确拒绝 - 记录为什么拒绝
- 不可变记录 - 拒绝记录不能修改
- 可审计 - 所有拒绝都可追溯
- 用户反馈 - 清晰告知用户原因
- 统计分析 - 可分析拒绝模式
应用到灵知系统
class PermissionDenialTracker:
"""权限拒绝追踪器"""
def __init__(self):
self.denials: List[PermissionDenial] = []
async def record_denial(
self,
resource: str,
action: str,
reason: str,
context: Dict[str, Any]
):
"""记录权限拒绝"""
denial = PermissionDenial(
timestamp=datetime.utcnow(),
resource=resource,
action=action,
reason=reason,
context=context
)
self.denials.append(denial)
# 记录到数据库
await self._persist_denial(denial)
# 触发告警(如果频繁拒绝)
await self._check_frequent_denials(denial)
async def get_denial_stats(
self,
time_window: timedelta = timedelta(hours=1)
) -> Dict[str, Any]:
"""获取拒绝统计"""
cutoff_time = datetime.utcnow() - time_window
recent_denials = [
d for d in self.denials
if d.timestamp >= cutoff_time
]
# 按原因分组
by_reason: Dict[str, int] = {}
for denial in recent_denials:
by_reason[denial.reason] = by_reason.get(denial.reason, 0) + 1
# 按资源分组
by_resource: Dict[str, int] = {}
for denial in recent_denials:
resource = f"{denial.resource}:{denial.action}"
by_resource[resource] = by_resource.get(resource, 0) + 1
return {
"total_denials": len(recent_denials),
"unique_resources": len(by_resource),
"top_reasons": sorted(by_reason.items(), key=lambda x: -x[1])[:5],
"top_resources": sorted(by_resource.items(), key=lambda x: -x[1])[:5]
}
async def suggest_permission_fixes(self) -> List[str]:
"""建议权限修复"""
stats = await self.get_denial_stats()
suggestions = []
# 分析高频拒绝
for resource, count in stats["top_resources"]:
if count > 10:
suggestions.append(
f"资源 '{resource}' 在过去1小时被拒绝 {count} 次,"
f"建议添加到允许列表"
)
# 分析常见原因
for reason, count in stats["top_reasons"]:
if "rate_limit" in reason.lower():
suggestions.append(
f"检测到频繁的速率限制拒绝,建议增加速率限制阈值"
)
return suggestions
async def _check_frequent_denials(self, denial: PermissionDenial):
"""检查是否频繁拒绝"""
# 最近10分钟的拒绝
cutoff_time = datetime.utcnow() - timedelta(minutes=10)
recent_denials = [
d for d in self.denials
if d.timestamp >= cutoff_time and
d.resource == denial.resource and
d.action == denial.action
]
if len(recent_denials) > 5:
# 触发告警
await self.alerting_system.send_alert(
severity="warning",
message=f"Frequent permission denials for {denial.resource}:{denial.action}",
details={
"denial_count": len(recent_denials),
"reason": denial.reason
}
)
2️⃣2️⃣ Token使用追踪 (Token Usage Tracking)
Claw Code的实现
观察:UsageSummary类
@dataclass(frozen=True)
class UsageSummary:
input_tokens: int = 0
output_tokens: int = 0
def add_turn(self, prompt: str, output: str) -> 'UsageSummary':
return UsageSummary(
input_tokens=self.input_tokens + len(prompt.split()),
output_tokens=self.output_tokens + len(output.split()),
)
核心思想:
- 简洁统计 - 只追踪核心指标
- 不可变更新 - 返回新对象而不是修改
- 累计计算 - 支持多轮累加
- 成本估算 - 可转换为成本
- 预算控制 - 可设置预算限制
应用到灵知系统
class TokenUsageTracker:
"""Token使用追踪器"""
def __init__(self):
self.usage_by_provider: Dict[str, ProviderUsage] = {}
self.daily_budget: Dict[str, int] = {
"hunyuan": 1_000_000, # 100万tokens
"deepseek": 5_000_000, # 500万tokens
"lingzhi": 10_000_000 # 1000万tokens(内部)
}
async def track_usage(
self,
provider: str,
prompt: str,
response: str,
metadata: Dict[str, Any] = None
):
"""追踪使用"""
# 估算token数(简化版)
input_tokens = self._estimate_tokens(prompt)
output_tokens = self._estimate_tokens(response)
if provider not in self.usage_by_provider:
self.usage_by_provider[provider] = ProviderUsage(
provider=provider,
input_tokens=0,
output_tokens=0,
total_cost=0.0
)
usage = self.usage_by_provider[provider]
# 更新使用量
updated_usage = ProviderUsage(
provider=provider,
input_tokens=usage.input_tokens + input_tokens,
output_tokens=usage.output_tokens + output_tokens,
total_cost=usage.total_cost + self._calculate_cost(
provider, input_tokens, output_tokens
)
)
self.usage_by_provider[provider] = updated_usage
# 检查预算
await self._check_budget(provider, updated_usage)
# 记录到数据库
await self._persist_usage(provider, input_tokens, output_tokens, metadata)
async def get_usage_report(
self,
provider: str = None
) -> Dict[str, Any]:
"""获取使用报告"""
if provider:
usage = self.usage_by_provider.get(provider)
if not usage:
return {"error": f"No usage data for {provider}"}
return {
"provider": provider,
"input_tokens": usage.input_tokens,
"output_tokens": usage.output_tokens,
"total_tokens": usage.input_tokens + usage.output_tokens,
"estimated_cost": usage.total_cost,
"budget_remaining": self.daily_budget.get(provider, 0) - (usage.input_tokens + usage.output_tokens)
}
else:
# 所有provider的汇总
return {
provider: {
"input_tokens": usage.input_tokens,
"output_tokens": usage.output_tokens,
"total_tokens": usage.input_tokens + usage.output_tokens,
"estimated_cost": usage.total_cost
}
for provider, usage in self.usage_by_provider.items()
}
async def _check_budget(self, provider: str, usage: ProviderUsage):
"""检查预算"""
total_tokens = usage.input_tokens + usage.output_tokens
budget = self.daily_budget.get(provider, float('inf'))
if total_tokens > budget:
# 超出预算
await self.alerting_system.send_alert(
severity="critical",
message=f"Token budget exceeded for {provider}",
details={
"provider": provider,
"used": total_tokens,
"budget": budget,
"overage": total_tokens - budget
}
)
# 可以选择暂停该provider
# await self._disable_provider(provider)
def _estimate_tokens(self, text: str) -> int:
"""估算token数(简化版)"""
# 简单估算:中文约1.5字符/token,英文约4字符/token
chinese_chars = len([c for c in text if '\u4e00' <= c <= '\u9fff'])
english_chars = len(text) - chinese_chars
return int(chinese_chars / 1.5 + english_chars / 4)
def _calculate_cost(
self,
provider: str,
input_tokens: int,
output_tokens: int
) -> float:
"""计算成本"""
# 简化定价(实际应该从配置读取)
pricing = {
"hunyuan": {"input": 0.015, "output": 0.06}, # 元/1K tokens
"deepseek": {"input": 0.001, "output": 0.002},
"lingzhi": {"input": 0.0, "output": 0.0} # 内部免费
}
if provider not in pricing:
return 0.0
rates = pricing[provider]
input_cost = (input_tokens / 1000) * rates["input"]
output_cost = (output_tokens / 1000) * rates["output"]
return input_cost + output_cost
2️⃣3️⃣ 执行注册表 (Execution Registry)
Claw Code的实现
观察:build_execution_registry()
def build_execution_registry():
# 注册所有命令和工具
# 提供统一的执行接口
registry = {
"commands": {...},
"tools": {...}
}
return registry
# 使用
registry = build_execution_registry()
command_execs = tuple(
registry.command(match.name).execute(prompt)
for match in matches if match.kind == 'command'
)
核心思想:
- 集中注册 - 所有可执行对象在一处注册
- 延迟加载 - 按需加载,不是一次性全部加载
- 统一接口 - 命令和工具有相同的执行接口
- 类型安全 - 每种类型有特定的注册表
- 易于扩展 - 添加新命令/工具只需注册
应用到灵知系统
class EvolutionExecutionRegistry:
"""进化执行注册表"""
def __init__(self):
self._agents: Dict[str, Type[EvolutionAgent]] = {}
self._tools: Dict[str, Type[EvolutionTool]] = {}
self._hooks: Dict[str, List[HookHandler]] = {}
def register_agent(
self,
name: str,
agent_class: Type[EvolutionAgent],
metadata: Dict[str, Any] = None
):
"""注册Agent"""
self._agents[name] = agent_class
# 记录元数据
if metadata:
self._agent_metadata[name] = metadata
def register_tool(
self,
name: str,
tool_class: Type[EvolutionTool],
metadata: Dict[str, Any] = None
):
"""注册工具"""
self._tools[name] = tool_class
if metadata:
self._tool_metadata[name] = metadata
def register_hook(
self,
event: str,
handler: HookHandler,
priority: int = 0
):
"""注册Hook"""
if event not in self._hooks:
self._hooks[event] = []
self._hooks[event].append(handler)
# 按优先级排序
self._hooks[event].sort(key=lambda h: h.priority, reverse=True)
async def execute_agent(
self,
name: str,
task: Dict[str, Any],
context: Dict[str, Any]
) -> Any:
"""执行Agent"""
if name not in self._agents:
raise AgentNotFoundError(name)
agent_class = self._agents[name]
# 创建Agent实例
agent = agent_class()
# 执行前Hook
await self._execute_hooks("before_agent_execution", {
"agent_name": name,
"task": task
})
# 执行Agent
try:
result = await agent.execute(task, context)
# 执行后Hook
await self._execute_hooks("after_agent_execution", {
"agent_name": name,
"result": result
})
return result
except Exception as e:
# 错误Hook
await self._execute_hooks("on_agent_error", {
"agent_name": name,
"error": str(e)
})
raise
async def execute_tool(
self,
name: str,
params: Dict[str, Any],
context: Dict[str, Any]
) -> Any:
"""执行工具"""
if name not in self._tools:
raise ToolNotFoundError(name)
tool_class = self._tools[name]
tool = tool_class()
return await tool.execute(params, context)
async def _execute_hooks(
self,
event: str,
context: Dict[str, Any]
):
"""执行Hooks"""
if event not in self._hooks:
return
for handler in self._hooks[event]:
try:
await handler.handle(context)
except Exception as e:
logger.error(f"Hook failed: {e}")
# Hook失败不影响主流程
def list_agents(self) -> List[Dict[str, Any]]:
"""列出所有Agent"""
return [
{
"name": name,
"class": agent_class.__name__,
"metadata": self._agent_metadata.get(name, {})
}
for name, agent_class in self._agents.items()
]
def list_tools(self) -> List[Dict[str, Any]]:
"""列出所有工具"""
return [
{
"name": name,
"class": tool_class.__name__,
"metadata": self._tool_metadata.get(name, {})
}
for name, tool_class in self._tools.items()
]
# 使用示例
registry = EvolutionExecutionRegistry()
# 注册Agent
registry.register_agent(
name="exploration",
agent_class=EvolutionExplorationAgent,
metadata={
"description": "探索改进机会",
"version": "1.0.0",
"author": "LingZhi Team"
}
)
registry.register_agent(
name="verification",
agent_class=EvolutionVerificationAgent,
metadata={
"description": "验证进化效果",
"version": "1.0.0"
}
)
# 注册工具
registry.register_tool(
name="multi_ai_compare",
tool_class=MultiAICompareTool,
metadata={
"description": "多AI对比工具",
"timeout": 30.0
}
)
# 注册Hook
registry.register_hook(
event="before_agent_execution",
handler=LoggingHook(),
priority=10
)
registry.register_hook(
event="after_agent_execution",
handler=MetricsTrackingHook(),
priority=5
)
# 执行
result = await registry.execute_agent(
name="exploration",
task={"query": "...", "response": "..."},
context={"db": db, "user_id": "..."}
)
📊 总结:额外发现的8大核心思想
| # | 思想 | 核心价值 | 应用优先级 |
|---|---|---|---|
| 16 | 数据类驱动 | 类型安全、不可变性 | P1 |
| 17 | 智能路由 | 自动选择最佳处理路径 | P0 |
| 18 | 快照驱动 | 架构可追溯、渐进移植 | P1 |
| 19 | 会话持久化 | 断点恢复、跨会话记忆 | P0 |
| 20 | 历史日志 | 可审计、可分析 | P1 |
| 21 | 权限拒绝追踪 | 安全监控、自动修复 | P1 |
| 22 | Token追踪 | 成本控制、预算管理 | P0 |
| 23 | 执行注册表 | 统一管理、易于扩展 | P0 |
🚀 立即可实现的功能
基于这些新发现,我们可以立即实现:
本周(P0)
- Token使用追踪 - 监控API成本
- 执行注册表 - 统一Agent和工具管理
- 会话持久化 - 支持断点恢复
本月(P1)
- 智能路由 - 自动选择最佳Agent
- 历史日志 - 完整的审计追踪
- 权限拒绝追踪 - 安全监控
下季度(P2)
- 快照驱动开发 - 架构演进追踪
- 数据类重构 - 提升代码质量
众智混元,万法灵通 ⚡🚀