跳转至

LingFlow V4.0 封装优化方案

版本: V4.0.0 日期: 2026-03-25 类型: 架构设计方案 状态: 待实施


执行摘要

本方案融合了两个现有封装方案(渐进式改进和全新API设计)的优点,提出一个更优化的混合架构

核心创新: 1. 🎯 双模式API:同时支持同步和异步,用户自由选择 2. 🔌 插件系统:标准化插件接口,支持第三方扩展 3. 🚀 性能优化:智能缓存、懒加载、并行执行 4. 🔄 配置热重载:无需重启,动态更新配置 5. 📊 内置监控:性能追踪、错误统计、资源监控 6. 🛠️ 开发者体验:自动补全、类型提示、错误诊断 7. 🎨 渐进式类型化:支持完全类型化,也支持渐进式迁移 8. 🔀 智能适配器:自动转换新旧API,无缝迁移

预期收益: - 开发效率:+80% - 类型安全:100% - API 一致性:5/5 - 用户体验:显著提升 - 测试覆盖率:>95%


一、架构设计

1.1 整体架构图

┌─────────────────────────────────────────────────────────────────┐
│                        用户层 (CLI/SDK/REST)                     │
├─────────────────────────────────────────────────────────────────┤
│                        API门面层 (Facade)                         │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │  LingFlow (统一入口) - 支持同步/异步                       ││
│  │  ├─ skill: SkillService (同步)                            ││
│  │  ├─ workflow: WorkflowService (同步)                      ││
│  │  ├─ agent: AgentService (同步)                             ││
│  │  ├─ skill_async: SkillService (异步)                      ││
│  │  ├─ workflow_async: WorkflowService (异步)                 ││
│  │  └─ agent_async: AgentService (异步)                      ││
│  └─────────────────────────────────────────────────────────────┘│
├─────────────────────────────────────────────────────────────────┤
│                        服务层 (Services)                          │
│  ┌──────────────┬──────────────┬──────────────┬──────────────┐ │
│  │SkillService  │WorkflowSvc   │AgentService  │PluginService │ │
│  │              │              │              │              │ │
│  │- execute()   │- execute()   │- spawn()     │- load()      │ │
│  │- list()      │- validate()  │- status()    │- register()  │ │
│  │- get_info()  │- schedule()  │- terminate() │- reload()    │ │
│  └──────────────┴──────────────┴──────────────┴──────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│                        领域层 (Domain)                           │
│  ┌──────────────┬──────────────┬──────────────┬──────────────┐ │
│  │Coordinator   │Orchestrator  │Agent         │Workflow      │ │
│  │              │              │              │              │ │
│  │- 任务调度    │- 工作流编排  │- 任务执行    │- 工作流定义  │ │
│  │- 依赖管理    │- 依赖解析    │- 状态管理    │- 验证        │ │
│  │- 负载均衡    │- 错误处理    │- 超时控制    │- 序列化      │ │
│  └──────────────┴──────────────┴──────────────┴──────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│                        技能层 (Skills)                           │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │  BaseSkill (技能基类)                                       ││
│  │  ├─ validate_params() - 参数验证                           ││
│  │  ├─ execute() - 执行逻辑                                    ││
│  │  ├─ pre_execute() - 前置钩子                                ││
│  │  ├─ post_execute() - 后置钩子                               ││
│  │  └─ on_error() - 错误处理                                  ││
│  │                                                             ││
│  │  核心技能: brainstorming, writing-plans, tdd, debugging...  ││
│  │  插件技能: 第三方技能通过 PluginService 动态加载            ││
│  └─────────────────────────────────────────────────────────────┘│
├─────────────────────────────────────────────────────────────────┤
│                        基础设施层 (Infrastructure)               │
│  ┌──────────────┬──────────────┬──────────────┬──────────────┐ │
│  │ConfigManager │CacheManager  │Monitor      │Logger        │ │
│  │              │              │              │              │ │
│  │- 加载配置    │- 智能缓存    │- 性能追踪    │- 结构化日志  │ │
│  │- 验证配置    │- 懒加载      │- 错误统计    │- 分级日志    │ │
│  │- 热重载      │- 失效策略    │- 资源监控    │- 日志聚合    │ │
│  └──────────────┴──────────────┴──────────────┴──────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│                        核心类型层 (Core Types)                    │
│  ┌──────────────┬──────────────┬──────────────┬──────────────┐ │
│  │Result[T]     │LingFlowConfig│BaseSkill     │Exceptions    │ │
│  │              │              │              │              │ │
│  │- 统一结果    │- 配置构建器  │- 技能基类    │- 统一异常    │ │
│  │- 链式调用    │- 类型验证    │- 生命周期    │- 错误码      │ │
│  │- 函数式编程  │- 热重载      │- 元数据      │- 结构化      │ │
│  └──────────────┴──────────────┴──────────────┴──────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│                        适配器层 (Adapters)                        │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │  LegacyAdapter - 兼容旧API                                  ││
│  │  AsyncAdapter - 同步/异步转换                                ││
│  │  TypeAdapter - 类型转换                                     ││
│  └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘

1.2 分层职责

层级 职责 示例
API门面层 统一入口,简化使用 LingFlow
服务层 业务逻辑,服务编排 SkillService
领域层 核心模型,业务规则 Coordinator, Agent
技能层 可扩展功能,插件化 BaseSkill
基础设施层 通用服务,底层支持 ConfigManager, CacheManager
核心类型层 基础类型,工具类 Result, Exceptions
适配器层 兼容性,转换层 LegacyAdapter, AsyncAdapter

二、核心类型设计

2.1 Result 类型(增强版)

改进点: 1. 添加重试机制 2. 添加超时控制 3. 添加回退策略 4. 添加组合操作 5. 添加序列化支持

from typing import Generic, TypeVar, Callable, Optional, Dict, Any, List, Union
from dataclasses import dataclass, field
from functools import wraps
import time
import asyncio
import json

T = TypeVar('T')
E = TypeVar('E', bound=Exception)

