Initial commit

This commit is contained in:
Zhongwei Li
2025-11-30 09:05:52 +08:00
commit db12a906d2
62 changed files with 27669 additions and 0 deletions

48
hooks/hooks.json Normal file
View File

@@ -0,0 +1,48 @@
{
"hooks": {
"PostToolUse": [
{
"matcher": "",
"hooks": [
{
"type": "command",
"command": "python3 ${CLAUDE_PLUGIN_ROOT}/hooks/post_tool_use_elevenlabs.py"
}
]
}
],
"Stop": [
{
"matcher": "",
"hooks": [
{
"type": "command",
"command": "python3 ${CLAUDE_PLUGIN_ROOT}/hooks/stop.py --chat"
}
]
}
],
"SubagentStop": [
{
"matcher": "",
"hooks": [
{
"type": "command",
"command": "python3 ${CLAUDE_PLUGIN_ROOT}/hooks/subagent_stop.py"
}
]
}
],
"Notification": [
{
"matcher": "",
"hooks": [
{
"type": "command",
"command": "python3 ${CLAUDE_PLUGIN_ROOT}/hooks/notification.py"
}
]
}
]
}
}

267
hooks/mcp/tt-server.py Executable file
View File

@@ -0,0 +1,267 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "mcp>=1.0.0",
# "python-dotenv",
# ]
# ///
"""
Titanium Toolkit MCP Server
Exposes utility scripts as MCP tools for Claude Code.
Available Tools:
- plan_parser: Parse requirements into implementation plan
- bmad_generator: Generate BMAD documents (brief, PRD, architecture, epic, index, research)
- bmad_validator: Validate BMAD documents
Usage:
This server is automatically registered when the titanium-toolkit plugin is installed.
Tools are accessible as: mcp__plugin_titanium-toolkit_tt__<tool_name>
"""
import asyncio
import subprocess
import sys
import json
from pathlib import Path
from typing import Any
from mcp.server import Server
from mcp.types import Tool, TextContent
# Initialize MCP server
server = Server("tt")
# Get the plugin root directory (3 levels up from this file)
PLUGIN_ROOT = Path(__file__).parent.parent.parent
UTILS_DIR = PLUGIN_ROOT / "hooks" / "utils"
@server.list_tools()
async def list_tools() -> list[Tool]:
"""List available Titanium Toolkit utility tools."""
return [
Tool(
name="plan_parser",
description="Parse requirements into structured implementation plan with epics, stories, tasks, and agent assignments",
inputSchema={
"type": "object",
"properties": {
"requirements_file": {
"type": "string",
"description": "Path to requirements file (e.g., '.titanium/requirements.md')"
},
"project_path": {
"type": "string",
"description": "Absolute path to project directory (e.g., '$(pwd)')"
}
},
"required": ["requirements_file", "project_path"]
}
),
Tool(
name="bmad_generator",
description="Generate BMAD documents (brief, prd, architecture, epic, index, research) using GPT-4",
inputSchema={
"type": "object",
"properties": {
"doc_type": {
"type": "string",
"enum": ["brief", "prd", "architecture", "epic", "index", "research"],
"description": "Type of BMAD document to generate"
},
"input_path": {
"type": "string",
"description": "Path to input file or directory (depends on doc_type)"
},
"project_path": {
"type": "string",
"description": "Absolute path to project directory"
}
},
"required": ["doc_type", "input_path", "project_path"]
}
),
Tool(
name="bmad_validator",
description="Validate BMAD documents for completeness and quality",
inputSchema={
"type": "object",
"properties": {
"doc_type": {
"type": "string",
"enum": ["brief", "prd", "architecture", "epic"],
"description": "Type of BMAD document to validate"
},
"document_path": {
"type": "string",
"description": "Path to BMAD document to validate"
}
},
"required": ["doc_type", "document_path"]
}
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
"""Execute a Titanium Toolkit utility tool."""
try:
if name == "plan_parser":
return await run_plan_parser(arguments)
elif name == "bmad_generator":
return await run_bmad_generator(arguments)
elif name == "bmad_validator":
return await run_bmad_validator(arguments)
else:
return [TextContent(
type="text",
text=f"Error: Unknown tool '{name}'"
)]
except Exception as e:
return [TextContent(
type="text",
text=f"Error executing {name}: {str(e)}"
)]
async def run_plan_parser(args: dict[str, Any]) -> list[TextContent]:
"""Run the plan_parser.py utility."""
requirements_file = args["requirements_file"]
project_path = args["project_path"]
script_path = UTILS_DIR / "workflow" / "plan_parser.py"
# Validate script exists
if not script_path.exists():
return [TextContent(
type="text",
text=f"Error: plan_parser.py not found at {script_path}"
)]
# Run the script
result = subprocess.run(
["uv", "run", str(script_path), requirements_file, project_path],
capture_output=True,
text=True,
cwd=project_path
)
if result.returncode != 0:
error_msg = f"Error running plan_parser:\n\nSTDOUT:\n{result.stdout}\n\nSTDERR:\n{result.stderr}"
return [TextContent(type="text", text=error_msg)]
# Return the plan JSON
return [TextContent(
type="text",
text=f"✅ Plan generated successfully!\n\nPlan saved to: {project_path}/.titanium/plan.json\n\n{result.stdout}"
)]
async def run_bmad_generator(args: dict[str, Any]) -> list[TextContent]:
"""Run the bmad_generator.py utility."""
doc_type = args["doc_type"]
input_path = args["input_path"]
project_path = args["project_path"]
script_path = UTILS_DIR / "bmad" / "bmad_generator.py"
# Validate script exists
if not script_path.exists():
return [TextContent(
type="text",
text=f"Error: bmad_generator.py not found at {script_path}"
)]
# For epic generation, input_path contains space-separated args: "prd_path arch_path epic_num"
# Split them and pass as separate arguments
if doc_type == "epic":
input_parts = input_path.split()
if len(input_parts) != 3:
return [TextContent(
type="text",
text=f"Error: Epic generation requires 3 inputs (prd_path arch_path epic_num), got {len(input_parts)}"
)]
# Pass all parts as separate arguments
cmd = ["uv", "run", str(script_path), doc_type] + input_parts + [project_path]
else:
# For other doc types, input_path is a single value
cmd = ["uv", "run", str(script_path), doc_type, input_path, project_path]
# Run the script
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=project_path
)
if result.returncode != 0:
error_msg = f"Error running bmad_generator:\n\nSTDOUT:\n{result.stdout}\n\nSTDERR:\n{result.stderr}"
return [TextContent(type="text", text=error_msg)]
# Return success message with output
return [TextContent(
type="text",
text=f"✅ BMAD {doc_type} generated successfully!\n\n{result.stdout}"
)]
async def run_bmad_validator(args: dict[str, Any]) -> list[TextContent]:
"""Run the bmad_validator.py utility."""
doc_type = args["doc_type"]
document_path = args["document_path"]
script_path = UTILS_DIR / "bmad" / "bmad_validator.py"
# Validate script exists
if not script_path.exists():
return [TextContent(
type="text",
text=f"Error: bmad_validator.py not found at {script_path}"
)]
# Get the document's parent directory as working directory
document_parent = Path(document_path).parent
# Run the script
result = subprocess.run(
["uv", "run", str(script_path), doc_type, document_path],
capture_output=True,
text=True,
cwd=str(document_parent)
)
# Validator returns non-zero for validation failures (expected behavior)
# Only treat it as an error if there's stderr output (actual script error)
if result.returncode != 0 and result.stderr and "Traceback" in result.stderr:
error_msg = f"Error running bmad_validator:\n\nSTDOUT:\n{result.stdout}\n\nSTDERR:\n{result.stderr}"
return [TextContent(type="text", text=error_msg)]
# Return validation results (includes both pass and fail cases)
return [TextContent(
type="text",
text=f"BMAD {doc_type} validation results:\n\n{result.stdout}"
)]
async def main():
"""Run the MCP server."""
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())

238
hooks/notification.py Executable file
View File

