Initial commit

This commit is contained in:
Zhongwei Li
2025-11-29 17:57:28 +08:00
commit e063391898
27 changed files with 3055 additions and 0 deletions

140
hooks/scripts/notification.py Executable file
View File

@@ -0,0 +1,140 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "python-dotenv",
# ]
# ///
import argparse
import json
import os
import random
import subprocess
import sys
from pathlib import Path
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass # dotenv is optional
def get_tts_script_path():
"""
Determine which TTS script to use based on available API keys.
Priority order: ElevenLabs > OpenAI > pyttsx3
"""
# Get current script directory and construct utils/tts path
script_dir = Path(__file__).parent
tts_dir = script_dir / "utils" / "tts"
# Check for ElevenLabs API key (highest priority)
if os.getenv("ELEVENLABS_API_KEY"):
elevenlabs_script = tts_dir / "elevenlabs_tts.py"
if elevenlabs_script.exists():
return str(elevenlabs_script)
# Check for OpenAI API key (second priority)
if os.getenv("OPENAI_API_KEY"):
openai_script = tts_dir / "openai_tts.py"
if openai_script.exists():
return str(openai_script)
# Fall back to pyttsx3 (no API key required)
pyttsx3_script = tts_dir / "pyttsx3_tts.py"
if pyttsx3_script.exists():
return str(pyttsx3_script)
return None
def announce_notification():
"""Announce that the agent needs user input."""
try:
tts_script = get_tts_script_path()
if not tts_script:
return # No TTS scripts available
# Get engineer name if available
engineer_name = os.getenv("ENGINEER_NAME", "").strip()
# Create notification message with 30% chance to include name
if engineer_name and random.random() < 0.3:
notification_message = f"{engineer_name}, your agent needs your input"
else:
notification_message = "Your agent needs your input"
# Call the TTS script with the notification message
subprocess.run(
["uv", "run", tts_script, notification_message],
capture_output=True, # Suppress output
timeout=10, # 10-second timeout
)
except (subprocess.TimeoutExpired, subprocess.SubprocessError, FileNotFoundError):
# Fail silently if TTS encounters issues
pass
except Exception:
# Fail silently for any other errors
pass
def main():
try:
# Parse command line arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"--notify", action="store_true", help="Enable TTS notifications"
)
args = parser.parse_args()
# Read JSON input from stdin
input_data = json.loads(sys.stdin.read())
# Ensure log directory exists
import os
log_dir = os.path.join(os.getcwd(), "logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "notification.json")
# Read existing log data or initialize empty list
if os.path.exists(log_file):
with open(log_file) as f:
try:
log_data = json.load(f)
except (json.JSONDecodeError, ValueError):
log_data = []
else:
log_data = []
# Append new data
log_data.append(input_data)
# Write back to file with formatting
with open(log_file, "w") as f:
json.dump(log_data, f, indent=2)
# Announce notification via TTS only if --notify flag is set
# Skip TTS for the generic "Claude is waiting for your input" message
if (
args.notify
and input_data.get("message") != "Claude is waiting for your input"
):
announce_notification()
sys.exit(0)
except json.JSONDecodeError:
# Handle JSON decode errors gracefully
sys.exit(0)
except Exception:
# Handle any other errors gracefully
sys.exit(0)
if __name__ == "__main__":
main()

89
hooks/scripts/post_tool_use.py Executable file
View File

@@ -0,0 +1,89 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.8"
# ///
import json
import subprocess
import sys
from datetime import datetime
from pathlib import Path
def check_and_fix_structure():
"""Run structure enforcement after file operations."""
try:
# Only run structure check for file-writing tools
project_root = Path.cwd()
enforce_script = project_root / "src" / "commands" / "enforce-structure.js"
if enforce_script.exists():
# Run structure enforcement with auto-fix
result = subprocess.run(
["node", str(enforce_script), "--fix"],
capture_output=True,
text=True,
cwd=project_root,
)
# If violations were found and fixed, print the output
if result.returncode == 0 and "Fixed" in result.stdout:
print("🔧 Structure enforcement auto-fix applied:", file=sys.stderr)
print(result.stdout, file=sys.stderr)
except Exception:
# Don't fail the hook if structure enforcement fails
pass
def main():
try:
# Read JSON input from stdin
input_data = json.load(sys.stdin)
# Check if this was a file-writing operation
tool_name = input_data.get("tool_name", "")
file_writing_tools = {"Write", "Edit", "MultiEdit"}
# Run structure enforcement for file-writing tools
if tool_name in file_writing_tools:
check_and_fix_structure()
# Ensure log directory exists
log_dir = Path.cwd() / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
log_path = log_dir / "post_tool_use.json"
# Read existing log data or initialize empty list
if log_path.exists():
with open(log_path) as f:
try:
log_data = json.load(f)
except (json.JSONDecodeError, ValueError):
log_data = []
else:
log_data = []
# Add timestamp to the log entry
timestamp = datetime.now().strftime("%b %d, %I:%M%p").lower()
input_data["timestamp"] = timestamp
# Append new data
log_data.append(input_data)
# Write back to file with formatting
with open(log_path, "w") as f:
json.dump(log_data, f, indent=2)
sys.exit(0)
except json.JSONDecodeError:
# Handle JSON decode errors gracefully
sys.exit(0)
except Exception:
# Exit cleanly on any other error
sys.exit(0)
if __name__ == "__main__":
main()

575
hooks/scripts/pre_tool_use.py Executable file
View File

@@ -0,0 +1,575 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.8"
# ///
import hashlib
import json
import os
import re
import shlex
import sys
import time
import shutil
from datetime import datetime
from pathlib import Path
def is_dangerous_deletion_command(command):
"""
Token-based detection of destructive commands.
Uses shlex.split() to properly tokenize the command and check the first token
against known destructive commands, avoiding false positives from substrings.
"""
if not command or not command.strip():
return False
# Try to tokenize the command
try:
tokens = shlex.split(command.lower())
except ValueError:
# If tokenization fails, fall back to basic split
tokens = command.lower().split()
if not tokens:
return False
first_token = tokens[0]
# List of known destructive commands
destructive_commands = {
# File deletion
'rm', 'unlink', 'rmdir',
# File system operations
'dd', 'shred', 'wipe', 'srm', 'trash',
# Truncation
'truncate',
# Package managers
'pip', 'npm', 'yarn', 'conda', 'apt', 'yum', 'brew',
# System operations
'kill', 'killall', 'pkill', 'fuser',
'umount', 'swapoff', 'fdisk', 'mkfs', 'format',
# Archive operations
'tar', 'zip', 'unzip', 'gunzip', 'bunzip2', 'unxz', '7z',
# Database operations (if run as commands)
'mongo', 'psql', 'mysql',
}
# Check if the first token is a destructive command
if first_token in destructive_commands:
# For package managers, check if they're doing destructive operations
if first_token in {'npm', 'yarn', 'pip', 'conda', 'apt', 'yum', 'brew'}:
destructive_verbs = {'uninstall', 'remove', 'rm', 'purge'}
return any(verb in tokens for verb in destructive_verbs)
# For archive commands, check for destructive flags
if first_token in {'tar', 'zip', '7z'}:
destructive_flags = {'--delete', '-d', 'd'}
return any(flag in tokens for flag in destructive_flags)
# For gunzip, bunzip2, unxz - these delete source by default
if first_token in {'gunzip', 'bunzip2', 'unxz'}:
return '--keep' not in tokens and '-k' not in tokens
# All other destructive commands are blocked by default
return True
# Check for output redirection that overwrites files (>)
if '>' in command and '>>' not in command:
# Allow redirection to /dev/null
if '/dev/null' not in command:
return True
return False
def is_env_file_access(tool_name, tool_input):
"""
Check if any tool is trying to access .env files containing sensitive data.
Allows reading .env files but blocks editing/writing operations.
Also allows access to .env.sample and .env.example files.
"""
if tool_name in ["Read", "Edit", "MultiEdit", "Write", "Bash"]:
if tool_name in ["Edit", "MultiEdit", "Write"]:
file_path = tool_input.get("file_path", "")
if ".env" in file_path and not (
file_path.endswith(".env.sample") or file_path.endswith(".env.example")
):
return True
elif tool_name == "Bash":
command = tool_input.get("command", "")
env_write_patterns = [
r"echo\s+.*>\s*\.env\b(?!\.sample|\.example)",
r"touch\s+.*\.env\b(?!\.sample|\.example)",
r"cp\s+.*\.env\b(?!\.sample|\.example)",
r"mv\s+.*\.env\b(?!\.sample|\.example)",
r">\s*\.env\b(?!\.sample|\.example)",
r">>\s*\.env\b(?!\.sample|\.example)",
r"vim\s+.*\.env\b(?!\.sample|\.example)",
r"nano\s+.*\.env\b(?!\.sample|\.example)",
r"emacs\s+.*\.env\b(?!\.sample|\.example)",
r"sed\s+.*-i.*\.env\b(?!\.sample|\.example)",
]
for pattern in env_write_patterns:
if re.search(pattern, command):
return True
return False
def is_command_file_access(tool_name, tool_input):
"""
Check if any tool is trying to access .claude/commands/ files.
This now only provides warnings, not blocks, to avoid workflow disruption.
"""
if tool_name not in ["Write", "Edit", "MultiEdit"]:
return False
file_path = tool_input.get("file_path", "")
if not file_path:
return False
normalized_path = os.path.normpath(file_path)
is_commands_file = (
"/.claude/commands/" in normalized_path
or normalized_path.startswith(".claude/commands/")
or normalized_path.startswith(".claude\\commands\\")
or "/.claude/commands/" in normalized_path
or normalized_path.endswith("/.claude/commands")
or normalized_path.endswith("\\.claude\\commands")
)
return is_commands_file
def check_root_structure_violations(tool_name, tool_input):
"""
Check if any tool is trying to create files in the root directory that violate project structure.
Only certain specific .md files are allowed in the root.
"""
if tool_name not in ["Write", "Edit", "MultiEdit"]:
return False
file_path = tool_input.get("file_path", "")
if not file_path:
return False
normalized_path = os.path.normpath(file_path)
path_parts = normalized_path.split(os.sep)
if len(path_parts) == 1 or (len(path_parts) == 2 and path_parts[0] == "."):
filename = path_parts[-1]
allowed_root_md_files = {
"README.md",
"CHANGELOG.md",
"CLAUDE.md",
"ROADMAP.md",
"SECURITY.md",
}
if filename.endswith(".md"):
if filename not in allowed_root_md_files:
return True
config_extensions = {".json", ".yaml", ".yml", ".toml", ".ini", ".env"}
if any(filename.endswith(ext) for ext in config_extensions):
allowed_root_configs = {
"package.json",
"package-lock.json",
"yarn.lock",
"pnpm-lock.yaml",
"pyproject.toml",
"requirements.txt",
"Cargo.toml",
"Cargo.lock",
"go.mod",
"go.sum",
}
if filename not in allowed_root_configs:
return True
script_extensions = {".sh", ".py", ".js", ".ts", ".rb", ".pl", ".php"}
if any(filename.endswith(ext) for ext in script_extensions):
return True
return False
def get_claude_session_id():
"""Generate or retrieve a unique session ID for Claude interactions."""
session_file = Path.home() / ".cache" / "claude" / "session_id"
session_file.parent.mkdir(parents=True, exist_ok=True)
if session_file.exists():
try:
with open(session_file) as f:
session_id = f.read().strip()
if session_id:
return session_id
except Exception:
pass
session_id = hashlib.md5(str(time.time()).encode()).hexdigest()[:8]
try:
with open(session_file, "w") as f:
f.write(session_id)
except Exception:
pass
return session_id
# -----------------------------
# SAFE TRASH (ultra-conservative)
# -----------------------------
REPO_ROOT = Path.cwd().resolve()
MAX_TRASH_BYTES = 20 * 1024 * 1024 # 20MB cap
TRASH_DIR = REPO_ROOT / ".trash"
def _is_simple_relpath(p: str) -> bool:
# disallow globs and backrefs; must not be absolute
if not p or p.startswith("-"):
return False
bad_tokens = ["*", "?", "[", "]", ".."]
if any(b in p for b in bad_tokens):
return False
return not os.path.isabs(p)
def _resolve_inside_repo(raw_path: str) -> Path | None:
try:
candidate = (Path.cwd() / raw_path).resolve()
except Exception:
return None
try:
if str(candidate).startswith(str(REPO_ROOT) + os.sep) or str(candidate) == str(
REPO_ROOT
):
return candidate
return None
except Exception:
return None
def _is_denied_path(p: Path) -> bool:
try:
rel = p.resolve().relative_to(REPO_ROOT)
except Exception:
return True
s = str(rel)
if s == ".env" or s.endswith(os.sep + ".env"):
return True
parts = set(s.split(os.sep))
# Never touch these; also forbids any nested target within these dirs
denied_dirs = {"node_modules", "venv", "dist", "build", ".trash", "logs"}
if parts.intersection(denied_dirs):
return True
return False
def _is_regular_and_small(p: Path, max_bytes: int = MAX_TRASH_BYTES) -> bool:
try:
st = p.stat()
return p.is_file() and not p.is_symlink() and st.st_size <= max_bytes
except Exception:
return False
def _trash_destination_for(p: Path) -> Path:
ts = datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
bucket = TRASH_DIR / ts
rel = p.resolve().relative_to(REPO_ROOT)
dest = bucket / rel
dest.parent.mkdir(parents=True, exist_ok=True)
return dest
def _append_trash_log(original: Path, moved_to: Path, session_id: str):
try:
log_dir = REPO_ROOT / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
log_path = log_dir / "pre_tool_use.json"
entry = {
"tool_name": "Bash",
"tool_input": {"command": f"safe_trash {original}"},
"session_id": session_id,
"hook_event_name": "PreToolUse",
"decision": "approved",
"working_directory": str(Path.cwd()),
"reason": "allowed_trash_command",
"timestamp": datetime.now().strftime("%b %d, %I:%M%p").lower(),
"moved_from": str(original),
"moved_to": str(moved_to),
}
if log_path.exists():
try:
with open(log_path) as f:
existing = json.load(f)
except Exception:
existing = []
else:
existing = []
existing.append(entry)
with open(log_path, "w") as f:
json.dump(existing, f, indent=2)
except Exception:
pass
def is_allowed_trash_command(command: str) -> tuple[bool, str | None]:
"""
Allow exactly one ultra-safe pattern:
safe_trash <relative-file>
We intentionally DO NOT allow multi-args, globs, or directories.
Returns (allowed, resolved_absolute_path | None).
"""
if not command:
return (False, None)
normalized = " ".join(command.strip().split())
m = re.match(r"^safe_trash\s+([^\s]+)$", normalized)
if not m:
return (False, None)
raw_path = m.group(1)
if not _is_simple_relpath(raw_path):
return (False, None)
target = _resolve_inside_repo(raw_path)
if target is None:
return (False, None)
if _is_denied_path(target):
return (False, None)
if not _is_regular_and_small(target):
return (False, None)
return (True, str(target))
def handle_safe_trash(command: str, session_id: str) -> bool:
"""
If command matches safe_trash policy, move the file into ./.trash/<timestamp>/...
Returns True if we handled it here (and external command should be blocked).
"""
allowed, target_s = is_allowed_trash_command(command)
if not allowed:
return False
target = Path(target_s)
dest = _trash_destination_for(target)
try:
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(target), str(dest))
_append_trash_log(target, dest, session_id)
log_tool_call(
"Bash",
{"command": command},
"approved",
"allowed_trash_command",
f"target={target}",
)
print(
f"✅ safe_trash moved file:\n from: {target}\n to: {dest}",
file=sys.stderr,
)
print(
" External command was intercepted by pre_tool_use hook (no shell execution).",
file=sys.stderr,
)
return True
except Exception as e:
print(f"safe_trash error: {e}", file=sys.stderr)
return False
def log_tool_call(tool_name, tool_input, decision, reason=None, block_message=None):
"""Log all tool calls with their decisions to a structured JSON file."""
try:
session_id = get_claude_session_id()
input_data = {
"tool_name": tool_name,
"tool_input": tool_input,
"session_id": session_id,
"hook_event_name": "PreToolUse",
"decision": decision,
"working_directory": str(Path.cwd()),
}
if reason:
input_data["reason"] = reason
if block_message:
input_data["block_message"] = block_message
log_dir = Path.cwd() / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
log_path = log_dir / "pre_tool_use.json"
if log_path.exists():
with open(log_path) as f:
try:
log_data = json.load(f)
except (json.JSONDecodeError, ValueError):
log_data = []
else:
log_data = []
timestamp = datetime.now().strftime("%b %d, %I:%M%p").lower()
input_data["timestamp"] = timestamp
log_data.append(input_data)
with open(log_path, "w") as f:
json.dump(log_data, f, indent=2)
except Exception as e:
print(f"Logging error: {e}", file=sys.stderr)
def main():
try:
input_data = json.load(sys.stdin)
tool_name = input_data.get("tool_name", "")
tool_input = input_data.get("tool_input", {})
if not tool_name:
print("Error: No tool_name provided in input", file=sys.stderr)
sys.exit(1)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON input: {e}", file=sys.stderr)
sys.exit(1)
try:
# Early-intercept: handle ultra-safe trash command inline to avoid any shell-side surprises
if tool_name == "Bash":
command = tool_input.get("command", "")
if handle_safe_trash(command, get_claude_session_id()):
sys.exit(2)
# Check for .env file access violations
if is_env_file_access(tool_name, tool_input):
block_message = "Access to .env files containing sensitive data is prohibited"
log_tool_call(
tool_name, tool_input, "blocked", "env_file_access", block_message
)
print(
"BLOCKED: Access to .env files containing sensitive data is prohibited",
file=sys.stderr,
)
print("Use .env.sample for template files instead", file=sys.stderr)
sys.exit(2)
# Block ALL forms of deletion and destructive operations
if tool_name == "Bash":
command = tool_input.get("command", "")
if is_dangerous_deletion_command(command):
block_message = (
"Destructive command detected and blocked for data protection"
)
log_tool_call(
tool_name,
tool_input,
"blocked",
"dangerous_deletion_command",
block_message,
)
print(
"🚫 DELETION PROTECTION: ALL destructive operations are BLOCKED",
file=sys.stderr,
)
print("", file=sys.stderr)
print("🛡️ PROTECTED OPERATIONS:", file=sys.stderr)
print(" • File deletion (rm, unlink, rmdir)", file=sys.stderr)
print(" • Directory removal (rm -r, rm -rf)", file=sys.stderr)
print(" • File overwriting (>, echo >, cat >)", file=sys.stderr)
print(" • Truncation (truncate, :>, /dev/null)", file=sys.stderr)
print(" • Package removal (npm uninstall, pip uninstall)", file=sys.stderr)
print(" • Database drops (DROP TABLE, DELETE FROM)", file=sys.stderr)
print(" • System operations (kill -9, format, fdisk)", file=sys.stderr)
print(" • Archive destructive ops (tar --delete)", file=sys.stderr)
print(" • Dangerous paths (/, ~, *, .., system dirs)", file=sys.stderr)
print("", file=sys.stderr)
print("💡 SAFE ALTERNATIVES:", file=sys.stderr)
print(" • Use 'mv' to relocate instead of delete", file=sys.stderr)
print(" • Use 'cp' to backup before changes", file=sys.stderr)
print(" • Use '>>' to append instead of overwrite", file=sys.stderr)
print(" • Use specific file paths (no wildcards)", file=sys.stderr)
print(
" • Request manual confirmation for destructive operations",
file=sys.stderr,
)
print("", file=sys.stderr)
print("🔒 This protection ensures NO accidental data loss", file=sys.stderr)
sys.exit(2)
# Check for root directory structure violations
if check_root_structure_violations(tool_name, tool_input):
file_path = tool_input.get("file_path", "")
filename = os.path.basename(file_path)
block_message = f"Root structure violation: unauthorized file {filename} in root directory"
log_tool_call(
tool_name,
tool_input,
"blocked",
"root_structure_violation",
block_message,
)
print("🚫 ROOT STRUCTURE VIOLATION BLOCKED", file=sys.stderr)
print(f" File: {filename}", file=sys.stderr)
print(" Reason: Unauthorized file in root directory", file=sys.stderr)
print("", file=sys.stderr)
print("📋 Root directory rules:", file=sys.stderr)
print(
" • Only these .md files allowed: README.md, CHANGELOG.md, CLAUDE.md, ROADMAP.md, SECURITY.md",
file=sys.stderr,
)
print(" • Config files belong in config/ directory", file=sys.stderr)
print(" • Scripts belong in scripts/ directory", file=sys.stderr)
print(" • Documentation belongs in docs/ directory", file=sys.stderr)
print("", file=sys.stderr)
print(
"💡 Suggestion: Use /enforce-structure --fix to auto-organize files",
file=sys.stderr,
)
sys.exit(2)
# WARNING (not blocking) for command file access
if is_command_file_access(tool_name, tool_input):
file_path = tool_input.get("file_path", "")
filename = os.path.basename(file_path)
log_tool_call(
tool_name,
tool_input,
"approved",
"command_file_warning",
f"Warning: modifying command file {filename}",
)
print(f"⚠️ COMMAND FILE MODIFICATION: {filename}", file=sys.stderr)
print(" Location: .claude/commands/", file=sys.stderr)
print(" Impact: May affect Claude's available commands", file=sys.stderr)
print("", file=sys.stderr)
print("💡 Best practices:", file=sys.stderr)
print(" • Test command changes carefully", file=sys.stderr)
print(" • Document any custom commands", file=sys.stderr)
print(" • Consider using /create-command for new commands", file=sys.stderr)
print("", file=sys.stderr)
except Exception as e:
print(f"Pre-tool use hook error: {e}", file=sys.stderr)
log_tool_call(
tool_name, tool_input, "approved", "hook_error", f"Hook error occurred: {e}"
)
# If we get here, the tool call is allowed - log as approved
log_tool_call(tool_name, tool_input, "approved")
sys.exit(0)
if __name__ == "__main__":
main()