@dataclass
class Result(Generic[T]):
    """统一的执行结果封装(增强版)

    Features:
        - 类型安全(泛型)
        - 链式调用(map, and_then, or_else)
        - 错误处理(retry, fallback)
        - 并发控制(timeout, with_timeout)
        - 组合操作(zip, all_ok, first_ok)
        - 序列化支持(to_dict, from_dict)

    Examples:
        >>> # 基础使用
        >>> success = Result.ok(data={"key": "value"})
        >>> failure = Result.fail("Something went wrong", code="ERR001")
        >>>
        >>> # 链式调用
        >>> result = (Result.ok("  hello  ")
        ...           .map(str.strip)
        ...           .map(str.upper)
        ...           .and_then(lambda x: Result.ok(x + "!")))
        >>>
        >>> # 错误处理
        >>> result = (Result.ok("test")
        ...           .retry(times=3, delay=1)
        ...           .fallback(lambda: Result.ok("default")))
        >>>
        >>> # 并发控制
        >>> result = Result.ok("data").with_timeout(5)
    """

    data: Optional[T] = None
    error: Optional[str] = None
    code: str = "OK"
    details: Dict[str, Any] = field(default_factory=dict)
    execution_time: float = 0.0
    _raw_exception: Optional[Exception] = field(default=None, repr=False)

    # ==================== 工厂方法 ====================

    @classmethod
    def ok(cls, data: T, **details) -> "Result[T]":
        """创建成功结果"""
        return cls(data=data, code="OK", details=details)

    @classmethod
    def fail(cls, error: str, code: str = "ERROR", **details) -> "Result[T]":
        """创建失败结果"""
        return cls(data=None, error=error, code=code, details=details)

    @classmethod
    def from_exception(cls, exc: Exception) -> "Result[T]":
        """从异常创建结果"""
        return cls.fail(
            error=str(exc),
            code=exc.__class__.__name__,
            exception_type=type(exc).__name__
        )

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "Result[T]":
        """从字典反序列化"""
        try:
            if data.get("code") == "OK":
                return cls.ok(data.get("data"), **data.get("details", {}))
            else:
                return cls.fail(
                    data.get("error", "Unknown error"),
                    code=data.get("code", "ERROR"),
                    **data.get("details", {})
                )
        except Exception as e:
            return cls.fail(f"Failed to deserialize: {e}")

    # ==================== 属性 ====================

    @property
    def is_ok(self) -> bool:
        """是否成功"""
        return self.code == "OK" and self.error is None

    @property
    def is_error(self) -> bool:
        """是否失败"""
        return not self.is_ok

    # ==================== 基础操作 ====================

    def unwrap(self) -> T:
        """获取数据,失败时抛出异常"""
        if self.is_error:
            raise LingFlowError(self.error or "Unknown error", code=self.code)
        return self.data

    def unwrap_or(self, default: T) -> T:
        """获取数据,失败时返回默认值"""
        return self.data if self.is_ok else default

    def unwrap_or_else(self, func: Callable[[], T]) -> T:
        """获取数据,失败时调用函数"""
        return self.data if self.is_ok else func()

    def to_dict(self) -> Dict[str, Any]:
        """序列化为字典"""
        return {
            "ok": self.is_ok,
            "data": self.data,
            "error": self.error,
            "code": self.code,
            "details": self.details,
            "execution_time": self.execution_time,
        }

    def to_json(self) -> str:
        """序列化为 JSON"""
        return json.dumps(self.to_dict(), ensure_ascii=False, indent=2)

    # ==================== 链式操作 ====================

    def map(self, func: Callable[[T], Any]) -> "Result[Any]":
        """链式转换(同步)"""
        if self.is_ok:
            try:
                start_time = time.time()
                result = func(self.data)
                return Result.ok(result, **self.details, execution_time=time.time() - start_time)
            except Exception as e:
                return Result.fail(str(e), code="MAP_ERROR")
        return self

    async def map_async(self, func: Callable[[T], Any]) -> "Result[Any]":
        """链式转换(异步)"""
        if self.is_ok:
            try:
                start_time = time.time()
                result = await func(self.data)
                return Result.ok(result, **self.details, execution_time=time.time() - start_time)
            except Exception as e:
                return Result.fail(str(e), code="MAP_ERROR")
        return self

    def and_then(self, func: Callable[[T], "Result[T]"]) -> "Result[T]":
        """链式调用(同步)"""
        if self.is_ok:
            return func(self.data)
        return self

    async def and_then_async(self, func: Callable[[T], "Result[T]"]) -> "Result[T]":
        """链式调用(异步)"""
        if self.is_ok:
            return await func(self.data)
        return self

    def or_else(self, func: Callable[[str], "Result[T]"]) -> "Result[T]":
        """错误处理链"""
        if self.is_error:
            return func(self.error)
        return self

    # ==================== 错误处理 ====================

    def retry(
        self,
        times: int = 3,
        delay: float = 1.0,
        backoff: float = 2.0,
        on_retry: Optional[Callable[[int, Exception], None]] = None
    ) -> "Result[T]":
        """重试机制(仅对成功的操作生效)"""
        if not self.is_ok:
            return self

        original_func = None
        # 注意:这需要保存原始函数引用,实际使用时需要更复杂的设计
        # 这里仅作为接口示例
        return self

    def fallback(self, func: Callable[[], "Result[T]"]) -> "Result[T]":
        """回退策略"""
        if self.is_ok:
            return self
        return func()

    # ==================== 并发控制 ====================

    def with_timeout(self, timeout: float) -> "Result[T]":
        """超时控制(同步)"""
        if self.execution_time > timeout:
            return Result.fail(
                f"Operation timed out after {self.execution_time}s (limit: {timeout}s)",
                code="TIMEOUT"
            )
        return self

    async def with_timeout_async(self, timeout: float) -> "Result[T]":
        """超时控制(异步)"""
        try:
            result = await asyncio.wait_for(
                self.and_then_async(lambda x: Result.ok(x)),
                timeout=timeout
            )
            return result
        except asyncio.TimeoutError:
            return Result.fail(
                f"Operation timed out after {timeout}s",
                code="TIMEOUT"
            )

    # ==================== 组合操作 ====================

    @staticmethod
    def zip(*results: "Result[T]") -> "Result[List[T]]":
        """组合多个结果(全部成功才成功)"""
        data = []
        errors = []

        for result in results:
            if result.is_ok:
                data.append(result.data)
            else:
                errors.append(result.error)

        if errors:
            return Result.fail(
                f"Some results failed: {', '.join(errors)}",
                code="ZIP_ERROR",
                failed_results=len(errors)
            )

        return Result.ok(data)

    @staticmethod
    def first_ok(*results: "Result[T]") -> "Result[T]":
        """返回第一个成功的结果"""
        for result in results:
            if result.is_ok:
                return result

        return Result.fail(
            "All results failed",
            code="ALL_FAILED",
            errors=[r.error for r in results if r.is_error]
        )

    @staticmethod
    def all_ok(*results: "Result[T]") -> "Result[List[T]]":
        """全部成功才成功(同 zip)"""
        return Result.zip(*results)

    # ==================== 调试支持 ====================

    def debug(self) -> "Result[T]":
        """打印调试信息并返回自身"""
        import sys
        print(f"[DEBUG] Result: {self.to_dict()}", file=sys.stderr)
        return self

    def inspect(self, func: Callable[["Result[T]"], None]) -> "Result[T]":
        """检查结果并返回自身"""
        func(self)
        return self


# ==================== 装饰器支持 ====================

def resultify(func):
    """将函数返回值转换为 Result"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            result = func(*args, **kwargs)
            if isinstance(result, Result):
                return result
            return Result.ok(result)
        except Exception as e:
            return Result.from_exception(e)
    return wrapper


def async_resultify(func):
    """将异步函数返回值转换为 Result"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        try:
            result = await func(*args, **kwargs)
            if isinstance(result, Result):
                return result
            return Result.ok(result)
        except Exception as e:
            return Result.from_exception(e)
    return wrapper


# ==================== 并发操作 ====================

async def gather_results(*coros: "Result[T]") -> "Result[List[T]]":
    """并发执行多个操作"""
    try:
        results = await asyncio.gather(*coros, return_exceptions=True)

        data = []
        errors = []

        for result in results:
            if isinstance(result, Exception):
                errors.append(str(result))
            elif isinstance(result, Result):
                if result.is_ok:
                    data.append(result.data)
                else:
                    errors.append(result.error)
            else:
                data.append(result)

        if errors:
            return Result.fail(
                f"Some operations failed: {', '.join(errors)}",
                code="GATHER_ERROR"
            )

        return Result.ok(data)

    except Exception as e:
        return Result.from_exception(e)

2.2 统一异常体系

from typing import Dict, Any, Optional, List

class LingFlowError(Exception):
    """LingFlow 基础异常"""

    def __init__(
        self,
        message: str,
        *,
        code: str = "LF000",
        details: Optional[Dict[str, Any]] = None,
        cause: Optional[Exception] = None
    ):
        self.message = message
        self.code = code
        self.details = details or {}
        self.cause = cause

        # 构建完整消息
        full_message = f"[{code}] {message}"
        if cause:
            full_message += f" (caused by: {type(cause).__name__}: {cause})"

        super().__init__(full_message)

    def to_dict(self) -> Dict[str, Any]:
        """转换为字典"""
        result = {
            "error": self.__class__.__name__,
            "code": self.code,
            "message": self.message,
            "details": self.details,
        }

        if self.cause:
            result["cause"] = str(self.cause)

        return result

    def __str__(self) -> str:
        return self.message

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}(code={self.code!r}, message={self.message!r})"


class WorkflowError(LingFlowError):
    """工作流相关异常"""

    def __init__(
        self,
        message: str,
        *,
        workflow_id: Optional[str] = None,
        task_id: Optional[str] = None,
        **kwargs
    ):
        details = kwargs.get('details', {})
        if workflow_id:
            details['workflow_id'] = workflow_id
        if task_id:
            details['task_id'] = task_id

        super().__init__(message, code="WF001", details=details, **kwargs)


class SkillError(LingFlowError):
    """技能相关异常"""

    def __init__(
        self,
        message: str,
        *,
        skill_name: Optional[str] = None,
        skill_version: Optional[str] = None,
        **kwargs
    ):
        details = kwargs.get('details', {})
        if skill_name:
            details['skill_name'] = skill_name
        if skill_version:
            details['skill_version'] = skill_version

        super().__init__(message, code="SK001", details=details, **kwargs)


class AgentError(LingFlowError):
    """代理相关异常"""

    def __init__(
        self,
        message: str,
        *,
        agent_id: Optional[str] = None,
        agent_type: Optional[str] = None,
        **kwargs
    ):
        details = kwargs.get('details', {})
        if agent_id:
            details['agent_id'] = agent_id
        if agent_type:
            details['agent_type'] = agent_type

        super().__init__(message, code="AG001", details=details, **kwargs)


class ConfigurationError(LingFlowError):
    """配置相关异常"""

    def __init__(
        self,
        message: str,
        *,
        config_key: Optional[str] = None,
        config_value: Optional[Any] = None,
        **kwargs
    ):
        details = kwargs.get('details', {})
        if config_key:
            details['config_key'] = config_key
        if config_value is not None:
            details['config_value'] = str(config_value)

        super().__init__(message, code="CF001", details=details, **kwargs)


class ValidationError(LingFlowError):
    """验证相关异常"""

    def __init__(
        self,
        message: str,
        *,
        field_name: Optional[str] = None,
        field_value: Optional[Any] = None,
        errors: Optional[List[str]] = None,
        **kwargs
    ):
        details = kwargs.get('details', {})
        if field_name:
            details['field_name'] = field_name
        if field_value is not None:
            details['field_value'] = str(field_value)
        if errors:
            details['errors'] = errors

        super().__init__(message, code="VD001", details=details, **kwargs)


class TimeoutError(LingFlowError):
    """超时相关异常"""

    def __init__(
        self,
        message: str,
        *,
        timeout: Optional[float] = None,
        operation: Optional[str] = None,
        **kwargs
    ):
        details = kwargs.get('details', {})
        if timeout:
            details['timeout'] = timeout
        if operation:
            details['operation'] = operation

        super().__init__(message, code="TO001", details=details, **kwargs)


# ==================== 错误码定义 ====================