@@ -0,0 +1,238 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "python-dotenv",
# "openai",
# ]
# ///
import argparse
import json
import os
import sys
import subprocess
from pathlib import Path
from datetime import datetime
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass # dotenv is optional
def get_tts_script_path():
"""
Determine which TTS script to use based on available API keys.
Priority order: ElevenLabs > OpenAI > pyttsx3
"""
script_dir = Path(__file__).parent
tts_dir = script_dir / "utils" / "tts"
# Check for ElevenLabs (highest priority for quality)
if os.getenv('ELEVENLABS_API_KEY'):
elevenlabs_script = tts_dir / "elevenlabs_tts.py"
if elevenlabs_script.exists():
return str(elevenlabs_script)
# Check for OpenAI API key (second priority)
if os.getenv('OPENAI_API_KEY'):
openai_script = tts_dir / "openai_tts.py"
if openai_script.exists():
return str(openai_script)
# Fall back to pyttsx3 (no API key required)
pyttsx3_script = tts_dir / "local_tts.py"
if pyttsx3_script.exists():
return str(pyttsx3_script)
return None
def get_smart_notification(message, input_data):
"""
Use GPT-5 nano to generate context-aware notification message.
Analyzes recent transcript to understand what Claude needs.
"""
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
return None
try:
from openai import OpenAI
client = OpenAI(api_key=api_key)
# Extract any additional context
context = f"Notification: {message}\n"
# Add fields from input_data
for key in ['status', 'reason', 'permission_mode', 'cwd']:
if input_data.get(key):
context += f"{key}: {input_data[key]}\n"
# Try to get recent context from transcript if available
transcript_path = input_data.get('transcript_path')
if transcript_path and os.path.exists(transcript_path):
try:
# Read last few messages to understand context
with open(transcript_path, 'r') as f:
lines = f.readlines()
last_lines = lines[-10:] if len(lines) > 10 else lines
for line in reversed(last_lines):
try:
msg = json.loads(line.strip())
# Look for recent user message
if msg.get('role') == 'user':
user_msg = msg.get('content', '')
if isinstance(user_msg, str):
context += f"Last user request: {user_msg[:100]}\n"
break
except:
pass
except:
pass
prompt = f"""Create a brief 4-8 word voice notification that tells the user what Claude is waiting for.
Be specific about what action, permission, or input is needed.
{context}
Examples:
- "Waiting for edit approval"
- "Need permission for bash command"
- "Ready for your response"
- "Waiting to continue your task"
Notification:"""
response = client.chat.completions.create(
model="gpt-5-nano",
messages=[{"role": "user", "content": prompt}],
max_completion_tokens=20,
)
return response.choices[0].message.content.strip().strip('"').strip("'")
except Exception as e:
print(f"Smart notification error: {e}", file=sys.stderr)
return None
def get_notification_message(message, input_data=None):
"""
Convert notification message to a more natural spoken version.
"""
# Try smart notification first for "waiting" messages
if ("waiting" in message.lower() or "idle" in message.lower()) and input_data:
smart_msg = get_smart_notification(message, input_data)
if smart_msg:
return smart_msg
# Common notification transformations
if "permission" in message.lower():
# Extract tool name if present
if "to use" in message.lower():
parts = message.split("to use")
if len(parts) > 1:
tool_name = parts[1].strip().rstrip('.')
return f"Permission needed for {tool_name}"
return "Claude needs your permission"
elif "waiting for your input" in message.lower():
# More informative default if smart notification failed
return "Waiting for your response"
elif "idle" in message.lower():
return "Claude is ready"
# Default: use the message as-is but make it more concise
# Remove "Claude" from beginning if present
if message.startswith("Claude "):
message = message[7:]
# Truncate very long messages
if len(message) > 50:
message = message[:47] + "..."
return message
def main():
try:
# Read JSON input from stdin
input_data = json.load(sys.stdin)
# Extract notification message
message = input_data.get("message", "")
if not message:
sys.exit(0)
# Convert to natural speech with context
spoken_message = get_notification_message(message, input_data)
# Use ElevenLabs for consistent voice across all hooks
script_dir = Path(__file__).parent
elevenlabs_script = script_dir / "utils" / "tts" / "elevenlabs_tts.py"
try:
subprocess.run(["afplay", "/System/Library/Sounds/Tink.aiff"], timeout=1)
subprocess.run(
["uv", "run", str(elevenlabs_script), spoken_message],
capture_output=True,
timeout=10
)
except Exception:
pass
# Optional: Also use system notification if available
try:
# Try notify-send on Linux
subprocess.run([
"notify-send", "-a", "Claude Code", "Claude Code", message
], capture_output=True, timeout=2)
except:
try:
# Try osascript on macOS
subprocess.run([
"osascript", "-e",
f'display notification "{message}" with title "Claude Code"'
], capture_output=True, timeout=2)
except:
pass # No system notification available
# Log for debugging (optional)
log_dir = os.path.join(os.getcwd(), "logs")
if os.path.exists(log_dir):
log_path = os.path.join(log_dir, "notifications.json")
try:
logs = []
if os.path.exists(log_path):
with open(log_path, 'r') as f:
logs = json.load(f)
logs.append({
"timestamp": datetime.now().isoformat(),
"message": message,
"spoken": spoken_message
})
# Keep last 50 entries
logs = logs[-50:]
with open(log_path, 'w') as f:
json.dump(logs, f, indent=2)
except:
pass
sys.exit(0)
except Exception:
# Fail silently
sys.exit(0)
if __name__ == "__main__":
main()

207
hooks/post_tool_use_elevenlabs.py Executable file
View File

@@ -0,0 +1,207 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "python-dotenv",
# "openai",
# ]
# ///
import json
import sys
import subprocess
import os
from pathlib import Path
from datetime import datetime
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
def get_simple_summary(tool_name, tool_input, tool_response):
"""
Create a simple summary without LLM first
"""
if tool_name == "Task":
# Extract task description
task_desc = ""
if "prompt" in tool_input:
task_desc = tool_input['prompt']
elif "description" in tool_input:
task_desc = tool_input['description']
# Extract agent name if present
if ':' in task_desc:
parts = task_desc.split(':', 1)
agent_name = parts[0].strip()
task_detail = parts[1].strip() if len(parts) > 1 else ""
# Shorten task detail
if len(task_detail) > 30:
task_detail = task_detail[:30] + "..."
return f"{agent_name} completed {task_detail}"
return "Agent task completed"
elif tool_name == "Write":
file_path = tool_input.get("file_path", "")
if file_path:
file_name = Path(file_path).name
return f"Created {file_name}"
return "File created"
elif tool_name in ["Edit", "MultiEdit"]:
file_path = tool_input.get("file_path", "")
if file_path:
file_name = Path(file_path).name
return f"Updated {file_name}"
return "File updated"
return f"{tool_name} completed"
def get_ai_summary(tool_name, tool_input, tool_response):
"""
Use OpenAI to create a better summary
"""
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
return None
try:
from openai import OpenAI
client = OpenAI(api_key=api_key)
# Build context
context = f"Tool: {tool_name}\n"
if tool_name == "Task":
task_desc = tool_input.get("prompt", tool_input.get("description", ""))
context += f"Task: {task_desc}\n"
if tool_response and "output" in tool_response:
# Truncate output if too long
output = str(tool_response["output"])[:200]
context += f"Result: {output}\n"
elif tool_name == "Write":
file_path = tool_input.get("file_path", "")
context += f"File: {file_path}\n"
context += "Action: Created new file\n"
elif tool_name in ["Edit", "MultiEdit"]:
file_path = tool_input.get("file_path", "")
context += f"File: {file_path}\n"
context += "Action: Modified existing file\n"
prompt = f"""Create a 3-7 word summary of this tool completion for voice announcement.
Be specific about what was accomplished.
{context}
Examples of good summaries:
- "Created user authentication module"
- "Updated API endpoints"
- "Documentation generator built"
- "Fixed validation errors"
- "Database schema created"
Summary:"""
response = client.chat.completions.create(
model="gpt-5-nano",
messages=[{"role": "user", "content": prompt}],
max_completion_tokens=15,
)
summary = response.choices[0].message.content.strip()
# Remove quotes if present
summary = summary.strip('"').strip("'")
return summary
except Exception as e:
print(f"AI summary error: {e}", file=sys.stderr)
return None
def announce_with_tts(summary):
"""
Use ElevenLabs Sarah voice for all announcements (high quality, consistent)
Falls back to macOS say if ElevenLabs fails.
"""
script_dir = Path(__file__).parent
tts_dir = script_dir / "utils" / "tts"
elevenlabs_script = tts_dir / "elevenlabs_tts.py"
try:
result = subprocess.run(
["uv", "run", str(elevenlabs_script), summary],
capture_output=True,
timeout=10
)
if result.returncode == 0:
return "elevenlabs"
else:
# ElevenLabs failed, use macOS fallback
subprocess.run(["say", summary], timeout=5)
return "macos-fallback"
except:
# Last resort fallback
try:
subprocess.run(["say", summary], timeout=5)
return "macos-fallback"
except:
return "none"
def main():
try:
# Read input
input_data = json.load(sys.stdin)
tool_name = input_data.get("tool_name", "")
tool_input = input_data.get("tool_input", {})
tool_response = input_data.get("tool_response", {})
# Skip certain tools
if tool_name in ["TodoWrite", "Grep", "LS", "Bash", "Read", "Glob", "WebFetch", "WebSearch"]:
sys.exit(0)
# Try AI summary first, fall back to simple summary
summary = get_ai_summary(tool_name, tool_input, tool_response)
if not summary:
summary = get_simple_summary(tool_name, tool_input, tool_response)
# Announce with TTS (ElevenLabs or local)
tts_method = announce_with_tts(summary)
# Log what we announced
log_dir = os.path.join(os.getcwd(), "logs")
if os.path.exists(log_dir):
log_path = os.path.join(log_dir, "voice_announcements.json")
logs = []
if os.path.exists(log_path):
try:
with open(log_path, 'r') as f:
logs = json.load(f)
except:
logs = []
logs.append({
"timestamp": datetime.now().isoformat(),
"tool": tool_name,
"summary": summary,
"ai_generated": bool(get_ai_summary(tool_name, tool_input, tool_response)),
"tts_method": tts_method
})
# Keep last 50
logs = logs[-50:]
with open(log_path, 'w') as f:
json.dump(logs, f, indent=2)
print(f"Announced via {tts_method}: {summary}")
sys.exit(0)
except Exception as e:
print(f"Hook error: {e}", file=sys.stderr)
sys.exit(0)
if __name__ == "__main__":
main()

310
hooks/stop.py Executable file
View File

