38 KiB
38 KiB
description
| description |
|---|
| 迭代执行与反馈:接收任务树,批量执行,动态适应 |
/runtime.iterate - 迭代执行与反馈循环
核心目标
我们不是一次性完成者,我们是持续交付者。
当我们通过/runtime.plan生成了任务树后,我们通过迭代循环:
- 批量执行 - 一次执行一批可并行任务
- 收集反馈 - 每个任务成功/失败/产生新发现
- 动态适应 - 根据反馈调整计划
- 循环直至完成 - 持续迭代直到所有任务完成
任务树(来自plan)
↓
[执行批次1] → 收集反馈 → 调整计划
↓
[执行批次2] → 收集反馈 → 调整计划
↓
[执行批次3] ...
↓
✅ 完成(所有任务满足DoD)
何时使用 /runtime.iterate
必须使用场景
- ✅ 已完成规划阶段 - 已通过
/runtime.plan生成任务树 - ✅ 需要批量执行 - 任务间有依赖,需要分批次
- ✅ 需要持续反馈 - 想在执行中学习、调整计划
- ✅ 处理不确定性 - 预期会有失败、变更、新发现
- ✅ 长期项目 - 需要持续数天/数周的实施
使用流程
# 第一步: 生成任务树
/runtime.plan "实现用户认证系统"
↓
生成: cognition/plans/plan-2025xx.json
# 第二步: 迭代执行
/runtime.iterate --plan=plan-2025xx.json
↓ 自动执行...
Iteration 1: 执行基础设施任务(User表、Token表、JWT配置)
Iteration 2: 执行服务层任务(user.service, token.service)
Iteration 3: 执行API层任务(Register、Login API)
Iteration 4: 执行安全层任务(Password Hashing、Auth Middleware)
Iteration 5: 执行测试任务(单元测试、集成测试)
✅ 所有任务完成!
迭代循环详解
阶段1: 初始化(Iteration Setup)
1.1 加载任务树
def load_plan(plan_file: str) -> Plan:
"""
加载由plan生成的任务树
"""
with open(plan_file) as f:
plan_data = json.load(f)
# 验证文件格式
if "tasks" not in plan_data:
raise ValueError("无效的计划文件:缺少'tasks'字段")
if "critical_path" not in plan_data:
raise ValueError("无效的计划文件:缺少'critical_path'字段")
return Plan(
tasks=[Task.from_dict(t) for t in plan_data["tasks"]],
critical_path=plan_data["critical_path"],
total_effort=plan_data["total_effort"]
)
1.2 初始化迭代器
class IterativeExecutor:
def __init__(self, plan: Plan, strategy="breadth"):
self.plan = plan
self.iteration_count = 0
self.max_iterations = 20 # 防止无限循环
self.completed_tasks = []
self.failed_tasks = []
self.skipped_tasks = []
self.strategy = strategy
print(f"🚀 迭代执行器已初始化")
print(f" 总任务数: {len(plan.tasks)}")
print(f" 预计工时: {plan.total_effort}")
print(f" 关键路径: {' → '.join(plan.critical_path)}")
print(f" 策略: {strategy}")
阶段2: 迭代循环(Iteration Loop)
2.1 主循环逻辑
def run_iteration_loop(self) -> IterationResult:
"""
运行迭代执行循环
"""
print("\n" + "=" * 50)
print("开始迭代执行循环")
print("=" * 50)
while self.should_continue():
self.iteration_count += 1
print(f"\n📌 Iteration #{self.iteration_count}")
print(f" 时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("-" * 50)
# Step 1: 选择可执行任务
ready_tasks = self.get_ready_tasks()
print(f" 可执行任务: {len(ready_tasks)}个")
if not ready_tasks:
if self.get_remaining_tasks():
print(" ⚠️ 有未完成任务但依赖未满足")
print(f" 剩余: {len(self.get_remaining_tasks())}个")
break
else:
print(" ✅ 所有任务已完成!")
break
# Step 2: 批量执行
results = self.execute_batch(ready_tasks)
# Step 3: 收集反馈
feedback = self.collect_feedback(results)
# Step 4: 适应与调整
self.plan = self.adapt_plan(self.plan, feedback)
# Step 5: 检查完成状态
if self.is_all_completed():
print("\n" + "=" * 50)
print("✅ 所有任务完成!迭代结束")
print("=" * 50)
break
# Step 6: 休息与反思
if self.should_reflect():
self.reflect()
return self.generate_result()
2.2 选择可执行任务
def get_ready_tasks(self) -> List[Task]:
"""
选择满足以下条件的任务:
1. 未完成
2. 所有依赖已完成
3. 未被阻塞
4. 在当前策略优先级中
"""
ready = []
for task in self.plan.tasks:
# 已完成,跳过
if task.status == "completed":
continue
# 被阻塞,跳过
if task.status == "blocked":
continue
# 有失败依赖,阻塞
if any(dep in [t.id for t in self.failed_tasks] for dep in task.dependencies):
task.status = "blocked"
task.block_reason = f"依赖任务失败: {[t.name for t in self.failed_tasks]}"
self.skipped_tasks.append(task)
continue
# 依赖未完成,等待
if not all(dep in [t.id for t in self.completed_tasks] for dep in task.dependencies):
continue
# 满足所有条件,可执行
ready.append(task)
# 按策略排序
return self.sort_by_strategy(ready)
2.3 执行策略
def sort_by_strategy(self, tasks: List[Task]) -> List[Task]:
"""
根据执行策略排序任务
策略1: breadth-first (广度优先)
- 执行所有基础任务再执行上层任务
- 特点: 减少返工风险
策略2: depth-first (深度优先)
- 优先执行关键路径上的任务
- 特点: 快速验证核心链路
策略3: risk-driven (风险驱动)
- 优先执行高风险任务
- 特点: 尽早暴露问题
策略4: value-driven (价值驱动)
- 优先交付用户价值最大的任务
- 特点: 快速交付MVP
"""
if self.strategy == "breadth":
# 按层级排序(基础设施 → 服务 → API → 安全 → 测试)
layer_order = {
"基础设施": 1,
"服务层": 2,
"API层": 3,
"安全层": 4,
"测试": 5
}
return sorted(tasks, key=lambda t: layer_order.get(t.layer, 99))
elif self.strategy == "depth":
# 按是否关键路径排序
return sorted(tasks, key=lambda t: t.id in self.plan.critical_path, reverse=True)
elif self.strategy == "risk":
# 按风险等级排序
risk_order = {"high": 1, "medium": 2, "low": 3}
return sorted(tasks, key=lambda t: risk_order.get(t.risk_level, 3))
elif self.strategy == "value":
# 按价值排序(需要手动标注或从需求提取)
return sorted(tasks, key=lambda t: t.priority, reverse=True)
else:
return tasks
阶段3: 执行批次(Batch Execution)
3.1 批量执行
def execute_batch(self, tasks: List[Task]) -> List[ExecutionResult]:
"""
批量执行一批任务
"""
print(f"\n🚀 执行批次: {len(tasks)}个任务")
print("-" * 50)
results = []
for i, task in enumerate(tasks, 1):
print(f"\n [{i}/{len(tasks)}] {task.id}: {task.name}")
print(f" 预计工时: {task.effort}h")
print(f" 置信度: {task.confidence:.2f}")
try:
# 预检查
if not self.pre_check(task):
print(f" ⚠️ 预检查失败,跳过")
result = ExecutionResult(
task=task,
status="skipped",
reason="预检查失败"
)
results.append(result)
self.skipped_tasks.append(task)
continue
# 执行
print(f" ⏳ 执行中...")
execution_result = task.execute()
# 验证(Definition of Done)
validation = self.validate_task(task, execution_result)
if validation.passed:
print(f" ✅ 完成!耗时: {execution_result.duration:.1f}h")
result = ExecutionResult(
task=task,
status="success",
result=execution_result,
validation=validation
)
self.completed_tasks.append(task)
else:
print(f" ❌ 验证失败:")
for error in validation.errors:
print(f" - {error}")
result = ExecutionResult(
task=task,
status="failed",
result=execution_result,
validation=validation
)
self.failed_tasks.append(task)
results.append(result)
# 学习这次执行
self.learn_from_execution(result)
except Exception as e:
print(f" 💥 执行错误: {e}")
import traceback
traceback.print_exc()
result = ExecutionResult(
task=task,
status="error",
error=e
)
results.append(result)
self.failed_tasks.append(task)
print("\n" + "-" * 50)
print(f"批次完成: {len(tasks)}个任务")
print(f" ✅ 成功: {len([r for r in results if r.status == 'success'])}")
print(f" ❌ 失败: {len([r for r in results if r.status == 'failed'])}")
print(f" ⚠️ 跳过: {len([r for r in results if r.status == 'skipped'])}")
print(f" 💥 错误: {len([r for r in results if r.status == 'error'])}")
return results
3.2 预检查(Pre-Check)
def pre_check(self, task: Task) -> bool:
"""
执行前检查
"""
print(" 预检查:")
# 检查1: 依赖是否完成
for dep_id in task.dependencies:
dep = self.plan.get_task(dep_id)
if not dep or dep.status != "completed":
print(f" ⚠️ 依赖未完成: {dep_id}")
return False
print(f" ✅ 所有依赖已完成")
# 检查2: 必需资源是否可用
if task.required_resources:
for resource in task.required_resources:
if not self.check_resource_available(resource):
print(f" ⚠️ 资源不可用: {resource}")
return False
print(f" ✅ 资源可用")
# 检查3: 是否有已知风险
if task.risk_level == "high" and task.confidence < 0.6:
# 高风险且信心不足,建议先Spike
if not self.ask_confirmation("高风险任务,确认执行?"):
print(f" ⚠️ 用户取消执行(建议先调研)")
return False
print(f" ⚠️ 高风险但用户确认执行")
print(f" ✅ 预检查通过")
return True
3.3 任务执行(伪代码)
class Task:
def execute(self) -> ExecutionResult:
"""
执行单个任务
"""
start_time = datetime.now()
# 不同类型的任务,使用不同工具
if self.type == "database":
# 数据库任务: 执行SQL
result = bash(f"psql -f {self.sql_file}")
elif self.type == "api":
# API任务: 创建Controller + Route
# 1. 读取模板
template = read_file("templates/api-controller.template")
# 2. 填充模板
code = template.format(
controller_name=self.controller_name,
functions=self.functions
)
# 3. 写入文件
write_file(self.output_path, code)
# 4. 运行测试
bash(f"npm test {self.test_file}")
elif self.type == "service":
# 服务任务: 实现业务逻辑
# ...
duration = datetime.now() - start_time
return ExecutionResult(
task_id=self.id,
status="success",
duration=duration.total_seconds() / 3600,
artifacts=[self.output_path]
)
阶段4: 验证(Validation)
4.1 Definition of Done
def validate_task(self, task: Task, execution_result: ExecutionResult) -> ValidationResult:
"""
验证任务是否真正完成
"""
passed_checks = []
failed_checks = []
# 检查1: 代码存在且可访问
if task.output_path:
if Path(task.output_path).exists():
passed_checks.append("代码文件存在")
else:
failed_checks.append("代码文件不存在")
# 检查2: 单元测试通过
if task.requires_unit_tests:
test_result = bash(f"npm test {task.test_path}")
if test_result.exit_code == 0:
coverage = extract_coverage(test_result.output)
if coverage >= 0.8:
passed_checks.append(f"单元测试覆盖({coverage:.0%})")
else:
failed_checks.append(f"覆盖率不足: {coverage:.0%}")
else:
failed_checks.append("单元测试失败")
# 检查3: 手动测试通过
if task.acceptance_criteria:
print(" 验收标准检查:")
for criteria in task.acceptance_criteria:
if self.check_criteria(criteria):
print(f" ✅ {criteria}")
passed_checks.append(criteria)
else:
print(f" ❌ {criteria}")
failed_checks.append(criteria)
# 检查4: 无回归错误(如果配置了集成测试)
if task.requires_integration_test:
# 运行集成测试
pass
# 总结
all_passed = len(failed_checks) == 0
if all_passed:
print(" ✅ 所有验收标准通过")
else:
print(f" ❌ 未通过 {len(failed_checks)}项检查")
return ValidationResult(
passed=len(passed_checks),
failed=len(failed_checks),
all_passed=all_passed,
errors=failed_checks
)
4.2 渐进式DoD(根据优先级)
def get_definition_of_done(self, task: Task) -> List[str]:
"""
根据任务优先级返回DoD检查清单
"""
if task.priority == "P0":
# 关键任务: 必须全部满足
return [
"✓ 代码实现完成",
"✓ 单元测试覆盖率>80%",
"✓ 手动测试通过",
"✓ API文档更新",
"✓ CI/CD通过",
"✓ 代码审查通过"
]
elif task.priority == "P1":
# 重要任务: 可以稍微放宽
return [
"✓ 代码实现完成",
"✓ 单元测试覆盖率>70%",
"✓ 手动测试通过",
"✓ CI/CD通过"
# 文档可以后续补充
]
else:
# P2任务: 最小要求
return [
"✓ 代码实现完成",
"✓ 基本测试通过",
"✓ 无严重bug"
]
阶段5: 反馈循环与适应
5.1 收集反馈
def collect_feedback(self, results: List[ExecutionResult]) -> Feedback:
"""
从执行结果收集反馈
"""
feedback = Feedback(
iteration=self.iteration_count,
timestamp=datetime.now(),
results=results,
metrics={
"success_rate": len([r for r in results if r.status == "success"]) / len(results),
"avg_duration": sum(r.duration for r in results if r.duration) / len(results),
"failed_count": len([r for r in results if r.status == "failed"]),
"new_discoveries": []
}
)
# 检查是否有新发现
for result in results:
if result.discovery:
feedback.metrics["new_discoveries"].append(result.discovery)
print(f"\n✨ 新发现: {result.discovery}")
return feedback
5.2 动态适应
def adapt_plan(self, plan: Plan, feedback: Feedback) -> Plan:
"""
根据反馈动态调整计划
三种适应模式:
1. 失败处理: 重试/分解/重新设计
2. 新发现: 添加新任务
3. 性能调整: 调整后续任务估算
"""
print("\n🔄 适应调整:")
print("-" * 50)
adjusted = False
# 模式1: 处理失败
for result in feedback.results:
if result.status == "failed":
task = result.task
# 分析失败原因
failure_reason = self.analyze_failure(result)
print(f"\n 分析失败原因 ({task.id}):")
print(f" → {failure_reason}")
# 三种处理方式
# 方式1A: 临时错误 → 重试
if self.is_transient_error(failure_reason):
print(" → 临时错误,重试任务")
task.retries += 1
if task.retries < 3:
# 暂时不改计划,下次迭代重试
adjusted = True
else:
print(" → 重试3次仍失败,升级为错误")
task.status = "error"
# 方式1B: 任务过大 → 分解
elif self.is_too_complex(failure_reason):
print(" → 任务复杂度过高,分解为子任务")
sub_tasks = self.decompose_task(task)
self.plan.replace(task, sub_tasks)
adjusted = True
# 方式1C: 设计问题 → 重新设计
elif self.is_design_issue(failure_reason):
print(" → 设计问题,需要重新设计")
# 启动短暂学习
learn_result = self.learn_from_failure(task, failure_reason)
new_design = self.redesign(task, learn_result)
self.plan.replace(task, new_design)
adjusted = True
# 方式1D: 需求理解错误 → 请求澄清
else:
print(" → 需求理解可能有问题,请求用户澄清")
self.request_user_clarification(task, failure_reason)
adjusted = True
# 模式2: 处理新发现
if feedback.metrics["new_discoveries"]:
print(f"\n ✨ 发现 {len(feedback.metrics['new_discoveries'])} 个新信息")
for discovery in feedback.metrics["new_discoveries"]:
print(f" → {discovery}")
# 基于新信息生成后续任务
new_tasks = self.create_follow_up_tasks(discovery)
if new_tasks:
print(f" → 新增 {len(new_tasks)} 个任务")
self.plan.add_tasks(new_tasks)
adjusted = True
# 模式3: 性能调整(如果实际耗时与预计差异大)
if feedback.metrics["avg_duration"]:
avg_actual = feedback.metrics["avg_duration"]
avg_estimated = sum(t.effort for t in self.completed_tasks) / len(self.completed_tasks)
ratio = avg_actual / avg_estimated
if ratio > 1.5:
print(f"\n ⚠️ 实际耗时比预计高{ratio:.1f}倍")
print(" → 调整后续任务估算")
for task in self.get_remaining_tasks():
task.effort *= ratio
adjusted = True
if not adjusted:
print(" 无需调整,继续执行")
return plan
5.3 失败分析
def analyze_failure(self, result: ExecutionResult) -> FailureAnalysis:
"""
分析失败原因
失败类型:
- TYPE_UNKNOWN: 未知错误
- TYPE_TRANSIENT: 临时错误(重试可解决)
- TYPE_COMPLEXITY: 任务太复杂(需要分解)
- TYPE_DESIGN: 设计问题(需要重新设计)
- TYPE_REQUIREMENT: 需求不清(需要澄清)
- TYPE_RESOURCE: 资源不足(需要配置)
"""
if result.error:
error_msg = str(result.error).lower()
# 临时错误
if any(word in error_msg for word in [
"timeout", "connection", "network",
"EBUSY", "EAGAIN"
]):
return Failure.TYPE_TRANSIENT
# 设计问题
if any(word in error_msg for word in [
"circular dependency", "deadlock",
"stack overflow"
]):
return Failure.TYPE_DESIGN
# 资源问题
if any(word in error_msg for word in [
"out of memory", "disk full",
"quota exceeded"
]):
return Failure.TYPE_RESOURCE
# 验证错误(检查清单未通过)
if result.validation and not result.validation.all_passed:
if len(result.validation.errors) > 5:
# 错误太多,可能是需求理解问题
return Failure.TYPE_REQUIREMENT
else:
# 具体检查项失败,可能是设计问题
return Failure.TYPE_DESIGN
return Failure.TYPE_UNKNOWN
阶段6: 终止条件
6.1 终止条件判断
def should_continue(self) -> bool:
"""
判断是否继续迭代
"""
# 条件1: 达到最大迭代次数
if self.iteration_count >= self.max_iterations:
print("\n⚠️ 达到最大迭代次数,停止执行")
print(f" 已完成: {len(self.completed_tasks)}个任务")
print(f" 未完成: {len(self.get_remaining_tasks())}个任务")
return False
# 条件2: 所有任务完成
if self.is_all_completed():
print("\n✅ 所有任务完成!")
return False
# 条件3: 连续3次无进展(无法解决的阻塞)
recent_iterations = self.get_recent_iterations(3)
if all(len(r.completed_tasks) == 0 for r in recent_iterations):
print("\n⚠️ 连续3次无进展,存在无法解决的任务")
print(" 建议方案:")
print(" 1. 手动介入未完成任務")
print(" 2. 重新规划剩余任务")
print(" 3. 调整需求范围")
return False
# 条件4: 用户手动停止
if self.should_stop_requested:
print("\n⏹️ 用户手动停止执行")
return False
return True
6.2 完成状态判断
def is_all_completed(self) -> bool:
"""
检查所有任务是否已完成
"""
remaining = self.get_remaining_tasks()
if not remaining:
return True
# 检查是否有任务被永久阻塞
truly_blocked = [
task for task in remaining
if task.status == "blocked"
]
if truly_blocked:
print(f"\n⚠️ {len(truly_blocked)}个任务永久阻塞,无法完成")
return False
return False
阶段7: 生成结果报告
7.1 执行结果统计
def generate_result(self) -> IterationResult:
"""
生成执行结果报告
"""
result = IterationResult(
plan_file=self.plan.file_path,
total_iterations=self.iteration_count,
started_at=self.start_time,
ended_at=datetime.now(),
completed_tasks=self.completed_tasks,
failed_tasks=self.failed_tasks,
skipped_tasks=self.skipped_tasks,
metrics=self.calculate_metrics()
)
print("\n" + "=" * 70)
print("📊 执行结果统计")
print("=" * 70)
print(f"总迭代次数: {self.iteration_count}")
print(f"总任务数: {len(self.plan.tasks)}")
print(f"✅ 已完成: {len(self.completed_tasks)}")
print(f"❌ 失败: {len(self.failed_tasks)}")
print(f"⚠️ 跳过: {len(self.skipped_tasks)}")
print(f"⏱️ 总耗时: {result.total_duration:.1f}小时")
print(f"完成率: {result.completion_rate:.1%}")
if self.failed_tasks:
print("\n❌ 失败任务:")
for task in self.failed_tasks[:5]:
print(f" - {task.id}: {task.name}")
if result.metrics["efficiency"]:
print(f"\n🎯 效率指标:")
print(f" 估算准确率: {result.metrics['efficiency']['accuracy']:.1%}")
print(f" 平均偏差: {result.metrics['efficiency']['bias']:.1f}x")
return result
7.2 保存到记忆
def save_to_memory(self, result: IterationResult):
"""
将执行结果保存到长期记忆
"""
# 固化成功经验
if result.completion_rate == 1.0:
memory_content = f"""
## 项目成功交付 - {self.plan.project_name}
**时间**: {datetime.now().strftime('%Y-%m-%d')}
**项目**: {self.plan.project_name}
**总任务**: {len(self.plan.tasks)}
**总工时**: {result.total_duration:.1f}h
**完成率**: 100%
**关键经验**:
{"\n".join(["- " + m for m in result.metrics["learnings"]])}
**可用于未来项目**: 是
"""
self.remember(memory_content, category="project-success")
# 固化失败教训
if self.failed_tasks:
for task in self.failed_tasks:
failure_memory = f"""
## 失败任务 - {task.id}
**任务**: {task.name}
**原因**: {getattr(task, 'failure_reason', '未知')}
**教训**: {task.lesson_learned}
**防御机制**: 下次遇到类似任务,先{task.defense_action}
"""
self.remember(failure_memory, category="failure-pattern")
执行策略详解
策略1: Breadth-First (广度优先)
特点:
- 先完成所有基础任务
- 再执行上层任务
- 最后执行测试和优化
执行顺序:
批1: User表、Token表、JWT配置 [所有基础设施]
批2: user.service、token.service [所有服务]
批3: Register API、Login API [所有API]
批4: Password Hashing、Auth中间件 [所有安全]
批5: 单元测试、集成测试 [所有测试]
优势:
✓ 减少返工风险(基础不稳定不会上层浪费)
✓ 可以并行开发(每批任务并行)
✓ 适合团队协作(分层对接)
劣势:
⚠ 价值交付慢(用户要等所有层完成才能用)
策略2: Depth-First (深度优先)
特点:
- 优先完成关键路径上的任务
- 尽快验证核心链路
- 次要路径推迟
执行顺序:
批1: User表 → Token表 → Token服务 → Login API → 集成测试 [关键路径]
批2: Register API → 单元测试 [关键路径]
批3: Password重置API [非关键]
批4: 文档、代码质量优化 [非关键]
优势:
✓ 快速验证核心功能(用户可早期体验)
✓ 风险发现早(核心链路问题尽早暴露)
✓ 适合快速演示
劣势:
⚠ 需要频繁重构(后续任务可能要求前面修改)
策略3: Risk-Driven (风险驱动)
特点:
- 优先执行高风险任务
- 尽早暴露潜在问题
- 低风险任务推迟
执行顺序:
高风险: 密码重置流程(涉及多步骤安全)→ 先做Spike
高风险: Token刷新机制(不确定设计) → 查阅资料+原型
中风险: 认证中间件(有现成模式) → 稍后实现
低风险: 单元测试(成熟技术) → 最后补充
优势:
✓ 不确定性尽快消除(避免大坑)
✓ 适合技术选型期
✓ 风险前置管理
劣势:
⚠ 可能执行顺序不合理(不是最优价值)
策略4: Value-Driven (价值驱动)
特点:
- 优先交付用户价值最大的功能
- MVP -> 迭代增强
- 砍掉非核心价值
执行顺序:
MVP: User表 + Register API + Login API [可演示的核心]
迭代1: Password重置 [增强可用性]
迭代2: Rate limiting [提升质量]
迭代3: 完整测试覆盖 [质量保证]
优势:
✓ 用户价值最大化(可以早期使用)
✓ 反馈收集早(用户参与迭代)
✓ 适合创业/探索期
劣势:
⚠ 技术债务积累(前期快速可能牺牲质量)
命令定义
语法
/runtime.iterate \
--plan-file=plan-xxxx.json \
--strategy=[breadth|depth|risk|value] \
--max-iterations=20 \
--parallel=[1|2|3|...] \
--auto-adapt=true \
--reflect-interval=5
参数说明
| 参数 | 说明 | 默认值 |
|---|---|---|
plan-file |
计划文件路径(来自/runtime.plan) | 必需 |
strategy |
执行策略 | breadth |
max-iterations |
最大迭代次数(防止无限循环) | 20 |
parallel |
每批并行任务数 | 3 |
auto-adapt |
失败时自动适应 | true |
reflect-interval |
每N次迭代后强制反思 | 5 |
使用示例
示例1: 基础使用
# 第一步:规划
/runtime.plan "实现用户认证系统"
↓
生成: cognition/plans/plan-xxx.json
# 第二步:迭代执行
/runtime.iterate --plan-file=cognition/plans/plan-xxx.json
输出:
🚀 迭代执行器已初始化
总任务数: 17
预计工时: 17.75h
关键路径: ① → ② → ⑤ → ⑧ → ⑮ → ⑯
策略: breadth
════════════════════════════════════════════════════════════
开始迭代执行循环
════════════════════════════════════════════════════════════
📌 Iteration #1
时间: 2025-11-14 11:00:00
--------------------------------------------------
可执行任务: 3个
[1/3] ①: 创建User表
预计工时: 0.5h
置信度: 0.90
预检查:
✅ 所有依赖已完成
✅ 资源可用
⏳ 执行中...
Command: psql -f migrations/001-create-user.sql
Result: CREATE TABLE 成功
✅ 完成!耗时: 0.4h
[2/3] ②: 创建Token表
预计工时: 0.5h
置信度: 0.85
依赖: [①]
预检查:
✅ 所有依赖已完成
✅ 资源可用
⏳ 执行中...
Command: psql -f migrations/002-create-token.sql
Result: CREATE TABLE 成功
✅ 完成!耗时: 0.3h
[3/3] ③: 配置JWT
预计工时: 0.25h
置信度: 0.95
预检查:
✅ 所有依赖已完成
⏳ 执行中...
Command: node scripts/generate-jwt-keys.js
Result: 密钥生成成功
✅ 完成!耗时: 0.2h
--------------------------------------------------
批次完成: 3个任务
✅ 成功: 3
❌ 失败: 0
⚠️ 跳过: 0
💥 错误: 0
📌 Iteration #2
时间: 2025-11-14 11:30:00
--------------------------------------------------
可执行任务: 3个
[1/3] ④: user.service.js
...
════════════════════════════════════════════════════════════
📊 执行结果统计
════════════════════════════════════════════════════════════
总迭代次数: 6
总任务数: 17
✅ 已完成: 17
❌ 失败: 0
⏱️ 总耗时: 16.2小时
完成率: 100%
✅ 所有验收标准满足
✅ API文档已更新
✅ Changelog已更新
✅ CI/CD通过
项目完成!
示例2: 使用不同策略
# 深度优先(快速验证核心)
/runtime.iterate --plan-file=plan-xxx.json --strategy=depth
# 风险驱动(优先高风险任务)
/runtime.iterate --plan-file=plan-xxx.json --strategy=risk
# 价值驱动(MVP模式)
/runtime.iterate --plan-file=plan-xxx.json --strategy=value
示例3: 并行执行
# 一次并行执行5个任务(适合多核CPU)
/runtime.iterate --plan-file=plan-xxx.json --parallel=5
示例4: 失败处理
# 配置失败处理策略
/runtime.iterate \
--plan-file=plan-xxx.json \
--on-failure= [retry|decompose|learn|stop]
# retry: 重试3次
# decompose: 分解为子任务
# learn: 启动学习循环
# stop: 停止等待人工
与 /runtime.plan 的区别
| 维度 | /runtime.plan |
/runtime.iterate |
关系 |
|---|---|---|---|
| 输入 | 需求文本 | 任务树(JSON) | plan的输出 → iterate的输入 |
| 核心 | 拆解任务 | 执行 + 反馈 | 阶段2(实施) |
| 输出 | 任务树(静态) | 执行报告(动态) | 后续: |
| 函数 | 生成计划 | 执行计划 | 先后关系 |
| 循环 | 无(一次生成) | 有(多次迭代) | 迭代: iterate |
工作流:
/runtime.plan "实现功能X" → 生成任务树
↓
/runtime.iterate --plan=... → 执行任务树
↓
/runtime.reflect → 回顾整个过程
与 /runtime.learn 的区别
| 维度 | /runtime.learn |
/runtime.iterate |
为什么分开? |
|---|---|---|---|
| 范围 | 学习 + 规划 + 执行 | 仅执行 | 职责单一 |
| 自治度 | 完全自主(从问题到方案) | 半自主(需plan提供任务树) | 区分认知层次 |
| 输入 | 问题/需求 | 结构化任务 | 抽象层次不同 |
| 复杂度 | 高(需要智能决策) | 中(主要是执行控制) | 便于调试优化 |
| 典型场景 | 探索未知问题 | 执行已知计划 | 解耦关注点 |
类比:
- Learn = 资深架构师(知道如何学习、规划、实施)
- Plan = 项目经理(知道如何拆解任务)
- Iterate = 技术主管(知道如何带领团队执行)
工具与脚本
辅助脚本: task-executor.py
#!/usr/bin/env python3
"""
任务执行器 - 执行单个任务并验证
"""
import subprocess
import time
from pathlib import Path
class TaskExecutor:
def __init__(self, workspace="."):
self.workspace = Path(workspace)
def execute(self, task):
"""
执行任务
Returns:
{
"status": "success|failed|error",
"duration": seconds,
"output": str,
"error": str (if failed),
"artifacts": [files created/modified]
}
"""
start_time = time.time()
result = {
"status": "unknown",
"duration": 0,
"output": "",
"error": None,
"artifacts": []
}
try:
# 根据任务类型选择执行方式
if task["type"] == "database":
exec_result = self._execute_sql(task["sql_file"])
elif task["type"] == "file_create":
exec_result = self._create_file(
task["file_path"],
task["content"]
)
elif task["type"] == "command":
exec_result = self._run_command(task["command"])
elif task["type"] == "test":
exec_result = self._run_tests(task["test_files"])
else:
exec_result = {
"status": "error",
"error": f"未知任务类型: {task['type']}"
}
# 记录执行结果
result.update(exec_result)
result["duration"] = time.time() - start_time
except Exception as e:
result["status"] = "error"
result["error"] = str(e)
return result
def _execute_sql(self, sql_file):
"""执行SQL文件"""
cmd = f"psql -f {sql_file}"
return self._run_command(cmd)
def _create_file(self, file_path, content):
"""创建文件"""
path = Path(file_path)
path.write_text(content)
return {
"status": "success",
"artifacts": [str(path)]
}
def _run_command(self, command):
"""运行Shell命令"""
process = subprocess.run(
command,
shell=True,
capture_output=True,
text=True
)
if process.returncode == 0:
return {
"status": "success",
"output": process.stdout
}
else:
return {
"status": "failed",
"output": process.stdout,
"error": process.stderr
}
def _run_tests(self, test_files):
"""运行测试"""
cmd = f"npm test {' '.join(test_files)}"
return self._run_command(cmd)
# 使用示例
if __name__ == "__main__":
executor = TaskExecutor(workspace=".")
# 执行任务: 创建User表
task = {
"type": "database",
"sql_file": "migrations/001-create-user.sql"
}
result = executor.execute(task)
print(json.dumps(result, indent=2))
最佳实践
实践1: 先规划,再执行
# 正确流程
✅ /runtime.plan "需求" → 生成计划
✅ /runtime.iterate --plan=xxx.json → 执行计划
✅ /runtime.reflect → 回顾
# 错误
❌ /runtime.iterate # 没有提供计划文件
实践2: 选择合适的策略
# 不同类型的项目用不同策略
✅ 新功能开发(基础重要): --strategy=breadth
✅ PoC演示(快速验证): --strategy=depth
✅ 技术调研(风险消除): --strategy=risk
✅ MVP产品(价值优先): --strategy=value
实践3: 定期检查(Reflect)
# 每5次迭代后强制检查
/runtime.iterate --plan=xxx.json --reflect-interval=5
# 或在执行后手动
/runtime.reflect
"""
上次5次迭代的模式:
- 估算准确率: 85%
- 常见失败类型: 依赖配置
- 发现的新任务: 平均每次迭代1.2个
改进建议:
- 预检查阶段增强配置验证
- 为配置任务添加专门检查清单
"""
实践4: 优雅失败与恢复
# 不是暴力失败
try:
result = execute(task)
except Exception as e:
# 记录
log_failure(task, e)
# 分析
failure_type = analyze_failure(e)
# 适应(不是panic)
if failure_type == FAIL_TRANSIENT:
retry(task)
elif failure_type == FAIL_COMPLEX:
decompose(task)
elif failure_type == FAIL_DESIGN:
learn_and_redesign(task)
elif failure_type == FAIL_REQUIREMENT:
ask_clarification()
# 继续(不是所有都停止)
continue_execution()
宪法遵循
遵循原则:
- ✓ 2.2 渐进式实施: 持续交付价值
- ✓ 4.4 规划透明: 执行过程可见
- ✓ 1.3 谦逊与不确定: 失败时承认并学习
- ✓ 4.1 从经验学习: 每次迭代都更新认知
- ✓ 4.3 自我反思: 定期评估执行效果
命令定义: .ai-runtime/commands/runtime.iterate.md
脚本: .ai-runtime/scripts/task-executor.py
输出: cognition/execution-reports/*.json
版本: 1.0.0