class ErrorCode:
    """错误码常量"""

    # 通用错误 (LF000-LF099)
    UNKNOWN_ERROR = "LF000"
    INTERNAL_ERROR = "LF001"
    NOT_IMPLEMENTED = "LF002"

    # 工作流错误 (WF100-WF199)
    WORKFLOW_NOT_FOUND = "WF100"
    WORKFLOW_INVALID = "WF101"
    WORKFLOW_FAILED = "WF102"
    WORKFLOW_TIMEOUT = "WF103"

    # 技能错误 (SK200-SK299)
    SKILL_NOT_FOUND = "SK200"
    SKILL_INVALID = "SK201"
    SKILL_EXECUTION_FAILED = "SK202"
    SKILL_VALIDATION_FAILED = "SK203"

    # 代理错误 (AG300-AG399)
    AGENT_NOT_FOUND = "AG300"
    AGENT_BUSY = "AG301"
    AGENT_FAILED = "AG302"

    # 配置错误 (CF400-CF499)
    CONFIG_INVALID = "CF400"
    CONFIG_MISSING = "CF401"
    CONFIG_TYPE_ERROR = "CF402"

    # 验证错误 (VD500-VD599)
    VALIDATION_FAILED = "VD500"
    PARAMETER_MISSING = "VD501"
    PARAMETER_INVALID = "VD502"

    # 超时错误 (TO600-TO699)
    TIMEOUT = "TO600"
    OPERATION_TIMEOUT = "TO601"

2.3 配置系统

改进点: 1. 支持多种配置源(YAML, JSON, 环境变量, 命令行) 2. 配置热重载 3. 配置验证 4. 配置合并和继承 5. 配置加密支持

from typing import Dict, Any, Optional, Union, List, Callable
from dataclasses import dataclass, field, asdict
from pathlib import Path
import yaml
import json
import os
import hashlib
import time
from enum import Enum
import threading
import importlib


class ConfigSource(Enum):
    """配置来源"""
    FILE_YAML = "yaml"
    FILE_JSON = "json"
    ENV = "env"
    CLI = "cli"
    CODE = "code"


@dataclass
class LingFlowConfig:
    """LingFlow 配置类(支持热重载)"""

    # ========== 工作流配置 ==========
    max_parallel: int = 2
    max_iterations: int = 100
    scheduling_delay: float = 0.01
    workflow_timeout: float = 600.0

    # ========== 技能配置 ==========
    skills_path: str = "skills"
    skill_timeout: float = 30.0
    skill_cache_enabled: bool = True
    skill_cache_ttl: int = 3600  # seconds

    # ========== 代理配置 ==========
    agent_timeout: float = 300.0
    agent_context_limit: int = 8000
    agent_max_tasks: int = 3

    # ========== 压缩配置 ==========
    compression_enabled: bool = True
    compression_target_tokens: int = 4000
    compression_strategy: str = "priority"  # priority, random, none

    # ========== 安全配置 ==========
    allow_symlinks: bool = False
    validate_paths: bool = True
    max_file_size: int = 10 * 1024 * 1024  # 10MB
    allowed_extensions: List[str] = field(default_factory=lambda: [".py", ".yaml", ".json"])

    # ========== 日志配置 ==========
    log_level: str = "INFO"
    log_format: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    log_file: Optional[str] = None
    log_rotation: bool = True
    log_max_bytes: int = 10 * 1024 * 1024  # 10MB
    log_backup_count: int = 5

    # ========== 监控配置 ==========
    monitoring_enabled: bool = True
    metrics_enabled: bool = True
    tracing_enabled: bool = False
    performance_threshold: float = 1.0  # seconds

    # ========== 插件配置 ==========
    plugins_enabled: bool = True
    plugins_path: str = "plugins"
    auto_load_plugins: bool = True

    # ========== 缓存配置 ==========
    cache_enabled: bool = True
    cache_ttl: int = 3600  # seconds
    cache_max_size: int = 1000

    # ========== 并发配置 ==========
    async_enabled: bool = True
    thread_pool_size: int = 4
    async_pool_size: int = 10

    @classmethod
    def builder(cls) -> "LingFlowConfigBuilder":
        """创建构建器"""
        return LingFlowConfigBuilder(cls())

    def validate(self) -> Result[None]:
        """验证配置"""
        errors = []

        # 验证工作流配置
        if self.max_parallel < 1:
            errors.append("max_parallel must be >= 1")
        if self.max_iterations < 1:
            errors.append("max_iterations must be >= 1")
        if self.workflow_timeout < 0:
            errors.append("workflow_timeout must be >= 0")

        # 验证技能配置
        if self.skill_timeout < 0:
            errors.append("skill_timeout must be >= 0")
        if not os.path.isabs(self.skills_path):
            if not os.path.exists(self.skills_path):
                errors.append(f"skills_path does not exist: {self.skills_path}")

        # 验证代理配置
        if self.agent_timeout < 0:
            errors.append("agent_timeout must be >= 0")
        if self.agent_context_limit < 1000:
            errors.append("agent_context_limit must be >= 1000")

        # 验证压缩配置
        if self.compression_target_tokens < 1000:
            errors.append("compression_target_tokens must be >= 1000")

        # 验证日志配置
        valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
        if self.log_level not in valid_levels:
            errors.append(f"log_level must be one of {valid_levels}")

        if errors:
            return Result.fail(
                f"Configuration validation failed: {'; '.join(errors)}",
                code="CONFIG_VALIDATION_FAILED",
                errors=errors
            )

        return Result.ok()

    def to_dict(self) -> Dict[str, Any]:
        """转换为字典"""
        return asdict(self)

    def to_yaml(self, file_path: Optional[Path] = None) -> str:
        """转换为 YAML"""
        yaml_str = yaml.dump(self.to_dict(), default_flow_style=False, allow_unicode=True)

        if file_path:
            file_path.write_text(yaml_str, encoding='utf-8')

        return yaml_str

    def to_json(self, file_path: Optional[Path] = None) -> str:
        """转换为 JSON"""
        json_str = json.dumps(self.to_dict(), ensure_ascii=False, indent=2)

        if file_path:
            file_path.write_text(json_str, encoding='utf-8')

        return json_str

    @classmethod
    def from_yaml(cls, file_path: Union[str, Path]) -> "LingFlowConfig":
        """从 YAML 文件加载"""
        file_path = Path(file_path)
        data = yaml.safe_load(file_path.read_text(encoding='utf-8'))
        return cls(**data)

    @classmethod
    def from_json(cls, file_path: Union[str, Path]) -> "LingFlowConfig":
        """从 JSON 文件加载"""
        file_path = Path(file_path)
        data = json.loads(file_path.read_text(encoding='utf-8'))
        return cls(**data)

    @classmethod
    def from_env(cls, prefix: str = "LINGFLOW_") -> "LingFlowConfig":
        """从环境变量加载"""
        data = {}

        for key, value in os.environ.items():
            if key.startswith(prefix):
                config_key = key[len(prefix):].lower()
                data[config_key] = value

        # 转换类型
        instance = cls()
        for key, value in data.items():
            if hasattr(instance, key):
                field_type = type(getattr(instance, key))
                if field_type == bool:
                    data[key] = value.lower() in ('true', '1', 'yes')
                elif field_type == int:
                    data[key] = int(value)
                elif field_type == float:
                    data[key] = float(value)

        return cls(**data)


class LingFlowConfigBuilder:
    """配置构建器(增强版)"""

    def __init__(self, config: LingFlowConfig):
        self._config = config

    # ========== 工作流配置 ==========
    def max_parallel(self, value: int) -> "LingFlowConfigBuilder":
        self._config.max_parallel = value
        return self

    def max_iterations(self, value: int) -> "LingFlowConfigBuilder":
        self._config.max_iterations = value
        return self

    def workflow_timeout(self, value: float) -> "LingFlowConfigBuilder":
        self._config.workflow_timeout = value
        return self

    # ========== 技能配置 ==========
    def skills_path(self, path: str) -> "LingFlowConfigBuilder":
        self._config.skills_path = path
        return self

    def skill_timeout(self, value: float) -> "LingFlowConfigBuilder":
        self._config.skill_timeout = value
        return self

    # ========== 代理配置 ==========
    def agent_timeout(self, value: float) -> "LingFlowConfigBuilder":
        self._config.agent_timeout = value
        return self

    def agent_context_limit(self, value: int) -> "LingFlowConfigBuilder":
        self._config.agent_context_limit = value
        return self

    # ========== 压缩配置 ==========
    def compression(
        self,
        enabled: bool,
        target_tokens: int = 4000
    ) -> "LingFlowConfigBuilder":
        self._config.compression_enabled = enabled
        self._config.compression_target_tokens = target_tokens
        return self

    # ========== 安全配置 ==========
    def security(
        self,
        allow_symlinks: bool = False,
        validate_paths: bool = True
    ) -> "LingFlowConfigBuilder":
        self._config.allow_symlinks = allow_symlinks
        self._config.validate_paths = validate_paths
        return self

    # ========== 日志配置 ==========
    def logging(
        self,
        level: str = "INFO",
        file_path: Optional[str] = None
    ) -> "LingFlowConfigBuilder":
        self._config.log_level = level
        self._config.log_file = file_path
        return self

    # ========== 监控配置 ==========
    def monitoring(
        self,
        enabled: bool = True,
        metrics: bool = True,
        tracing: bool = False
    ) -> "LingFlowConfigBuilder":
        self._config.monitoring_enabled = enabled
        self._config.metrics_enabled = metrics
        self._config.tracing_enabled = tracing
        return self

    # ========== 插件配置 ==========
    def plugins(
        self,
        enabled: bool = True,
        path: str = "plugins",
        auto_load: bool = True
    ) -> "LingFlowConfigBuilder":
        self._config.plugins_enabled = enabled
        self._config.plugins_path = path
        self._config.auto_load_plugins = auto_load
        return self

    # ========== 并发配置 ==========
    def async_mode(self, enabled: bool = True) -> "LingFlowConfigBuilder":
        self._config.async_enabled = enabled
        return self

    def thread_pool_size(self, size: int) -> "LingFlowConfigBuilder":
        self._config.thread_pool_size = size
        return self

    # ========== 便捷方法 ==========
    def timeout(self, value: float) -> "LingFlowConfigBuilder":
        """设置所有超时"""
        self._config.agent_timeout = value
        self._config.skill_timeout = value
        self._config.workflow_timeout = value
        return self

    def performance(self, mode: str = "balanced") -> "LingFlowConfigBuilder":
        """性能预设"""
        if mode == "fast":
            self._config.max_parallel = 8
            self._config.compression_enabled = False
        elif mode == "balanced":
            self._config.max_parallel = 2
            self._config.compression_enabled = True
        elif mode == "conservative":
            self._config.max_parallel = 1
            self._config.compression_enabled = True
        return self

    def development(self) -> "LingFlowConfigBuilder":
        """开发模式"""
        self._config.log_level = "DEBUG"
        self._config.monitoring_enabled = True
        self._config.performance("balanced")
        return self

    def production(self) -> "LingFlowConfigBuilder":
        """生产模式"""
        self._config.log_level = "WARNING"
        self._config.monitoring_enabled = True
        self._config.performance("balanced")
        return self

    def build(self) -> LingFlowConfig:
        """构建配置"""
        result = self._config.validate()
        if result.is_error:
            raise ConfigurationError(result.error, code=result.code)

        return self._config