@@ -0,0 +1,310 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "python-dotenv",
# "openai",
# ]
# ///
import argparse
import json
import os
import sys
import random
import subprocess
from pathlib import Path
from datetime import datetime
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass # dotenv is optional
def get_completion_messages():
"""Return list of friendly completion messages."""
return [
"Work complete!",
"All done!",
"Task finished!",
"Job complete!",
"Ready for next task!"
]
def get_tts_script_path():
"""
Determine which TTS script to use based on available API keys and MCP.
Priority order: ElevenLabs MCP > OpenAI > local
"""
# Get current script directory and construct utils/tts path
script_dir = Path(__file__).parent
tts_dir = script_dir / "utils" / "tts"
# Check for ElevenLabs MCP first (highest priority)
elevenlabs_mcp_script = tts_dir / "elevenlabs_mcp.py"
if elevenlabs_mcp_script.exists():
return str(elevenlabs_mcp_script)
# Check for OpenAI API key (second priority)
if os.getenv('OPENAI_API_KEY'):
openai_script = tts_dir / "openai_tts.py"
if openai_script.exists():
return str(openai_script)
# Fall back to local TTS (no API key required)
local_script = tts_dir / "local_tts.py"
if local_script.exists():
return str(local_script)
return None
def get_session_summary(transcript_path):
"""
Analyze the transcript and create a comprehensive summary
of what Claude accomplished in this session.
Uses GPT-5 mini for intelligent session summarization.
"""
api_key = os.getenv("OPENAI_API_KEY")
if not api_key or not transcript_path or not os.path.exists(transcript_path):
return None
try:
from openai import OpenAI
client = OpenAI(api_key=api_key)
# Read transcript and collect tool uses
tool_uses = []
user_requests = []
with open(transcript_path, 'r') as f:
for line in f:
try:
msg = json.loads(line.strip())
# Collect user messages
if msg.get('role') == 'user':
content = msg.get('content', '')
if isinstance(content, str) and content.strip():
user_requests.append(content[:100]) # First 100 chars
# Collect tool uses from content blocks
if msg.get('role') == 'assistant':
content = msg.get('content', [])
if isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get('type') == 'tool_use':
tool_uses.append({
'name': block.get('name'),
'input': block.get('input', {})
})
except:
pass
if not tool_uses:
return None
# Build context from tools and user intent
context = f"Session completed with {len(tool_uses)} operations.\n"
if user_requests:
context += f"User requested: {user_requests[0]}\n\n"
context += "Key actions:\n"
# Summarize tool usage
tool_counts = {}
for tool in tool_uses:
name = tool['name']
tool_counts[name] = tool_counts.get(name, 0) + 1
for tool_name, count in list(tool_counts.items())[:10]:
context += f"- {tool_name}: {count}x\n"
prompt = f"""Summarize what Claude accomplished in this work session in 1-2 natural sentences for a voice announcement.
Focus on the end result and key accomplishments, not individual steps.
Be conversational and speak directly to the user in first person (I did...).
Keep it concise but informative.
{context}
Examples of good summaries:
- "I set up three MCP servers and configured voice announcements across all your projects"
- "I migrated your HOLACE configuration globally and everything is ready to use"
- "I fixed all the failing tests and updated the authentication module"
- "I created the payment integration with Stripe and added webhook handling"
Summary:"""
response = client.chat.completions.create(
model="gpt-5-mini",
messages=[{"role": "user", "content": prompt}],
max_completion_tokens=100,
)
return response.choices[0].message.content.strip()
except Exception as e:
print(f"Session summary error: {e}", file=sys.stderr)
return None
def get_llm_completion_message():
"""
Generate completion message using available LLM services.
Priority order: OpenAI > Anthropic > fallback to random message
Returns:
str: Generated or fallback completion message
"""
# Get current script directory and construct utils/llm path
script_dir = Path(__file__).parent
llm_dir = script_dir / "utils" / "llm"
# Try OpenAI first (highest priority)
if os.getenv('OPENAI_API_KEY'):
oai_script = llm_dir / "oai.py"
if oai_script.exists():
try:
result = subprocess.run([
"uv", "run", str(oai_script), "--completion"
],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except (subprocess.TimeoutExpired, subprocess.SubprocessError):
pass
# Try Anthropic second
if os.getenv('ANTHROPIC_API_KEY'):
anth_script = llm_dir / "anth.py"
if anth_script.exists():
try:
result = subprocess.run([
"uv", "run", str(anth_script), "--completion"
],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except (subprocess.TimeoutExpired, subprocess.SubprocessError):
pass
# Fallback to random predefined message
messages = get_completion_messages()
return random.choice(messages)
def announce_completion(input_data):
"""Announce completion with comprehensive session summary."""
try:
tts_script = get_tts_script_path()
if not tts_script:
return # No TTS scripts available
# Try to get comprehensive session summary from transcript
transcript_path = input_data.get('transcript_path')
completion_message = get_session_summary(transcript_path)
# Fallback to generic message if summary fails
if not completion_message:
completion_message = get_llm_completion_message()
# Call the TTS script with the completion message
subprocess.run([
"uv", "run", tts_script, completion_message
],
capture_output=True, # Suppress output
timeout=15 # Longer timeout for longer summaries
)
except (subprocess.TimeoutExpired, subprocess.SubprocessError, FileNotFoundError):
# Fail silently if TTS encounters issues
pass
except Exception:
# Fail silently for any other errors
pass
def main():
try:
# Parse command line arguments
parser = argparse.ArgumentParser()
parser.add_argument('--chat', action='store_true', help='Copy transcript to chat.json')
args = parser.parse_args()
# Read JSON input from stdin
input_data = json.load(sys.stdin)
# Extract required fields
session_id = input_data.get("session_id", "")
stop_hook_active = input_data.get("stop_hook_active", False)
# Ensure log directory exists
log_dir = os.path.join(os.getcwd(), "logs")
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, "stop.json")
# Read existing log data or initialize empty list
if os.path.exists(log_path):
with open(log_path, 'r') as f:
try:
log_data = json.load(f)
except (json.JSONDecodeError, ValueError):
log_data = []
else:
log_data = []
# Append new data
log_data.append(input_data)
# Write back to file with formatting
with open(log_path, 'w') as f:
json.dump(log_data, f, indent=2)
# Handle --chat switch
if args.chat and 'transcript_path' in input_data:
transcript_path = input_data['transcript_path']
if os.path.exists(transcript_path):
# Read .jsonl file and convert to JSON array
chat_data = []
try:
with open(transcript_path, 'r') as f:
for line in f:
line = line.strip()
if line:
try:
chat_data.append(json.loads(line))
except json.JSONDecodeError:
pass # Skip invalid lines
# Write to logs/chat.json
chat_file = os.path.join(log_dir, 'chat.json')
with open(chat_file, 'w') as f:
json.dump(chat_data, f, indent=2)
except Exception:
pass # Fail silently
# Announce completion via TTS
announce_completion(input_data)
sys.exit(0)
except json.JSONDecodeError:
# Handle JSON decode errors gracefully
sys.exit(0)
except Exception:
# Handle any other errors gracefully
sys.exit(0)
if __name__ == "__main__":
main()

151
hooks/subagent_stop.py Executable file
View File

@@ -0,0 +1,151 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "python-dotenv",
# ]
# ///
import argparse
import json
import os
import sys
import subprocess
from pathlib import Path
from datetime import datetime
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass # dotenv is optional
def get_tts_script_path():
"""
Determine which TTS script to use based on available API keys and MCP.
Priority order: ElevenLabs MCP > OpenAI > local
"""
# Get current script directory and construct utils/tts path
script_dir = Path(__file__).parent
tts_dir = script_dir / "utils" / "tts"
# Check for ElevenLabs MCP first (highest priority)
elevenlabs_mcp_script = tts_dir / "elevenlabs_mcp.py"
if elevenlabs_mcp_script.exists():
return str(elevenlabs_mcp_script)
# Check for OpenAI API key (second priority)
if os.getenv('OPENAI_API_KEY'):
openai_script = tts_dir / "openai_tts.py"
if openai_script.exists():
return str(openai_script)
# Fall back to local TTS (no API key required)
local_script = tts_dir / "local_tts.py"
if local_script.exists():
return str(local_script)
return None
def announce_subagent_completion():
"""Announce subagent completion using the best available TTS service."""
try:
tts_script = get_tts_script_path()
if not tts_script:
return # No TTS scripts available
# Use fixed message for subagent completion
completion_message = "Subagent Complete"
# Call the TTS script with the completion message
subprocess.run([
"uv", "run", tts_script, completion_message
],
capture_output=True, # Suppress output
timeout=10 # 10-second timeout
)
except (subprocess.TimeoutExpired, subprocess.SubprocessError, FileNotFoundError):
# Fail silently if TTS encounters issues
pass
except Exception:
# Fail silently for any other errors
pass
def main():
try:
# Parse command line arguments
parser = argparse.ArgumentParser()
parser.add_argument('--chat', action='store_true', help='Copy transcript to chat.json')
args = parser.parse_args()
# Read JSON input from stdin
input_data = json.load(sys.stdin)
# Extract required fields
session_id = input_data.get("session_id", "")
stop_hook_active = input_data.get("stop_hook_active", False)
# Ensure log directory exists
log_dir = os.path.join(os.getcwd(), "logs")
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, "subagent_stop.json")
# Read existing log data or initialize empty list
if os.path.exists(log_path):
with open(log_path, 'r') as f:
try:
log_data = json.load(f)
except (json.JSONDecodeError, ValueError):
log_data = []
else:
log_data = []
# Append new data
log_data.append(input_data)
# Write back to file with formatting
with open(log_path, 'w') as f:
json.dump(log_data, f, indent=2)
# Handle --chat switch (same as stop.py)
if args.chat and 'transcript_path' in input_data:
transcript_path = input_data['transcript_path']
if os.path.exists(transcript_path):
# Read .jsonl file and convert to JSON array
chat_data = []
try:
with open(transcript_path, 'r') as f:
for line in f:
line = line.strip()
if line:
try:
chat_data.append(json.loads(line))
except json.JSONDecodeError:
pass # Skip invalid lines
# Write to logs/chat.json
chat_file = os.path.join(log_dir, 'chat.json')
with open(chat_file, 'w') as f:
json.dump(chat_data, f, indent=2)
except Exception:
pass # Fail silently
# Announce subagent completion via TTS
announce_subagent_completion()
sys.exit(0)
except json.JSONDecodeError:
# Handle JSON decode errors gracefully
sys.exit(0)
except Exception:
# Handle any other errors gracefully
sys.exit(0)
if __name__ == "__main__":
main()

