双/多工程流系统 - 快速开始指南
版本: v1.0 最后更新: 2026-03-31 状态: ✅ 可用
📚 文档导航
| 文档 | 说明 |
|---|---|
| MULTI_WORKFLOW_DESIGN.md | 完整设计文档 |
| multi_workflow.py | 核心实现 |
| 本文档 | 快速开始指南 |
🎯 5分钟上手
1. 基础概念
双工程流: 快速流(YOLO模式)+ 稳定流(生产就绪) 多工程流: 开发、测试、文档、优化、审查、部署等专业分工
2. 最简示例
# quick_start.py
import asyncio
from lingflow.workflow.multi_workflow import (
MultiWorkflowCoordinator,
FastTrackWorkflow,
StableTrackWorkflow,
ExecutionStrategy
)
async def main():
# 创建协调器
coordinator = MultiWorkflowCoordinator(max_parallel_workflows=2)
# 创建双工程流
fast = FastTrackWorkflow("fast_demo")
stable = StableTrackWorkflow("stable_demo")
# 注册
coordinator.register_workflow(fast)
coordinator.register_workflow(stable)
# 执行
results = await coordinator.execute_all(
strategy=ExecutionStrategy.PARALLEL
)
# 结果
for wf_id, result in results.items():
print(f"{wf_id}: {result}")
asyncio.run(main())
运行:
🚀 常见用法
用法1: YOLO模式(快速工程流)
from lingflow.workflow.multi_workflow import FastTrackWorkflow
from lingflow.common.models import Task, TaskPriority
# 创建快速流
fast_track = FastTrackWorkflow("yolo_feature")
# 添加任务
fast_track.add_task(Task(
task_id="quick_test",
name="Quick Test",
description="Skip full tests, just syntax check",
priority=TaskPriority.HIGH,
agent_type="testing"
))
# 执行
result = await fast_track.execute({})
# 提交(自动跳过hooks)
if result.success:
print("✅ YOLO commit complete!")
用法2: 生产发布(稳定工程流)
from lingflow.workflow.multi_workflow import StableTrackWorkflow
stable = StableTrackWorkflow("production_release")
# 添加完整测试任务
stable.add_task(Task(
task_id="full_suite",
name="Full Test Suite",
description="Complete testing with 70% coverage",
agent_type="testing"
))
stable.add_task(Task(
task_id="code_review",
name="Code Review",
description="Senior developer review required",
agent_type="review"
))
# 执行(包含所有质量检查)
result = await stable.execute({})
if result.success:
print("✅ Production ready!")
用法3: 多团队并行
from lingflow.workflow.multi_workflow import (
MultiWorkflowCoordinator,
DevWorkflow,
TestWorkflow,
DocWorkflow,
ExecutionStrategy
)
coordinator = MultiWorkflowCoordinator(max_parallel_workflows=3)
# 开发流
dev = DevWorkflow("feature_dev")
dev.add_task(Task(task_id="code", agent_type="implementation"))
# 测试流(依赖开发)
test = TestWorkflow("feature_test")
test.add_dependency("feature_dev")
test.add_task(Task(task_id="test", agent_type="testing"))
# 文档流(依赖开发)
doc = DocWorkflow("feature_doc")
doc.add_dependency("feature_dev")
doc.add_task(Task(task_id="doc", agent_type="documentation"))
# 注册并执行
for wf in [dev, test, doc]:
coordinator.register_workflow(wf)
# 并行执行测试和文档
results = await coordinator.execute_all(
strategy=ExecutionStrategy.HYBRID
)
用法4: 工程流提升
# 快速验证
fast = FastTrackWorkflow("prototype")
result = await fast.execute({})
# 提升到稳定流
if result.success and fast.can_promote_to(WorkflowType.STABLE):
stable = coordinator.promote_workflow(
from_workflow_id="prototype",
to_type=WorkflowType.STABLE,
new_workflow_id="production"
)
# 运行完整测试
result = await stable.execute({})
⚙️ 配置选项
WorkflowConfig 参数
from lingflow.workflow.multi_workflow import WorkflowConfig
config = WorkflowConfig(
# 跳过的步骤
skip_steps=["full_test_suite", "code_review"],
# 必需的步骤
required_steps=["syntax_check", "unit_test"],
# 质量阈值
quality_thresholds={
"test_coverage": 0.70,
"code_quality": 9.0
},
# 自动提交
auto_commit=False,
# 绕过hooks
bypass_hooks=False,
# 并行执行
parallel_execution=True
)
workflow = FastTrackWorkflow("my_wf", config=config)
执行策略对比
| 策略 | 适用场景 | 特点 |
|---|---|---|
| PARALLEL | 独立任务 | 最快,无依赖 |
| SEQUENTIAL | 依赖任务 | 按顺序执行 |
| HYBRID | 混合场景 | 关键路径优先 |
📊 工程流类型
快速工程流 (FastTrackWorkflow)
用途: YOLO模式、快速原型、紧急修复
稳定工程流 (StableTrackWorkflow)
用途: 生产发布、重要功能
开发工程流 (DevWorkflow)
用途: 功能开发
配置: 平衡质量和速度
测试工程流 (TestWorkflow)
用途: 全面测试
配置: 单元、集成、E2E、性能测试
文档工程流 (DocWorkflow)
用途: 文档生成
配置: API文档、用户指南、示例
优化工程流 (OptimizeWorkflow)
用途: 代码优化
配置: 性能、内存、复杂度优化
审查工程流 (ReviewWorkflow)
用途: 代码审查
配置: 安全、架构、最佳实践
部署工程流 (DeployWorkflow)
用途: 生产部署
配置: 蓝绿部署、回滚准备
🔧 高级用法
自定义工程流
from lingflow.workflow.multi_workflow import BaseWorkflow, WorkflowType
class CustomWorkflow(BaseWorkflow):
def __init__(self, workflow_id: str):
super().__init__(
workflow_id=workflow_id,
workflow_type=WorkflowType.DEV,
priority=WorkflowPriority.HIGH
)
async def execute(self, context: dict):
# 自定义逻辑
return await super().execute(context)
依赖管理
# A依赖B
workflow_b.add_dependency("workflow_a")
# 检查依赖是否满足
if workflow_b._check_dependencies(context):
await workflow_b.execute(context)
状态监控
status = coordinator.get_status()
print(f"总数: {status['total_workflows']}")
print(f"完成: {status['completed']}")
print(f"失败: {status['failed']}")
# 详细状态
for wf_id, info in status['workflows'].items():
print(f"{wf_id}: {info['status']}")
🎓 最佳实践
1. 选择合适的工程流类型
2. 合理设置依赖
# ✅ 好的依赖
test.add_dependency("dev") # 测试依赖开发
review.add_dependency("test") # 审查依赖测试
# ❌ 避免循环依赖
dev.add_dependency("review") # 不要!
3. 使用并行执行
4. 监控和日志
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("lingflow.workflow")
# 执行时自动记录日志
results = await coordinator.execute_all()
📈 性能对比
单工程流 vs 双工程流
| 指标 | 单工程流 | 双工程流 | 提升 |
|---|---|---|---|
| 开发速度 | 1x | 1.5x | 50% |
| 发布频率 | 每天 | 每小时 | 24x |
| 实验验证 | 慢 | 快 | 10x |
多工程流效率
| 模式 | 并行度 | 时间节省 |
|---|---|---|
| 顺序 | 1x | 0% |
| 双工程流 | 2x | 50% |
| 多工程流(5) | 5x | 80% |
❓ 常见问题
Q1: 如何选择执行策略?
A:
- 无依赖: PARALLEL
- 有依赖: SEQUENTIAL 或 HYBRID
- 不确定: HYBRID(自动优化)
Q2: 快速流如何提升到稳定流?
A:
if fast_track.can_promote_to(WorkflowType.STABLE):
stable = coordinator.promote_workflow("fast_id", WorkflowType.STABLE)
Q3: 如何处理工程流失败?
A:
Q4: 能否混合不同类型的工程流?
A: 可以!这是多工程流系统的核心优势
coordinator.register_workflow(DevWorkflow("dev"))
coordinator.register_workflow(TestWorkflow("test"))
coordinator.register_workflow(DocWorkflow("doc"))
# 混合执行
🔗 相关链接
快速上手 ✅ 生产就绪 🚀
众智混元,万法灵通 ⚡