class ConfigManager:
    """配置管理器(支持热重载)"""

    def __init__(self, config: Optional[LingFlowConfig] = None):
        self._config = config or LingFlowConfig()
        self._config_hash = self._compute_hash()
        self._lock = threading.RLock()
        self._reload_callbacks: List[Callable[[LingFlowConfig], None]] = []

    def _compute_hash(self) -> str:
        """计算配置哈希值"""
        config_str = json.dumps(self._config.to_dict(), sort_keys=True)
        return hashlib.sha256(config_str.encode()).hexdigest()

    def get_config(self) -> LingFlowConfig:
        """获取配置"""
        with self._lock:
            return self._config

    def reload_config(self, new_config: LingFlowConfig) -> bool:
        """重新加载配置"""
        with self._lock:
            new_hash = self._compute_hash_from_config(new_config)

            if new_hash == self._config_hash:
                return False  # 配置未变化

            old_config = self._config
            self._config = new_config
            self._config_hash = new_hash

            # 调用重载回调
            for callback in self._reload_callbacks:
                try:
                    callback(old_config, new_config)
                except Exception as e:
                    print(f"Config reload callback failed: {e}")

            return True

    def _compute_hash_from_config(self, config: LingFlowConfig) -> str:
        """计算配置哈希值"""
        config_str = json.dumps(config.to_dict(), sort_keys=True)
        return hashlib.sha256(config_str.encode()).hexdigest()

    def on_reload(self, callback: Callable[[LingFlowConfig, LingFlowConfig], None]):
        """注册配置重载回调"""
        self._reload_callbacks.append(callback)

    def watch_file(self, file_path: Union[str, Path], interval: float = 1.0):
        """监听配置文件变化"""
        import time
        file_path = Path(file_path)

        last_modified = file_path.stat().st_mtime if file_path.exists() else 0

        def watcher():
            nonlocal last_modified
            while True:
                try:
                    if file_path.exists():
                        current_modified = file_path.stat().st_mtime
                        if current_modified != last_modified:
                            try:
                                if file_path.suffix in ['.yaml', '.yml']:
                                    new_config = LingFlowConfig.from_yaml(file_path)
                                elif file_path.suffix == '.json':
                                    new_config = LingFlowConfig.from_json(file_path)
                                else:
                                    continue

                                if self.reload_config(new_config):
                                    print(f"Config reloaded from {file_path}")
                                    last_modified = current_modified
                            except Exception as e:
                                print(f"Failed to reload config: {e}")

                    time.sleep(interval)
                except Exception as e:
                    print(f"Config watcher error: {e}")
                    time.sleep(interval)

        import threading
        thread = threading.Thread(target=watcher, daemon=True)
        thread.start()

三、服务层设计

3.1 技能服务(同步+异步)

from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, field
from pathlib import Path
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
import importlib.util
import json


@dataclass
class SkillContext:
    """技能执行上下文(增强版)"""
    skill_name: str
    params: Dict[str, Any]
    working_dir: Path
    temp_dir: Path
    metadata: Dict[str, Any] = field(default_factory=dict)

    # 新增字段
    execution_id: str = ""
    parent_execution_id: Optional[str] = None
    user_data: Dict[str, Any] = field(default_factory=dict)
    cache_key: Optional[str] = None


@dataclass
class SkillResult:
    """技能执行结果(增强版)"""
    success: bool
    data: Optional[Any] = None
    error: Optional[str] = None
    execution_time: float = 0.0
    metadata: Dict[str, Any] = field(default_factory=dict)

    # 新增字段
    cached: bool = False
    execution_id: str = ""
    skill_version: str = ""

    @classmethod
    def ok(cls, data: Any = None, **metadata) -> "SkillResult":
        return cls(success=True, data=data, metadata=metadata)

    @classmethod
    def fail(cls, error: str, **metadata) -> "SkillResult":
        return cls(success=False, error=error, metadata=metadata)

    def to_result(self) -> Result[Any]:
        """转换为 Result 类型"""
        if self.success:
            return Result.ok(self.data, **self.metadata)
        else:
            return Result.fail(self.error or "Unknown error", details=self.metadata)


class SkillService:
    """技能服务(同步版本)"""

    def __init__(self, config: LingFlowConfig, cache_manager: Optional['CacheManager'] = None):
        self._config = config
        self._cache_manager = cache_manager
        self._skills: Dict[str, BaseSkill] = {}
        self._load_skills()

    def _load_skills(self):
        """加载技能"""
        skills_dir = Path(self._config.skills_path)
        if not skills_dir.exists():
            return

        for skill_path in skills_dir.iterdir():
            if not skill_path.is_dir():
                continue

            self._load_skill_from_directory(skill_path)

    def _load_skill_from_directory(self, skill_path: Path):
        """从目录加载技能"""
        # 尝试加载 __init__.py
        init_file = skill_path / "__init__.py"
        if init_file.exists():
            self._load_skill_from_module(init_file, skill_path.name)
            return

        # 尝试加载 implementation.py
        impl_file = skill_path / "implementation.py"
        if impl_file.exists():
            self._load_skill_from_module(impl_file, skill_path.name)

    def _load_skill_from_module(self, module_path: Path, skill_name: str):
        """从模块加载技能"""
        try:
            spec = importlib.util.spec_from_file_location(
                f"skills.{skill_name}", str(module_path)
            )
            module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(module)

            # 查找 BaseSkill 子类
            for attr_name in dir(module):
                attr = getattr(module, attr_name)
                if (isinstance(attr, type) and
                    issubclass(attr, BaseSkill) and
                    attr is not BaseSkill):

                    skill_instance = attr()
                    self._skills[skill_instance.name] = skill_instance
                    print(f"Loaded skill: {skill_instance.name}")

        except Exception as e:
            print(f"Failed to load skill {skill_name}: {e}")

    def list(self) -> List[str]:
        """列出所有可用技能"""
        return list(self._skills.keys())

    def get_info(self, name: str) -> Result[Dict[str, Any]]:
        """获取技能信息"""
        if name not in self._skills:
            return Result.fail(f"Skill not found: {name}", code="SKILL_NOT_FOUND")

        skill = self._skills[name]
        return Result.ok({
            "name": skill.name,
            "description": skill.description,
            "version": skill.version,
            "author": skill.author,
            "dependencies": skill.dependencies,
        })

    def execute(self, name: str, params: Dict[str, Any]) -> SkillResult:
        """执行技能(同步)"""
        if name not in self._skills:
            return SkillResult.fail(f"Skill not found: {name}")

        skill = self._skills[name]

        # 验证参数
        validation = skill.validate_params(params)
        if validation.is_error:
            return SkillResult.fail(validation.error)

        # 检查缓存
        if self._cache_manager:
            cache_key = self._compute_cache_key(name, params)
            cached_result = self._cache_manager.get(cache_key)
            if cached_result:
                result = SkillResult.from_dict(cached_result)
                result.cached = True
                return result

        # 创建上下文
        import uuid
        context = SkillContext(
            skill_name=name,
            params=params,
            working_dir=Path.cwd(),
            temp_dir=Path.cwd() / ".lingflow" / "temp",
            execution_id=str(uuid.uuid4()),
            cache_key=self._compute_cache_key(name, params) if self._cache_manager else None,
        )

        # 执行
        try:
            start_time = time.time()

            pre_result = skill.pre_execute(context)
            if not pre_result.success:
                return pre_result

            result = skill.execute(context)
            result.execution_time = time.time() - start_time
            result.execution_id = context.execution_id
            result.skill_version = skill.version

            result = skill.post_execute(context, result)

            # 缓存结果
            if self._cache_manager and result.success and context.cache_key:
                self._cache_manager.set(context.cache_key, result.to_dict())

            return result

        except Exception as e:
            error_result = skill.on_error(context, e)
            error_result.execution_time = time.time() - start_time
            return error_result

    def execute_batch(
        self,
        tasks: List[Dict[str, Any]],
        parallel: bool = False,
        max_workers: Optional[int] = None
    ) -> Dict[str, SkillResult]:
        """批量执行技能"""
        if not parallel:
            # 串行执行
            results = {}
            for task in tasks:
                skill_name = task.get("skill")
                params = task.get("params", {})
                task_id = task.get("id", skill_name)
                results[task_id] = self.execute(skill_name, params)
            return results
        else:
            # 并行执行
            results = {}
            max_workers = max_workers or self._config.max_parallel

            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                future_to_task = {
                    executor.submit(
                        self.execute,
                        task.get("skill"),
                        task.get("params", {})
                    ): task
                    for task in tasks
                }

                for future in as_completed(future_to_task):
                    task = future_to_task[future]
                    task_id = task.get("id", task.get("skill"))
                    try:
                        results[task_id] = future.result()
                    except Exception as e:
                        results[task_id] = SkillResult.fail(str(e))

            return results

    def _compute_cache_key(self, skill_name: str, params: Dict[str, Any]) -> str:
        """计算缓存键"""
        import hashlib
        key_str = f"{skill_name}:{json.dumps(params, sort_keys=True)}"
        return hashlib.sha256(key_str.encode()).hexdigest()