224
hooks/scripts/session_start.py Executable file
View File

@@ -0,0 +1,224 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "python-dotenv",
# ]
# ///
import argparse
import json
import subprocess
import sys
from datetime import datetime
from pathlib import Path
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass # dotenv is optional
def log_session_start(input_data):
"""Log session start event to logs directory."""
# Ensure logs directory exists
log_dir = Path("logs")
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / "session_start.json"
# Read existing log data or initialize empty list
if log_file.exists():
with open(log_file) as f:
try:
log_data = json.load(f)
except (json.JSONDecodeError, ValueError):
log_data = []
else:
log_data = []
# Append the entire input data
log_data.append(input_data)
# Write back to file with formatting
with open(log_file, "w") as f:
json.dump(log_data, f, indent=2)
def get_git_status():
"""Get current git status information."""
try:
# Get current branch
branch_result = subprocess.run(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
capture_output=True,
text=True,
timeout=5,
)
current_branch = (
branch_result.stdout.strip() if branch_result.returncode == 0 else "unknown"
)
# Get uncommitted changes count
status_result = subprocess.run(
["git", "status", "--porcelain"], capture_output=True, text=True, timeout=5
)
if status_result.returncode == 0:
changes = (
status_result.stdout.strip().split("\n")
if status_result.stdout.strip()
else []
)
uncommitted_count = len(changes)
else:
uncommitted_count = 0
return current_branch, uncommitted_count
except Exception:
return None, None
def get_recent_issues():
"""Get recent GitHub issues if gh CLI is available."""
try:
# Check if gh is available
gh_check = subprocess.run(["which", "gh"], capture_output=True)
if gh_check.returncode != 0:
return None
# Get recent open issues
result = subprocess.run(
["gh", "issue", "list", "--limit", "5", "--state", "open"],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except Exception:
pass
return None
def load_development_context(source):
"""Load relevant development context based on session source."""
context_parts = []
# Add timestamp
context_parts.append(
f"Session started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
context_parts.append(f"Session source: {source}")
# Add git information
branch, changes = get_git_status()
if branch:
context_parts.append(f"Git branch: {branch}")
if changes > 0:
context_parts.append(f"Uncommitted changes: {changes} files")
# Load project-specific context files if they exist
context_files = [
".claude/CONTEXT.md",
".claude/TODO.md",
"TODO.md",
".github/ISSUE_TEMPLATE.md",
]
for file_path in context_files:
if Path(file_path).exists():
try:
with open(file_path) as f:
content = f.read().strip()
if content:
context_parts.append(f"\n--- Content from {file_path} ---")
context_parts.append(
content[:1000]
) # Limit to first 1000 chars
except Exception:
pass
# Add recent issues if available
issues = get_recent_issues()
if issues:
context_parts.append("\n--- Recent GitHub Issues ---")
context_parts.append(issues)
return "\n".join(context_parts)
def main():
try:
# Parse command line arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"--load-context",
action="store_true",
help="Load development context at session start",
)
parser.add_argument(
"--announce", action="store_true", help="Announce session start via TTS"
)
args = parser.parse_args()
# Read JSON input from stdin
input_data = json.loads(sys.stdin.read())
# Extract fields
session_id = input_data.get("session_id", "unknown")
source = input_data.get("source", "unknown") # "startup", "resume", or "clear"
# Log the session start event
log_session_start(input_data)
# Load development context if requested
if args.load_context:
context = load_development_context(source)
if context:
# Using JSON output to add context
output = {
"hookSpecificOutput": {
"hookEventName": "SessionStart",
"additionalContext": context,
}
}
print(json.dumps(output))
sys.exit(0)
# Announce session start if requested
if args.announce:
try:
# Try to use TTS to announce session start
script_dir = Path(__file__).parent
tts_script = script_dir / "utils" / "tts" / "pyttsx3_tts.py"
if tts_script.exists():
messages = {
"startup": "Claude Code session started",
"resume": "Resuming previous session",
"clear": "Starting fresh session",
}
message = messages.get(source, "Session started")
subprocess.run(
["uv", "run", str(tts_script), message],
capture_output=True,
timeout=5,
)
except Exception:
pass
# Success
sys.exit(0)
except json.JSONDecodeError:
# Handle JSON decode errors gracefully
sys.exit(0)
except Exception:
# Handle any other errors gracefully
sys.exit(0)
if __name__ == "__main__":
main()

214
hooks/scripts/stop.py Executable file
View File

@@ -0,0 +1,214 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "python-dotenv",
# ]
# ///
import argparse
import json
import os
import random
import subprocess
import sys
from pathlib import Path
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass # dotenv is optional
def get_completion_messages():
"""Return list of friendly completion messages."""
return [
"Work complete!",
"All done!",
"Task finished!",
"Job complete!",
"Ready for next task!",
]
def get_tts_script_path():
"""
Determine which TTS script to use based on available API keys.
Priority order: ElevenLabs > OpenAI > pyttsx3
"""
# Get current script directory and construct utils/tts path
script_dir = Path(__file__).parent
tts_dir = script_dir / "utils" / "tts"
# Check for ElevenLabs API key (highest priority)
if os.getenv("ELEVENLABS_API_KEY"):
elevenlabs_script = tts_dir / "elevenlabs_tts.py"
if elevenlabs_script.exists():
return str(elevenlabs_script)
# Check for OpenAI API key (second priority)
if os.getenv("OPENAI_API_KEY"):
openai_script = tts_dir / "openai_tts.py"
if openai_script.exists():
return str(openai_script)
# Fall back to pyttsx3 (no API key required)
pyttsx3_script = tts_dir / "pyttsx3_tts.py"
if pyttsx3_script.exists():
return str(pyttsx3_script)
return None
def get_llm_completion_message():
"""
Generate completion message using available LLM services.
Priority order: OpenAI > Anthropic > fallback to random message
Returns:
str: Generated or fallback completion message
"""
# Get current script directory and construct utils/llm path
script_dir = Path(__file__).parent
llm_dir = script_dir / "utils" / "llm"
# Try OpenAI first (highest priority)
if os.getenv("OPENAI_API_KEY"):
oai_script = llm_dir / "oai.py"
if oai_script.exists():
try:
result = subprocess.run(
["uv", "run", str(oai_script), "--completion"],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except (subprocess.TimeoutExpired, subprocess.SubprocessError):
pass
# Try Anthropic second
if os.getenv("ANTHROPIC_API_KEY"):
anth_script = llm_dir / "anth.py"
if anth_script.exists():
try:
result = subprocess.run(
["uv", "run", str(anth_script), "--completion"],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except (subprocess.TimeoutExpired, subprocess.SubprocessError):
pass
# Fallback to random predefined message
messages = get_completion_messages()
return random.choice(messages)
def announce_completion():
"""Announce completion using the best available TTS service."""
try:
tts_script = get_tts_script_path()
if not tts_script:
return # No TTS scripts available
# Get completion message (LLM-generated or fallback)
completion_message = get_llm_completion_message()
# Call the TTS script with the completion message
subprocess.run(
["uv", "run", tts_script, completion_message],
capture_output=True, # Suppress output
timeout=10, # 10-second timeout
)
except (subprocess.TimeoutExpired, subprocess.SubprocessError, FileNotFoundError):
# Fail silently if TTS encounters issues
pass
except Exception:
# Fail silently for any other errors
pass
def main():
try:
# Parse command line arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"--chat", action="store_true", help="Copy transcript to chat.json"
)
args = parser.parse_args()
# Read JSON input from stdin
input_data = json.load(sys.stdin)
# Extract required fields
session_id = input_data.get("session_id", "")
stop_hook_active = input_data.get("stop_hook_active", False)
# Ensure log directory exists
log_dir = os.path.join(os.getcwd(), "logs")
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, "stop.json")
# Read existing log data or initialize empty list
if os.path.exists(log_path):
with open(log_path) as f:
try:
log_data = json.load(f)
except (json.JSONDecodeError, ValueError):
log_data = []
else:
log_data = []
# Append new data
log_data.append(input_data)
# Write back to file with formatting
with open(log_path, "w") as f:
json.dump(log_data, f, indent=2)
# Handle --chat switch
if args.chat and "transcript_path" in input_data:
transcript_path = input_data["transcript_path"]
if os.path.exists(transcript_path):
# Read .jsonl file and convert to JSON array
chat_data = []
try:
with open(transcript_path) as f:
for line in f:
line = line.strip()
if line:
try:
chat_data.append(json.loads(line))
except json.JSONDecodeError:
pass # Skip invalid lines
# Write to logs/chat.json
chat_file = os.path.join(log_dir, "chat.json")
with open(chat_file, "w") as f:
json.dump(chat_data, f, indent=2)
except Exception:
pass # Fail silently
# Announce completion via TTS
announce_completion()
sys.exit(0)
except json.JSONDecodeError:
# Handle JSON decode errors gracefully
sys.exit(0)
except Exception:
# Handle any other errors gracefully
sys.exit(0)
if __name__ == "__main__":
main()