1494
hooks/utils/bmad/bmad_generator.py Executable file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,501 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "python-dotenv",
# ]
# ///
"""
BMAD Document Validator Utility
Validates BMAD documents match required structure and completeness.
Commands:
brief <file_path> Validate product brief
prd <file_path> Validate PRD
architecture <file_path> Validate architecture
epic <file_path> Validate epic
all <bmad_dir> Validate all documents in backlog
Examples:
uv run bmad_validator.py prd bmad-backlog/prd/prd.md
uv run bmad_validator.py all bmad-backlog/
"""
import json
import sys
import re
from pathlib import Path
from typing import Dict, List
def validate_brief(file_path: str) -> Dict:
"""
Validate Product Brief has all required sections.
Args:
file_path: Path to product-brief.md
Returns:
Validation results dict
"""
try:
with open(file_path, 'r') as f:
content = f.read()
except Exception as e:
return {
"valid": False,
"errors": [f"Cannot read file: {e}"],
"warnings": [],
"missing_sections": []
}
required_sections = [
"Executive Summary",
"Problem Statement",
"Proposed Solution",
"Target Users",
"Goals & Success Metrics",
"MVP Scope",
"Post-MVP Vision",
"Technical Considerations",
"Constraints & Assumptions",
"Risks & Open Questions",
"Next Steps"
]
results = {
"valid": True,
"errors": [],
"warnings": [],
"missing_sections": []
}
# Check for required sections
for section in required_sections:
if section not in content:
results["valid"] = False
results["missing_sections"].append(section)
# Check for header
if not re.search(r'#\s+Product Brief:', content):
results["errors"].append("Missing main header: # Product Brief: {Name}")
# Check for version info
if "**Version:**" not in content:
results["warnings"].append("Missing version field")
if "**Date:**" not in content:
results["warnings"].append("Missing date field")
return results
def validate_prd(file_path: str) -> Dict:
"""
Validate PRD has all required sections.
Args:
file_path: Path to prd.md
Returns:
Validation results dict
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
except Exception as e:
return {
"valid": False,
"errors": [f"Cannot read file: {e}"],
"warnings": [],
"missing_sections": []
}
required_sections = [
"Executive Summary",
"Product Overview",
"Success Metrics",
"Feature Requirements",
"User Stories",
"Technical Requirements",
"Data Requirements",
"AI/ML Requirements",
"Design Requirements",
"Go-to-Market Strategy",
"Risks & Mitigation",
"Open Questions",
"Appendix"
]
results = {
"valid": True,
"errors": [],
"warnings": [],
"missing_sections": []
}
# Check for required sections
for section in required_sections:
if section not in content:
results["valid"] = False
results["missing_sections"].append(section)
# Check for header
if not re.search(r'#\s+Product Requirements Document', content):
results["errors"].append("Missing main header")
# Check for metadata
if "**Document Version:**" not in content:
results["warnings"].append("Missing document version")
if "**Last Updated:**" not in content:
results["warnings"].append("Missing last updated date")
# Check for user stories format
if "User Stories" in content:
# Should have "As a" pattern
if "As a" not in content:
results["warnings"].append("User stories missing 'As a... I want... so that' format")
# Check for acceptance criteria
if "Feature Requirements" in content or "User Stories" in content:
if "Acceptance Criteria:" not in content and "- [ ]" not in content:
results["warnings"].append("Missing acceptance criteria checkboxes")
return results
def validate_architecture(file_path: str) -> Dict:
"""
Validate Architecture document completeness.
Args:
file_path: Path to architecture.md
Returns:
Validation results dict
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
except Exception as e:
return {
"valid": False,
"errors": [f"Cannot read file: {e}"],
"warnings": [],
"missing_sections": []
}
required_sections = [
"System Overview",
"Architecture Principles",
"High-Level Architecture",
"Component Details",
"Data Architecture",
"Infrastructure",
"Security Architecture",
"Deployment Strategy",
"Monitoring & Observability",
"Appendix"
]
results = {
"valid": True,
"errors": [],
"warnings": [],
"missing_sections": []
}
# Check for required sections
for section in required_sections:
if section not in content:
results["valid"] = False
results["missing_sections"].append(section)
# Check for code examples
if "```sql" not in content and "```python" not in content and "```typescript" not in content:
results["warnings"].append("Missing code examples (SQL, Python, or TypeScript)")
# Check for cost estimates
if "Cost" not in content:
results["warnings"].append("Missing cost estimates")
# Check for technology decisions
if "Technology Decisions" not in content:
results["warnings"].append("Missing technology decisions table")
return results
def validate_epic(file_path: str) -> Dict:
"""
Validate Epic file structure.
Args:
file_path: Path to EPIC-*.md
Returns:
Validation results dict
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
except Exception as e:
return {
"valid": False,
"errors": [f"Cannot read file: {e}"],
"warnings": [],
"missing_sections": []
}
required_fields = [
"**Epic Owner:**",
"**Priority:**",
"**Status:**",
"**Estimated Effort:**"
]
required_sections = [
"Epic Description",
"Business Value",
"Success Criteria",
"User Stories",
"Dependencies",
"Definition of Done"
]
results = {
"valid": True,
"errors": [],
"warnings": [],
"missing_sections": [],
"missing_fields": []
}
# Check for required fields
for field in required_fields:
if field not in content:
results["valid"] = False
results["missing_fields"].append(field)
# Check for required sections
for section in required_sections:
if section not in content:
results["valid"] = False
results["missing_sections"].append(section)
# Check for story format
story_matches = re.findall(r'### STORY-(\d+)-(\d+):', content)
if not story_matches:
results["errors"].append("No stories found (expecting STORY-XXX-YY format)")
# Check stories have acceptance criteria
if story_matches:
has_criteria = "Acceptance Criteria:" in content or "**Acceptance Criteria:**" in content
if not has_criteria:
results["warnings"].append("Stories missing acceptance criteria")
# Check for "As a... I want... so that" format
has_user_story_format = "As a" in content and "I want" in content and "so that" in content
if not has_user_story_format:
results["warnings"].append("Stories missing user story format (As a... I want... so that...)")
return results
def validate_all(bmad_dir: str) -> Dict:
"""
Validate all documents in BMAD backlog.
Args:
bmad_dir: Path to bmad-backlog directory
Returns:
Combined validation results
"""
bmad_path = Path(bmad_dir)
results = {
"brief": None,
"prd": None,
"architecture": None,
"epics": [],
"overall_valid": True
}
# Validate brief (optional)
brief_path = bmad_path / "product-brief.md"
if brief_path.exists():
results["brief"] = validate_brief(str(brief_path))
if not results["brief"]["valid"]:
results["overall_valid"] = False
# Validate PRD (required)
prd_path = bmad_path / "prd" / "prd.md"
if prd_path.exists():
results["prd"] = validate_prd(str(prd_path))
if not results["prd"]["valid"]:
results["overall_valid"] = False
else:
results["overall_valid"] = False
results["prd"] = {"valid": False, "errors": ["PRD not found"]}
# Validate architecture (required)
arch_path = bmad_path / "architecture" / "architecture.md"
if arch_path.exists():
results["architecture"] = validate_architecture(str(arch_path))
if not results["architecture"]["valid"]:
results["overall_valid"] = False
else:
results["overall_valid"] = False
results["architecture"] = {"valid": False, "errors": ["Architecture not found"]}
# Validate epics (required)
epics_dir = bmad_path / "epics"
if epics_dir.exists():
epic_files = sorted(epics_dir.glob("EPIC-*.md"))
for epic_file in epic_files:
epic_result = validate_epic(str(epic_file))
epic_result["file"] = epic_file.name
results["epics"].append(epic_result)
if not epic_result["valid"]:
results["overall_valid"] = False
else:
results["overall_valid"] = False
return results
def print_validation_results(results: Dict, document_type: str):
"""Print validation results in readable format."""
print(f"\n{'='*60}")
print(f"Validation Results: {document_type}")
print(f"{'='*60}\n")
if results["valid"]:
print("✅ VALID - All required sections present")
else:
print("❌ INVALID - Missing required content")
if results.get("missing_sections"):
print("\n❌ Missing Required Sections:")
for section in results["missing_sections"]:
print(f" - {section}")
if results.get("missing_fields"):
print("\n❌ Missing Required Fields:")
for field in results["missing_fields"]:
print(f" - {field}")
if results.get("errors"):
print("\n❌ Errors:")
for error in results["errors"]:
print(f" - {error}")
if results.get("warnings"):
print("\n⚠️ Warnings:")
for warning in results["warnings"]:
print(f" - {warning}")
print()
def main():
"""CLI interface for validation."""
if len(sys.argv) < 3:
print("Usage: bmad_validator.py <command> <file_path>", file=sys.stderr)
print("\nCommands:", file=sys.stderr)
print(" brief <file_path>", file=sys.stderr)
print(" prd <file_path>", file=sys.stderr)
print(" architecture <file_path>", file=sys.stderr)
print(" epic <file_path>", file=sys.stderr)
print(" all <bmad_dir>", file=sys.stderr)
sys.exit(1)
command = sys.argv[1]
path = sys.argv[2]
try:
if command == "brief":
results = validate_brief(path)
print_validation_results(results, "Product Brief")
sys.exit(0 if results["valid"] else 1)
elif command == "prd":
results = validate_prd(path)
print_validation_results(results, "PRD")
sys.exit(0 if results["valid"] else 1)
elif command == "architecture":
results = validate_architecture(path)
print_validation_results(results, "Architecture")
sys.exit(0 if results["valid"] else 1)
elif command == "epic":
results = validate_epic(path)
print_validation_results(results, f"Epic ({Path(path).name})")
sys.exit(0 if results["valid"] else 1)
elif command == "all":
results = validate_all(path)
print(f"\n{'='*60}")
print(f"Complete Backlog Validation: {path}")
print(f"{'='*60}\n")
if results["overall_valid"]:
print("✅ ALL DOCUMENTS VALID\n")
else:
print("❌ VALIDATION FAILED\n")
# Print individual results
if results["brief"]:
print("Product Brief:", "✅ Valid" if results["brief"]["valid"] else "❌ Invalid")
else:
print("Product Brief: (not found - optional)")
if results["prd"]:
print("PRD:", "✅ Valid" if results["prd"]["valid"] else "❌ Invalid")
else:
print("PRD: ❌ Not found (required)")
if results["architecture"]:
print("Architecture:", "✅ Valid" if results["architecture"]["valid"] else "❌ Invalid")
else:
print("Architecture: ❌ Not found (required)")
print(f"Epics: {len(results['epics'])} found")
for epic in results["epics"]:
status = "" if epic["valid"] else ""
print(f" {status} {epic['file']}")
print(f"\n{'='*60}\n")
# Print details if invalid
if not results["overall_valid"]:
if results["prd"] and not results["prd"]["valid"]:
print_validation_results(results["prd"], "PRD")
if results["architecture"] and not results["architecture"]["valid"]:
print_validation_results(results["architecture"], "Architecture")
for epic in results["epics"]:
if not epic["valid"]:
print_validation_results(epic, f"Epic {epic['file']}")
sys.exit(0 if results["overall_valid"] else 1)
else:
print(f"Error: Unknown command: {command}", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Error: {e!s}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,682 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "python-dotenv",
# ]
# ///
"""
BMAD Research Prompt Generator
Generates research prompts and findings templates for technical decisions.
No GPT-4 calls - just template generation (Cost: $0).
Commands:
prompt <topic> <project_path> [prd_path] Generate research prompt
template <topic> <project_path> Generate findings template
Examples:
uv run research_generator.py prompt "data vendors" "$(pwd)" "bmad-backlog/prd/prd.md"
uv run research_generator.py template "data vendors" "$(pwd)"
"""
import sys
import re
from pathlib import Path
from datetime import datetime
def generate_research_prompt(topic: str, project_path: str, prd_path: str = None) -> str:
"""
Generate research prompt for web AI (ChatGPT/Claude).
Args:
topic: Research topic
project_path: Project directory
prd_path: Optional path to PRD for context
Returns:
Research prompt content
"""
current_date = datetime.now().strftime("%B %d, %Y")
topic_slug = topic.lower().replace(' ', '-').replace('/', '-')
# Read PRD for context if provided
project_context = ""
project_name = "New Project"
requirements_context = ""
if prd_path and Path(prd_path).exists():
try:
with open(prd_path, 'r') as f:
prd_content = f.read()
# Extract project name
match = re.search(r'##\s+(.+?)(?:\s+-|$)', prd_content, re.MULTILINE)
if match:
project_name = match.group(1).strip()
# Extract relevant requirements
if "data" in topic.lower() or "api" in topic.lower():
data_section = extract_section(prd_content, "Data Requirements")
if data_section:
requirements_context = f"\n**Project Requirements**:\n{data_section[:500]}"
if "auth" in topic.lower():
security_section = extract_section(prd_content, "Security")
if security_section:
requirements_context = f"\n**Security Requirements**:\n{security_section[:500]}"
project_context = f"\n**Project**: {project_name}\n"
except Exception:
pass
prompt_content = f"""# Research Prompt: {topic}
**Date**: {current_date}
**For**: {project_name}
---
## Instructions
**COPY THIS ENTIRE PROMPT** and paste into:
- ChatGPT (https://chat.openai.com) with GPT-4
- Claude (https://claude.ai) web version
They have web search capabilities for current, accurate information.
---
## Research Request
{project_context}
**Research Topic**: {topic}
{requirements_context}
Please research and provide comprehensive analysis:
---
### 1. Overview
- What options exist for {topic}?
- What are the top 5-7 solutions/vendors/APIs?
- Current market leaders?
- Recent changes in this space? (2024-2025)
---
### 2. Detailed Comparison Table
Create a comprehensive comparison:
| Option | Pricing | Key Features | Pros | Cons | Best For |
|--------|---------|--------------|------|------|----------|
| Option 1: [Name] | [Tiers] | [Top 3-5 features] | [2-3 pros] | [2-3 cons] | [Use case] |
| Option 2: [Name] | | | | | |
| Option 3: [Name] | | | | | |
| Option 4: [Name] | | | | | |
| Option 5: [Name] | | | | | |
---
### 3. Technical Details
For EACH option, provide:
#### [Option Name]
**API Documentation**: [Link to official docs]
**Authentication**:
- Method: API Key | OAuth | JWT | Other
- Security: HTTPS required? Token rotation?
**Rate Limits**:
- Free tier: X requests per minute/hour/day
- Paid tiers: Rate limit increases
**Data Format**:
- Response format: JSON | XML | GraphQL | CSV
- Webhook support: Yes/No
- Streaming: Yes/No
**SDK Availability**:
- Python: [pip package name] - [GitHub link]
- Node.js: [npm package name] - [GitHub link]
- Other languages: [List]
**Code Example**:
```python
# Basic usage example (if available from docs)
```
**Community**:
- GitHub stars: X
- Last updated: Date
- Issues: Open/closed ratio
- Stack Overflow: Questions count
---
### 4. Integration Complexity
For each option, estimate:
**Setup Time**:
- Account creation: X minutes
- API key generation: X minutes
- SDK integration: X hours
- Testing: X hours
**Total**: X hours/days
**Dependencies**:
- Libraries required
- Platform requirements
- Other services needed
**Learning Curve**:
- Documentation quality: Excellent | Good | Fair | Poor
- Tutorials available: Yes/No
- Community support: Active | Moderate | Limited
---
### 5. Recommendations
Based on the project requirements, provide specific recommendations:
**For MVP** (budget-conscious, speed):
- **Recommended**: [Option]
- **Why**: [Rationale]
- **Tradeoffs**: [What you give up]
**For Production** (quality-focused, scalable):
- **Recommended**: [Option]
- **Why**: [Rationale]
- **Cost**: $X/month at scale
**For Enterprise** (feature-complete):
- **Recommended**: [Option]
- **Why**: [Rationale]
- **Cost**: $Y/month
---
### 6. Detailed Cost Analysis
For each option:
#### [Option Name]
**Free Tier**:
- What's included: [Limits]
- Restrictions: [What's missing]
- Good for MVP? Yes/No - [Why]
**Starter/Basic Tier**:
- Price: $X/month
- Includes: [Features and limits]
- Rate limits: X requests/min
**Professional Tier**:
- Price: $Y/month
- Includes: [Features and limits]
- Rate limits: Y requests/min
**Enterprise Tier**:
- Price: $Z/month or Custom
- Includes: [Features]
- SLA: X% uptime
**Estimated Monthly Cost**:
- MVP (low volume): $X-Y
- Production (medium volume): $X-Y
- Scale (high volume): $X-Y
**Hidden Costs**:
- [Overage charges, add-ons, etc.]
---
### 7. Risks & Considerations
For each option, analyze:
**Vendor Lock-in**:
- How easy to migrate away? (Easy/Medium/Hard)
- Data export capabilities
- API compatibility with alternatives
**Data Quality/Reliability**:
- Uptime history (if available)
- Published SLAs
- Known outages or issues
- Data accuracy/freshness
**Compliance & Security**:
- Data residency (US/EU/Global)
- Compliance certifications (SOC 2, GDPR, etc.)
- Security features (encryption, access controls)
- Privacy policy concerns
**Support & Maintenance**:
- Support channels (email, chat, phone)
- Response time SLAs
- Documentation updates
- Release cadence
- Deprecation policy
**Scalability**:
- Auto-scaling capabilities
- Performance at high volume
- Regional availability
- CDN/edge locations
---
### 8. Source Links
Provide current, working links to:
**Official Resources**:
- Homepage: [URL]
- Pricing page: [URL]
- API documentation: [URL]
- Getting started guide: [URL]
- Status page: [URL]
**Developer Resources**:
- GitHub repository: [URL]
- SDK documentation: [URL]
- API reference: [URL]
- Code examples: [URL]
**Community**:
- Community forum: [URL]
- Discord/Slack: [URL]
- Stack Overflow tag: [URL]
- Twitter/X: [Handle]
**Reviews & Comparisons**:
- G2/Capterra reviews: [URL]
- Comparison articles: [URL]
- User testimonials: [URL]
- Case studies: [URL]
---
## Deliverable
Please structure your response with clear sections matching the template above.
This research will inform our architecture decisions and be documented for future reference.
Thank you!
---
**After completing research**:
1. Copy findings into template: bmad-backlog/research/RESEARCH-{topic_slug}-findings.md
2. Return to Claude Code
3. Continue with /bmad:architecture (will use your research)
"""
# Save prompt
prompt_path = Path(project_path) / "bmad-backlog" / "research" / f"RESEARCH-{topic_slug}-prompt.md"
prompt_path.parent.mkdir(parents=True, exist_ok=True)
with open(prompt_path, 'w') as f:
f.write(prompt_content)
return prompt_content
def generate_findings_template(topic: str, project_path: str) -> str:
"""
Generate findings template for documenting research.
Args:
topic: Research topic
project_path: Project directory
Returns:
Template content
"""
current_date = datetime.now().strftime("%B %d, %Y")
topic_slug = topic.lower().replace(' ', '-').replace('/', '-')
template_content = f"""# Research Findings: {topic}
**Date**: {current_date}
**Researcher**: [Your Name]
**Status**: Draft
---
## Research Summary
**Question**: What {topic} should we use?
**Recommendation**: [Chosen option and brief rationale]
**Confidence**: High | Medium | Low
**Decision Date**: [When decision was made]
---
## Options Evaluated
### Option 1: [Name]
**Overview**:
[1-2 sentence description of what this is]
**Pricing**:
- Free tier: [Details or N/A]
- Starter tier: $X/month - [What's included]
- Pro tier: $Y/month - [What's included]
- Enterprise: $Z/month or Custom
- **Estimated cost for our MVP**: $X/month
**Key Features**:
- [Feature 1]
- [Feature 2]
- [Feature 3]
- [Feature 4]
**Pros**:
- [Pro 1]
- [Pro 2]
- [Pro 3]
**Cons**:
- [Con 1]
- [Con 2]
- [Con 3]
**Technical Details**:
- API Type: REST | GraphQL | WebSocket | Other
- Authentication: API Key | OAuth | JWT | Other
- Rate Limits: X requests per minute/hour
- Data Format: JSON | XML | CSV | Other
- SDKs: Python ([package]), Node.js ([package]), Other
- Latency: Typical response time
- Uptime SLA: X%
**Documentation**: [Link]
**Community**:
- GitHub Stars: X
- Last Update: [Date]
- Active Development: Yes/No
---
### Option 2: [Name]
[Same structure as Option 1]
---
### Option 3: [Name]
[Same structure as Option 1]
---
### Option 4: [Name]
[Same structure as Option 1 - if evaluated]
---
## Comparison Matrix
| Criteria | Option 1 | Option 2 | Option 3 | Winner |
|----------|----------|----------|----------|--------|
| **Cost (MVP)** | $X/mo | $Y/mo | $Z/mo | [Option] |
| **Cost (Production)** | $X/mo | $Y/mo | $Z/mo | [Option] |
| **Features** | X/10 | Y/10 | Z/10 | [Option] |
| **API Quality** | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | [Option] |
| **Documentation** | Excellent | Good | Fair | [Option] |
| **Community** | Large | Medium | Small | [Option] |
| **Ease of Use** | Easy | Medium | Complex | [Option] |
| **Scalability** | High | Medium | High | [Option] |
| **Vendor Lock-in Risk** | Low | Medium | High | [Option] |
| **Overall Score** | X/10 | Y/10 | Z/10 | **[Winner]** |
---
## Final Recommendation
**Chosen**: [Option X]
**Rationale**:
1. [Primary reason - e.g., best balance of cost and features]
2. [Secondary reason - e.g., excellent documentation]
3. [Tertiary reason - e.g., active community]
**For MVP**:
- [Why this works for MVP]
- Cost: $X/month
- Timeline: [Can start immediately / Need 1 week setup]
**For Production**:
- [Scalability considerations]
- Cost at scale: $Y/month
- Migration path: [If we outgrow this]
**Implementation Priority**: MVP | Phase 2 | Future
---
## Implementation Plan
### Setup Steps
1. [Step 1 - e.g., Create account at vendor.com]
2. [Step 2 - e.g., Generate API key]
3. [Step 3 - e.g., Install SDK: pip install package]
4. [Step 4 - e.g., Test connection]
5. [Step 5 - e.g., Implement in production code]
**Estimated Setup Time**: X hours
### Configuration Required
**Environment Variables**:
```bash
# Add to .env.example
{{VENDOR}}_API_KEY=your_key_here
{{VENDOR}}_BASE_URL=https://api.vendor.com
```
**Code Configuration**:
```python
# Example configuration
from {{package}} import Client
client = Client(api_key=os.getenv('{{VENDOR}}_API_KEY'))
```
### Basic Usage Example
```python
# Example usage from documentation
{{code example if available}}
```
---
## Cost Projection
**Monthly Cost Breakdown**:
**MVP** (estimated volume):
- Base fee: $X
- Usage costs: $Y
- **Total**: $Z/month
**Production** (estimated volume):
- Base fee: $X
- Usage costs: $Y
- **Total**: $Z/month
**At Scale** (estimated volume):
- Base fee: $X
- Usage costs: $Y
- **Total**: $Z/month
**Cost Optimization**:
- [Strategy 1 to reduce costs]
- [Strategy 2]
---
## Risks & Mitigations
| Risk | Impact | Likelihood | Mitigation |
|------|--------|-----------|------------|
| Vendor increases pricing | Medium | Medium | [Monitor pricing, have backup option] |
| Service downtime | High | Low | [Implement fallback, cache data] |
| Rate limit hit | Medium | Medium | [Implement rate limiting, queue requests] |
| Data quality issues | High | Low | [Validation layer, monitoring] |
| Vendor shutdown | High | Low | [Data export plan, alternative ready] |
---
## Testing Checklist
- [ ] Create account and obtain credentials
- [ ] Test API in development
- [ ] Verify rate limits and error handling
- [ ] Test with production-like volume
- [ ] Set up monitoring and alerts
- [ ] Document API integration in code
- [ ] Add to .env.example
- [ ] Create fallback/error handling
- [ ] Test cost with real usage
- [ ] Review security and compliance
---
## References
**Official Documentation**:
- Website: [URL]
- Pricing: [URL]
- API Docs: [URL]
- Getting Started: [URL]
- Status Page: [URL]
**Community Resources**:
- GitHub: [URL]
- Discord/Slack: [URL]
- Stack Overflow: [URL with tag]
**Comparison Articles**:
- [Article 1 title]: [URL]
- [Article 2 title]: [URL]
**User Reviews**:
- G2: [URL]
- Reddit discussions: [URLs]
---
## Next Steps
1. ✅ Research complete
2. Review findings with team (if applicable)
3. Make final decision on [chosen option]
4. Update bmad-backlog/prd/prd.md Technical Assumptions
5. Reference in bmad-backlog/architecture/architecture.md
6. Add to implementation backlog
---
**Status**: ✅ Research Complete | ⏳ Awaiting Decision | ❌ Needs More Research
**Recommendation**: [Final recommendation]
---
*This document was generated from research conducted using web-based AI.*
*Fill in all sections with findings from your research.*
*Save this file when complete - it will be referenced during architecture generation.*
"""
# Save template
template_path = Path(project_path) / "bmad-backlog" / "research" / f"RESEARCH-{topic_slug}-findings.md"
template_path.parent.mkdir(parents=True, exist_ok=True)
with open(template_path, 'w') as f:
f.write(template_content)
return template_content
def extract_section(content: str, section_header: str) -> str:
"""Extract section from markdown document."""
lines = content.split('\n')
section_lines = []
in_section = False
for line in lines:
if section_header.lower() in line.lower() and line.startswith('#'):
in_section = True
continue
elif in_section and line.startswith('#') and len(line.split()) > 1:
# New section started
break
elif in_section:
section_lines.append(line)
return '\n'.join(section_lines).strip()
def main():
"""CLI interface for research prompt generation."""
if len(sys.argv) < 4:
print("Usage: research_generator.py <command> <topic> <project_path> [prd_path]", file=sys.stderr)
print("\nCommands:", file=sys.stderr)
print(" prompt <topic> <project_path> [prd_path] Generate research prompt", file=sys.stderr)
print(" template <topic> <project_path> Generate findings template", file=sys.stderr)
print("\nExamples:", file=sys.stderr)
print(' uv run research_generator.py prompt "data vendors" "$(pwd)" "bmad-backlog/prd/prd.md"', file=sys.stderr)
print(' uv run research_generator.py template "hosting platforms" "$(pwd)"', file=sys.stderr)
sys.exit(1)
command = sys.argv[1]
topic = sys.argv[2]
project_path = sys.argv[3]
prd_path = sys.argv[4] if len(sys.argv) > 4 else None
topic_slug = topic.lower().replace(' ', '-').replace('/', '-')
try:
if command == "prompt":
content = generate_research_prompt(topic, project_path, prd_path)
print(f"✅ Research prompt generated: bmad-backlog/research/RESEARCH-{topic_slug}-prompt.md")
elif command == "template":
content = generate_findings_template(topic, project_path)
print(f"✅ Findings template generated: bmad-backlog/research/RESEARCH-{topic_slug}-findings.md")
else:
print(f"Error: Unknown command: {command}", file=sys.stderr)
print("Valid commands: prompt, template", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Error: {str(e)}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

114
hooks/utils/llm/anth.py Executable file
View File

@@ -0,0 +1,114 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.8"
# dependencies = [
# "anthropic",
# "python-dotenv",
# ]
# ///
import os
import sys
from dotenv import load_dotenv
def prompt_llm(prompt_text):
"""
Base Anthropic LLM prompting method using fastest model.
Args:
prompt_text (str): The prompt to send to the model
Returns:
str: The model's response text, or None if error
"""
load_dotenv()
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
return None
try:
import anthropic
client = anthropic.Anthropic(api_key=api_key)
message = client.messages.create(
model="claude-3-5-haiku-20241022", # Fastest Anthropic model
max_tokens=100,
temperature=0.7,
messages=[{"role": "user", "content": prompt_text}],
)
return message.content[0].text.strip()
except Exception:
return None
def generate_completion_message():
"""
Generate a completion message using Anthropic LLM.
Returns:
str: A natural language completion message, or None if error
"""
engineer_name = os.getenv("ENGINEER_NAME", "").strip()
if engineer_name:
name_instruction = f"Sometimes (about 30% of the time) include the engineer's name '{engineer_name}' in a natural way."
examples = f"""Examples of the style:
- Standard: "Work complete!", "All done!", "Task finished!", "Ready for your next move!"
- Personalized: "{engineer_name}, all set!", "Ready for you, {engineer_name}!", "Complete, {engineer_name}!", "{engineer_name}, we're done!" """
else:
name_instruction = ""
examples = """Examples of the style: "Work complete!", "All done!", "Task finished!", "Ready for your next move!" """
prompt = f"""Generate a short, friendly completion message for when an AI coding assistant finishes a task.
Requirements:
- Keep it under 10 words
- Make it positive and future focused
- Use natural, conversational language
- Focus on completion/readiness
- Do NOT include quotes, formatting, or explanations
- Return ONLY the completion message text
{name_instruction}
{examples}
Generate ONE completion message:"""
response = prompt_llm(prompt)
# Clean up response - remove quotes and extra formatting
if response:
response = response.strip().strip('"').strip("'").strip()
# Take first line if multiple lines
response = response.split("\\n")[0].strip()
return response
def main():
"""Command line interface for testing."""
if len(sys.argv) > 1:
if sys.argv[1] == "--completion":
message = generate_completion_message()
if message:
print(message)
else:
print("Error generating completion message")
else:
prompt_text = " ".join(sys.argv[1:])
response = prompt_llm(prompt_text)
if response:
print(response)
else:
print("Error calling Anthropic API")
else:
print("Usage: ./anth.py 'your prompt here' or ./anth.py --completion")
if __name__ == "__main__":
main()

117
hooks/utils/llm/oai.py Executable file
View File

@@ -0,0 +1,117 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.8"
# dependencies = [
# "openai",
# "python-dotenv",
# ]
# ///
import os
import sys
from dotenv import load_dotenv
def prompt_llm(prompt_text):
"""
Base OpenAI LLM prompting method using fastest model.
Args:
prompt_text (str): The prompt to send to the model
Returns:
str: The model's response text, or None if error
"""
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
return None
try:
from openai import OpenAI
client = OpenAI(api_key=api_key)
response = client.chat.completions.create(
model="gpt-4o-mini", # Fast OpenAI model
messages=[{"role": "user", "content": prompt_text}],
max_tokens=100,
temperature=0.7,
)
return response.choices[0].message.content.strip()
except Exception:
return None
def generate_completion_message():
"""
Generate a completion message using OpenAI LLM.
Returns:
str: A natural language completion message, or None if error
"""
engineer_name = os.getenv("ENGINEER_NAME", "").strip()
if engineer_name:
name_instruction = f"Sometimes (about 30% of the time) include the engineer's name '{engineer_name}' in a natural way."
examples = f"""Examples of the style:
- Standard: "Work complete!", "All done!", "Task finished!", "Ready for your next move!"
- Personalized: "{engineer_name}, all set!", "Ready for you, {engineer_name}!", "Complete, {engineer_name}!", "{engineer_name}, we're done!" """
else:
name_instruction = ""
examples = """Examples of the style: "Work complete!", "All done!", "Task finished!", "Ready for your next move!" """
prompt = f"""Generate a short, friendly completion message for when an AI coding assistant finishes a task.
Requirements:
- Keep it under 10 words
- Make it positive and future focused
- Use natural, conversational language
- Focus on completion/readiness
- Do NOT include quotes, formatting, or explanations
- Return ONLY the completion message text
{name_instruction}
{examples}
Generate ONE completion message:"""
response = prompt_llm(prompt)
# Clean up response - remove quotes and extra formatting
if response:
response = response.strip().strip('"').strip("'").strip()
# Take first line if multiple lines
response = response.split("\\n")[0].strip()
return response
def main():
"""Command line interface for testing."""
if len(sys.argv) > 1:
if sys.argv[1] == "--completion":
message = generate_completion_message()
if message:
print(message)
else:
print("Error generating completion message", file=sys.stderr)
sys.exit(1)
else:
prompt_text = " ".join(sys.argv[1:])
response = prompt_llm(prompt_text)
if response:
print(response)
else:
print("Error calling OpenAI API", file=sys.stderr)
sys.exit(1)
else:
print("Usage: ./oai.py 'your prompt here' or ./oai.py --completion", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

114
hooks/utils/tts/elevenlabs_mcp.py Executable file
View File

@@ -0,0 +1,114 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.8"
# dependencies = [
# "python-dotenv",
# ]
# ///
import os
import sys
import json
import subprocess
from pathlib import Path
from dotenv import load_dotenv
def main():
"""
ElevenLabs MCP TTS Script
Uses ElevenLabs MCP server for high-quality text-to-speech via Claude Code.
Accepts optional text prompt as command-line argument.
Usage:
- ./elevenlabs_mcp.py # Uses default text
- ./elevenlabs_mcp.py "Your custom text" # Uses provided text
Features:
- Integration with Claude Code MCP
- Automatic voice selection
- High-quality voice synthesis via ElevenLabs API
- Optimized for hook usage (quick, reliable)
"""
# Load environment variables
load_dotenv()
try:
print("🎙️ ElevenLabs MCP TTS")
print("=" * 25)
# Get text from command line argument or use default
if len(sys.argv) > 1:
text = " ".join(sys.argv[1:]) # Join all arguments as text
else:
text = "Task completed successfully!"
print(f"🎯 Text: {text}")
print("🔊 Generating and playing via MCP...")
try:
# Use Claude Code CLI to invoke ElevenLabs MCP
# This assumes the ElevenLabs MCP server is configured in Claude Code
claude_cmd = [
"claude", "mcp", "call", "ElevenLabs", "text_to_speech",
"--text", text,
"--voice_name", "Adam", # Default voice
"--model_id", "eleven_turbo_v2_5", # Fast model
"--output_directory", str(Path.home() / "Desktop"),
"--speed", "1.0",
"--stability", "0.5",
"--similarity_boost", "0.75"
]
# Try to run the Claude MCP command
result = subprocess.run(
claude_cmd,
capture_output=True,
text=True,
timeout=15 # 15-second timeout for TTS generation
)
if result.returncode == 0:
print("✅ TTS generated and played via MCP!")
# Try to play the generated audio file
# Look for recently created audio files on Desktop
desktop = Path.home() / "Desktop"
audio_files = list(desktop.glob("*.mp3"))
if audio_files:
# Find the most recent audio file
latest_audio = max(audio_files, key=lambda f: f.stat().st_mtime)
# Try to play with system default audio player
if sys.platform == "darwin": # macOS
subprocess.run(["afplay", str(latest_audio)], capture_output=True)
elif sys.platform == "linux": # Linux
subprocess.run(["aplay", str(latest_audio)], capture_output=True)
elif sys.platform == "win32": # Windows
subprocess.run(["start", str(latest_audio)], shell=True, capture_output=True)
print("🎵 Audio playback attempted")
else:
print("⚠️ Audio file not found on Desktop")
else:
print(f"❌ MCP Error: {result.stderr}")
# Fall back to simple notification
print("🔔 TTS via MCP failed - task completion noted")
except subprocess.TimeoutExpired:
print("⏰ MCP TTS timed out - continuing...")
except FileNotFoundError:
print("❌ Claude CLI not found - MCP TTS unavailable")
except Exception as e:
print(f"❌ MCP Error: {e}")
except Exception as e:
print(f"❌ Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,83 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.8"
# dependencies = [
# "elevenlabs",
# "python-dotenv",
# ]
# ///
import os
import sys
from pathlib import Path
from dotenv import load_dotenv
def main():
"""
ElevenLabs Turbo v2.5 TTS Script
Uses ElevenLabs' Turbo v2.5 model for fast, high-quality text-to-speech.
Accepts optional text prompt as command-line argument.
Usage:
- ./elevenlabs_tts.py # Uses default text
- ./elevenlabs_tts.py "Your custom text" # Uses provided text
Features:
- Fast generation (optimized for real-time use)
- High-quality voice synthesis
- Stable production model
- Cost-effective for high-volume usage
"""
# Load environment variables
load_dotenv()
# Get API key from environment
api_key = os.getenv('ELEVENLABS_API_KEY')
if not api_key:
print("❌ Error: ELEVENLABS_API_KEY not found in environment variables", file=sys.stderr)
print("Please add your ElevenLabs API key to .env file:", file=sys.stderr)
print("ELEVENLABS_API_KEY=your_api_key_here", file=sys.stderr)
sys.exit(1)
try:
from elevenlabs.client import ElevenLabs
from elevenlabs.play import play
# Initialize client
elevenlabs = ElevenLabs(api_key=api_key)
# Get text from command line argument or use default
if len(sys.argv) > 1:
text = " ".join(sys.argv[1:]) # Join all arguments as text
else:
text = "Task completed successfully."
try:
# Generate and play audio directly
audio = elevenlabs.text_to_speech.convert(
text=text,
voice_id="EXAVITQu4vr4xnSDxMaL", # Sarah voice
model_id="eleven_turbo_v2_5",
output_format="mp3_44100_128",
)
play(audio)
except Exception as e:
print(f"❌ Error: {e}", file=sys.stderr)
sys.exit(1)
except ImportError:
print("❌ Error: elevenlabs package not installed", file=sys.stderr)
print("This script uses UV to auto-install dependencies.", file=sys.stderr)
print("Make sure UV is installed: https://docs.astral.sh/uv/", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"❌ Unexpected error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

93
hooks/utils/tts/local_tts.py Executable file
View File

@@ -0,0 +1,93 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.8"
# dependencies = [
# "pyttsx3",
# ]
# ///
import sys
import random
import os
def main():
"""
Local TTS Script (pyttsx3)
Uses pyttsx3 for offline text-to-speech synthesis.
Accepts optional text prompt as command-line argument.
Usage:
- ./local_tts.py # Uses default text
- ./local_tts.py "Your custom text" # Uses provided text
Features:
- Offline TTS (no API key required)
- Cross-platform compatibility
- Configurable voice settings
- Immediate audio playback
- Engineer name personalization support
"""
try:
import pyttsx3
# Initialize TTS engine
engine = pyttsx3.init()
# Configure engine settings
engine.setProperty('rate', 180) # Speech rate (words per minute)
engine.setProperty('volume', 0.9) # Volume (0.0 to 1.0)
print("🎙️ Local TTS")
print("=" * 12)
# Get text from command line argument or use default
if len(sys.argv) > 1:
text = " ".join(sys.argv[1:]) # Join all arguments as text
else:
# Default completion messages with engineer name support
engineer_name = os.getenv("ENGINEER_NAME", "").strip()
if engineer_name and random.random() < 0.3: # 30% chance to use name
personalized_messages = [
f"{engineer_name}, all set!",
f"Ready for you, {engineer_name}!",
f"Complete, {engineer_name}!",
f"{engineer_name}, we're done!",
f"Task finished, {engineer_name}!"
]
text = random.choice(personalized_messages)
else:
completion_messages = [
"Work complete!",
"All done!",
"Task finished!",
"Job complete!",
"Ready for next task!",
"Ready for your next move!",
"All set!"
]
text = random.choice(completion_messages)
print(f"🎯 Text: {text}")
print("🔊 Speaking...")
# Speak the text
engine.say(text)
engine.runAndWait()
print("✅ Playback complete!")
except ImportError:
print("❌ Error: pyttsx3 package not installed")
print("This script uses UV to auto-install dependencies.")
sys.exit(1)
except Exception as e:
print(f"❌ Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

109
hooks/utils/tts/openai_tts.py Executable file
View File

@@ -0,0 +1,109 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.8"
# dependencies = [
# "openai",
# "python-dotenv",
# ]
# ///
import os
import sys
import asyncio
from pathlib import Path
from dotenv import load_dotenv
async def main():
"""
OpenAI TTS Script
Uses OpenAI's TTS model for high-quality text-to-speech.
Accepts optional text prompt as command-line argument.
Usage:
- ./openai_tts.py # Uses default text
- ./openai_tts.py "Your custom text" # Uses provided text
Features:
- OpenAI TTS-1 model (fast and reliable)
- Nova voice (engaging and warm)
- Direct audio streaming and playback
- Optimized for hook usage
"""
# Load environment variables
load_dotenv()
# Get API key from environment
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
print("❌ Error: OPENAI_API_KEY not found in environment variables")
sys.exit(1)
try:
from openai import AsyncOpenAI
# Initialize OpenAI client
openai = AsyncOpenAI(api_key=api_key)
print("🎙️ OpenAI TTS")
print("=" * 15)
# Get text from command line argument or use default
if len(sys.argv) > 1:
text = " ".join(sys.argv[1:]) # Join all arguments as text
else:
text = "Task completed successfully!"
print(f"🎯 Text: {text}")
print("🔊 Generating audio...")
try:
# Generate audio using OpenAI TTS
response = await openai.audio.speech.create(
model="tts-1",
voice="nova",
input=text,
response_format="mp3",
)
# Save to temporary file
audio_file = Path.home() / "Desktop" / "tts_completion.mp3"
with open(audio_file, "wb") as f:
async for chunk in response.iter_bytes():
f.write(chunk)
print("🎵 Playing audio...")
# Play the audio file
import subprocess
if sys.platform == "darwin": # macOS
subprocess.run(["afplay", str(audio_file)], capture_output=True)
elif sys.platform == "linux": # Linux
subprocess.run(["aplay", str(audio_file)], capture_output=True)
elif sys.platform == "win32": # Windows
subprocess.run(["start", str(audio_file)], shell=True, capture_output=True)
print("✅ Playback complete!")
# Clean up the temporary file
try:
audio_file.unlink()
except:
pass
except Exception as e:
print(f"❌ Error: {e}")
except ImportError as e:
print("❌ Error: Required package not installed")
print("This script uses UV to auto-install dependencies.")
sys.exit(1)
except Exception as e:
print(f"❌ Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,238 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "python-dotenv",
# "anthropic",
# ]
# ///
"""
Plan Parser Utility
Uses Claude Haiku 4.5 to break down requirements into structured implementation plans.
Creates .titanium/plan.json with epics, stories, tasks, and agent assignments.
Usage:
uv run plan_parser.py <requirements_file> <project_path>
Example:
uv run plan_parser.py .titanium/requirements.md "$(pwd)"
Output:
- Creates .titanium/plan.json with structured plan
- Prints JSON to stdout
"""
import json
import sys
import os
from pathlib import Path
from dotenv import load_dotenv
def get_claude_model(task_type: str = "default") -> str:
"""
Get Claude model based on task complexity.
Args:
task_type: "complex" for large model, "default" for small model
Returns:
Model name string
"""
load_dotenv()
if task_type == "complex":
# Use large model (Sonnet) for complex tasks
return os.getenv("ANTHROPIC_LARGE_MODEL", "claude-sonnet-4-5-20250929")
else:
# Use small model (Haiku) for faster tasks
return os.getenv("ANTHROPIC_SMALL_MODEL", "claude-haiku-4-5-20251001")
def parse_requirements_to_plan(requirements_text: str, project_path: str) -> dict:
"""
Use Claude Haiku 4.5 to break down requirements into structured plan.
Args:
requirements_text: Requirements document text
project_path: Absolute path to project directory
Returns:
Structured plan dictionary with epics, stories, tasks
"""
# Load environment variables
load_dotenv()
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
print("Error: ANTHROPIC_API_KEY not found in environment variables", file=sys.stderr)
print("Please add your Anthropic API key to ~/.env file:", file=sys.stderr)
print("ANTHROPIC_API_KEY=sk-ant-your-key-here", file=sys.stderr)
sys.exit(1)
try:
from anthropic import Anthropic
client = Anthropic(api_key=api_key)
except ImportError:
print("Error: anthropic package not installed", file=sys.stderr)
print("This should be handled by uv automatically.", file=sys.stderr)
sys.exit(1)
# Build Claude prompt
prompt = f"""Analyze these requirements and create a structured implementation plan.
Requirements:
{requirements_text}
Create a JSON plan with this exact structure:
{{
"epics": [
{{
"name": "Epic name",
"description": "Epic description",
"stories": [
{{
"name": "Story name",
"description": "User story or technical description",
"tasks": [
{{
"name": "Task name",
"agent": "@agent-name",
"estimated_time": "30m",
"dependencies": []
}}
]
}}
]
}}
],
"agents_needed": ["@api-developer", "@frontend-developer"],
"estimated_total_time": "4h"
}}
Available agents to use:
- @product-manager: Requirements validation, clarification, acceptance criteria
- @api-developer: Backend APIs (REST/GraphQL), database, authentication
- @frontend-developer: UI/UX, React/Vue/etc, responsive design
- @devops-engineer: CI/CD, deployment, infrastructure, Docker/K8s
- @test-runner: Running tests, test execution, test reporting
- @tdd-specialist: Writing tests, test-driven development, test design
- @code-reviewer: Code review, best practices, code quality
- @security-scanner: Security vulnerabilities, security best practices
- @doc-writer: Technical documentation, API docs, README files
- @api-documenter: OpenAPI/Swagger specs, API documentation
- @debugger: Debugging, error analysis, troubleshooting
- @refactor: Code refactoring, code improvement, tech debt
- @project-planner: Project breakdown, task planning, estimation
- @shadcn-ui-builder: UI components using shadcn/ui library
- @meta-agent: Creating new custom agents
Guidelines:
1. Break down into logical epics (major features)
2. Each epic should have 1-5 stories
3. Each story should have 2-10 tasks
4. Assign the most appropriate agent to each task
5. Estimate time realistically (15m, 30m, 1h, 2h, etc.)
6. List dependencies between tasks (use task names)
7. Start with @product-manager for requirements validation
8. Always include @test-runner or @tdd-specialist for testing
9. Consider @security-scanner for auth/payment/sensitive features
10. End with @doc-writer for documentation
Return ONLY valid JSON, no markdown code blocks, no explanations."""
try:
# Get model (configurable via env var, defaults to Sonnet for complex epics)
model = get_claude_model("complex") # Use large model for complex epics
# Call Claude
response = client.messages.create(
model=model,
max_tokens=8192, # Increased for large epics with many stories
temperature=0.3, # Lower temperature for deterministic planning
messages=[{"role": "user", "content": prompt}]
)
plan_json = response.content[0].text.strip()
# Clean up markdown code blocks if present
if plan_json.startswith("```json"):
plan_json = plan_json[7:]
if plan_json.startswith("```"):
plan_json = plan_json[3:]
if plan_json.endswith("```"):
plan_json = plan_json[:-3]
plan_json = plan_json.strip()
# Parse and validate JSON
plan = json.loads(plan_json)
# Validate structure
if "epics" not in plan:
raise ValueError("Plan missing 'epics' field")
if "agents_needed" not in plan:
raise ValueError("Plan missing 'agents_needed' field")
if "estimated_total_time" not in plan:
raise ValueError("Plan missing 'estimated_total_time' field")
# Save plan to file
plan_path = Path(project_path) / ".titanium" / "plan.json"
plan_path.parent.mkdir(parents=True, exist_ok=True)
# Atomic write
temp_path = plan_path.with_suffix('.tmp')
with open(temp_path, 'w') as f:
json.dump(plan, f, indent=2)
temp_path.replace(plan_path)
return plan
except json.JSONDecodeError as e:
print(f"Error: Claude returned invalid JSON: {e}", file=sys.stderr)
print(f"Response was: {plan_json[:200]}...", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Error calling Claude API: {e}", file=sys.stderr)
sys.exit(1)
def main():
"""CLI interface for plan parsing."""
if len(sys.argv) < 3:
print("Usage: plan_parser.py <requirements_file> <project_path>", file=sys.stderr)
print("\nExample:", file=sys.stderr)
print(" uv run plan_parser.py .titanium/requirements.md \"$(pwd)\"", file=sys.stderr)
sys.exit(1)
requirements_file = sys.argv[1]
project_path = sys.argv[2]
# Validate requirements file exists
if not Path(requirements_file).exists():
print(f"Error: Requirements file not found: {requirements_file}", file=sys.stderr)
sys.exit(1)
# Read requirements
try:
with open(requirements_file, 'r') as f:
requirements_text = f.read()
except Exception as e:
print(f"Error reading requirements file: {e}", file=sys.stderr)
sys.exit(1)
if not requirements_text.strip():
print("Error: Requirements file is empty", file=sys.stderr)
sys.exit(1)
# Parse requirements to plan
plan = parse_requirements_to_plan(requirements_text, project_path)
# Output plan to stdout
print(json.dumps(plan, indent=2))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,253 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "python-dotenv",
# ]
# ///
"""
Workflow State Management Utility
Manages workflow state via file-based JSON storage in .titanium/workflow-state.json
Commands:
init <project_path> <workflow_type> <goal> Initialize new workflow
update_phase <project_path> <phase> <status> Update current phase
get <project_path> Get current state
complete <project_path> Mark workflow complete
Examples:
uv run workflow_state.py init "$(pwd)" "development" "Implement user auth"
uv run workflow_state.py update_phase "$(pwd)" "implementation" "in_progress"
uv run workflow_state.py get "$(pwd)"
uv run workflow_state.py complete "$(pwd)"
"""
import json
import sys
import os
from pathlib import Path
from datetime import datetime
# Constants
STATE_FILE = ".titanium/workflow-state.json"
def init_workflow(project_path: str, workflow_type: str, goal: str) -> dict:
"""
Initialize a new workflow state file.
Args:
project_path: Absolute path to project directory
workflow_type: Type of workflow (development, bug-fix, refactor, review)
goal: User's stated goal for this workflow
Returns:
Initial state dictionary
"""
state_path = Path(project_path) / STATE_FILE
state_path.parent.mkdir(parents=True, exist_ok=True)
state = {
"workflow_type": workflow_type,
"goal": goal,
"status": "planning",
"started_at": datetime.now().isoformat(),
"current_phase": "planning",
"phases": [],
"completed_tasks": [],
"pending_tasks": []
}
# Atomic write
temp_path = state_path.with_suffix('.tmp')
with open(temp_path, 'w') as f:
json.dump(state, f, indent=2)
temp_path.replace(state_path)
return state
def update_phase(project_path: str, phase_name: str, status: str = "in_progress") -> dict:
"""
Update current workflow phase.
Args:
project_path: Absolute path to project directory
phase_name: Name of phase (planning, implementation, review, completed)
status: Status of phase (in_progress, completed, failed)
Returns:
Updated state dictionary or None if state doesn't exist
"""
state_path = Path(project_path) / STATE_FILE
if not state_path.exists():
print(f"Error: No workflow state found at {state_path}", file=sys.stderr)
return None
# Read current state
with open(state_path, 'r') as f:
state = json.load(f)
# Update current phase and status
state["current_phase"] = phase_name
state["status"] = status
# Update or add phase
phase_exists = False
for i, p in enumerate(state["phases"]):
if p["name"] == phase_name:
# Preserve original started_at when updating existing phase
state["phases"][i]["status"] = status
# Only add completed_at if completing and doesn't already exist
if status == "completed" and "completed_at" not in state["phases"][i]:
state["phases"][i]["completed_at"] = datetime.now().isoformat()
phase_exists = True
break
if not phase_exists:
# Create new phase entry with current timestamp
phase_entry = {
"name": phase_name,
"status": status,
"started_at": datetime.now().isoformat()
}
if status == "completed":
phase_entry["completed_at"] = datetime.now().isoformat()
state["phases"].append(phase_entry)
# Atomic write
temp_path = state_path.with_suffix('.tmp')
with open(temp_path, 'w') as f:
json.dump(state, f, indent=2)
temp_path.replace(state_path)
return state
def get_state(project_path: str) -> dict:
"""
Get current workflow state.
Args:
project_path: Absolute path to project directory
Returns:
State dictionary or None if state doesn't exist
"""
state_path = Path(project_path) / STATE_FILE
if not state_path.exists():
return None
with open(state_path, 'r') as f:
return json.load(f)
def complete_workflow(project_path: str) -> dict:
"""
Mark workflow as complete.
Args:
project_path: Absolute path to project directory
Returns:
Updated state dictionary or None if state doesn't exist
"""
state_path = Path(project_path) / STATE_FILE
if not state_path.exists():
print(f"Error: No workflow state found at {state_path}", file=sys.stderr)
return None
# Read current state
with open(state_path, 'r') as f:
state = json.load(f)
# Update to completed
state["status"] = "completed"
state["current_phase"] = "completed"
state["completed_at"] = datetime.now().isoformat()
# Mark current phase as completed if it exists
if state["phases"]:
for phase in state["phases"]:
if phase["status"] == "in_progress":
phase["status"] = "completed"
phase["completed_at"] = datetime.now().isoformat()
# Atomic write
temp_path = state_path.with_suffix('.tmp')
with open(temp_path, 'w') as f:
json.dump(state, f, indent=2)
temp_path.replace(state_path)
return state
def main():
"""CLI interface for workflow state management."""
if len(sys.argv) < 3:
print("Usage: workflow_state.py <command> <project_path> [args...]", file=sys.stderr)
print("\nCommands:", file=sys.stderr)
print(" init <project_path> <workflow_type> <goal>", file=sys.stderr)
print(" update_phase <project_path> <phase> [status]", file=sys.stderr)
print(" get <project_path>", file=sys.stderr)
print(" complete <project_path>", file=sys.stderr)
sys.exit(1)
command = sys.argv[1]
project_path = sys.argv[2]
try:
if command == "init":
if len(sys.argv) < 5:
print("Error: init requires workflow_type and goal", file=sys.stderr)
sys.exit(1)
workflow_type = sys.argv[3]
goal = sys.argv[4]
state = init_workflow(project_path, workflow_type, goal)
print(json.dumps(state, indent=2))
elif command == "update_phase":
if len(sys.argv) < 4:
print("Error: update_phase requires phase_name", file=sys.stderr)
sys.exit(1)
phase_name = sys.argv[3]
status = sys.argv[4] if len(sys.argv) > 4 else "in_progress"
state = update_phase(project_path, phase_name, status)
if state:
print(json.dumps(state, indent=2))
else:
sys.exit(1)
elif command == "get":
state = get_state(project_path)
if state:
print(json.dumps(state, indent=2))
else:
print("No workflow found", file=sys.stderr)
sys.exit(1)
elif command == "complete":
state = complete_workflow(project_path)
if state:
print(json.dumps(state, indent=2))
else:
sys.exit(1)
else:
print(f"Error: Unknown command: {command}", file=sys.stderr)
print("\nValid commands: init, update_phase, get, complete", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Error: {str(e)}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()