Files
gh-epieczko-betty/skills/epic.decompose/epic_decompose.py
2025-11-29 18:26:08 +08:00

376 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
epic.decompose - Take an Epic (as Markdown) and decompose it into user stories. Analyzes Epic document and identifies major deliverables, grouping them by persona or capability.
Generated by meta.skill with Betty Framework certification
"""
import os
import sys
import json
import yaml
import re
from pathlib import Path
from typing import Dict, List, Any, Optional
from betty.config import BASE_DIR
from betty.logging_utils import setup_logger
from betty.certification import certified_skill
logger = setup_logger(__name__)
class EpicDecompose:
"""
Take an Epic (as Markdown) and decompose it into user stories. Analyzes Epic document and identifies major deliverables, grouping them by persona or capability.
"""
def __init__(self, base_dir: str = BASE_DIR):
"""Initialize skill"""
self.base_dir = Path(base_dir)
@certified_skill("epic.decompose")
def execute(self, epic_file: Optional[str] = None, max_stories: Optional[int] = None,
output_path: Optional[str] = None) -> Dict[str, Any]:
"""
Execute the skill
Args:
epic_file: Path to the epic.md file to decompose
max_stories: Maximum number of stories to generate (default: 5)
output_path: Where to save the stories.json file (default: ./stories.json)
Returns:
Dict with execution results
"""
try:
logger.info("Executing epic.decompose...")
# Validate required inputs
if not epic_file:
raise ValueError("epic_file is required")
# Set defaults
if max_stories is None:
max_stories = 5
if not output_path:
output_path = "./stories.json"
# Read Epic file
epic_path = Path(epic_file)
if not epic_path.exists():
raise FileNotFoundError(f"Epic file not found: {epic_file}")
epic_content = epic_path.read_text()
# Parse Epic and extract information
epic_data = self._parse_epic(epic_content)
# Generate user stories
stories = self._generate_stories(epic_data, max_stories)
# Write stories to JSON file
output_file = Path(output_path)
output_file.parent.mkdir(parents=True, exist_ok=True)
with open(output_file, 'w') as f:
json.dump(stories, f, indent=2)
logger.info(f"Generated {len(stories)} stories and saved to {output_path}")
result = {
"ok": True,
"status": "success",
"message": f"Decomposed Epic into {len(stories)} user stories",
"output_file": str(output_path),
"story_count": len(stories),
"artifact_type": "user-stories-list",
"next_step": "Use story.write to generate formatted story documents"
}
logger.info("Skill completed successfully")
return result
except Exception as e:
logger.error(f"Error executing skill: {e}")
return {
"ok": False,
"status": "failed",
"error": str(e)
}
def _parse_epic(self, content: str) -> Dict[str, Any]:
"""
Parse Epic markdown to extract components
Args:
content: Epic markdown content
Returns:
Dict with parsed Epic data
"""
epic_data = {
"title": "",
"summary": "",
"background": "",
"acceptance_criteria": [],
"stakeholders": []
}
# Extract title
title_match = re.search(r'^# Epic: (.+)$', content, re.MULTILINE)
if title_match:
epic_data["title"] = title_match.group(1).strip()
# Extract summary
summary_match = re.search(r'## Summary\s+(.+?)(?=##|$)', content, re.DOTALL)
if summary_match:
epic_data["summary"] = summary_match.group(1).strip()
# Extract background
background_match = re.search(r'## Background\s+(.+?)(?=##|$)', content, re.DOTALL)
if background_match:
epic_data["background"] = background_match.group(1).strip()
# Extract acceptance criteria
ac_match = re.search(r'## Acceptance Criteria\s+(.+?)(?=##|$)', content, re.DOTALL)
if ac_match:
ac_text = ac_match.group(1).strip()
# Extract checkbox items
criteria = re.findall(r'- \[ \] (.+)', ac_text)
epic_data["acceptance_criteria"] = criteria
# Extract stakeholders
stakeholders_match = re.search(r'## Stakeholders\s+(.+?)(?=##|$)', content, re.DOTALL)
if stakeholders_match:
sh_text = stakeholders_match.group(1).strip()
# Extract bullet points
stakeholders = re.findall(r'- \*\*(.+?)\*\*', sh_text)
epic_data["stakeholders"] = stakeholders
return epic_data
def _generate_stories(self, epic_data: Dict[str, Any], max_stories: int) -> List[Dict[str, Any]]:
"""
Generate user stories from Epic data
Args:
epic_data: Parsed Epic data
max_stories: Maximum number of stories to generate
Returns:
List of user story objects
"""
stories = []
# Generate stories based on acceptance criteria and epic content
# This is a simplified approach - in a real system, you might use NLP/LLM
# Default personas based on stakeholders
personas = self._extract_personas(epic_data)
# Generate stories from acceptance criteria
for i, criterion in enumerate(epic_data.get("acceptance_criteria", [])[:max_stories]):
persona = personas[i % len(personas)] if personas else "User"
story = {
"title": f"{persona} can {self._generate_story_title(criterion)}",
"persona": f"As a {persona}",
"goal": self._extract_goal(criterion),
"benefit": self._generate_benefit(criterion, epic_data),
"acceptance_criteria": [
criterion,
f"The implementation is tested and verified",
f"Documentation is updated to reflect changes"
]
}
stories.append(story)
# If we don't have enough stories from AC, generate from epic content
if len(stories) < max_stories:
additional_stories = self._generate_additional_stories(
epic_data,
max_stories - len(stories),
personas
)
stories.extend(additional_stories)
return stories[:max_stories]
def _extract_personas(self, epic_data: Dict[str, Any]) -> List[str]:
"""Extract or infer personas from Epic data"""
personas = []
# Use stakeholders as personas
stakeholders = epic_data.get("stakeholders", [])
for sh in stakeholders:
# Extract persona from stakeholder name
# e.g., "Product Team" -> "Product Manager"
if "team" in sh.lower():
personas.append(sh.replace(" Team", " Member").replace(" team", " member"))
else:
personas.append(sh)
# Default personas if none found
if not personas:
personas = ["User", "Administrator", "Developer"]
return personas
def _generate_story_title(self, criterion: str) -> str:
"""Generate a story title from acceptance criterion"""
# Remove common prefixes and convert to action
title = criterion.lower()
title = re.sub(r'^(all|the|a|an)\s+', '', title)
# Limit length
words = title.split()[:8]
return ' '.join(words)
def _extract_goal(self, criterion: str) -> str:
"""Extract the goal from acceptance criterion"""
# Simple extraction - in real system, use NLP
return criterion
def _generate_benefit(self, criterion: str, epic_data: Dict[str, Any]) -> str:
"""Generate benefit statement from criterion and epic context"""
# Use summary as context for benefit
summary = epic_data.get("summary", "")
if summary:
# Extract first key phrase
benefit_phrases = summary.split('.')
if benefit_phrases:
return f"So that {benefit_phrases[0].strip().lower()}"
return "So that the business objectives are met"
def _generate_additional_stories(self, epic_data: Dict[str, Any],
count: int, personas: List[str]) -> List[Dict[str, Any]]:
"""
Generate additional stories if we don't have enough from acceptance criteria
Args:
epic_data: Parsed Epic data
count: Number of additional stories needed
personas: Available personas
Returns:
List of additional user stories
"""
stories = []
# Generic story templates based on epic title and background
title = epic_data.get("title", "feature")
background = epic_data.get("background", "")
# Extract key capabilities from background
capabilities = self._extract_capabilities(background)
for i in range(min(count, len(capabilities))):
capability = capabilities[i]
persona = personas[i % len(personas)] if personas else "User"
story = {
"title": f"{persona} can {capability}",
"persona": f"As a {persona}",
"goal": capability,
"benefit": f"So that {title.lower()} is achieved",
"acceptance_criteria": [
f"{capability} functionality is implemented",
f"User can successfully {capability.lower()}",
f"Changes are tested and documented"
]
}
stories.append(story)
return stories
def _extract_capabilities(self, text: str) -> List[str]:
"""Extract key capabilities from text"""
# Simple heuristic: look for verb phrases
capabilities = []
# Common capability patterns
patterns = [
r'enable.+?to (\w+\s+\w+)',
r'allow.+?to (\w+\s+\w+)',
r'provide (\w+\s+\w+)',
r'implement (\w+\s+\w+)',
]
for pattern in patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
capabilities.extend(matches)
# Default capabilities if none found
if not capabilities:
capabilities = [
"access the system",
"view their data",
"manage their account",
"receive notifications",
"generate reports"
]
return capabilities
def main():
"""CLI entry point"""
import argparse
parser = argparse.ArgumentParser(
description="Take an Epic (as Markdown) and decompose it into user stories. Analyzes Epic document and identifies major deliverables, grouping them by persona or capability."
)
parser.add_argument(
"--epic-file",
required=True,
help="Path to the epic.md file to decompose"
)
parser.add_argument(
"--max-stories",
type=int,
default=5,
help="Maximum number of stories to generate (default: 5)"
)
parser.add_argument(
"--output-path",
default="./stories.json",
help="Where to save the stories.json file (default: ./stories.json)"
)
parser.add_argument(
"--output-format",
choices=["json", "yaml"],
default="json",
help="Output format"
)
args = parser.parse_args()
# Create skill instance
skill = EpicDecompose()
# Execute skill
result = skill.execute(
epic_file=args.epic_file,
max_stories=args.max_stories,
output_path=args.output_path,
)
# Output result
if args.output_format == "json":
print(json.dumps(result, indent=2))
else:
print(yaml.dump(result, default_flow_style=False))
# Exit with appropriate code
sys.exit(0 if result.get("ok") else 1)
if __name__ == "__main__":
main()