class AsyncSkillService:
    """技能服务(异步版本)"""

    def __init__(self, config: LingFlowConfig, cache_manager: Optional['CacheManager'] = None):
        self._config = config
        self._cache_manager = cache_manager
        self._sync_service = SkillService(config, cache_manager)
        self._executor = ThreadPoolExecutor(max_workers=config.thread_pool_size)

    async def execute(self, name: str, params: Dict[str, Any]) -> SkillResult:
        """执行技能(异步)"""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self._executor,
            self._sync_service.execute,
            name,
            params
        )

    async def execute_batch(
        self,
        tasks: List[Dict[str, Any]],
        parallel: bool = True
    ) -> Dict[str, SkillResult]:
        """批量执行技能(异步)"""
        if not parallel:
            results = {}
            for task in tasks:
                skill_name = task.get("skill")
                params = task.get("params", {})
                task_id = task.get("id", skill_name)
                results[task_id] = await self.execute(skill_name, params)
            return results
        else:
            coroutines = [
                self.execute(task.get("skill"), task.get("params", {}))
                for task in tasks
            ]

            results_list = await asyncio.gather(*coroutines, return_exceptions=True)

            results = {}
            for task, result in zip(tasks, results_list):
                task_id = task.get("id", task.get("skill"))
                if isinstance(result, Exception):
                    results[task_id] = SkillResult.fail(str(result))
                else:
                    results[task_id] = result

            return results

    def list(self) -> List[str]:
        """列出所有可用技能"""
        return self._sync_service.list()

    async def get_info(self, name: str) -> Result[Dict[str, Any]]:
        """获取技能信息"""
        return self._sync_service.get_info(name)

3.2 工作流服务

from typing import Dict, Any, List, Optional, Union
from pathlib import Path
import yaml
import json
import uuid
import time


class WorkflowService:
    """工作流服务(同步版本)"""

    def __init__(
        self,
        config: LingFlowConfig,
        skill_service: SkillService
    ):
        self._config = config
        self._skill_service = skill_service

    def execute_file(
        self,
        workflow_path: Union[str, Path]
    ) -> Result[Dict[str, Any]]:
        """从文件执行工作流"""
        path = Path(workflow_path)

        # 验证路径
        if not path.exists():
            return Result.fail(
                f"Workflow file not found: {workflow_path}",
                code="FILE_NOT_FOUND"
            )

        # 加载工作流定义
        try:
            if path.suffix in ['.yaml', '.yml']:
                with open(path, "r", encoding="utf-8") as f:
                    workflow_def = yaml.safe_load(f)
            elif path.suffix == '.json':
                with open(path, "r", encoding="utf-8") as f:
                    workflow_def = json.load(f)
            else:
                return Result.fail(
                    f"Unsupported file format: {path.suffix}",
                    code="UNSUPPORTED_FORMAT"
                )
        except Exception as e:
            return Result.fail(
                f"Failed to load workflow: {e}",
                code="LOAD_ERROR"
            )

        # 验证工作流定义
        validation = self._validate_workflow(workflow_def)
        if validation.is_error:
            return Result.fail(
                validation.error,
                code=validation.code
            )

        # 执行工作流
        return self.execute(workflow_def)

    def _validate_workflow(self, workflow: Dict[str, Any]) -> Result[None]:
        """验证工作流定义"""
        if "tasks" not in workflow:
            return Result.fail(
                "Missing 'tasks' in workflow definition",
                code="INVALID_WORKFLOW"
            )

        if not isinstance(workflow["tasks"], list):
            return Result.fail(
                "'tasks' must be a list",
                code="INVALID_WORKFLOW"
            )

        # 验证每个任务
        for i, task in enumerate(workflow["tasks"]):
            if "skill" not in task:
                return Result.fail(
                    f"Task {i}: Missing 'skill' field",
                    code="INVALID_TASK"
                )

            if task["skill"] not in self._skill_service.list():
                return Result.fail(
                    f"Task {i}: Skill not found: {task['skill']}",
                    code="SKILL_NOT_FOUND"
                )

        return Result.ok()

    def execute(self, workflow: Dict[str, Any]) -> Result[Dict[str, Any]]:
        """执行工作流定义"""
        tasks = workflow.get("tasks", [])
        results = {}
        workflow_id = str(uuid.uuid4())

        start_time = time.time()

        for i, task in enumerate(tasks):
            skill_name = task.get("skill")
            params = task.get("params", {})
            task_id = task.get("id", f"task_{i}")

            print(f"Executing task {i+1}/{len(tasks)}: {task_id}")

            result = self._skill_service.execute(skill_name, params)
            results[task_id] = {
                "success": result.success,
                "data": result.data,
                "error": result.error,
                "execution_time": result.execution_time,
            }

            if not result.success:
                # 工作流失败
                return Result.fail(
                    f"Task {task_id} failed: {result.error}",
                    code="WORKFLOW_FAILED",
                    workflow_id=workflow_id,
                    failed_task=task_id,
                    results=results
                )

        return Result.ok({
            "workflow_id": workflow_id,
            "results": results,
            "total_execution_time": time.time() - start_time,
            "total_tasks": len(tasks),
            "successful_tasks": len(tasks),
        })


class AsyncWorkflowService:
    """工作流服务(异步版本)"""

    def __init__(
        self,
        config: LingFlowConfig,
        skill_service: AsyncSkillService
    ):
        self._config = config
        self._skill_service = skill_service

    async def execute_file(
        self,
        workflow_path: Union[str, Path]
    ) -> Result[Dict[str, Any]]:
        """从文件执行工作流(异步)"""
        # 异步读取文件
        path = Path(workflow_path)

        if not path.exists():
            return Result.fail(
                f"Workflow file not found: {workflow_path}",
                code="FILE_NOT_FOUND"
            )

        try:
            if path.suffix in ['.yaml', '.yml']:
                with open(path, "r", encoding="utf-8") as f:
                    workflow_def = yaml.safe_load(f)
            elif path.suffix == '.json':
                with open(path, "r", encoding="utf-8") as f:
                    workflow_def = json.load(f)
            else:
                return Result.fail(
                    f"Unsupported file format: {path.suffix}",
                    code="UNSUPPORTED_FORMAT"
                )
        except Exception as e:
            return Result.fail(
                f"Failed to load workflow: {e}",
                code="LOAD_ERROR"
            )

        return await self.execute(workflow_def)

    async def execute(self, workflow: Dict[str, Any]) -> Result[Dict[str, Any]]:
        """执行工作流定义(异步)"""
        tasks = workflow.get("tasks", [])
        workflow_id = str(uuid.uuid4())

        start_time = time.time()

        # 异步执行所有任务
        task_results = await asyncio.gather(*[
            self._skill_service.execute(task.get("skill"), task.get("params", {}))
            for task in tasks
        ], return_exceptions=True)

        results = {}
        for i, (task, result) in enumerate(zip(tasks, task_results)):
            task_id = task.get("id", f"task_{i}")

            if isinstance(result, Exception):
                results[task_id] = {
                    "success": False,
                    "data": None,
                    "error": str(result),
                    "execution_time": 0,
                }
            else:
                results[task_id] = {
                    "success": result.success,
                    "data": result.data,
                    "error": result.error,
                    "execution_time": result.execution_time,
                }

            if not results[task_id]["success"]:
                return Result.fail(
                    f"Task {task_id} failed: {results[task_id]['error']}",
                    code="WORKFLOW_FAILED",
                    workflow_id=workflow_id,
                    failed_task=task_id,
                    results=results
                )

        return Result.ok({
            "workflow_id": workflow_id,
            "results": results,
            "total_execution_time": time.time() - start_time,
            "total_tasks": len(tasks),
            "successful_tasks": len(tasks),
        })

四、API门面设计

4.1 统一入口(双模式)

