LingFlow Phase 4: 参数优化架构设计
版本: v1.0 日期: 2026-03-31 状态: 架构设计
目录
架构概述
Phase 4参数优化架构旨在将LingFlow的自优化能力从简单的网格搜索升级到智能参数优化系统。通过引入贝叶斯优化、参数持久化和多目标优化策略,实现更高效、更智能的参数调优。
架构分层
┌─────────────────────────────────────────────────────────────┐
│ 应用层 (CLI / Hooks) │
├─────────────────────────────────────────────────────────────┤
│ 优化协调层 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 优化调度器 │ │ 参数管理器 │ │ 报告生成器 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ 算法核心层 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 贝叶斯优化器 │ │ 多目标优化器 │ │ 敏感性分析器 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ 评估层 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 结构评估器 │ │ 性能评估器 │ │ 简洁评估器 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ 持久化层 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 参数存储 │ │ 历史记录 │ │ 缓存管理 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
设计目标
主要目标
- 智能优化: 从网格搜索升级到贝叶斯优化,减少50%以上的评估次数
- 参数持久化: 实现参数版本管理和最佳参数缓存
- 多目标优化: 同时优化代码质量、性能、简洁性
- 知识迁移: 支持跨项目的参数知识迁移
- 实时监控: 提供优化进度可视化和收敛性检测
非目标
- 不改变现有的评估器接口
- 不修改现有的触发器系统
- 不影响核心LingFlow工作流
当前状态分析
现有系统架构
当前LingFlow自优化系统(v3.8.0)包含:
优点: - 已实现结构、性能、简洁性三个评估器 - 支持LingMinOpt集成(带降级) - 进程隔离的优化器设计 - 完善的配置管理系统
限制: - 使用简单网格搜索或随机采样 - 无参数持久化机制 - 无参数版本管理 - 无多目标优化支持 - 无参数敏感性分析
关键性能指标
| 指标 | 当前值 | 目标值 |
|---|---|---|
| 192类项目优化时间 | ~120秒 | <60秒 |
| 参数评估次数 | 20-50次 | <15次 |
| 内存占用 | ~150MB | <200MB |
| 参数命中率 | N/A | >60% |
核心架构
架构原则
- 向后兼容: 保持现有API不变
- 渐进增强: 可选启用新功能
- 模块化: 各组件独立可测试
- 可扩展: 支持新算法和评估器
- 可观测: 完整的日志和指标
核心组件关系图
┌─────────────────────────────────────────────────────────────┐
│ OptimizationEngine │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ OptimizationCoordinator │ │
│ │ - 管理优化生命周期 │ │
│ │ - 协调各个组件 │ │
│ │ - 处理优化请求 │ │
│ └──────────┬──────────────────┬──────────────────┬─────┘ │
│ │ │ │ │
│ ┌──────────▼──────┐ ┌───────▼────────┐ ┌─────▼──────┐ │
│ │ BayesianOptimizer│ │ParameterManager│ │MultiObjective│ │
│ │ │ │ │ │ Optimizer │ │
│ └──────────────────┘ └─────────────────┘ └─────────────┘ │
│ │ │ │ │
│ ┌──────────▼──────────────────▼──────────────────▼──────┐ │
│ │ ParameterStore │ │
│ │ - 参数版本管理 │ │
│ │ - 最佳参数缓存 │ │
│ │ - 知识迁移 │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
参数优化算法
贝叶斯优化器设计
贝叶斯优化通过构建代理模型(高斯过程或TPE)来建模目标函数,从而用更少的评估次数找到最优参数。
核心算法选择
推荐方案: Optuna (TPE算法)
选择理由: 1. 成熟稳定,广泛使用 2. 支持并行优化 3. 内置剪枝和早停 4. 轻量级依赖 5. 活跃社区支持
备选方案: - Scikit-Optimize: 基于高斯过程,适合小规模搜索空间 - BoTorch: 适合大规模并行优化,但依赖较重
算法实现框架
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, Any, List, Optional, Callable
import numpy as np
@dataclass
class OptimizationTrial:
"""优化试验"""
trial_id: str
params: Dict[str, Any]
score: float
metrics: Dict[str, float]
timestamp: float
converged: bool = False
@dataclass
class OptimizationState:
"""优化状态"""
current_trial: int
best_trial: OptimizationTrial
convergence_rate: float
should_stop: bool = False
class BaseOptimizer(ABC):
"""优化器基类"""
def __init__(
self,
search_space: Dict[str, Any],
objective: Callable[[Dict[str, Any]], float],
config: Dict[str, Any]
):
self.search_space = search_space
self.objective = objective
self.config = config
self.history: List[OptimizationTrial] = []
@abstractmethod
def suggest(self) -> Dict[str, Any]:
"""建议下一组参数"""
pass
@abstractmethod
def observe(self, params: Dict[str, Any], score: float) -> None:
"""观察评估结果"""
pass
@abstractmethod
def should_stop(self) -> bool:
"""判断是否应该停止"""
pass
class BayesianOptimizer(BaseOptimizer):
"""贝叶斯优化器 (Optuna TPE)"""
def __init__(
self,
search_space: Dict[str, Any],
objective: Callable[[Dict[str, Any]], float],
config: Dict[str, Any]
):
super().__init__(search_space, objective, config)
self.study = self._create_study()
self.n_trials = config.get("n_trials", 50)
self.timeout = config.get("timeout", 120)
def _create_study(self):
"""创建Optuna研究"""
try:
import optuna
study = optuna.create_study(
direction="minimize",
sampler=optuna.samplers.TPESampler(
seed=self.config.get("seed", 42),
multivariate=True,
group=True
),
pruner=optuna.pruners.MedianPruner(
n_startup_trials=5,
n_warmup_steps=10
)
)
return study
except ImportError:
raise ImportError("Optuna未安装,请运行: pip install optuna")
def suggest(self) -> Dict[str, Any]:
"""建议下一组参数"""
trial = self.study.ask()
params = {}
for name, space in self.search_space.items():
if space["type"] == "categorical":
params[name] = trial.suggest_categorical(
name, space["choices"]
)
elif space["type"] == "int":
params[name] = trial.suggest_int(
name, space["min"], space["max"]
)
elif space["type"] == "float":
params[name] = trial.suggest_float(
name, space["min"], space["max"]
)
return params
def observe(self, params: Dict[str, Any], score: float) -> None:
"""观察评估结果"""
self.study.tell(score)
# 记录历史
trial = OptimizationTrial(
trial_id=f"trial_{len(self.history)}",
params=params,
score=score,
metrics={},
timestamp=time.time()
)
self.history.append(trial)
def should_stop(self) -> bool:
"""判断是否应该停止"""
# 收敛性检测
if len(self.history) < 10:
return False
# 检查最近10次试验的改进
recent_scores = [t.score for t in self.history[-10:]]
improvement = (recent_scores[0] - recent_scores[-1]) / recent_scores[0]
if improvement < self.config.get("min_improvement", 0.01):
return True
return len(self.history) >= self.n_trials
def optimize(self) -> OptimizationTrial:
"""运行完整优化"""
while not self.should_stop():
params = self.suggest()
score = self.objective(params)
self.observe(params, score)
return self.history[
min(range(len(self.history)), key=lambda i: self.history[i].score)
]
多目标优化器
class MultiObjectiveOptimizer(BaseOptimizer):
"""多目标优化器 (Pareto前沿)"""
def __init__(
self,
search_space: Dict[str, Any],
objectives: List[Callable[[Dict[str, Any]], float]],
weights: Optional[List[float]] = None,
config: Dict[str, Any] = None
):
super().__init__(search_space, lambda x: 0, config or {})
self.objectives = objectives
self.weights = weights or [1.0] * len(objectives)
self.pareto_front: List[OptimizationTrial] = []
def _weighted_score(self, params: Dict[str, Any]) -> float:
"""计算加权分数"""
scores = [obj(params) for obj in self.objectives]
return sum(s * w for s, w in zip(scores, self.weights))
def _update_pareto_front(self, trial: OptimizationTrial) -> None:
"""更新Pareto前沿"""
# 检查是否被支配
dominated = False
for existing in self.pareto_front:
if all(
existing.metrics.get(f"obj_{i}", float('inf')) <=
trial.metrics.get(f"obj_{i}", float('inf'))
for i in range(len(self.objectives))
):
dominated = True
break
if not dominated:
# 移除被新试验支配的
self.pareto_front = [
t for t in self.pareto_front
if not all(
trial.metrics.get(f"obj_{i}", float('inf')) <=
t.metrics.get(f"obj_{i}", float('inf'))
for i in range(len(self.objectives))
)
]
self.pareto_front.append(trial)
def get_pareto_front(self) -> List[OptimizationTrial]:
"""获取Pareto前沿"""
return sorted(self.pareto_front, key=lambda t: t.score)
敏感性分析器
class SensitivityAnalyzer:
"""参数敏感性分析器"""
def __init__(
self,
search_space: Dict[str, Any],
objective: Callable[[Dict[str, Any]], float],
base_params: Dict[str, Any]
):
self.search_space = search_space
self.objective = objective
self.base_params = base_params
def analyze(
self,
n_samples: int = 100,
method: str = "sobol"
) -> Dict[str, float]:
"""分析参数敏感性
Returns:
参数名 -> 敏感性分数 (0-1)
"""
sensitivities = {}
base_score = self.objective(self.base_params)
for param_name, space in self.search_space.items():
# 单变量扰动分析
perturbations = []
if space["type"] == "categorical":
# 测试所有类别
for value in space["choices"]:
params = self.base_params.copy()
params[param_name] = value
score = self.objective(params)
perturbations.append(abs(score - base_score))
elif space["type"] in ("int", "float"):
# 在范围内采样
values = np.linspace(space["min"], space["max"], min(10, n_samples))
for value in values:
params = self.base_params.copy()
params[param_name] = value
score = self.objective(params)
perturbations.append(abs(score - base_score))
# 计算敏感性(归一化)
sensitivity = np.mean(perturbations) / (base_score + 1e-6)
sensitivities[param_name] = min(1.0, sensitivity)
return sensitivities
参数管理系统
参数持久化架构
from pathlib import Path
from typing import Dict, Any, List, Optional
from datetime import datetime
import hashlib
import json
class ParameterVersion:
"""参数版本"""
def __init__(
self,
params: Dict[str, Any],
metadata: Dict[str, Any],
parent_version: Optional[str] = None
):
self.version_id = self._generate_id(params)
self.params = params
self.metadata = metadata
self.parent_version = parent_version
self.created_at = datetime.now()
self.checksum = self._calculate_checksum(params)
def _generate_id(self, params: Dict[str, Any]) -> str:
"""生成版本ID"""
param_str = json.dumps(params, sort_keys=True)
return hashlib.sha256(param_str.encode()).hexdigest()[:16]
def _calculate_checksum(self, params: Dict[str, Any]) -> str:
"""计算校验和"""
param_str = json.dumps(params, sort_keys=True)
return hashlib.md5(param_str.encode()).hexdigest()
class ParameterStore:
"""参数存储(后端抽象)"""
def save(self, version: ParameterVersion) -> bool:
"""保存参数版本"""
raise NotImplementedError
def load(self, version_id: str) -> Optional[ParameterVersion]:
"""加载参数版本"""
raise NotImplementedError
def list_versions(self, filter: Dict[str, Any] = None) -> List[ParameterVersion]:
"""列出参数版本"""
raise NotImplementedError
def delete(self, version_id: str) -> bool:
"""删除参数版本"""
raise NotImplementedError
class FileSystemParameterStore(ParameterStore):
"""文件系统参数存储"""
def __init__(self, base_path: Path):
self.base_path = base_path
self.base_path.mkdir(parents=True, exist_ok=True)
self.index_path = self.base_path / "index.json"
self.versions_path = self.base_path / "versions"
self.versions_path.mkdir(exist_ok=True)
self._load_index()
def _load_index(self):
"""加载索引"""
if self.index_path.exists():
with open(self.index_path, 'r') as f:
self.index = json.load(f)
else:
self.index = {
"versions": {},
"by_project": {},
"by_goal": {},
"best": {}
}
def _save_index(self):
"""保存索引"""
with open(self.index_path, 'w') as f:
json.dump(self.index, f, indent=2, default=str)
def save(self, version: ParameterVersion) -> bool:
"""保存参数版本"""
# 保存版本文件
version_file = self.versions_path / f"{version.version_id}.json"
version_file.write_text(json.dumps({
"params": version.params,
"metadata": version.metadata,
"parent_version": version.parent_version,
"created_at": version.created_at.isoformat(),
"checksum": version.checksum
}, indent=2))
# 更新索引
project = version.metadata.get("project", "default")
goal = version.metadata.get("goal", "unknown")
self.index["versions"][version.version_id] = {
"created_at": version.created_at.isoformat(),
"checksum": version.checksum,
"project": project,
"goal": goal
}
if project not in self.index["by_project"]:
self.index["by_project"][project] = []
self.index["by_project"][project].append(version.version_id)
if goal not in self.index["by_goal"]:
self.index["by_goal"][goal] = []
self.index["by_goal"][goal].append(version.version_id)
# 更新最佳参数
best_key = f"{project}:{goal}"
if best_key not in self.index["best"]:
self.index["best"][best_key] = version.version_id
else:
# 比较分数
existing_version = self.load(self.index["best"][best_key])
if existing_version and version.metadata.get("score", float('inf')) < existing_version.metadata.get("score", float('inf')):
self.index["best"][best_key] = version.version_id
self._save_index()
return True
def load(self, version_id: str) -> Optional[ParameterVersion]:
"""加载参数版本"""
version_file = self.versions_path / f"{version_id}.json"
if not version_file.exists():
return None
data = json.loads(version_file.read_text())
return ParameterVersion(
params=data["params"],
metadata=data["metadata"],
parent_version=data.get("parent_version")
)
def get_best_params(self, project: str, goal: str) -> Optional[Dict[str, Any]]:
"""获取最佳参数"""
best_key = f"{project}:{goal}"
version_id = self.index["best"].get(best_key)
if version_id:
version = self.load(version_id)
return version.params if version else None
return None
参数缓存管理
from functools import lru_cache
from typing import Tuple
class ParameterCache:
"""参数缓存管理器"""
def __init__(self, max_size: int = 1000):
self.max_size = max_size
self._cache = {}
def _make_key(self, params: Dict[str, Any], context: str) -> str:
"""生成缓存键"""
param_str = json.dumps(params, sort_keys=True)
return hashlib.md5(f"{context}:{param_str}".encode()).hexdigest()
def get(self, params: Dict[str, Any], context: str) -> Optional[float]:
"""获取缓存结果"""
key = self._make_key(params, context)
return self._cache.get(key)
def set(self, params: Dict[str, Any], context: str, result: float) -> None:
"""设置缓存"""
if len(self._cache) >= self.max_size:
# LRU淘汰
oldest = next(iter(self._cache))
del self._cache[oldest]
key = self._make_key(params, context)
self._cache[key] = result
def invalidate(self, context: str = None) -> None:
"""失效缓存"""
if context:
keys_to_delete = [k for k in self._cache if k.startswith(context)]
for key in keys_to_delete:
del self._cache[key]
else:
self._cache.clear()
知识迁移
class KnowledgeTransfer:
"""参数知识迁移"""
def __init__(self, store: ParameterStore):
self.store = store
def transfer(
self,
source_project: str,
target_project: str,
goal: str,
similarity_threshold: float = 0.7
) -> Optional[Dict[str, Any]]:
"""从源项目迁移参数到目标项目
Args:
source_project: 源项目名称
target_project: 目标项目名称
goal: 优化目标
similarity_threshold: 相似度阈值
Returns:
迁移后的参数(如果相似度足够高)
"""
# 获取源项目的最佳参数
source_params = self.store.get_best_params(source_project, goal)
if not source_params:
return None
# 计算项目相似度(简化版)
similarity = self._calculate_similarity(
source_project, target_project
)
if similarity >= similarity_threshold:
# 应用迁移调整
return self._adjust_parameters(
source_params, source_project, target_project
)
return None
def _calculate_similarity(self, project1: str, project2: str) -> float:
"""计算项目相似度"""
# 简化版:基于项目元数据
# 实际实现可以考虑:
# - 代码结构相似度
# - 技术栈相似度
# - 规模相似度
return 0.8 # 占位符
def _adjust_parameters(
self,
params: Dict[str, Any],
source_project: str,
target_project: str
) -> Dict[str, Any]:
"""调整参数以适应目标项目"""
# 根据项目规模等调整参数
# 简化版:直接返回原参数
return params.copy()
评估框架
评估指标设计
from dataclasses import dataclass
from typing import Dict, Any, List
from enum import Enum
class MetricType(Enum):
"""指标类型"""
STRUCTURE = "structure"
PERFORMANCE = "performance"
SIMPLICITY = "simplicity"
QUALITY = "quality"
@dataclass
class Metric:
"""评估指标"""
name: str
value: float
type: MetricType
weight: float = 1.0
higher_is_better: bool = False
@dataclass
class EvaluationResult:
"""评估结果"""
metrics: Dict[str, Metric]
overall_score: float
converged: bool
metadata: Dict[str, Any]
class EvaluationFramework:
"""评估框架"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.metrics_weights = config.get("metrics_weights", {})
self.convergence_threshold = config.get("convergence_threshold", 0.01)
def evaluate(
self,
params: Dict[str, Any],
evaluators: Dict[str, Any],
goal: str
) -> EvaluationResult:
"""综合评估
Args:
params: 参数配置
evaluators: 评估器字典
goal: 优化目标
Returns:
评估结果
"""
metrics = {}
weighted_scores = []
# 结构评估
if "structure" in evaluators and goal in ("structure", "all"):
structure_result = evaluators["structure"].evaluate(params)
metrics["structure_violations"] = Metric(
name="structure_violations",
value=structure_result,
type=MetricType.STRUCTURE,
weight=self.metrics_weights.get("structure", 1.0)
)
weighted_scores.append(
metrics["structure_violations"].value *
metrics["structure_violations"].weight
)
# 性能评估
if "performance" in evaluators and goal in ("performance", "all"):
perf_result = evaluators["performance"].evaluate(params)
metrics["execution_time"] = Metric(
name="execution_time",
value=perf_result,
type=MetricType.PERFORMANCE,
weight=self.metrics_weights.get("performance", 1.0)
)
weighted_scores.append(
metrics["execution_time"].value *
metrics["execution_time"].weight
)
# 简洁性评估
if "simplicity" in evaluators and goal in ("simplicity", "all"):
simplicity_result = evaluators["simplicity"].evaluate(params)
metrics["complexity_score"] = Metric(
name="complexity_score",
value=simplicity_result,
type=MetricType.SIMPLICITY,
weight=self.metrics_weights.get("simplicity", 1.0)
)
weighted_scores.append(
metrics["complexity_score"].value *
metrics["complexity_score"].weight
)
# 计算总分
overall_score = sum(weighted_scores) / len(weighted_scores) if weighted_scores else 0
# 检测收敛
converged = self._check_convergence(metrics)
return EvaluationResult(
metrics=metrics,
overall_score=overall_score,
converged=converged,
metadata={"params": params}
)
def _check_convergence(self, metrics: Dict[str, Metric]) -> bool:
"""检测收敛"""
# 简化版:基于指标变化率
# 实际实现需要历史数据
return False
A/B测试框架
class ABTestFramework:
"""A/B测试框架"""
def __init__(
self,
evaluator: EvaluationFramework,
min_samples: int = 5,
confidence_level: float = 0.95
):
self.evaluator = evaluator
self.min_samples = min_samples
self.confidence_level = confidence_level
def compare(
self,
params_a: Dict[str, Any],
params_b: Dict[str, Any],
evaluators: Dict[str, Any],
goal: str
) -> Dict[str, Any]:
"""比较两组参数
Returns:
比较结果,包含统计显著性
"""
# 多次评估A
scores_a = []
for _ in range(self.min_samples):
result = self.evaluator.evaluate(params_a, evaluators, goal)
scores_a.append(result.overall_score)
# 多次评估B
scores_b = []
for _ in range(self.min_samples):
result = self.evaluator.evaluate(params_b, evaluators, goal)
scores_b.append(result.overall_score)
# 统计检验
from scipy import stats
t_stat, p_value = stats.ttest_ind(scores_a, scores_b)
# 计算改善比例
mean_a = np.mean(scores_a)
mean_b = np.mean(scores_b)
improvement = (mean_a - mean_b) / mean_a
return {
"params_a": params_a,
"params_b": params_b,
"mean_a": mean_a,
"mean_b": mean_b,
"improvement": improvement,
"p_value": p_value,
"significant": p_value < (1 - self.confidence_level),
"winner": "a" if mean_a < mean_b else "b"
}
收敛性检测
class ConvergenceDetector:
"""收敛性检测器"""
def __init__(
self,
window_size: int = 10,
threshold: float = 0.01,
min_trials: int = 20
):
self.window_size = window_size
self.threshold = threshold
self.min_trials = min_trials
self.history: List[float] = []
def update(self, score: float) -> bool:
"""更新并检查收敛
Returns:
是否收敛
"""
self.history.append(score)
if len(self.history) < self.min_trials:
return False
# 检查最近window_size个分数的变化
recent = self.history[-self.window_size:]
# 计算标准差
std = np.std(recent)
# 计算相对变化
mean = np.mean(recent)
relative_change = std / (mean + 1e-6)
return relative_change < self.threshold
def get_convergence_rate(self) -> float:
"""获取收敛率"""
if len(self.history) < self.window_size:
return 0.0
recent = self.history[-self.window_size:]
mean = np.mean(recent)
std = np.std(recent)
return 1.0 - min(1.0, std / (mean + 1e-6))
技术栈选型
核心依赖
| 库名 | 版本 | 用途 | 优先级 |
|---|---|---|---|
| optuna | >=3.0 | 贝叶斯优化 (TPE) | 必须 |
| scikit-optimize | >=0.9 | 备选优化算法 | 可选 |
| numpy | >=1.20 | 数值计算 | 必须 |
| scipy | >=1.7 | 统计检验 | 推荐 |
| pyyaml | >=5.4 | 配置文件 | 必须 |
可选依赖
| 库名 | 版本 | 用途 | 优先级 |
|---|---|---|---|
| plotly | >=5.0 | 可视化 | 推荐 |
| rich | >=12.0 | 终端输出 | 可选 |
| sqlalchemy | >=1.4 | 数据库存储 | 可选 |
依赖安装
# 核心依赖
pip install optuna numpy scipy pyyaml
# 可选依赖
pip install scikit-optimize plotly rich
# 开发依赖
pip install pytest pytest-cov mypy black
接口设计
优化引擎主接口
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass
from pathlib import Path
@dataclass
class OptimizationRequest:
"""优化请求"""
# 目标配置
target_path: str
goal: str # structure, performance, simplicity, all
# 搜索空间
search_space: Dict[str, Any]
# 约束
constraints: Dict[str, Any] = None
# 配置
max_time: float = 120 # 秒
max_trials: int = 50
early_stopping: bool = True
enable_cache: bool = True
enable_transfer: bool = True
# 元数据
project_name: str = "default"
experiment_name: str = None
@dataclass
class OptimizationResult:
"""优化结果"""
# 最佳参数
best_params: Dict[str, Any]
best_score: float
# 统计
n_trials: int
total_time: float
# 历史记录
trials: List[Dict[str, Any]]
# 收敛信息
converged: bool
convergence_rate: float
# 敏感性分析
sensitivities: Dict[str, float] = None
# Pareto前沿(多目标)
pareto_front: List[Dict[str, Any]] = None
# 错误信息
error: Optional[str] = None
class OptimizationEngine:
"""优化引擎主类"""
def __init__(
self,
config: Dict[str, Any] = None,
store: ParameterStore = None
):
self.config = config or {}
self.store = store or FileSystemParameterStore(
Path.home() / ".lingflow" / "parameters"
)
# 初始化组件
self.cache = ParameterCache()
self.evaluator = EvaluationFramework(self.config)
self.convergence_detector = ConvergenceDetector()
self.transfer = KnowledgeTransfer(self.store)
def optimize(self, request: OptimizationRequest) -> OptimizationResult:
"""执行优化
Args:
request: 优化请求
Returns:
优化结果
"""
# 1. 尝试参数迁移
if request.enable_transfer:
transferred_params = self.transfer.transfer(
source_project=None, # 自动选择相似项目
target_project=request.project_name,
goal=request.goal
)
if transferred_params:
# 使用迁移参数作为起点
pass
# 2. 选择优化器
if request.goal == "all":
optimizer = MultiObjectiveOptimizer(...)
else:
optimizer = BayesianOptimizer(...)
# 3. 运行优化
trials = []
start_time = time.time()
while not self._should_stop(request, trials, start_time):
# 建议参数
params = optimizer.suggest()
# 检查缓存
cache_key = self._make_cache_key(params, request)
if request.enable_cache:
cached_score = self.cache.get(params, cache_key)
if cached_score is not None:
score = cached_score
else:
score = self._evaluate(params, request)
self.cache.set(params, cache_key, score)
else:
score = self._evaluate(params, request)
# 观察结果
optimizer.observe(params, score)
# 记录试验
trials.append({
"params": params,
"score": score,
"timestamp": time.time() - start_time
})
# 检查收敛
if self.convergence_detector.update(score):
break
# 4. 敏感性分析
sensitivities = self._analyze_sensitivity(
optimizer, request
)
# 5. 保存结果
self._save_result(request, trials)
return OptimizationResult(
best_params=optimizer.best_params,
best_score=optimizer.best_score,
n_trials=len(trials),
total_time=time.time() - start_time,
trials=trials,
converged=self.convergence_detector.get_convergence_rate() > 0.9,
convergence_rate=self.convergence_detector.get_convergence_rate(),
sensitivities=sensitivities
)
def _evaluate(
self,
params: Dict[str, Any],
request: OptimizationRequest
) -> float:
"""评估参数"""
# 选择评估器
evaluators = self._get_evaluators(request)
# 综合评估
result = self.evaluator.evaluate(
params, evaluators, request.goal
)
return result.overall_score
def _should_stop(
self,
request: OptimizationRequest,
trials: List[Dict],
start_time: float
) -> bool:
"""判断是否应该停止"""
# 时间限制
if time.time() - start_time >= request.max_time:
return True
# 试验次数限制
if len(trials) >= request.max_trials:
return True
return False
def _analyze_sensitivity(
self,
optimizer: BaseOptimizer,
request: OptimizationRequest
) -> Dict[str, float]:
"""分析参数敏感性"""
analyzer = SensitivityAnalyzer(
search_space=request.search_space,
objective=lambda p: self._evaluate(p, request),
base_params=optimizer.best_params
)
return analyzer.analyze(n_samples=50)
CLI接口
import click
@click.group()
def optimize():
"""参数优化命令"""
pass
@optimize.command()
@click.option("--target", "-t", default=".", help="目标路径")
@click.option("--goal", "-g", type=click.Choice(["structure", "performance", "simplicity", "all"]),
default="structure", help="优化目标")
@click.option("--max-time", type=float, default=120, help="最大时间(秒)")
@click.option("--max-trials", type=int, default=50, help="最大试验次数")
@click.option("--output", "-o", help="输出报告路径")
@click.option("--cache/--no-cache", default=True, help="启用缓存")
@click.option("--transfer/--no-transfer", default=True, help="启用知识迁移")
@click.option("--verbose", "-v", is_flag=True, help="详细输出")
def run(target, goal, max_time, max_trials, output, cache, transfer, verbose):
"""运行参数优化"""
from lingflow.self_optimizer.phase4 import OptimizationEngine, OptimizationRequest
# 创建请求
request = OptimizationRequest(
target_path=target,
goal=goal,
search_space=_get_default_search_space(goal),
max_time=max_time,
max_trials=max_trials,
enable_cache=cache,
enable_transfer=transfer
)
# 创建引擎
engine = OptimizationEngine()
# 运行优化
with click.progressbar(length=100, label="优化进度") as bar:
result = engine.optimize(request)
bar.update(100)
# 输出结果
_print_result(result, verbose)
# 保存报告
if output:
_save_report(result, output)
@optimize.command()
@click.option("--project", "-p", help="项目名称")
@click.option("--goal", "-g", help="优化目标")
def best(project, goal):
"""查看最佳参数"""
from lingflow.self_optimizer.phase4 import FileSystemParameterStore
store = FileSystemParameterStore(Path.home() / ".lingflow" / "parameters")
params = store.get_best_params(project or "default", goal or "structure")
if params:
click.echo("最佳参数:")
for key, value in sorted(params.items()):
click.echo(f" {key}: {value}")
else:
click.echo("未找到保存的参数")
@optimize.command()
@click.option("--format", type=click.Choice(["json", "yaml", "table"]), default="table")
def history(format):
"""查看优化历史"""
pass
@optimize.command()
@click.option("--goal", "-g", required=True, help="优化目标")
@click.option("--output", "-o", required=True, help="输出路径")
def export(goal, output):
"""导出最佳参数配置"""
pass
集成策略
与现有系统集成
1. 保持向后兼容
# lingflow/self_optimizer/__init__.py
from lingflow.self_optimizer.optimizer import (
ProcessIsolatedOptimizer,
SynchronousOptimizer,
OptimizationRequest as LegacyRequest,
OptimizationResult as LegacyResult
)
# 新的Phase 4接口
from lingflow.self_optimizer.phase4 import (
OptimizationEngine,
OptimizationRequest,
OptimizationResult
)
# 适配器:将旧接口转换为新接口
class LegacyOptimizerAdapter:
"""旧版优化器适配器"""
def __init__(self, use_phase4: bool = True):
self.use_phase4 = use_phase4
if use_phase4:
self.engine = OptimizationEngine()
else:
self.optimizer = SynchronousOptimizer()
def optimize(self, request: LegacyRequest) -> LegacyResult:
"""适配优化接口"""
if self.use_phase4:
# 转换请求
new_request = OptimizationRequest(
target_path=request.target,
goal=request.goal,
search_space=_convert_search_space(request.goal),
config=request.config
)
# 运行新优化器
new_result = self.engine.optimize(new_request)
# 转换结果
return LegacyResult(
success=new_result.error is None,
best_params=new_result.best_params,
best_score=new_result.best_score,
experiments=new_result.n_trials,
duration=new_result.total_time,
history=new_result.trials
)
else:
# 使用旧优化器
return self.optimizer.optimize(request)
2. 配置文件扩展
# ~/.lingflow/config.yaml
# 现有配置保持不变...
# Phase 4 新增配置
phase4:
enabled: true
optimizer:
algorithm: "bayesian" # bayesian, random, grid
backend: "optuna" # optuna, skopt
search_spaces:
structure:
max_class_size:
type: "int"
min: 100
max: 500
step: 50
max_method_count:
type: "categorical"
choices: [10, 15, 20, 25]
cache:
enabled: true
max_size: 1000
ttl: 86400 # 24小时
transfer:
enabled: true
similarity_threshold: 0.7
convergence:
window_size: 10
threshold: 0.01
min_trials: 20
3. Hook集成
# 在现有hook中集成Phase 4优化
def post_review_hook(context):
"""代码审查后hook"""
# 检查是否启用Phase 4
config = get_global_config()
if config.get("phase4.enabled", False):
from lingflow.self_optimizer.phase4 import OptimizationEngine
engine = OptimizationEngine()
# ... 运行优化
else:
# 使用旧版优化器
from lingflow.self_optimizer import quick_optimize
quick_optimize(...)
实施路线图
阶段1: 基础架构 (Week 1-2)
目标: 建立核心架构和基础组件
任务:
1. 创建新模块结构 lingflow/self_optimizer/phase4/
2. 实现参数存储后端(文件系统)
3. 实现参数缓存机制
4. 编写单元测试
交付物:
- lingflow/self_optimizer/phase4/__init__.py
- lingflow/self_optimizer/phase4/storage.py
- lingflow/self_optimizer/phase4/cache.py
- 单元测试覆盖率 >80%
验收标准: - 可以保存和加载参数版本 - 缓存命中率 >50% - 所有测试通过
阶段2: 贝叶斯优化器 (Week 3-4)
目标: 集成Optuna,实现智能优化
任务: 1. 集成Optuna库 2. 实现BayesianOptimizer类 3. 实现搜索空间定义 4. 实现收敛性检测
交付物:
- lingflow/self_optimizer/phase4/optimizer.py
- lingflow/self_optimizer/phase4/search_space.py
- 集成测试
验收标准: - 优化时间减少 >50% - 参数质量提升 >20% - 与现有评估器兼容
阶段3: 多目标与敏感性分析 (Week 5-6)
目标: 实现高级优化功能
任务: 1. 实现MultiObjectiveOptimizer 2. 实现SensitivityAnalyzer 3. 实现KnowledgeTransfer 4. 实现ABTestFramework
交付物:
- lingflow/self_optimizer/phase4/multi_objective.py
- lingflow/self_optimizer/phase4/sensitivity.py
- lingflow/self_optimizer/phase4/transfer.py
- lingflow/self_optimizer/phase4/ab_test.py
验收标准: - Pareto前沿正确计算 - 敏感性分析准确 - 跨项目迁移可用
阶段4: 集成与CLI (Week 7-8)
目标: 完成集成和用户界面
任务: 1. 实现OptimizationEngine主类 2. 实现CLI命令 3. 实现报告生成 4. 编写文档
交付物:
- lingflow/self_optimizer/phase4/engine.py
- lingflow/cli.py (更新)
- docs/phase4-user-guide.md
- 集成测试
验收标准: - CLI命令完整可用 - 向后兼容性100% - 文档完整
阶段5: 优化与部署 (Week 9-10)
目标: 性能优化和生产部署
任务: 1. 性能优化 2. 内存优化 3. 生产环境测试 4. 发布准备
交付物: - 性能测试报告 - 部署指南 - v3.8.0发布
验收标准: - 192类项目优化 <60秒 - 内存占用 <200MB - 零重大bug
性能约束
时间约束
| 项目规模 | 类数量 | 目标时间 | 当前时间 | 改进 |
|---|---|---|---|---|
| 小型 | <50 | <20秒 | ~30秒 | 33% |
| 中型 | 50-150 | <45秒 | ~90秒 | 50% |
| 大型 | 150-200 | <60秒 | ~120秒 | 50% |
内存约束
- 峰值内存: <200MB
- 缓存大小: <50MB
- 参数历史: <10MB
质量约束
- 参数命中率: >60%
- 收敛准确率: >85%
- 跨项目迁移成功率: >50%
附录
A. 搜索空间示例
DEFAULT_SEARCH_SPACES = {
"structure": {
"max_class_size": {
"type": "int",
"min": 100,
"max": 500,
"step": 50
},
"max_method_count": {
"type": "categorical",
"choices": [10, 15, 20, 25]
},
"max_complexity": {
"type": "int",
"min": 5,
"max": 20,
"step": 5
},
"coupling_limit": {
"type": "float",
"min": 5.0,
"max": 15.0,
"step": 1.0
}
},
"performance": {
"cache_size": {
"type": "categorical",
"choices": [10, 50, 100, 500]
},
"parallelism": {
"type": "int",
"min": 1,
"max": 4,
"step": 1
},
"timeout": {
"type": "categorical",
"choices": [5, 10, 30, 60]
}
},
"simplicity": {
"complexity_threshold": {
"type": "int",
"min": 5,
"max": 15,
"step": 5
},
"duplication_penalty": {
"type": "float",
"min": 0.5,
"max": 2.0,
"step": 0.25
},
"max_line_length": {
"type": "categorical",
"choices": [80, 100, 120]
}
}
}
B. 配置文件示例
# ~/.lingflow/phase4-config.yaml
# 优化器配置
optimizer:
# 算法选择: bayesian, random, grid
algorithm: bayesian
# 后端选择: optuna, skopt
backend: optuna
# 并行配置
n_jobs: 1
timeout: 120
n_trials: 50
# 早停配置
early_stopping: true
patience: 10
min_improvement: 0.01
# 搜索空间配置
search_spaces:
structure:
max_class_size: {min: 100, max: 500, step: 50}
max_method_count: {choices: [10, 15, 20, 25]}
max_complexity: {min: 5, max: 20, step: 5}
# 缓存配置
cache:
enabled: true
max_size: 1000
ttl: 86400 # 24小时
# 知识迁移配置
transfer:
enabled: true
similarity_threshold: 0.7
max_sources: 5
# 收敛检测配置
convergence:
window_size: 10
threshold: 0.01
min_trials: 20
# 日志配置
logging:
level: INFO
file: ~/.lingflow/phase4.log
C. API参考
# 快速开始
from lingflow.self_optimizer.phase4 import OptimizationEngine, OptimizationRequest
engine = OptimizationEngine()
request = OptimizationRequest(
target_path="./my_project",
goal="structure",
search_space={"max_class_size": {"type": "int", "min": 100, "max": 500}},
max_time=60
)
result = engine.optimize(request)
print(f"最佳参数: {result.best_params}")
print(f"最佳分数: {result.best_score}")
文档版本: v1.0 最后更新: 2026-03-31 维护者: LingFlow团队