Phase 4-5 集成接口定义
版本: v1.0 日期: 2026-03-31
目录
核心接口
1. 优化器协议
from typing import Protocol, Dict, Any, Optional, List
from dataclasses import dataclass
from enum import Enum
class OptimizationGoal(Enum):
"""优化目标"""
STRUCTURE = "structure"
PERFORMANCE = "performance"
SIMPLICITY = "simplicity"
ALL = "all"
@dataclass
class OptimizationRequest:
"""优化请求"""
target: str
goal: OptimizationGoal
search_space: Dict[str, Any]
max_time: float = 120.0
max_trials: int = 50
enable_cache: bool = True
enable_transfer: bool = True
project_name: str = "default"
@dataclass
class OptimizationResult:
"""优化结果"""
success: bool
best_params: Dict[str, Any]
best_score: float
n_trials: int
duration: float
converged: bool
error: Optional[str] = None
# Phase 4扩展
sensitivities: Optional[Dict[str, float]] = None
pareto_front: Optional[List[Dict[str, Any]]] = None
convergence_rate: Optional[float] = None
class OptimizerProtocol(Protocol):
"""优化器协议"""
def optimize(
self,
request: OptimizationRequest
) -> OptimizationResult:
"""执行优化"""
...
def get_progress(self) -> Dict[str, Any]:
"""获取优化进度"""
...
def cancel(self) -> bool:
"""取消优化"""
...
2. 学习器协议
from enum import Enum
class FeedbackSource(Enum):
"""反馈源"""
SEMGREP = "semgrep"
RUFF = "ruff"
PYLINT = "pylint"
CUSTOM = "custom"
@dataclass
class LearningRequest:
"""学习请求"""
target: str
tools: List[FeedbackSource]
auto_apply: bool = False
min_confidence: float = 0.8
save_rules: bool = True
@dataclass
class LearningResult:
"""学习结果"""
success: bool
feedback_count: int
rules_learned: int
rules: List[Any]
duration: float
error: Optional[str] = None
class LearningProtocol(Protocol):
"""学习器协议"""
def learn(
self,
request: LearningRequest
) -> LearningResult:
"""执行学习"""
...
def validate_rules(
self,
rules: List[Any]
) -> Dict[str, bool]:
"""验证规则"""
...
def apply_rules(
self,
rules: List[Any],
auto_apply: bool = False
) -> bool:
"""应用规则"""
...
集成接口
1. 智能优化器路由
class SmartOptimizerRouter:
"""智能优化器路由
根据项目特征和配置自动选择最合适的优化器
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""初始化路由器
Args:
config: 配置字典
"""
self.config = config or {}
self.phase4_enabled = self.config.get("phase4.enabled", False)
self.project_size_threshold = self.config.get(
"phase4.project_size_threshold",
50
)
def get_optimizer(
self,
project_context: Dict[str, Any]
) -> OptimizerProtocol:
"""获取优化器实例
Args:
project_context: 项目上下文信息
Returns:
优化器实例
"""
# 检查是否应该使用Phase 4
if self._should_use_phase4(project_context):
return self._create_phase4_optimizer()
else:
return self._create_legacy_optimizer()
def _should_use_phase4(
self,
context: Dict[str, Any]
) -> bool:
"""判断是否使用Phase 4优化器
Args:
context: 项目上下文
Returns:
是否使用Phase 4
"""
# 显式启用
if self.phase4_enabled:
return True
# 大型项目
class_count = context.get("class_count", 0)
if class_count > self.project_size_threshold:
return True
# 长时间优化需求
max_time = context.get("max_time", 0)
if max_time > 300: # 5分钟以上
return True
return False
def _create_phase4_optimizer(self) -> OptimizerProtocol:
"""创建Phase 4优化器"""
from lingflow.self_optimizer.phase4 import OptimizationEngine
return OptimizationEngine(config=self.config)
def _create_legacy_optimizer(self) -> OptimizerProtocol:
"""创建传统优化器"""
from lingflow.self_optimizer import SynchronousOptimizer
return SynchronousOptimizer()
2. 工作流增强器
from lingflow.common.models import Task
class WorkflowEnhancer:
"""工作流增强器
自动增强工作流中的相关任务
"""
def __init__(
self,
orchestrator,
config: Optional[Dict[str, Any]] = None
):
"""初始化增强器
Args:
orchestrator: 工作流编排器
config: 配置字典
"""
self.orchestrator = orchestrator
self.config = config or {}
def enhance_workflow(
self,
tasks: List[Task]
) -> List[Task]:
"""增强工作流任务
Args:
tasks: 原始任务列表
Returns:
增强后的任务列表
"""
enhanced_tasks = []
for task in tasks:
enhanced_task = self._enhance_task(task)
enhanced_tasks.append(enhanced_task)
return enhanced_tasks
def _enhance_task(self, task: Task) -> Task:
"""增强单个任务
Args:
task: 原始任务
Returns:
增强后的任务
"""
# 根据任务类型增强
if task.name == "code-review":
return self._enhance_code_review(task)
elif task.name == "optimize":
return self._enhance_optimize(task)
else:
return task
def _enhance_code_review(self, task: Task) -> Task:
"""增强代码审查任务
Args:
task: 代码审查任务
Returns:
增强后的任务
"""
phase5_enabled = self.config.get("phase5.enabled", False)
if not phase5_enabled:
return task
# 添加Phase 5参数
params = task.context.copy()
params.setdefault("use_phase5", True)
params.setdefault(
"ai_tools",
self.config.get("phase5.default_tools", ["semgrep", "ruff"])
)
return Task(
task_id=task.task_id,
name=task.name,
description=task.description,
agent_type=task.agent_type,
context=params,
priority=task.priority,
dependencies=task.dependencies
)
def _enhance_optimize(self, task: Task) -> Task:
"""增强优化任务
Args:
task: 优化任务
Returns:
增强后的任务
"""
phase4_enabled = self.config.get("phase4.enabled", False)
if not phase4_enabled:
return task
params = task.context.copy()
params.setdefault("use_phase4", True)
params.setdefault(
"optimization_method",
self.config.get("phase4.optimizer.algorithm", "bayesian")
)
return Task(
task_id=task.task_id,
name=task.name,
description=task.description,
agent_type=task.agent_type,
context=params,
priority=task.priority,
dependencies=task.dependencies
)
3. 代码审查集成
class CodeReviewIntegration:
"""代码审查与Phase 5集成
将AI工具学习到的规则集成到代码审查流程
"""
def __init__(
self,
code_reviewer,
phase5_system,
config: Optional[Dict[str, Any]] = None
):
"""初始化集成
Args:
code_reviewer: 代码审查器
phase5_system: Phase 5学习系统
config: 配置字典
"""
self.code_reviewer = code_reviewer
self.phase5_system = phase5_system
self.config = config or {}
def enhance_with_ai_tools(
self,
target_path: str,
tools: Optional[List[FeedbackSource]] = None
) -> List[Any]:
"""使用AI工具增强代码审查
Args:
target_path: 目标路径
tools: AI工具列表
Returns:
学习到的规则列表
"""
# 1. 确定工具列表
if tools is None:
tools = self._get_default_tools()
# 2. 收集反馈
feedback_list = self._collect_feedback(target_path, tools)
# 3. 提取规则
rules = self._extract_rules(feedback_list)
# 4. 验证规则
validated_rules = self._validate_rules(rules, target_path)
# 5. 注册规则
self._register_rules(validated_rules)
return validated_rules
def _get_default_tools(self) -> List[FeedbackSource]:
"""获取默认工具列表"""
default_tools = self.config.get(
"phase5.default_tools",
["semgrep", "ruff"]
)
tool_map = {
"semgrep": FeedbackSource.SEMGREP,
"ruff": FeedbackSource.RUFF,
"pylint": FeedbackSource.PYLINT
}
return [
tool_map[tool]
for tool in default_tools
if tool in tool_map
]
def _collect_feedback(
self,
target_path: str,
tools: List[FeedbackSource]
) -> List[Any]:
"""收集AI工具反馈"""
from lingflow.self_optimizer.phase5 import get_available_adapters
all_feedback = []
# 获取可用适配器
adapter_configs = self._get_adapter_configs(tools)
adapters = get_available_adapters(adapter_configs)
# 运行扫描
for adapter in adapters:
try:
feedback = adapter.run_scan(target_path)
all_feedback.extend(feedback)
except Exception as e:
# 记录错误但继续
self._log_error(adapter.name, e)
return all_feedback
def _extract_rules(self, feedback_list: List[Any]) -> List[Any]:
"""从反馈中提取规则"""
from lingflow.self_optimizer.phase5 import RuleExtractor
extractor = RuleExtractor(config=self.config)
rules = extractor.extract_patterns(feedback_list)
# 去重
deduplicator = RuleDeduplicator()
rules = deduplicator.deduplicate(rules)
return rules
def _validate_rules(
self,
rules: List[Any],
target_path: str
) -> List[Any]:
"""验证规则"""
from lingflow.self_optimizer.phase5 import RuleValidator
validator = RuleValidator(config=self.config)
validated_rules = []
for rule in rules:
try:
validation = validator.validate_rule(rule, target_path)
if validation.is_safe:
validated_rules.append(rule)
except Exception as e:
# 记录错误但继续
self._log_error(f"Validation for {rule.id}", e)
return validated_rules
def _register_rules(self, rules: List[Any]) -> None:
"""注册规则到代码审查器"""
for rule in rules:
try:
self.code_reviewer.rule_engine.add_rule(
rule_id=rule.id,
pattern=rule.pattern,
severity=rule.severity,
category=rule.category
)
except Exception as e:
# 记录错误但继续
self._log_error(f"Registering rule {rule.id}", e)
def _get_adapter_configs(
self,
tools: List[FeedbackSource]
) -> Dict[FeedbackSource, Dict[str, Any]]:
"""获取适配器配置"""
configs = {}
for tool in tools:
# 从配置中读取工具特定配置
tool_config = self.config.get(f"phase5.tools.{tool.value}", {})
configs[tool] = tool_config
return configs
def _log_error(self, context: str, error: Exception) -> None:
"""记录错误"""
import logging
logger = logging.getLogger(__name__)
logger.warning(f"Error in {context}: {error}")
数据模型
1. 统一结果模型
from typing import Generic, TypeVar, Optional, Dict, Any, List
from dataclasses import dataclass, field
from datetime import datetime
T = TypeVar('T')
@dataclass
class IntegrationResult(Generic[T]):
"""集成结果(统一格式)"""
success: bool
data: Optional[T] = None
error: Optional[str] = None
warnings: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
def is_error(self) -> bool:
"""是否错误"""
return not self.success or self.error is not None
def has_warnings(self) -> bool:
"""是否有警告"""
return len(self.warnings) > 0
2. 集成配置模型
from dataclasses import dataclass, field
from typing import Dict, Any, Optional
@dataclass
class IntegrationConfig:
"""集成配置"""
# Phase 4配置
phase4_enabled: bool = False
phase4_auto_enable: bool = False
phase4_project_size_threshold: int = 50
# Phase 5配置
phase5_enabled: bool = False
phase5_auto_collect: bool = False
phase5_default_tools: List[str] = field(default_factory=lambda: ["semgrep", "ruff"])
phase5_min_confidence: float = 0.8
phase5_auto_apply_threshold: float = 0.9
# 工作流配置
workflow_auto_enhance: bool = True
workflow_dry_run: bool = False
# 路由配置
smart_optimizer_routing: bool = True
fallback_on_error: bool = True
@classmethod
def from_dict(cls, config: Dict[str, Any]) -> 'IntegrationConfig':
"""从字典创建配置"""
return cls(
phase4_enabled=config.get("phase4.enabled", False),
phase4_auto_enable=config.get("phase4.auto_enable", False),
phase4_project_size_threshold=config.get(
"phase4.project_size_threshold",
50
),
phase5_enabled=config.get("phase5.enabled", False),
phase5_auto_collect=config.get("phase5.auto_collect", False),
phase5_default_tools=config.get(
"phase5.default_tools",
["semgrep", "ruff"]
),
phase5_min_confidence=config.get("phase5.min_confidence", 0.8),
phase5_auto_apply_threshold=config.get(
"phase5.auto_apply_threshold",
0.9
),
workflow_auto_enhance=config.get("integration.workflow_auto_enhance", True),
workflow_dry_run=config.get("integration.workflow_dry_run", False),
smart_optimizer_routing=config.get(
"integration.smart_optimizer_routing",
True
),
fallback_on_error=config.get("integration.fallback_on_error", True),
)
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"phase4.enabled": self.phase4_enabled,
"phase4.auto_enable": self.phase4_auto_enable,
"phase4.project_size_threshold": self.phase4_project_size_threshold,
"phase5.enabled": self.phase5_enabled,
"phase5.auto_collect": self.phase5_auto_collect,
"phase5.default_tools": self.phase5_default_tools,
"phase5.min_confidence": self.phase5_min_confidence,
"phase5.auto_apply_threshold": self.phase5_auto_apply_threshold,
"integration.workflow_auto_enhance": self.workflow_auto_enhance,
"integration.workflow_dry_run": self.workflow_dry_run,
"integration.smart_optimizer_routing": self.smart_optimizer_routing,
"integration.fallback_on_error": self.fallback_on_error,
}
错误处理
1. 集成异常
class IntegrationError(Exception):
"""集成错误基类"""
pass
class ConfigurationError(IntegrationError):
"""配置错误"""
pass
class OptimizerNotFoundError(IntegrationError):
"""优化器未找到错误"""
pass
class LearningError(IntegrationError):
"""学习错误"""
pass
class RuleValidationError(IntegrationError):
"""规则验证错误"""
pass
class ToolNotAvailableError(IntegrationError):
"""工具不可用错误"""
pass
2. 错误处理器
class IntegrationErrorHandler:
"""集成错误处理器"""
def __init__(self, logger=None):
"""初始化错误处理器"""
self.logger = logger or logging.getLogger(__name__)
self.error_counts = {}
self.error_callbacks = {}
def handle_error(
self,
error: Exception,
context: Dict[str, Any]
) -> None:
"""处理错误
Args:
error: 异常对象
context: 错误上下文
"""
# 记录错误
error_type = type(error).__name__
self.error_counts[error_type] = self.error_counts.get(error_type, 0) + 1
# 记录日志
self.logger.error(
f"Integration error: {error}",
extra={"context": context, "error_type": error_type}
)
# 调用回调
callback = self.error_callbacks.get(error_type)
if callback:
callback(error, context)
def register_callback(
self,
error_type: str,
callback: callable
) -> None:
"""注册错误回调
Args:
error_type: 错误类型
callback: 回调函数
"""
self.error_callbacks[error_type] = callback
def get_error_stats(self) -> Dict[str, int]:
"""获取错误统计"""
return self.error_counts.copy()
事件系统
1. 事件定义
from enum import Enum
from dataclasses import dataclass
from typing import Any, Dict, Callable, List
class IntegrationEventType(Enum):
"""集成事件类型"""
OPTIMIZATION_STARTED = "optimization_started"
OPTIMIZATION_COMPLETED = "optimization_completed"
OPTIMIZATION_FAILED = "optimization_failed"
LEARNING_STARTED = "learning_started"
LEARNING_COMPLETED = "learning_completed"
LEARNING_FAILED = "learning_failed"
RULE_LEARNED = "rule_learned"
RULE_VALIDATED = "rule_validated"
RULE_APPLIED = "rule_applied"
WORKFLOW_ENHANCED = "workflow_enhanced"
@dataclass
class IntegrationEvent:
"""集成事件"""
type: IntegrationEventType
data: Dict[str, Any]
timestamp: datetime = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
2. 事件总线
class IntegrationEventBus:
"""集成事件总线"""
def __init__(self):
"""初始化事件总线"""
self._listeners: Dict[IntegrationEventType, List[Callable]] = {}
def subscribe(
self,
event_type: IntegrationEventType,
callback: Callable[[IntegrationEvent], None]
) -> None:
"""订阅事件
Args:
event_type: 事件类型
callback: 回调函数
"""
if event_type not in self._listeners:
self._listeners[event_type] = []
self._listeners[event_type].append(callback)
def publish(self, event: IntegrationEvent) -> None:
"""发布事件
Args:
event: 事件对象
"""
listeners = self._listeners.get(event.type, [])
for callback in listeners:
try:
callback(event)
except Exception as e:
# 记录错误但继续执行其他回调
import logging
logger = logging.getLogger(__name__)
logger.error(
f"Error in event callback for {event.type}: {e}"
)
def unsubscribe(
self,
event_type: IntegrationEventType,
callback: Callable[[IntegrationEvent], None]
) -> None:
"""取消订阅
Args:
event_type: 事件类型
callback: 回调函数
"""
if event_type in self._listeners:
try:
self._listeners[event_type].remove(callback)
except ValueError:
pass
# 全局事件总线实例
_global_event_bus = None
def get_global_event_bus() -> IntegrationEventBus:
"""获取全局事件总线"""
global _global_event_bus
if _global_event_bus is None:
_global_event_bus = IntegrationEventBus()
return _global_event_bus
3. 事件使用示例
# 订阅事件
event_bus = get_global_event_bus()
def on_optimization_completed(event: IntegrationEvent):
"""优化完成事件处理"""
result = event.data.get("result")
print(f"Optimization completed: {result.best_score}")
event_bus.subscribe(
IntegrationEventType.OPTIMIZATION_COMPLETED,
on_optimization_completed
)
# 发布事件
event = IntegrationEvent(
type=IntegrationEventType.OPTIMIZATION_COMPLETED,
data={"result": optimization_result}
)
event_bus.publish(event)
接口版本: v1.0 最后更新: 2026-03-31