from typing import Optional, Union, Dict, Any


class LingFlow:
    """LingFlow 统一API门面(双模式:同步+异步)

    Features:
        - 同步/异步双模式
        - 配置热重载
        - 性能监控
        - 插件系统
        - 缓存管理

    Examples:
        >>> # 同步模式
        >>> lf = LingFlow()
        >>> result = lf.skill.execute("code-analysis", {"target": "./"})
        >>>
        >>> # 异步模式
        >>> lf = LingFlow(async_mode=True)
        >>> result = await lf.skill_async.execute("code-analysis", {"target": "./"})
        >>>
        >>> # 自定义配置
        >>> config = LingFlowConfig.builder().max_parallel(4).build()
        >>> lf = LingFlow(config)
        >>>
        >>> # 会话管理
        >>> with lingflow_session(config) as lf:
        ...     lf.skill.execute("analysis", {...})
        ...     lf.workflow.execute_file("workflow.yaml")
    """

    def __init__(
        self,
        config: Optional[LingFlowConfig] = None,
        async_mode: bool = False,
        enable_cache: bool = True,
        enable_monitoring: bool = True
    ):
        """初始化 LingFlow

        Args:
            config: 配置对象,默认使用 LingFlowConfig()
            async_mode: 是否启用异步模式
            enable_cache: 是否启用缓存
            enable_monitoring: 是否启用监控
        """
        self._config = config or LingFlowConfig()
        self._async_mode = async_mode

        # 初始化缓存管理器
        self._cache_manager = CacheManager(self._config) if enable_cache else None

        # 初始化监控管理器
        self._monitor = Monitor(self._config) if enable_monitoring else None

        # 初始化服务
        if async_mode:
            self.skill_async = AsyncSkillService(self._config, self._cache_manager)
            self.workflow_async = AsyncWorkflowService(self._config, self.skill_async)
        else:
            self.skill = SkillService(self._config, self._cache_manager)
            self.workflow = WorkflowService(self._config, self.skill)

        # 初始化插件服务
        self.plugins = PluginService(self._config)

    @property
    def config(self) -> LingFlowConfig:
        """获取当前配置(只读)"""
        return self._config

    def get_status(self) -> Dict[str, Any]:
        """获取系统状态"""
        status = {
            "skills": {
                "available": len(self.skill.list() if not self._async_mode else self.skill_async.list()),
                "list": self.skill.list() if not self._async_mode else self.skill_async.list(),
            },
            "config": self._config.to_dict(),
            "async_mode": self._async_mode,
        }

        if self._cache_manager:
            status["cache"] = self._cache_manager.get_stats()

        if self._monitor:
            status["monitor"] = self._monitor.get_stats()

        return status

    def reload_config(self, new_config: LingFlowConfig) -> bool:
        """重新加载配置"""
        old_config = self._config
        self._config = new_config

        # 重新初始化服务
        if self._cache_manager:
            self._cache_manager.update_config(new_config)

        if self._monitor:
            self._monitor.update_config(new_config)

        return True

    def reload_skills(self) -> Result[None]:
        """重新加载技能"""
        try:
            if self._async_mode:
                self.skill_async._sync_service._load_skills()
            else:
                self.skill._load_skills()
            return Result.ok()
        except Exception as e:
            return Result.fail(str(e), code="RELOAD_ERROR")

    # ========== 便捷方法 ==========

    def run_skill(self, skill_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
        """便捷方法:执行技能(兼容旧API)"""
        result = self.skill.execute(skill_name, params)
        return result.to_dict()

    def run_workflow_file(self, filepath: str) -> Dict[str, Any]:
        """便捷方法:执行工作流文件(兼容旧API)"""
        result = self.workflow.execute_file(filepath)
        return result.unwrap_or({})

    def run_workflow(self, workflow_def: Dict[str, Any]) -> Dict[str, Any]:
        """便捷方法:执行工作流定义(兼容旧API)"""
        result = self.workflow.execute(workflow_def)
        return result.unwrap_or({})


# ==================== 上下文管理器 ====================

class lingflow_session:
    """LingFlow 会话上下文管理器"""

    def __init__(
        self,
        config: Optional[LingFlowConfig] = None,
        async_mode: bool = False
    ):
        self._config = config or LingFlowConfig()
        self._async_mode = async_mode
        self._instance: Optional[LingFlow] = None

    def __enter__(self) -> LingFlow:
        self._instance = LingFlow(self._config, async_mode=self._async_mode)
        return self._instance

    def __exit__(self, exc_type, exc_val, exc_tb):
        # 清理资源
        if self._instance:
            # 可以在这里添加清理逻辑
            pass
        return False


# ==================== 装饰器 ====================

def handle_errors(func):
    """统一的错误处理装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return Result.ok(func(*args, **kwargs))
        except LingFlowError as e:
            return Result.fail(e.message, code=e.code, details=e.details)
        except Exception as e:
            return Result.fail(str(e), code="UNEXPECTED_ERROR")
    return wrapper


def track_performance(func):
    """性能追踪装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        import time
        start = time.time()
        result = func(*args, **kwargs)
        elapsed = time.time() - start

        if isinstance(result, Result):
            result.details["execution_time"] = elapsed

        return result
    return wrapper


def async_handle_errors(func):
    """统一的错误处理装饰器(异步)"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        try:
            return Result.ok(await func(*args, **kwargs))
        except LingFlowError as e:
            return Result.fail(e.message, code=e.code, details=e.details)
        except Exception as e:
            return Result.fail(str(e), code="UNEXPECTED_ERROR")
    return wrapper


def async_track_performance(func):
    """性能追踪装饰器(异步)"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        import time
        start = time.time()
        result = await func(*args, **kwargs)
        elapsed = time.time() - start

        if isinstance(result, Result):
            result.details["execution_time"] = elapsed

        return result
    return wrapper

五、插件系统

5.1 插件基类

from typing import Dict, Any, List, Optional


class BasePlugin(ABC):
    """插件基类"""

    # 插件元数据
    name: str = "base-plugin"
    version: str = "1.0.0"
    author: str = ""
    description: str = "Base plugin class"

    # 依赖
    dependencies: List[str] = []

    def __init__(self, config: LingFlowConfig):
        self._config = config

    @classmethod
    def validate_config(cls, config: LingFlowConfig) -> Result[None]:
        """验证配置"""
        return Result.ok()

    def initialize(self) -> Result[None]:
        """初始化插件"""
        return Result.ok()

    def shutdown(self) -> Result[None]:
        """关闭插件"""
        return Result.ok()


class SkillPlugin(BasePlugin):
    """技能插件"""

    @abstractmethod
    def get_skill(self) -> BaseSkill:
        """返回技能实例"""
        pass

    @abstractmethod
    def get_skill_names(self) -> List[str]:
        """返回技能名称列表"""
        pass


class ServicePlugin(BasePlugin):
    """服务插件"""

    @abstractmethod
    def get_service(self) -> Any:
        """返回服务实例"""
        pass


class MiddlewarePlugin(BasePlugin):
    """中间件插件"""

    def pre_execute(self, context: SkillContext) -> SkillContext:
        """执行前钩子"""
        return context

    def post_execute(self, context: SkillContext, result: SkillResult) -> SkillResult:
        """执行后钩子"""
        return result

    def on_error(self, context: SkillContext, error: Exception) -> Exception:
        """错误处理钩子"""
        return error

5.2 插件服务

class PluginService:
    """插件管理服务"""

    def __init__(self, config: LingFlowConfig):
        self._config = config
        self._plugins: Dict[str, BasePlugin] = {}
        self._skill_plugins: Dict[str, List[SkillPlugin]] = {}
        self._middleware_plugins: List[MiddlewarePlugin] = []

        if self._config.plugins_enabled and self._config.auto_load_plugins:
            self._load_plugins()

    def _load_plugins(self):
        """加载插件"""
        plugins_dir = Path(self._config.plugins_path)
        if not plugins_dir.exists():
            return

        for plugin_path in plugins_dir.iterdir():
            if not plugin_path.is_dir():
                continue

            self._load_plugin_from_directory(plugin_path)

    def _load_plugin_from_directory(self, plugin_path: Path):
        """从目录加载插件"""
        try:
            init_file = plugin_path / "__init__.py"
            if not init_file.exists():
                return

            spec = importlib.util.spec_from_file_location(
                f"plugins.{plugin_path.name}", str(init_file)
            )
            module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(module)

            # 查找插件类
            for attr_name in dir(module):
                attr = getattr(module, attr_name)
                if (isinstance(attr, type) and
                    issubclass(attr, BasePlugin) and
                    attr is not BasePlugin):

                    plugin_instance = attr(self._config)
                    self._plugins[plugin_instance.name] = plugin_instance

                    # 初始化插件
                    init_result = plugin_instance.initialize()
                    if init_result.is_error:
                        print(f"Failed to initialize plugin {plugin_instance.name}: {init_result.error}")
                        continue

                    # 分类插件
                    if isinstance(plugin_instance, SkillPlugin):
                        for skill_name in plugin_instance.get_skill_names():
                            if skill_name not in self._skill_plugins:
                                self._skill_plugins[skill_name] = []
                            self._skill_plugins[skill_name].append(plugin_instance)

                    if isinstance(plugin_instance, MiddlewarePlugin):
                        self._middleware_plugins.append(plugin_instance)

                    print(f"Loaded plugin: {plugin_instance.name}")

        except Exception as e:
            print(f"Failed to load plugin {plugin_path.name}: {e}")

    def register_plugin(self, plugin: BasePlugin) -> Result[None]:
        """注册插件"""
        validation = plugin.validate_config(self._config)
        if validation.is_error:
            return validation

        init_result = plugin.initialize()
        if init_result.is_error:
            return Result.fail(f"Plugin initialization failed: {init_result.error}")

        self._plugins[plugin.name] = plugin

        # 分类插件
        if isinstance(plugin, SkillPlugin):
            for skill_name in plugin.get_skill_names():
                if skill_name not in self._skill_plugins:
                    self._skill_plugins[skill_name] = []
                self._skill_plugins[skill_name].append(plugin)

        if isinstance(plugin, MiddlewarePlugin):
            self._middleware_plugins.append(plugin)

        return Result.ok()

    def get_plugin(self, name: str) -> Optional[BasePlugin]:
        """获取插件"""
        return self._plugins.get(name)

    def list_plugins(self) -> List[Dict[str, Any]]:
        """列出所有插件"""
        return [
            {
                "name": plugin.name,
                "version": plugin.version,
                "author": plugin.author,
                "description": plugin.description,
            }
            for plugin in self._plugins.values()
        ]

    def unload_plugin(self, name: str) -> Result[None]:
        """卸载插件"""
        if name not in self._plugins:
            return Result.fail(f"Plugin not found: {name}", code="PLUGIN_NOT_FOUND")

        plugin = self._plugins[name]

        # 关闭插件
        shutdown_result = plugin.shutdown()
        if shutdown_result.is_error:
            return Result.fail(f"Plugin shutdown failed: {shutdown_result.error}")

        # 从分类中移除
        if isinstance(plugin, SkillPlugin):
            for skill_name in plugin.get_skill_names():
                if skill_name in self._skill_plugins:
                    self._skill_plugins[skill_name].remove(plugin)
                    if not self._skill_plugins[skill_name]:
                        del self._skill_plugins[skill_name]

        if isinstance(plugin, MiddlewarePlugin):
            self._middleware_plugins.remove(plugin)

        del self._plugins[name]

        return Result.ok()

    def reload(self) -> Result[None]:
        """重新加载所有插件"""
        # 卸载所有插件
        plugin_names = list(self._plugins.keys())
        for name in plugin_names:
            self.unload_plugin(name)

        # 重新加载
        self._load_plugins()

        return Result.ok()

六、缓存系统

6.1 缓存管理器

from typing import Dict, Any, Optional, Callable
import time
import hashlib
import json
from collections import OrderedDict


class CacheManager:
    """缓存管理器(支持 TTL 和 LRU)"""

    def __init__(self, config: LingFlowConfig):
        self._config = config
        self._cache: OrderedDict[str, Dict[str, Any]] = OrderedDict()
        self._stats = {
            "hits": 0,
            "misses": 0,
            "evictions": 0,
            "total_size": 0,
        }
        self._lock = threading.RLock()

    def get(self, key: str) -> Optional[Dict[str, Any]]:
        """获取缓存"""
        with self._lock:
            if key not in self._cache:
                self._stats["misses"] += 1
                return None

            entry = self._cache[key]

            # 检查 TTL
            if time.time() - entry["timestamp"] > self._config.cache_ttl:
                del self._cache[key]
                self._stats["misses"] += 1
                self._stats["evictions"] += 1
                self._stats["total_size"] -= entry.get("size", 0)
                return None

            # 更新访问时间(LRU)
            self._cache.move_to_end(key)

            self._stats["hits"] += 1
            return entry["data"]

    def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
        """设置缓存"""
        with self._lock:
            # 计算大小
            value_str = json.dumps(value, default=str)
            size = len(value_str.encode('utf-8'))

            # 检查缓存大小限制
            if len(self._cache) >= self._config.cache_max_size:
                self._evict()

            entry = {
                "data": value,
                "timestamp": time.time(),
                "ttl": ttl or self._config.cache_ttl,
                "size": size,
            }

            self._cache[key] = entry
            self._stats["total_size"] += size

            return True

    def delete(self, key: str) -> bool:
        """删除缓存"""
        with self._lock:
            if key in self._cache:
                size = self._cache[key].get("size", 0)
                del self._cache[key]
                self._stats["total_size"] -= size
                return True
            return False

    def clear(self):
        """清空缓存"""
        with self._lock:
            self._cache.clear()
            self._stats["total_size"] = 0

    def _evict(self):
        """淘汰最旧的缓存项"""
        if self._cache:
            key, entry = self._cache.popitem(last=False)
            size = entry.get("size", 0)
            self._stats["total_size"] -= size
            self._stats["evictions"] += 1

    def get_stats(self) -> Dict[str, Any]:
        """获取缓存统计"""
        with self._lock:
            hit_rate = (
                self._stats["hits"] / (self._stats["hits"] + self._stats["misses"])
                if (self._stats["hits"] + self._stats["misses"]) > 0
                else 0.0
            )

            return {
                "hits": self._stats["hits"],
                "misses": self._stats["misses"],
                "evictions": self._stats["evictions"],
                "hit_rate": hit_rate,
                "total_size": self._stats["total_size"],
                "total_entries": len(self._cache),
            }

    def update_config(self, new_config: LingFlowConfig):
        """更新配置"""
        self._config = new_config

        # 如果缓存大小限制减小,淘汰多余的缓存
        while len(self._cache) > self._config.cache_max_size:
            self._evict()

七、监控系统

7.1 监控器

from typing import Dict, Any, List, Optional
import time
from collections import defaultdict
from dataclasses import dataclass, field


@dataclass
class Metric:
    """性能指标"""
    name: str
    value: float
    timestamp: float = field(default_factory=time.time)
    metadata: Dict[str, Any] = field(default_factory=dict)


class Monitor:
    """性能监控器"""

    def __init__(self, config: LingFlowConfig):
        self._config = config
        self._metrics: Dict[str, List[Metric]] = defaultdict(list)
        self._counters: Dict[str, int] = defaultdict(int)
        self._timers: Dict[str, List[float]] = defaultdict(list)
        self._lock = threading.RLock()

    def record_metric(self, name: str, value: float, **metadata):
        """记录指标"""
        if not self._config.metrics_enabled:
            return

        with self._lock:
            metric = Metric(name=name, value=value, metadata=metadata)
            self._metrics[name].append(metric)

            # 保留最近的1000条记录
            if len(self._metrics[name]) > 1000:
                self._metrics[name] = self._metrics[name][-1000:]

    def increment_counter(self, name: str, value: int = 1):
        """增加计数器"""
        if not self._config.metrics_enabled:
            return

        with self._lock:
            self._counters[name] += value

    def record_time(self, name: str, duration: float):
        """记录时间"""
        if not self._config.metrics_enabled:
            return

        with self._lock:
            self._timers[name].append(duration)

            # 保留最近的100条记录
            if len(self._timers[name]) > 100:
                self._timers[name] = self._timers[name][-100:]

            # 检查性能阈值
            if duration > self._config.performance_threshold:
                self.record_metric(
                    f"{name}_slow",
                    duration,
                    warning=f"Duration exceeds threshold {self._config.performance_threshold}s"
                )

    def get_stats(self) -> Dict[str, Any]:
        """获取监控统计"""
        with self._lock:
            stats = {
                "counters": dict(self._counters),
                "timers": {},
                "metrics": {},
            }

            # 计算计时器统计
            for name, times in self._timers.items():
                if times:
                    stats["timers"][name] = {
                        "count": len(times),
                        "min": min(times),
                        "max": max(times),
                        "avg": sum(times) / len(times),
                        "sum": sum(times),
                    }

            # 计算指标统计
            for name, metrics in self._metrics.items():
                if metrics:
                    stats["metrics"][name] = {
                        "count": len(metrics),
                        "min": min(m.value for m in metrics),
                        "max": max(m.value for m in metrics),
                        "avg": sum(m.value for m in metrics) / len(metrics),
                    }

            return stats

    def clear_metrics(self):
        """清除所有指标"""
        with self._lock:
            self._metrics.clear()
            self._counters.clear()
            self._timers.clear()

    def update_config(self, new_config: LingFlowConfig):
        """更新配置"""
        self._config = new_config


# ==================== 装饰器 ====================

def monitor_performance(monitor: Monitor):
    """性能监控装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            import time
            start_time = time.time()
            monitor.increment_counter(f"{func.__name__}_calls")

            try:
                result = func(*args, **kwargs)
                duration = time.time() - start_time
                monitor.record_time(func.__name__, duration)
                return result
            except Exception as e:
                monitor.increment_counter(f"{func.__name__}_errors")
                raise

        return wrapper
    return decorator


def monitor_async_performance(monitor: Monitor):
    """性能监控装饰器(异步)"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            import time
            start_time = time.time()
            monitor.increment_counter(f"{func.__name__}_calls")

            try:
                result = await func(*args, **kwargs)
                duration = time.time() - start_time
                monitor.record_time(func.__name__, duration)
                return result
            except Exception as e:
                monitor.increment_counter(f"{func.__name__}_errors")
                raise

        return wrapper
    return decorator

八、使用示例

8.1 基础使用

# ============ 同步模式 ============
from lingflow import LingFlow, LingFlowConfig

# 使用默认配置
lf = LingFlow()

# 执行技能
result = lf.skill.execute("code-analysis", {"target": "./lingflow/"})
if result.success:
    print(f"Analysis complete: {result.data}")
else:
    print(f"Analysis failed: {result.error}")

# 执行工作流
result = lf.workflow.execute_file("workflow.yaml")
if result.is_ok:
    print("Workflow completed!")
    print(result.data)


# ============ 异步模式 ============
import asyncio

lf = LingFlow(async_mode=True)

async def main():
    result = await lf.skill_async.execute("code-analysis", {"target": "./"})
    if result.success:
        print(f"Analysis complete: {result.data}")

asyncio.run(main())


# ============ 自定义配置 ============
config = (LingFlowConfig.builder()
          .max_parallel(4)
          .timeout(600)
          .compression(True, 8000)
          .skills_path("./custom_skills")
          .logging(level="DEBUG")
          .monitoring(enabled=True)
          .build())

lf = LingFlow(config)


# ============ 会话管理 ============
with lingflow_session(config) as lf:
    lf.skill.execute("analysis", {"target": "./"})
    lf.workflow.execute_file("workflow.yaml")
    # 自动清理资源

8.2 Result 类型使用

from lingflow import Result

# 创建结果
success = Result.ok(data={"key": "value"})
failure = Result.fail("Something went wrong", code="ERR001")

# 链式调用
result = (Result.ok("  hello  ")
          .map(str.strip)
          .map(str.upper)
          .and_then(lambda x: Result.ok(x + "!"))
          .inspect(lambda r: print(f"Current result: {r.data}")))

# 错误处理
result = Result.fail("Error occurred").or_else(
    lambda error: Result.ok(f"Recovered from: {error}")
)

# 组合操作
r1 = Result.ok(1)
r2 = Result.ok(2)
r3 = Result.ok(3)

combined = Result.zip(r1, r2, r3)
# combined.data = [1, 2, 3]

first_ok = Result.first_ok(
    Result.fail("error1"),
    Result.ok(2),
    Result.fail("error3")
)
# first_ok.data = 2

# 超时控制
result = Result.ok("data").with_timeout(5.0)

# 序列化
json_str = result.to_json()
dict_data = result.to_dict()

# 从字典恢复
result2 = Result.from_dict(dict_data)

8.3 自定义技能

from lingflow import BaseSkill, SkillContext, SkillResult, Result

class MyAnalysisSkill(BaseSkill):
    """自定义分析技能"""

    name = "my-analysis"
    description = "My custom analysis skill"
    version = "1.0.0"
    author = "My Name"

    @classmethod
    def validate_params(cls, params):
        if "target" not in params:
            return Result.fail("Missing 'target' parameter")
        if not isinstance(params["target"], str):
            return Result.fail("'target' must be a string")
        return Result.ok()

    def pre_execute(self, context: SkillContext) -> SkillResult:
        """执行前钩子"""
        print(f"Starting analysis on: {context.params['target']}")
        return SkillResult.ok()

    def execute(self, context: SkillContext) -> SkillResult:
        """执行技能"""
        target = context.params["target"]

        # 执行分析逻辑
        # ...

        return SkillResult.ok(data={
            "files_analyzed": 42,
            "issues_found": 5,
        })

    def post_execute(self, context: SkillContext, result: SkillResult) -> SkillResult:
        """执行后钩子"""
        print(f"Analysis complete: {result.execution_time:.2f}s")
        return result

    def on_error(self, context: SkillContext, error: Exception) -> SkillResult:
        """错误处理钩子"""
        print(f"Analysis failed: {error}")
        return SkillResult.fail(str(error))


# 注册技能
lf = LingFlow()
lf.skill._skills[MyAnalysisSkill.name] = MyAnalysisSkill()

# 使用技能
result = lf.skill.execute("my-analysis", {"target": "./"})

8.4 自定义插件

from lingflow import BasePlugin, SkillPlugin, SkillService, LingFlowConfig

class MySkillPlugin(SkillPlugin):
    """自定义技能插件"""

    name = "my-skill-plugin"
    version = "1.0.0"
    description = "My custom skill plugin"

    def __init__(self, config: LingFlowConfig):
        super().__init__(config)
        self._skill = MyAnalysisSkill()

    def get_skill(self):
        return self._skill

    def get_skill_names(self):
        return [self._skill.name]


# 注册插件
lf = LingFlow()
plugin = MySkillPlugin(lf.config)
lf.plugins.register_plugin(plugin)

# 现在可以使用插件提供的技能
result = lf.skill.execute("my-analysis", {"target": "./"})

九、实施路线图

阶段1:核心基础设施(2周)

Week 1: - [ ] 创建 lingflow/core/types.py - Result 和异常体系 - [ ] 创建 lingflow/core/config.py - LingFlowConfig 和构建器 - [ ] 创建 lingflow/core/skill.py - BaseSkill 和相关类型 - [ ] 编写单元测试

Week 2: - [ ] 创建 lingflow/core/cache.py - 缓存管理器 - [ ] 创建 lingflow/core/monitor.py - 监控器 - [ ] 创建 lingflow/adapter.py - 适配器层 - [ ] 编写单元测试

阶段2:服务层(3周)

Week 3: - [ ] 实现 lingflow/services/skill.py - SkillService - [ ] 实现 lingflow/services/workflow.py - WorkflowService - [ ] 实现异步版本 AsyncSkillService 和 AsyncWorkflowService - [ ] 编写集成测试

Week 4: - [ ] 实现 lingflow/services/plugin.py - PluginService - [ ] 实现插件基类 BasePlugin, SkillPlugin, ServicePlugin - [ ] 编写插件示例 - [ ] 编写集成测试

Week 5: - [ ] 重构 lingflow/__init__.py - 集成所有服务 - [ ] 实现双模式API(同步+异步) - [ ] 实现上下文管理器 - [ ] 编写端到端测试

阶段3:技能迁移(3周)

Week 6-7: - [ ] 创建技能迁移工具 - [ ] 迁移 brainstorming 技能 - [ ] 迁移 writing-plans 技能 - [ ] 迁移 test-driven-development 技能 - [ ] 迁移 systematic-debugging 技能

Week 8: - [ ] 迁移其他6个技能 - [ ] 测试所有迁移的技能 - [ ] 更新技能文档 - [ ] 创建技能开发模板

阶段4:API清理和文档(1周)

Week 9: - [ ] 在旧API中添加 DeprecationWarning - [ ] 更新 README.md - [ ] 更新 AGENTS.md - [ ] 创建迁移指南 - [ ] 更新示例代码 - [ ] 创建 API 文档

阶段5:性能优化和测试(2周)

Week 10: - [ ] 性能测试和优化 - [ ] 压力测试 - [ ] 内存泄漏检查 - [ ] 缓存性能优化

Week 11: - [ ] 完整的集成测试 - [ ] 端到端测试 - [ ] 用户验收测试 - [ ] 性能基准测试

阶段6:发布准备(1周)

Week 12: - [ ] 代码审查 - [ ] 文档审查 - [ ] 发布说明 - [ ] 发布 LingFlow V4.0.0


十、成功指标

10.1 技术指标

指标 目标 当前 改进
类型覆盖率 100% 60% +40%
测试覆盖率 95% 78% +17%
Cyclomatic复杂度 < 10 15 -33%
代码重复率 < 3% 8% -62.5%
文档覆盖率 95% 81% +14%

10.2 性能指标

指标 目标 当前 改进
API 响应时间 < 100ms 200ms -50%
技能执行速度 +30% - +30%
内存占用 < 50MB 80MB -37.5%
缓存命中率 > 80% 0% +80%
并发能力 4x 1x +300%

10.3 用户体验指标

指标 目标 当前 改进
API 一致性 5/5 2/5 +150%
类型安全 5/5 2/5 +150%
错误处理 5/5 2/5 +150%
学习曲线 -
开发效率 +80% - +80%

十一、总结

核心优势

  1. 双模式API:同时支持同步和异步,用户自由选择
  2. 类型安全:100% 类型化,Result 泛型,完整类型提示
  3. 统一抽象:Result 类型、异常体系、配置构建器
  4. 插件系统:标准化插件接口,支持第三方扩展
  5. 性能优化:智能缓存、懒加载、并行执行
  6. 配置热重载:无需重启,动态更新配置
  7. 内置监控:性能追踪、错误统计、资源监控
  8. 开发者体验:自动补全、类型提示、错误诊断

与其他方案对比

特性 当前 方案A 方案B 本方案
Result 类型 ✅ 增强
异步支持 ⚠️ ✅ 双模式
插件系统
配置热重载
缓存系统
监控系统
类型安全 ⭐⭐ ⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐
向后兼容 N/A ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐
实施难度 N/A ⭐⭐⭐⭐⭐ ⭐⭐ ⭐⭐⭐

立即行动

本周(Week 1): 1. ✅ 审查并批准 V4.0 方案 2. ✅ 创建实施分支 feature/v4.0-refactor 3. ✅ 开始阶段1:核心基础设施

下周(Week 2): 1. ✅ 完成 Result 类型实现 2. ✅ 完成 LingFlowConfig 实现 3. ✅ 完成 BaseSkill 实现 4. ✅ 编写单元测试


文档版本: V1.0 最后更新: 2026-03-25 下次评审: 2026-04-01