LingFlow V4.0 封装优化方案
版本: V4.0.0 日期: 2026-03-25 类型: 架构设计方案 状态: 待实施
执行摘要
本方案融合了两个现有封装方案(渐进式改进和全新API设计)的优点,提出一个更优化的混合架构。
核心创新: 1. 🎯 双模式API:同时支持同步和异步,用户自由选择 2. 🔌 插件系统:标准化插件接口,支持第三方扩展 3. 🚀 性能优化:智能缓存、懒加载、并行执行 4. 🔄 配置热重载:无需重启,动态更新配置 5. 📊 内置监控:性能追踪、错误统计、资源监控 6. 🛠️ 开发者体验:自动补全、类型提示、错误诊断 7. 🎨 渐进式类型化:支持完全类型化,也支持渐进式迁移 8. 🔀 智能适配器:自动转换新旧API,无缝迁移
预期收益: - 开发效率:+80% - 类型安全:100% - API 一致性:5/5 - 用户体验:显著提升 - 测试覆盖率:>95%
一、架构设计
1.1 整体架构图
┌─────────────────────────────────────────────────────────────────┐
│ 用户层 (CLI/SDK/REST) │
├─────────────────────────────────────────────────────────────────┤
│ API门面层 (Facade) │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ LingFlow (统一入口) - 支持同步/异步 ││
│ │ ├─ skill: SkillService (同步) ││
│ │ ├─ workflow: WorkflowService (同步) ││
│ │ ├─ agent: AgentService (同步) ││
│ │ ├─ skill_async: SkillService (异步) ││
│ │ ├─ workflow_async: WorkflowService (异步) ││
│ │ └─ agent_async: AgentService (异步) ││
│ └─────────────────────────────────────────────────────────────┘│
├─────────────────────────────────────────────────────────────────┤
│ 服务层 (Services) │
│ ┌──────────────┬──────────────┬──────────────┬──────────────┐ │
│ │SkillService │WorkflowSvc │AgentService │PluginService │ │
│ │ │ │ │ │ │
│ │- execute() │- execute() │- spawn() │- load() │ │
│ │- list() │- validate() │- status() │- register() │ │
│ │- get_info() │- schedule() │- terminate() │- reload() │ │
│ └──────────────┴──────────────┴──────────────┴──────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ 领域层 (Domain) │
│ ┌──────────────┬──────────────┬──────────────┬──────────────┐ │
│ │Coordinator │Orchestrator │Agent │Workflow │ │
│ │ │ │ │ │ │
│ │- 任务调度 │- 工作流编排 │- 任务执行 │- 工作流定义 │ │
│ │- 依赖管理 │- 依赖解析 │- 状态管理 │- 验证 │ │
│ │- 负载均衡 │- 错误处理 │- 超时控制 │- 序列化 │ │
│ └──────────────┴──────────────┴──────────────┴──────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ 技能层 (Skills) │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ BaseSkill (技能基类) ││
│ │ ├─ validate_params() - 参数验证 ││
│ │ ├─ execute() - 执行逻辑 ││
│ │ ├─ pre_execute() - 前置钩子 ││
│ │ ├─ post_execute() - 后置钩子 ││
│ │ └─ on_error() - 错误处理 ││
│ │ ││
│ │ 核心技能: brainstorming, writing-plans, tdd, debugging... ││
│ │ 插件技能: 第三方技能通过 PluginService 动态加载 ││
│ └─────────────────────────────────────────────────────────────┘│
├─────────────────────────────────────────────────────────────────┤
│ 基础设施层 (Infrastructure) │
│ ┌──────────────┬──────────────┬──────────────┬──────────────┐ │
│ │ConfigManager │CacheManager │Monitor │Logger │ │
│ │ │ │ │ │ │
│ │- 加载配置 │- 智能缓存 │- 性能追踪 │- 结构化日志 │ │
│ │- 验证配置 │- 懒加载 │- 错误统计 │- 分级日志 │ │
│ │- 热重载 │- 失效策略 │- 资源监控 │- 日志聚合 │ │
│ └──────────────┴──────────────┴──────────────┴──────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ 核心类型层 (Core Types) │
│ ┌──────────────┬──────────────┬──────────────┬──────────────┐ │
│ │Result[T] │LingFlowConfig│BaseSkill │Exceptions │ │
│ │ │ │ │ │ │
│ │- 统一结果 │- 配置构建器 │- 技能基类 │- 统一异常 │ │
│ │- 链式调用 │- 类型验证 │- 生命周期 │- 错误码 │ │
│ │- 函数式编程 │- 热重载 │- 元数据 │- 结构化 │ │
│ └──────────────┴──────────────┴──────────────┴──────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ 适配器层 (Adapters) │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ LegacyAdapter - 兼容旧API ││
│ │ AsyncAdapter - 同步/异步转换 ││
│ │ TypeAdapter - 类型转换 ││
│ └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘
1.2 分层职责
| 层级 | 职责 | 示例 |
|---|---|---|
| API门面层 | 统一入口,简化使用 | LingFlow 类 |
| 服务层 | 业务逻辑,服务编排 | SkillService |
| 领域层 | 核心模型,业务规则 | Coordinator, Agent |
| 技能层 | 可扩展功能,插件化 | BaseSkill |
| 基础设施层 | 通用服务,底层支持 | ConfigManager, CacheManager |
| 核心类型层 | 基础类型,工具类 | Result, Exceptions |
| 适配器层 | 兼容性,转换层 | LegacyAdapter, AsyncAdapter |
二、核心类型设计
2.1 Result 类型(增强版)
改进点: 1. 添加重试机制 2. 添加超时控制 3. 添加回退策略 4. 添加组合操作 5. 添加序列化支持
from typing import Generic, TypeVar, Callable, Optional, Dict, Any, List, Union
from dataclasses import dataclass, field
from functools import wraps
import time
import asyncio
import json
T = TypeVar('T')
E = TypeVar('E', bound=Exception)
@dataclass
class Result(Generic[T]):
"""统一的执行结果封装(增强版)
Features:
- 类型安全(泛型)
- 链式调用(map, and_then, or_else)
- 错误处理(retry, fallback)
- 并发控制(timeout, with_timeout)
- 组合操作(zip, all_ok, first_ok)
- 序列化支持(to_dict, from_dict)
Examples:
>>> # 基础使用
>>> success = Result.ok(data={"key": "value"})
>>> failure = Result.fail("Something went wrong", code="ERR001")
>>>
>>> # 链式调用
>>> result = (Result.ok(" hello ")
... .map(str.strip)
... .map(str.upper)
... .and_then(lambda x: Result.ok(x + "!")))
>>>
>>> # 错误处理
>>> result = (Result.ok("test")
... .retry(times=3, delay=1)
... .fallback(lambda: Result.ok("default")))
>>>
>>> # 并发控制
>>> result = Result.ok("data").with_timeout(5)
"""
data: Optional[T] = None
error: Optional[str] = None
code: str = "OK"
details: Dict[str, Any] = field(default_factory=dict)
execution_time: float = 0.0
_raw_exception: Optional[Exception] = field(default=None, repr=False)
# ==================== 工厂方法 ====================
@classmethod
def ok(cls, data: T, **details) -> "Result[T]":
"""创建成功结果"""
return cls(data=data, code="OK", details=details)
@classmethod
def fail(cls, error: str, code: str = "ERROR", **details) -> "Result[T]":
"""创建失败结果"""
return cls(data=None, error=error, code=code, details=details)
@classmethod
def from_exception(cls, exc: Exception) -> "Result[T]":
"""从异常创建结果"""
return cls.fail(
error=str(exc),
code=exc.__class__.__name__,
exception_type=type(exc).__name__
)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Result[T]":
"""从字典反序列化"""
try:
if data.get("code") == "OK":
return cls.ok(data.get("data"), **data.get("details", {}))
else:
return cls.fail(
data.get("error", "Unknown error"),
code=data.get("code", "ERROR"),
**data.get("details", {})
)
except Exception as e:
return cls.fail(f"Failed to deserialize: {e}")
# ==================== 属性 ====================
@property
def is_ok(self) -> bool:
"""是否成功"""
return self.code == "OK" and self.error is None
@property
def is_error(self) -> bool:
"""是否失败"""
return not self.is_ok
# ==================== 基础操作 ====================
def unwrap(self) -> T:
"""获取数据,失败时抛出异常"""
if self.is_error:
raise LingFlowError(self.error or "Unknown error", code=self.code)
return self.data
def unwrap_or(self, default: T) -> T:
"""获取数据,失败时返回默认值"""
return self.data if self.is_ok else default
def unwrap_or_else(self, func: Callable[[], T]) -> T:
"""获取数据,失败时调用函数"""
return self.data if self.is_ok else func()
def to_dict(self) -> Dict[str, Any]:
"""序列化为字典"""
return {
"ok": self.is_ok,
"data": self.data,
"error": self.error,
"code": self.code,
"details": self.details,
"execution_time": self.execution_time,
}
def to_json(self) -> str:
"""序列化为 JSON"""
return json.dumps(self.to_dict(), ensure_ascii=False, indent=2)
# ==================== 链式操作 ====================
def map(self, func: Callable[[T], Any]) -> "Result[Any]":
"""链式转换(同步)"""
if self.is_ok:
try:
start_time = time.time()
result = func(self.data)
return Result.ok(result, **self.details, execution_time=time.time() - start_time)
except Exception as e:
return Result.fail(str(e), code="MAP_ERROR")
return self
async def map_async(self, func: Callable[[T], Any]) -> "Result[Any]":
"""链式转换(异步)"""
if self.is_ok:
try:
start_time = time.time()
result = await func(self.data)
return Result.ok(result, **self.details, execution_time=time.time() - start_time)
except Exception as e:
return Result.fail(str(e), code="MAP_ERROR")
return self
def and_then(self, func: Callable[[T], "Result[T]"]) -> "Result[T]":
"""链式调用(同步)"""
if self.is_ok:
return func(self.data)
return self
async def and_then_async(self, func: Callable[[T], "Result[T]"]) -> "Result[T]":
"""链式调用(异步)"""
if self.is_ok:
return await func(self.data)
return self
def or_else(self, func: Callable[[str], "Result[T]"]) -> "Result[T]":
"""错误处理链"""
if self.is_error:
return func(self.error)
return self
# ==================== 错误处理 ====================
def retry(
self,
times: int = 3,
delay: float = 1.0,
backoff: float = 2.0,
on_retry: Optional[Callable[[int, Exception], None]] = None
) -> "Result[T]":
"""重试机制(仅对成功的操作生效)"""
if not self.is_ok:
return self
original_func = None
# 注意:这需要保存原始函数引用,实际使用时需要更复杂的设计
# 这里仅作为接口示例
return self
def fallback(self, func: Callable[[], "Result[T]"]) -> "Result[T]":
"""回退策略"""
if self.is_ok:
return self
return func()
# ==================== 并发控制 ====================
def with_timeout(self, timeout: float) -> "Result[T]":
"""超时控制(同步)"""
if self.execution_time > timeout:
return Result.fail(
f"Operation timed out after {self.execution_time}s (limit: {timeout}s)",
code="TIMEOUT"
)
return self
async def with_timeout_async(self, timeout: float) -> "Result[T]":
"""超时控制(异步)"""
try:
result = await asyncio.wait_for(
self.and_then_async(lambda x: Result.ok(x)),
timeout=timeout
)
return result
except asyncio.TimeoutError:
return Result.fail(
f"Operation timed out after {timeout}s",
code="TIMEOUT"
)
# ==================== 组合操作 ====================
@staticmethod
def zip(*results: "Result[T]") -> "Result[List[T]]":
"""组合多个结果(全部成功才成功)"""
data = []
errors = []
for result in results:
if result.is_ok:
data.append(result.data)
else:
errors.append(result.error)
if errors:
return Result.fail(
f"Some results failed: {', '.join(errors)}",
code="ZIP_ERROR",
failed_results=len(errors)
)
return Result.ok(data)
@staticmethod
def first_ok(*results: "Result[T]") -> "Result[T]":
"""返回第一个成功的结果"""
for result in results:
if result.is_ok:
return result
return Result.fail(
"All results failed",
code="ALL_FAILED",
errors=[r.error for r in results if r.is_error]
)
@staticmethod
def all_ok(*results: "Result[T]") -> "Result[List[T]]":
"""全部成功才成功(同 zip)"""
return Result.zip(*results)
# ==================== 调试支持 ====================
def debug(self) -> "Result[T]":
"""打印调试信息并返回自身"""
import sys
print(f"[DEBUG] Result: {self.to_dict()}", file=sys.stderr)
return self
def inspect(self, func: Callable[["Result[T]"], None]) -> "Result[T]":
"""检查结果并返回自身"""
func(self)
return self
# ==================== 装饰器支持 ====================
def resultify(func):
"""将函数返回值转换为 Result"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
if isinstance(result, Result):
return result
return Result.ok(result)
except Exception as e:
return Result.from_exception(e)
return wrapper
def async_resultify(func):
"""将异步函数返回值转换为 Result"""
@wraps(func)
async def wrapper(*args, **kwargs):
try:
result = await func(*args, **kwargs)
if isinstance(result, Result):
return result
return Result.ok(result)
except Exception as e:
return Result.from_exception(e)
return wrapper
# ==================== 并发操作 ====================
async def gather_results(*coros: "Result[T]") -> "Result[List[T]]":
"""并发执行多个操作"""
try:
results = await asyncio.gather(*coros, return_exceptions=True)
data = []
errors = []
for result in results:
if isinstance(result, Exception):
errors.append(str(result))
elif isinstance(result, Result):
if result.is_ok:
data.append(result.data)
else:
errors.append(result.error)
else:
data.append(result)
if errors:
return Result.fail(
f"Some operations failed: {', '.join(errors)}",
code="GATHER_ERROR"
)
return Result.ok(data)
except Exception as e:
return Result.from_exception(e)
2.2 统一异常体系
from typing import Dict, Any, Optional, List
class LingFlowError(Exception):
"""LingFlow 基础异常"""
def __init__(
self,
message: str,
*,
code: str = "LF000",
details: Optional[Dict[str, Any]] = None,
cause: Optional[Exception] = None
):
self.message = message
self.code = code
self.details = details or {}
self.cause = cause
# 构建完整消息
full_message = f"[{code}] {message}"
if cause:
full_message += f" (caused by: {type(cause).__name__}: {cause})"
super().__init__(full_message)
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
result = {
"error": self.__class__.__name__,
"code": self.code,
"message": self.message,
"details": self.details,
}
if self.cause:
result["cause"] = str(self.cause)
return result
def __str__(self) -> str:
return self.message
def __repr__(self) -> str:
return f"{self.__class__.__name__}(code={self.code!r}, message={self.message!r})"
class WorkflowError(LingFlowError):
"""工作流相关异常"""
def __init__(
self,
message: str,
*,
workflow_id: Optional[str] = None,
task_id: Optional[str] = None,
**kwargs
):
details = kwargs.get('details', {})
if workflow_id:
details['workflow_id'] = workflow_id
if task_id:
details['task_id'] = task_id
super().__init__(message, code="WF001", details=details, **kwargs)
class SkillError(LingFlowError):
"""技能相关异常"""
def __init__(
self,
message: str,
*,
skill_name: Optional[str] = None,
skill_version: Optional[str] = None,
**kwargs
):
details = kwargs.get('details', {})
if skill_name:
details['skill_name'] = skill_name
if skill_version:
details['skill_version'] = skill_version
super().__init__(message, code="SK001", details=details, **kwargs)
class AgentError(LingFlowError):
"""代理相关异常"""
def __init__(
self,
message: str,
*,
agent_id: Optional[str] = None,
agent_type: Optional[str] = None,
**kwargs
):
details = kwargs.get('details', {})
if agent_id:
details['agent_id'] = agent_id
if agent_type:
details['agent_type'] = agent_type
super().__init__(message, code="AG001", details=details, **kwargs)
class ConfigurationError(LingFlowError):
"""配置相关异常"""
def __init__(
self,
message: str,
*,
config_key: Optional[str] = None,
config_value: Optional[Any] = None,
**kwargs
):
details = kwargs.get('details', {})
if config_key:
details['config_key'] = config_key
if config_value is not None:
details['config_value'] = str(config_value)
super().__init__(message, code="CF001", details=details, **kwargs)
class ValidationError(LingFlowError):
"""验证相关异常"""
def __init__(
self,
message: str,
*,
field_name: Optional[str] = None,
field_value: Optional[Any] = None,
errors: Optional[List[str]] = None,
**kwargs
):
details = kwargs.get('details', {})
if field_name:
details['field_name'] = field_name
if field_value is not None:
details['field_value'] = str(field_value)
if errors:
details['errors'] = errors
super().__init__(message, code="VD001", details=details, **kwargs)
class TimeoutError(LingFlowError):
"""超时相关异常"""
def __init__(
self,
message: str,
*,
timeout: Optional[float] = None,
operation: Optional[str] = None,
**kwargs
):
details = kwargs.get('details', {})
if timeout:
details['timeout'] = timeout
if operation:
details['operation'] = operation
super().__init__(message, code="TO001", details=details, **kwargs)
# ==================== 错误码定义 ====================
class ErrorCode:
"""错误码常量"""
# 通用错误 (LF000-LF099)
UNKNOWN_ERROR = "LF000"
INTERNAL_ERROR = "LF001"
NOT_IMPLEMENTED = "LF002"
# 工作流错误 (WF100-WF199)
WORKFLOW_NOT_FOUND = "WF100"
WORKFLOW_INVALID = "WF101"
WORKFLOW_FAILED = "WF102"
WORKFLOW_TIMEOUT = "WF103"
# 技能错误 (SK200-SK299)
SKILL_NOT_FOUND = "SK200"
SKILL_INVALID = "SK201"
SKILL_EXECUTION_FAILED = "SK202"
SKILL_VALIDATION_FAILED = "SK203"
# 代理错误 (AG300-AG399)
AGENT_NOT_FOUND = "AG300"
AGENT_BUSY = "AG301"
AGENT_FAILED = "AG302"
# 配置错误 (CF400-CF499)
CONFIG_INVALID = "CF400"
CONFIG_MISSING = "CF401"
CONFIG_TYPE_ERROR = "CF402"
# 验证错误 (VD500-VD599)
VALIDATION_FAILED = "VD500"
PARAMETER_MISSING = "VD501"
PARAMETER_INVALID = "VD502"
# 超时错误 (TO600-TO699)
TIMEOUT = "TO600"
OPERATION_TIMEOUT = "TO601"
2.3 配置系统
改进点: 1. 支持多种配置源(YAML, JSON, 环境变量, 命令行) 2. 配置热重载 3. 配置验证 4. 配置合并和继承 5. 配置加密支持
from typing import Dict, Any, Optional, Union, List, Callable
from dataclasses import dataclass, field, asdict
from pathlib import Path
import yaml
import json
import os
import hashlib
import time
from enum import Enum
import threading
import importlib
class ConfigSource(Enum):
"""配置来源"""
FILE_YAML = "yaml"
FILE_JSON = "json"
ENV = "env"
CLI = "cli"
CODE = "code"
@dataclass
class LingFlowConfig:
"""LingFlow 配置类(支持热重载)"""
# ========== 工作流配置 ==========
max_parallel: int = 2
max_iterations: int = 100
scheduling_delay: float = 0.01
workflow_timeout: float = 600.0
# ========== 技能配置 ==========
skills_path: str = "skills"
skill_timeout: float = 30.0
skill_cache_enabled: bool = True
skill_cache_ttl: int = 3600 # seconds
# ========== 代理配置 ==========
agent_timeout: float = 300.0
agent_context_limit: int = 8000
agent_max_tasks: int = 3
# ========== 压缩配置 ==========
compression_enabled: bool = True
compression_target_tokens: int = 4000
compression_strategy: str = "priority" # priority, random, none
# ========== 安全配置 ==========
allow_symlinks: bool = False
validate_paths: bool = True
max_file_size: int = 10 * 1024 * 1024 # 10MB
allowed_extensions: List[str] = field(default_factory=lambda: [".py", ".yaml", ".json"])
# ========== 日志配置 ==========
log_level: str = "INFO"
log_format: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
log_file: Optional[str] = None
log_rotation: bool = True
log_max_bytes: int = 10 * 1024 * 1024 # 10MB
log_backup_count: int = 5
# ========== 监控配置 ==========
monitoring_enabled: bool = True
metrics_enabled: bool = True
tracing_enabled: bool = False
performance_threshold: float = 1.0 # seconds
# ========== 插件配置 ==========
plugins_enabled: bool = True
plugins_path: str = "plugins"
auto_load_plugins: bool = True
# ========== 缓存配置 ==========
cache_enabled: bool = True
cache_ttl: int = 3600 # seconds
cache_max_size: int = 1000
# ========== 并发配置 ==========
async_enabled: bool = True
thread_pool_size: int = 4
async_pool_size: int = 10
@classmethod
def builder(cls) -> "LingFlowConfigBuilder":
"""创建构建器"""
return LingFlowConfigBuilder(cls())
def validate(self) -> Result[None]:
"""验证配置"""
errors = []
# 验证工作流配置
if self.max_parallel < 1:
errors.append("max_parallel must be >= 1")
if self.max_iterations < 1:
errors.append("max_iterations must be >= 1")
if self.workflow_timeout < 0:
errors.append("workflow_timeout must be >= 0")
# 验证技能配置
if self.skill_timeout < 0:
errors.append("skill_timeout must be >= 0")
if not os.path.isabs(self.skills_path):
if not os.path.exists(self.skills_path):
errors.append(f"skills_path does not exist: {self.skills_path}")
# 验证代理配置
if self.agent_timeout < 0:
errors.append("agent_timeout must be >= 0")
if self.agent_context_limit < 1000:
errors.append("agent_context_limit must be >= 1000")
# 验证压缩配置
if self.compression_target_tokens < 1000:
errors.append("compression_target_tokens must be >= 1000")
# 验证日志配置
valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
if self.log_level not in valid_levels:
errors.append(f"log_level must be one of {valid_levels}")
if errors:
return Result.fail(
f"Configuration validation failed: {'; '.join(errors)}",
code="CONFIG_VALIDATION_FAILED",
errors=errors
)
return Result.ok()
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return asdict(self)
def to_yaml(self, file_path: Optional[Path] = None) -> str:
"""转换为 YAML"""
yaml_str = yaml.dump(self.to_dict(), default_flow_style=False, allow_unicode=True)
if file_path:
file_path.write_text(yaml_str, encoding='utf-8')
return yaml_str
def to_json(self, file_path: Optional[Path] = None) -> str:
"""转换为 JSON"""
json_str = json.dumps(self.to_dict(), ensure_ascii=False, indent=2)
if file_path:
file_path.write_text(json_str, encoding='utf-8')
return json_str
@classmethod
def from_yaml(cls, file_path: Union[str, Path]) -> "LingFlowConfig":
"""从 YAML 文件加载"""
file_path = Path(file_path)
data = yaml.safe_load(file_path.read_text(encoding='utf-8'))
return cls(**data)
@classmethod
def from_json(cls, file_path: Union[str, Path]) -> "LingFlowConfig":
"""从 JSON 文件加载"""
file_path = Path(file_path)
data = json.loads(file_path.read_text(encoding='utf-8'))
return cls(**data)
@classmethod
def from_env(cls, prefix: str = "LINGFLOW_") -> "LingFlowConfig":
"""从环境变量加载"""
data = {}
for key, value in os.environ.items():
if key.startswith(prefix):
config_key = key[len(prefix):].lower()
data[config_key] = value
# 转换类型
instance = cls()
for key, value in data.items():
if hasattr(instance, key):
field_type = type(getattr(instance, key))
if field_type == bool:
data[key] = value.lower() in ('true', '1', 'yes')
elif field_type == int:
data[key] = int(value)
elif field_type == float:
data[key] = float(value)
return cls(**data)
class LingFlowConfigBuilder:
"""配置构建器(增强版)"""
def __init__(self, config: LingFlowConfig):
self._config = config
# ========== 工作流配置 ==========
def max_parallel(self, value: int) -> "LingFlowConfigBuilder":
self._config.max_parallel = value
return self
def max_iterations(self, value: int) -> "LingFlowConfigBuilder":
self._config.max_iterations = value
return self
def workflow_timeout(self, value: float) -> "LingFlowConfigBuilder":
self._config.workflow_timeout = value
return self
# ========== 技能配置 ==========
def skills_path(self, path: str) -> "LingFlowConfigBuilder":
self._config.skills_path = path
return self
def skill_timeout(self, value: float) -> "LingFlowConfigBuilder":
self._config.skill_timeout = value
return self
# ========== 代理配置 ==========
def agent_timeout(self, value: float) -> "LingFlowConfigBuilder":
self._config.agent_timeout = value
return self
def agent_context_limit(self, value: int) -> "LingFlowConfigBuilder":
self._config.agent_context_limit = value
return self
# ========== 压缩配置 ==========
def compression(
self,
enabled: bool,
target_tokens: int = 4000
) -> "LingFlowConfigBuilder":
self._config.compression_enabled = enabled
self._config.compression_target_tokens = target_tokens
return self
# ========== 安全配置 ==========
def security(
self,
allow_symlinks: bool = False,
validate_paths: bool = True
) -> "LingFlowConfigBuilder":
self._config.allow_symlinks = allow_symlinks
self._config.validate_paths = validate_paths
return self
# ========== 日志配置 ==========
def logging(
self,
level: str = "INFO",
file_path: Optional[str] = None
) -> "LingFlowConfigBuilder":
self._config.log_level = level
self._config.log_file = file_path
return self
# ========== 监控配置 ==========
def monitoring(
self,
enabled: bool = True,
metrics: bool = True,
tracing: bool = False
) -> "LingFlowConfigBuilder":
self._config.monitoring_enabled = enabled
self._config.metrics_enabled = metrics
self._config.tracing_enabled = tracing
return self
# ========== 插件配置 ==========
def plugins(
self,
enabled: bool = True,
path: str = "plugins",
auto_load: bool = True
) -> "LingFlowConfigBuilder":
self._config.plugins_enabled = enabled
self._config.plugins_path = path
self._config.auto_load_plugins = auto_load
return self
# ========== 并发配置 ==========
def async_mode(self, enabled: bool = True) -> "LingFlowConfigBuilder":
self._config.async_enabled = enabled
return self
def thread_pool_size(self, size: int) -> "LingFlowConfigBuilder":
self._config.thread_pool_size = size
return self
# ========== 便捷方法 ==========
def timeout(self, value: float) -> "LingFlowConfigBuilder":
"""设置所有超时"""
self._config.agent_timeout = value
self._config.skill_timeout = value
self._config.workflow_timeout = value
return self
def performance(self, mode: str = "balanced") -> "LingFlowConfigBuilder":
"""性能预设"""
if mode == "fast":
self._config.max_parallel = 8
self._config.compression_enabled = False
elif mode == "balanced":
self._config.max_parallel = 2
self._config.compression_enabled = True
elif mode == "conservative":
self._config.max_parallel = 1
self._config.compression_enabled = True
return self
def development(self) -> "LingFlowConfigBuilder":
"""开发模式"""
self._config.log_level = "DEBUG"
self._config.monitoring_enabled = True
self._config.performance("balanced")
return self
def production(self) -> "LingFlowConfigBuilder":
"""生产模式"""
self._config.log_level = "WARNING"
self._config.monitoring_enabled = True
self._config.performance("balanced")
return self
def build(self) -> LingFlowConfig:
"""构建配置"""
result = self._config.validate()
if result.is_error:
raise ConfigurationError(result.error, code=result.code)
return self._config
class ConfigManager:
"""配置管理器(支持热重载)"""
def __init__(self, config: Optional[LingFlowConfig] = None):
self._config = config or LingFlowConfig()
self._config_hash = self._compute_hash()
self._lock = threading.RLock()
self._reload_callbacks: List[Callable[[LingFlowConfig], None]] = []
def _compute_hash(self) -> str:
"""计算配置哈希值"""
config_str = json.dumps(self._config.to_dict(), sort_keys=True)
return hashlib.sha256(config_str.encode()).hexdigest()
def get_config(self) -> LingFlowConfig:
"""获取配置"""
with self._lock:
return self._config
def reload_config(self, new_config: LingFlowConfig) -> bool:
"""重新加载配置"""
with self._lock:
new_hash = self._compute_hash_from_config(new_config)
if new_hash == self._config_hash:
return False # 配置未变化
old_config = self._config
self._config = new_config
self._config_hash = new_hash
# 调用重载回调
for callback in self._reload_callbacks:
try:
callback(old_config, new_config)
except Exception as e:
print(f"Config reload callback failed: {e}")
return True
def _compute_hash_from_config(self, config: LingFlowConfig) -> str:
"""计算配置哈希值"""
config_str = json.dumps(config.to_dict(), sort_keys=True)
return hashlib.sha256(config_str.encode()).hexdigest()
def on_reload(self, callback: Callable[[LingFlowConfig, LingFlowConfig], None]):
"""注册配置重载回调"""
self._reload_callbacks.append(callback)
def watch_file(self, file_path: Union[str, Path], interval: float = 1.0):
"""监听配置文件变化"""
import time
file_path = Path(file_path)
last_modified = file_path.stat().st_mtime if file_path.exists() else 0
def watcher():
nonlocal last_modified
while True:
try:
if file_path.exists():
current_modified = file_path.stat().st_mtime
if current_modified != last_modified:
try:
if file_path.suffix in ['.yaml', '.yml']:
new_config = LingFlowConfig.from_yaml(file_path)
elif file_path.suffix == '.json':
new_config = LingFlowConfig.from_json(file_path)
else:
continue
if self.reload_config(new_config):
print(f"Config reloaded from {file_path}")
last_modified = current_modified
except Exception as e:
print(f"Failed to reload config: {e}")
time.sleep(interval)
except Exception as e:
print(f"Config watcher error: {e}")
time.sleep(interval)
import threading
thread = threading.Thread(target=watcher, daemon=True)
thread.start()
三、服务层设计
3.1 技能服务(同步+异步)
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, field
from pathlib import Path
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
import importlib.util
import json
@dataclass
class SkillContext:
"""技能执行上下文(增强版)"""
skill_name: str
params: Dict[str, Any]
working_dir: Path
temp_dir: Path
metadata: Dict[str, Any] = field(default_factory=dict)
# 新增字段
execution_id: str = ""
parent_execution_id: Optional[str] = None
user_data: Dict[str, Any] = field(default_factory=dict)
cache_key: Optional[str] = None
@dataclass
class SkillResult:
"""技能执行结果(增强版)"""
success: bool
data: Optional[Any] = None
error: Optional[str] = None
execution_time: float = 0.0
metadata: Dict[str, Any] = field(default_factory=dict)
# 新增字段
cached: bool = False
execution_id: str = ""
skill_version: str = ""
@classmethod
def ok(cls, data: Any = None, **metadata) -> "SkillResult":
return cls(success=True, data=data, metadata=metadata)
@classmethod
def fail(cls, error: str, **metadata) -> "SkillResult":
return cls(success=False, error=error, metadata=metadata)
def to_result(self) -> Result[Any]:
"""转换为 Result 类型"""
if self.success:
return Result.ok(self.data, **self.metadata)
else:
return Result.fail(self.error or "Unknown error", details=self.metadata)
class SkillService:
"""技能服务(同步版本)"""
def __init__(self, config: LingFlowConfig, cache_manager: Optional['CacheManager'] = None):
self._config = config
self._cache_manager = cache_manager
self._skills: Dict[str, BaseSkill] = {}
self._load_skills()
def _load_skills(self):
"""加载技能"""
skills_dir = Path(self._config.skills_path)
if not skills_dir.exists():
return
for skill_path in skills_dir.iterdir():
if not skill_path.is_dir():
continue
self._load_skill_from_directory(skill_path)
def _load_skill_from_directory(self, skill_path: Path):
"""从目录加载技能"""
# 尝试加载 __init__.py
init_file = skill_path / "__init__.py"
if init_file.exists():
self._load_skill_from_module(init_file, skill_path.name)
return
# 尝试加载 implementation.py
impl_file = skill_path / "implementation.py"
if impl_file.exists():
self._load_skill_from_module(impl_file, skill_path.name)
def _load_skill_from_module(self, module_path: Path, skill_name: str):
"""从模块加载技能"""
try:
spec = importlib.util.spec_from_file_location(
f"skills.{skill_name}", str(module_path)
)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# 查找 BaseSkill 子类
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (isinstance(attr, type) and
issubclass(attr, BaseSkill) and
attr is not BaseSkill):
skill_instance = attr()
self._skills[skill_instance.name] = skill_instance
print(f"Loaded skill: {skill_instance.name}")
except Exception as e:
print(f"Failed to load skill {skill_name}: {e}")
def list(self) -> List[str]:
"""列出所有可用技能"""
return list(self._skills.keys())
def get_info(self, name: str) -> Result[Dict[str, Any]]:
"""获取技能信息"""
if name not in self._skills:
return Result.fail(f"Skill not found: {name}", code="SKILL_NOT_FOUND")
skill = self._skills[name]
return Result.ok({
"name": skill.name,
"description": skill.description,
"version": skill.version,
"author": skill.author,
"dependencies": skill.dependencies,
})
def execute(self, name: str, params: Dict[str, Any]) -> SkillResult:
"""执行技能(同步)"""
if name not in self._skills:
return SkillResult.fail(f"Skill not found: {name}")
skill = self._skills[name]
# 验证参数
validation = skill.validate_params(params)
if validation.is_error:
return SkillResult.fail(validation.error)
# 检查缓存
if self._cache_manager:
cache_key = self._compute_cache_key(name, params)
cached_result = self._cache_manager.get(cache_key)
if cached_result:
result = SkillResult.from_dict(cached_result)
result.cached = True
return result
# 创建上下文
import uuid
context = SkillContext(
skill_name=name,
params=params,
working_dir=Path.cwd(),
temp_dir=Path.cwd() / ".lingflow" / "temp",
execution_id=str(uuid.uuid4()),
cache_key=self._compute_cache_key(name, params) if self._cache_manager else None,
)
# 执行
try:
start_time = time.time()
pre_result = skill.pre_execute(context)
if not pre_result.success:
return pre_result
result = skill.execute(context)
result.execution_time = time.time() - start_time
result.execution_id = context.execution_id
result.skill_version = skill.version
result = skill.post_execute(context, result)
# 缓存结果
if self._cache_manager and result.success and context.cache_key:
self._cache_manager.set(context.cache_key, result.to_dict())
return result
except Exception as e:
error_result = skill.on_error(context, e)
error_result.execution_time = time.time() - start_time
return error_result
def execute_batch(
self,
tasks: List[Dict[str, Any]],
parallel: bool = False,
max_workers: Optional[int] = None
) -> Dict[str, SkillResult]:
"""批量执行技能"""
if not parallel:
# 串行执行
results = {}
for task in tasks:
skill_name = task.get("skill")
params = task.get("params", {})
task_id = task.get("id", skill_name)
results[task_id] = self.execute(skill_name, params)
return results
else:
# 并行执行
results = {}
max_workers = max_workers or self._config.max_parallel
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_task = {
executor.submit(
self.execute,
task.get("skill"),
task.get("params", {})
): task
for task in tasks
}
for future in as_completed(future_to_task):
task = future_to_task[future]
task_id = task.get("id", task.get("skill"))
try:
results[task_id] = future.result()
except Exception as e:
results[task_id] = SkillResult.fail(str(e))
return results
def _compute_cache_key(self, skill_name: str, params: Dict[str, Any]) -> str:
"""计算缓存键"""
import hashlib
key_str = f"{skill_name}:{json.dumps(params, sort_keys=True)}"
return hashlib.sha256(key_str.encode()).hexdigest()
class AsyncSkillService:
"""技能服务(异步版本)"""
def __init__(self, config: LingFlowConfig, cache_manager: Optional['CacheManager'] = None):
self._config = config
self._cache_manager = cache_manager
self._sync_service = SkillService(config, cache_manager)
self._executor = ThreadPoolExecutor(max_workers=config.thread_pool_size)
async def execute(self, name: str, params: Dict[str, Any]) -> SkillResult:
"""执行技能(异步)"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self._executor,
self._sync_service.execute,
name,
params
)
async def execute_batch(
self,
tasks: List[Dict[str, Any]],
parallel: bool = True
) -> Dict[str, SkillResult]:
"""批量执行技能(异步)"""
if not parallel:
results = {}
for task in tasks:
skill_name = task.get("skill")
params = task.get("params", {})
task_id = task.get("id", skill_name)
results[task_id] = await self.execute(skill_name, params)
return results
else:
coroutines = [
self.execute(task.get("skill"), task.get("params", {}))
for task in tasks
]
results_list = await asyncio.gather(*coroutines, return_exceptions=True)
results = {}
for task, result in zip(tasks, results_list):
task_id = task.get("id", task.get("skill"))
if isinstance(result, Exception):
results[task_id] = SkillResult.fail(str(result))
else:
results[task_id] = result
return results
def list(self) -> List[str]:
"""列出所有可用技能"""
return self._sync_service.list()
async def get_info(self, name: str) -> Result[Dict[str, Any]]:
"""获取技能信息"""
return self._sync_service.get_info(name)
3.2 工作流服务
from typing import Dict, Any, List, Optional, Union
from pathlib import Path
import yaml
import json
import uuid
import time
class WorkflowService:
"""工作流服务(同步版本)"""
def __init__(
self,
config: LingFlowConfig,
skill_service: SkillService
):
self._config = config
self._skill_service = skill_service
def execute_file(
self,
workflow_path: Union[str, Path]
) -> Result[Dict[str, Any]]:
"""从文件执行工作流"""
path = Path(workflow_path)
# 验证路径
if not path.exists():
return Result.fail(
f"Workflow file not found: {workflow_path}",
code="FILE_NOT_FOUND"
)
# 加载工作流定义
try:
if path.suffix in ['.yaml', '.yml']:
with open(path, "r", encoding="utf-8") as f:
workflow_def = yaml.safe_load(f)
elif path.suffix == '.json':
with open(path, "r", encoding="utf-8") as f:
workflow_def = json.load(f)
else:
return Result.fail(
f"Unsupported file format: {path.suffix}",
code="UNSUPPORTED_FORMAT"
)
except Exception as e:
return Result.fail(
f"Failed to load workflow: {e}",
code="LOAD_ERROR"
)
# 验证工作流定义
validation = self._validate_workflow(workflow_def)
if validation.is_error:
return Result.fail(
validation.error,
code=validation.code
)
# 执行工作流
return self.execute(workflow_def)
def _validate_workflow(self, workflow: Dict[str, Any]) -> Result[None]:
"""验证工作流定义"""
if "tasks" not in workflow:
return Result.fail(
"Missing 'tasks' in workflow definition",
code="INVALID_WORKFLOW"
)
if not isinstance(workflow["tasks"], list):
return Result.fail(
"'tasks' must be a list",
code="INVALID_WORKFLOW"
)
# 验证每个任务
for i, task in enumerate(workflow["tasks"]):
if "skill" not in task:
return Result.fail(
f"Task {i}: Missing 'skill' field",
code="INVALID_TASK"
)
if task["skill"] not in self._skill_service.list():
return Result.fail(
f"Task {i}: Skill not found: {task['skill']}",
code="SKILL_NOT_FOUND"
)
return Result.ok()
def execute(self, workflow: Dict[str, Any]) -> Result[Dict[str, Any]]:
"""执行工作流定义"""
tasks = workflow.get("tasks", [])
results = {}
workflow_id = str(uuid.uuid4())
start_time = time.time()
for i, task in enumerate(tasks):
skill_name = task.get("skill")
params = task.get("params", {})
task_id = task.get("id", f"task_{i}")
print(f"Executing task {i+1}/{len(tasks)}: {task_id}")
result = self._skill_service.execute(skill_name, params)
results[task_id] = {
"success": result.success,
"data": result.data,
"error": result.error,
"execution_time": result.execution_time,
}
if not result.success:
# 工作流失败
return Result.fail(
f"Task {task_id} failed: {result.error}",
code="WORKFLOW_FAILED",
workflow_id=workflow_id,
failed_task=task_id,
results=results
)
return Result.ok({
"workflow_id": workflow_id,
"results": results,
"total_execution_time": time.time() - start_time,
"total_tasks": len(tasks),
"successful_tasks": len(tasks),
})
class AsyncWorkflowService:
"""工作流服务(异步版本)"""
def __init__(
self,
config: LingFlowConfig,
skill_service: AsyncSkillService
):
self._config = config
self._skill_service = skill_service
async def execute_file(
self,
workflow_path: Union[str, Path]
) -> Result[Dict[str, Any]]:
"""从文件执行工作流(异步)"""
# 异步读取文件
path = Path(workflow_path)
if not path.exists():
return Result.fail(
f"Workflow file not found: {workflow_path}",
code="FILE_NOT_FOUND"
)
try:
if path.suffix in ['.yaml', '.yml']:
with open(path, "r", encoding="utf-8") as f:
workflow_def = yaml.safe_load(f)
elif path.suffix == '.json':
with open(path, "r", encoding="utf-8") as f:
workflow_def = json.load(f)
else:
return Result.fail(
f"Unsupported file format: {path.suffix}",
code="UNSUPPORTED_FORMAT"
)
except Exception as e:
return Result.fail(
f"Failed to load workflow: {e}",
code="LOAD_ERROR"
)
return await self.execute(workflow_def)
async def execute(self, workflow: Dict[str, Any]) -> Result[Dict[str, Any]]:
"""执行工作流定义(异步)"""
tasks = workflow.get("tasks", [])
workflow_id = str(uuid.uuid4())
start_time = time.time()
# 异步执行所有任务
task_results = await asyncio.gather(*[
self._skill_service.execute(task.get("skill"), task.get("params", {}))
for task in tasks
], return_exceptions=True)
results = {}
for i, (task, result) in enumerate(zip(tasks, task_results)):
task_id = task.get("id", f"task_{i}")
if isinstance(result, Exception):
results[task_id] = {
"success": False,
"data": None,
"error": str(result),
"execution_time": 0,
}
else:
results[task_id] = {
"success": result.success,
"data": result.data,
"error": result.error,
"execution_time": result.execution_time,
}
if not results[task_id]["success"]:
return Result.fail(
f"Task {task_id} failed: {results[task_id]['error']}",
code="WORKFLOW_FAILED",
workflow_id=workflow_id,
failed_task=task_id,
results=results
)
return Result.ok({
"workflow_id": workflow_id,
"results": results,
"total_execution_time": time.time() - start_time,
"total_tasks": len(tasks),
"successful_tasks": len(tasks),
})
四、API门面设计
4.1 统一入口(双模式)
from typing import Optional, Union, Dict, Any
class LingFlow:
"""LingFlow 统一API门面(双模式:同步+异步)
Features:
- 同步/异步双模式
- 配置热重载
- 性能监控
- 插件系统
- 缓存管理
Examples:
>>> # 同步模式
>>> lf = LingFlow()
>>> result = lf.skill.execute("code-analysis", {"target": "./"})
>>>
>>> # 异步模式
>>> lf = LingFlow(async_mode=True)
>>> result = await lf.skill_async.execute("code-analysis", {"target": "./"})
>>>
>>> # 自定义配置
>>> config = LingFlowConfig.builder().max_parallel(4).build()
>>> lf = LingFlow(config)
>>>
>>> # 会话管理
>>> with lingflow_session(config) as lf:
... lf.skill.execute("analysis", {...})
... lf.workflow.execute_file("workflow.yaml")
"""
def __init__(
self,
config: Optional[LingFlowConfig] = None,
async_mode: bool = False,
enable_cache: bool = True,
enable_monitoring: bool = True
):
"""初始化 LingFlow
Args:
config: 配置对象,默认使用 LingFlowConfig()
async_mode: 是否启用异步模式
enable_cache: 是否启用缓存
enable_monitoring: 是否启用监控
"""
self._config = config or LingFlowConfig()
self._async_mode = async_mode
# 初始化缓存管理器
self._cache_manager = CacheManager(self._config) if enable_cache else None
# 初始化监控管理器
self._monitor = Monitor(self._config) if enable_monitoring else None
# 初始化服务
if async_mode:
self.skill_async = AsyncSkillService(self._config, self._cache_manager)
self.workflow_async = AsyncWorkflowService(self._config, self.skill_async)
else:
self.skill = SkillService(self._config, self._cache_manager)
self.workflow = WorkflowService(self._config, self.skill)
# 初始化插件服务
self.plugins = PluginService(self._config)
@property
def config(self) -> LingFlowConfig:
"""获取当前配置(只读)"""
return self._config
def get_status(self) -> Dict[str, Any]:
"""获取系统状态"""
status = {
"skills": {
"available": len(self.skill.list() if not self._async_mode else self.skill_async.list()),
"list": self.skill.list() if not self._async_mode else self.skill_async.list(),
},
"config": self._config.to_dict(),
"async_mode": self._async_mode,
}
if self._cache_manager:
status["cache"] = self._cache_manager.get_stats()
if self._monitor:
status["monitor"] = self._monitor.get_stats()
return status
def reload_config(self, new_config: LingFlowConfig) -> bool:
"""重新加载配置"""
old_config = self._config
self._config = new_config
# 重新初始化服务
if self._cache_manager:
self._cache_manager.update_config(new_config)
if self._monitor:
self._monitor.update_config(new_config)
return True
def reload_skills(self) -> Result[None]:
"""重新加载技能"""
try:
if self._async_mode:
self.skill_async._sync_service._load_skills()
else:
self.skill._load_skills()
return Result.ok()
except Exception as e:
return Result.fail(str(e), code="RELOAD_ERROR")
# ========== 便捷方法 ==========
def run_skill(self, skill_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""便捷方法:执行技能(兼容旧API)"""
result = self.skill.execute(skill_name, params)
return result.to_dict()
def run_workflow_file(self, filepath: str) -> Dict[str, Any]:
"""便捷方法:执行工作流文件(兼容旧API)"""
result = self.workflow.execute_file(filepath)
return result.unwrap_or({})
def run_workflow(self, workflow_def: Dict[str, Any]) -> Dict[str, Any]:
"""便捷方法:执行工作流定义(兼容旧API)"""
result = self.workflow.execute(workflow_def)
return result.unwrap_or({})
# ==================== 上下文管理器 ====================
class lingflow_session:
"""LingFlow 会话上下文管理器"""
def __init__(
self,
config: Optional[LingFlowConfig] = None,
async_mode: bool = False
):
self._config = config or LingFlowConfig()
self._async_mode = async_mode
self._instance: Optional[LingFlow] = None
def __enter__(self) -> LingFlow:
self._instance = LingFlow(self._config, async_mode=self._async_mode)
return self._instance
def __exit__(self, exc_type, exc_val, exc_tb):
# 清理资源
if self._instance:
# 可以在这里添加清理逻辑
pass
return False
# ==================== 装饰器 ====================
def handle_errors(func):
"""统一的错误处理装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return Result.ok(func(*args, **kwargs))
except LingFlowError as e:
return Result.fail(e.message, code=e.code, details=e.details)
except Exception as e:
return Result.fail(str(e), code="UNEXPECTED_ERROR")
return wrapper
def track_performance(func):
"""性能追踪装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
import time
start = time.time()
result = func(*args, **kwargs)
elapsed = time.time() - start
if isinstance(result, Result):
result.details["execution_time"] = elapsed
return result
return wrapper
def async_handle_errors(func):
"""统一的错误处理装饰器(异步)"""
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return Result.ok(await func(*args, **kwargs))
except LingFlowError as e:
return Result.fail(e.message, code=e.code, details=e.details)
except Exception as e:
return Result.fail(str(e), code="UNEXPECTED_ERROR")
return wrapper
def async_track_performance(func):
"""性能追踪装饰器(异步)"""
@wraps(func)
async def wrapper(*args, **kwargs):
import time
start = time.time()
result = await func(*args, **kwargs)
elapsed = time.time() - start
if isinstance(result, Result):
result.details["execution_time"] = elapsed
return result
return wrapper
五、插件系统
5.1 插件基类
from typing import Dict, Any, List, Optional
class BasePlugin(ABC):
"""插件基类"""
# 插件元数据
name: str = "base-plugin"
version: str = "1.0.0"
author: str = ""
description: str = "Base plugin class"
# 依赖
dependencies: List[str] = []
def __init__(self, config: LingFlowConfig):
self._config = config
@classmethod
def validate_config(cls, config: LingFlowConfig) -> Result[None]:
"""验证配置"""
return Result.ok()
def initialize(self) -> Result[None]:
"""初始化插件"""
return Result.ok()
def shutdown(self) -> Result[None]:
"""关闭插件"""
return Result.ok()
class SkillPlugin(BasePlugin):
"""技能插件"""
@abstractmethod
def get_skill(self) -> BaseSkill:
"""返回技能实例"""
pass
@abstractmethod
def get_skill_names(self) -> List[str]:
"""返回技能名称列表"""
pass
class ServicePlugin(BasePlugin):
"""服务插件"""
@abstractmethod
def get_service(self) -> Any:
"""返回服务实例"""
pass
class MiddlewarePlugin(BasePlugin):
"""中间件插件"""
def pre_execute(self, context: SkillContext) -> SkillContext:
"""执行前钩子"""
return context
def post_execute(self, context: SkillContext, result: SkillResult) -> SkillResult:
"""执行后钩子"""
return result
def on_error(self, context: SkillContext, error: Exception) -> Exception:
"""错误处理钩子"""
return error
5.2 插件服务
class PluginService:
"""插件管理服务"""
def __init__(self, config: LingFlowConfig):
self._config = config
self._plugins: Dict[str, BasePlugin] = {}
self._skill_plugins: Dict[str, List[SkillPlugin]] = {}
self._middleware_plugins: List[MiddlewarePlugin] = []
if self._config.plugins_enabled and self._config.auto_load_plugins:
self._load_plugins()
def _load_plugins(self):
"""加载插件"""
plugins_dir = Path(self._config.plugins_path)
if not plugins_dir.exists():
return
for plugin_path in plugins_dir.iterdir():
if not plugin_path.is_dir():
continue
self._load_plugin_from_directory(plugin_path)
def _load_plugin_from_directory(self, plugin_path: Path):
"""从目录加载插件"""
try:
init_file = plugin_path / "__init__.py"
if not init_file.exists():
return
spec = importlib.util.spec_from_file_location(
f"plugins.{plugin_path.name}", str(init_file)
)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# 查找插件类
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (isinstance(attr, type) and
issubclass(attr, BasePlugin) and
attr is not BasePlugin):
plugin_instance = attr(self._config)
self._plugins[plugin_instance.name] = plugin_instance
# 初始化插件
init_result = plugin_instance.initialize()
if init_result.is_error:
print(f"Failed to initialize plugin {plugin_instance.name}: {init_result.error}")
continue
# 分类插件
if isinstance(plugin_instance, SkillPlugin):
for skill_name in plugin_instance.get_skill_names():
if skill_name not in self._skill_plugins:
self._skill_plugins[skill_name] = []
self._skill_plugins[skill_name].append(plugin_instance)
if isinstance(plugin_instance, MiddlewarePlugin):
self._middleware_plugins.append(plugin_instance)
print(f"Loaded plugin: {plugin_instance.name}")
except Exception as e:
print(f"Failed to load plugin {plugin_path.name}: {e}")
def register_plugin(self, plugin: BasePlugin) -> Result[None]:
"""注册插件"""
validation = plugin.validate_config(self._config)
if validation.is_error:
return validation
init_result = plugin.initialize()
if init_result.is_error:
return Result.fail(f"Plugin initialization failed: {init_result.error}")
self._plugins[plugin.name] = plugin
# 分类插件
if isinstance(plugin, SkillPlugin):
for skill_name in plugin.get_skill_names():
if skill_name not in self._skill_plugins:
self._skill_plugins[skill_name] = []
self._skill_plugins[skill_name].append(plugin)
if isinstance(plugin, MiddlewarePlugin):
self._middleware_plugins.append(plugin)
return Result.ok()
def get_plugin(self, name: str) -> Optional[BasePlugin]:
"""获取插件"""
return self._plugins.get(name)
def list_plugins(self) -> List[Dict[str, Any]]:
"""列出所有插件"""
return [
{
"name": plugin.name,
"version": plugin.version,
"author": plugin.author,
"description": plugin.description,
}
for plugin in self._plugins.values()
]
def unload_plugin(self, name: str) -> Result[None]:
"""卸载插件"""
if name not in self._plugins:
return Result.fail(f"Plugin not found: {name}", code="PLUGIN_NOT_FOUND")
plugin = self._plugins[name]
# 关闭插件
shutdown_result = plugin.shutdown()
if shutdown_result.is_error:
return Result.fail(f"Plugin shutdown failed: {shutdown_result.error}")
# 从分类中移除
if isinstance(plugin, SkillPlugin):
for skill_name in plugin.get_skill_names():
if skill_name in self._skill_plugins:
self._skill_plugins[skill_name].remove(plugin)
if not self._skill_plugins[skill_name]:
del self._skill_plugins[skill_name]
if isinstance(plugin, MiddlewarePlugin):
self._middleware_plugins.remove(plugin)
del self._plugins[name]
return Result.ok()
def reload(self) -> Result[None]:
"""重新加载所有插件"""
# 卸载所有插件
plugin_names = list(self._plugins.keys())
for name in plugin_names:
self.unload_plugin(name)
# 重新加载
self._load_plugins()
return Result.ok()
六、缓存系统
6.1 缓存管理器
from typing import Dict, Any, Optional, Callable
import time
import hashlib
import json
from collections import OrderedDict
class CacheManager:
"""缓存管理器(支持 TTL 和 LRU)"""
def __init__(self, config: LingFlowConfig):
self._config = config
self._cache: OrderedDict[str, Dict[str, Any]] = OrderedDict()
self._stats = {
"hits": 0,
"misses": 0,
"evictions": 0,
"total_size": 0,
}
self._lock = threading.RLock()
def get(self, key: str) -> Optional[Dict[str, Any]]:
"""获取缓存"""
with self._lock:
if key not in self._cache:
self._stats["misses"] += 1
return None
entry = self._cache[key]
# 检查 TTL
if time.time() - entry["timestamp"] > self._config.cache_ttl:
del self._cache[key]
self._stats["misses"] += 1
self._stats["evictions"] += 1
self._stats["total_size"] -= entry.get("size", 0)
return None
# 更新访问时间(LRU)
self._cache.move_to_end(key)
self._stats["hits"] += 1
return entry["data"]
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
"""设置缓存"""
with self._lock:
# 计算大小
value_str = json.dumps(value, default=str)
size = len(value_str.encode('utf-8'))
# 检查缓存大小限制
if len(self._cache) >= self._config.cache_max_size:
self._evict()
entry = {
"data": value,
"timestamp": time.time(),
"ttl": ttl or self._config.cache_ttl,
"size": size,
}
self._cache[key] = entry
self._stats["total_size"] += size
return True
def delete(self, key: str) -> bool:
"""删除缓存"""
with self._lock:
if key in self._cache:
size = self._cache[key].get("size", 0)
del self._cache[key]
self._stats["total_size"] -= size
return True
return False
def clear(self):
"""清空缓存"""
with self._lock:
self._cache.clear()
self._stats["total_size"] = 0
def _evict(self):
"""淘汰最旧的缓存项"""
if self._cache:
key, entry = self._cache.popitem(last=False)
size = entry.get("size", 0)
self._stats["total_size"] -= size
self._stats["evictions"] += 1
def get_stats(self) -> Dict[str, Any]:
"""获取缓存统计"""
with self._lock:
hit_rate = (
self._stats["hits"] / (self._stats["hits"] + self._stats["misses"])
if (self._stats["hits"] + self._stats["misses"]) > 0
else 0.0
)
return {
"hits": self._stats["hits"],
"misses": self._stats["misses"],
"evictions": self._stats["evictions"],
"hit_rate": hit_rate,
"total_size": self._stats["total_size"],
"total_entries": len(self._cache),
}
def update_config(self, new_config: LingFlowConfig):
"""更新配置"""
self._config = new_config
# 如果缓存大小限制减小,淘汰多余的缓存
while len(self._cache) > self._config.cache_max_size:
self._evict()
七、监控系统
7.1 监控器
from typing import Dict, Any, List, Optional
import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class Metric:
"""性能指标"""
name: str
value: float
timestamp: float = field(default_factory=time.time)
metadata: Dict[str, Any] = field(default_factory=dict)
class Monitor:
"""性能监控器"""
def __init__(self, config: LingFlowConfig):
self._config = config
self._metrics: Dict[str, List[Metric]] = defaultdict(list)
self._counters: Dict[str, int] = defaultdict(int)
self._timers: Dict[str, List[float]] = defaultdict(list)
self._lock = threading.RLock()
def record_metric(self, name: str, value: float, **metadata):
"""记录指标"""
if not self._config.metrics_enabled:
return
with self._lock:
metric = Metric(name=name, value=value, metadata=metadata)
self._metrics[name].append(metric)
# 保留最近的1000条记录
if len(self._metrics[name]) > 1000:
self._metrics[name] = self._metrics[name][-1000:]
def increment_counter(self, name: str, value: int = 1):
"""增加计数器"""
if not self._config.metrics_enabled:
return
with self._lock:
self._counters[name] += value
def record_time(self, name: str, duration: float):
"""记录时间"""
if not self._config.metrics_enabled:
return
with self._lock:
self._timers[name].append(duration)
# 保留最近的100条记录
if len(self._timers[name]) > 100:
self._timers[name] = self._timers[name][-100:]
# 检查性能阈值
if duration > self._config.performance_threshold:
self.record_metric(
f"{name}_slow",
duration,
warning=f"Duration exceeds threshold {self._config.performance_threshold}s"
)
def get_stats(self) -> Dict[str, Any]:
"""获取监控统计"""
with self._lock:
stats = {
"counters": dict(self._counters),
"timers": {},
"metrics": {},
}
# 计算计时器统计
for name, times in self._timers.items():
if times:
stats["timers"][name] = {
"count": len(times),
"min": min(times),
"max": max(times),
"avg": sum(times) / len(times),
"sum": sum(times),
}
# 计算指标统计
for name, metrics in self._metrics.items():
if metrics:
stats["metrics"][name] = {
"count": len(metrics),
"min": min(m.value for m in metrics),
"max": max(m.value for m in metrics),
"avg": sum(m.value for m in metrics) / len(metrics),
}
return stats
def clear_metrics(self):
"""清除所有指标"""
with self._lock:
self._metrics.clear()
self._counters.clear()
self._timers.clear()
def update_config(self, new_config: LingFlowConfig):
"""更新配置"""
self._config = new_config
# ==================== 装饰器 ====================
def monitor_performance(monitor: Monitor):
"""性能监控装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
import time
start_time = time.time()
monitor.increment_counter(f"{func.__name__}_calls")
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
monitor.record_time(func.__name__, duration)
return result
except Exception as e:
monitor.increment_counter(f"{func.__name__}_errors")
raise
return wrapper
return decorator
def monitor_async_performance(monitor: Monitor):
"""性能监控装饰器(异步)"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
import time
start_time = time.time()
monitor.increment_counter(f"{func.__name__}_calls")
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
monitor.record_time(func.__name__, duration)
return result
except Exception as e:
monitor.increment_counter(f"{func.__name__}_errors")
raise
return wrapper
return decorator
八、使用示例
8.1 基础使用
# ============ 同步模式 ============
from lingflow import LingFlow, LingFlowConfig
# 使用默认配置
lf = LingFlow()
# 执行技能
result = lf.skill.execute("code-analysis", {"target": "./lingflow/"})
if result.success:
print(f"Analysis complete: {result.data}")
else:
print(f"Analysis failed: {result.error}")
# 执行工作流
result = lf.workflow.execute_file("workflow.yaml")
if result.is_ok:
print("Workflow completed!")
print(result.data)
# ============ 异步模式 ============
import asyncio
lf = LingFlow(async_mode=True)
async def main():
result = await lf.skill_async.execute("code-analysis", {"target": "./"})
if result.success:
print(f"Analysis complete: {result.data}")
asyncio.run(main())
# ============ 自定义配置 ============
config = (LingFlowConfig.builder()
.max_parallel(4)
.timeout(600)
.compression(True, 8000)
.skills_path("./custom_skills")
.logging(level="DEBUG")
.monitoring(enabled=True)
.build())
lf = LingFlow(config)
# ============ 会话管理 ============
with lingflow_session(config) as lf:
lf.skill.execute("analysis", {"target": "./"})
lf.workflow.execute_file("workflow.yaml")
# 自动清理资源
8.2 Result 类型使用
from lingflow import Result
# 创建结果
success = Result.ok(data={"key": "value"})
failure = Result.fail("Something went wrong", code="ERR001")
# 链式调用
result = (Result.ok(" hello ")
.map(str.strip)
.map(str.upper)
.and_then(lambda x: Result.ok(x + "!"))
.inspect(lambda r: print(f"Current result: {r.data}")))
# 错误处理
result = Result.fail("Error occurred").or_else(
lambda error: Result.ok(f"Recovered from: {error}")
)
# 组合操作
r1 = Result.ok(1)
r2 = Result.ok(2)
r3 = Result.ok(3)
combined = Result.zip(r1, r2, r3)
# combined.data = [1, 2, 3]
first_ok = Result.first_ok(
Result.fail("error1"),
Result.ok(2),
Result.fail("error3")
)
# first_ok.data = 2
# 超时控制
result = Result.ok("data").with_timeout(5.0)
# 序列化
json_str = result.to_json()
dict_data = result.to_dict()
# 从字典恢复
result2 = Result.from_dict(dict_data)
8.3 自定义技能
from lingflow import BaseSkill, SkillContext, SkillResult, Result
class MyAnalysisSkill(BaseSkill):
"""自定义分析技能"""
name = "my-analysis"
description = "My custom analysis skill"
version = "1.0.0"
author = "My Name"
@classmethod
def validate_params(cls, params):
if "target" not in params:
return Result.fail("Missing 'target' parameter")
if not isinstance(params["target"], str):
return Result.fail("'target' must be a string")
return Result.ok()
def pre_execute(self, context: SkillContext) -> SkillResult:
"""执行前钩子"""
print(f"Starting analysis on: {context.params['target']}")
return SkillResult.ok()
def execute(self, context: SkillContext) -> SkillResult:
"""执行技能"""
target = context.params["target"]
# 执行分析逻辑
# ...
return SkillResult.ok(data={
"files_analyzed": 42,
"issues_found": 5,
})
def post_execute(self, context: SkillContext, result: SkillResult) -> SkillResult:
"""执行后钩子"""
print(f"Analysis complete: {result.execution_time:.2f}s")
return result
def on_error(self, context: SkillContext, error: Exception) -> SkillResult:
"""错误处理钩子"""
print(f"Analysis failed: {error}")
return SkillResult.fail(str(error))
# 注册技能
lf = LingFlow()
lf.skill._skills[MyAnalysisSkill.name] = MyAnalysisSkill()
# 使用技能
result = lf.skill.execute("my-analysis", {"target": "./"})
8.4 自定义插件
from lingflow import BasePlugin, SkillPlugin, SkillService, LingFlowConfig
class MySkillPlugin(SkillPlugin):
"""自定义技能插件"""
name = "my-skill-plugin"
version = "1.0.0"
description = "My custom skill plugin"
def __init__(self, config: LingFlowConfig):
super().__init__(config)
self._skill = MyAnalysisSkill()
def get_skill(self):
return self._skill
def get_skill_names(self):
return [self._skill.name]
# 注册插件
lf = LingFlow()
plugin = MySkillPlugin(lf.config)
lf.plugins.register_plugin(plugin)
# 现在可以使用插件提供的技能
result = lf.skill.execute("my-analysis", {"target": "./"})
九、实施路线图
阶段1:核心基础设施(2周)
Week 1:
- [ ] 创建 lingflow/core/types.py - Result 和异常体系
- [ ] 创建 lingflow/core/config.py - LingFlowConfig 和构建器
- [ ] 创建 lingflow/core/skill.py - BaseSkill 和相关类型
- [ ] 编写单元测试
Week 2:
- [ ] 创建 lingflow/core/cache.py - 缓存管理器
- [ ] 创建 lingflow/core/monitor.py - 监控器
- [ ] 创建 lingflow/adapter.py - 适配器层
- [ ] 编写单元测试
阶段2:服务层(3周)
Week 3:
- [ ] 实现 lingflow/services/skill.py - SkillService
- [ ] 实现 lingflow/services/workflow.py - WorkflowService
- [ ] 实现异步版本 AsyncSkillService 和 AsyncWorkflowService
- [ ] 编写集成测试
Week 4:
- [ ] 实现 lingflow/services/plugin.py - PluginService
- [ ] 实现插件基类 BasePlugin, SkillPlugin, ServicePlugin
- [ ] 编写插件示例
- [ ] 编写集成测试
Week 5:
- [ ] 重构 lingflow/__init__.py - 集成所有服务
- [ ] 实现双模式API(同步+异步)
- [ ] 实现上下文管理器
- [ ] 编写端到端测试
阶段3:技能迁移(3周)
Week 6-7: - [ ] 创建技能迁移工具 - [ ] 迁移 brainstorming 技能 - [ ] 迁移 writing-plans 技能 - [ ] 迁移 test-driven-development 技能 - [ ] 迁移 systematic-debugging 技能
Week 8: - [ ] 迁移其他6个技能 - [ ] 测试所有迁移的技能 - [ ] 更新技能文档 - [ ] 创建技能开发模板
阶段4:API清理和文档(1周)
Week 9: - [ ] 在旧API中添加 DeprecationWarning - [ ] 更新 README.md - [ ] 更新 AGENTS.md - [ ] 创建迁移指南 - [ ] 更新示例代码 - [ ] 创建 API 文档
阶段5:性能优化和测试(2周)
Week 10: - [ ] 性能测试和优化 - [ ] 压力测试 - [ ] 内存泄漏检查 - [ ] 缓存性能优化
Week 11: - [ ] 完整的集成测试 - [ ] 端到端测试 - [ ] 用户验收测试 - [ ] 性能基准测试
阶段6:发布准备(1周)
Week 12: - [ ] 代码审查 - [ ] 文档审查 - [ ] 发布说明 - [ ] 发布 LingFlow V4.0.0
十、成功指标
10.1 技术指标
| 指标 | 目标 | 当前 | 改进 |
|---|---|---|---|
| 类型覆盖率 | 100% | 60% | +40% |
| 测试覆盖率 | 95% | 78% | +17% |
| Cyclomatic复杂度 | < 10 | 15 | -33% |
| 代码重复率 | < 3% | 8% | -62.5% |
| 文档覆盖率 | 95% | 81% | +14% |
10.2 性能指标
| 指标 | 目标 | 当前 | 改进 |
|---|---|---|---|
| API 响应时间 | < 100ms | 200ms | -50% |
| 技能执行速度 | +30% | - | +30% |
| 内存占用 | < 50MB | 80MB | -37.5% |
| 缓存命中率 | > 80% | 0% | +80% |
| 并发能力 | 4x | 1x | +300% |
10.3 用户体验指标
| 指标 | 目标 | 当前 | 改进 |
|---|---|---|---|
| API 一致性 | 5/5 | 2/5 | +150% |
| 类型安全 | 5/5 | 2/5 | +150% |
| 错误处理 | 5/5 | 2/5 | +150% |
| 学习曲线 | 低 | 中 | - |
| 开发效率 | +80% | - | +80% |
十一、总结
核心优势
- 双模式API:同时支持同步和异步,用户自由选择
- 类型安全:100% 类型化,Result 泛型,完整类型提示
- 统一抽象:Result 类型、异常体系、配置构建器
- 插件系统:标准化插件接口,支持第三方扩展
- 性能优化:智能缓存、懒加载、并行执行
- 配置热重载:无需重启,动态更新配置
- 内置监控:性能追踪、错误统计、资源监控
- 开发者体验:自动补全、类型提示、错误诊断
与其他方案对比
| 特性 | 当前 | 方案A | 方案B | 本方案 |
|---|---|---|---|---|
| Result 类型 | ❌ | ❌ | ✅ | ✅ 增强 |
| 异步支持 | ⚠️ | ❌ | ❌ | ✅ 双模式 |
| 插件系统 | ❌ | ❌ | ❌ | ✅ |
| 配置热重载 | ❌ | ❌ | ❌ | ✅ |
| 缓存系统 | ❌ | ❌ | ❌ | ✅ |
| 监控系统 | ❌ | ❌ | ❌ | ✅ |
| 类型安全 | ⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| 向后兼容 | N/A | ⭐⭐⭐⭐⭐ | ⭐ | ⭐⭐⭐⭐ |
| 实施难度 | N/A | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐ |
立即行动
本周(Week 1):
1. ✅ 审查并批准 V4.0 方案
2. ✅ 创建实施分支 feature/v4.0-refactor
3. ✅ 开始阶段1:核心基础设施
下周(Week 2): 1. ✅ 完成 Result 类型实现 2. ✅ 完成 LingFlowConfig 实现 3. ✅ 完成 BaseSkill 实现 4. ✅ 编写单元测试
文档版本: V1.0 最后更新: 2026-03-25 下次评审: 2026-04-01