Initial commit

This commit is contained in:
Zhongwei Li
2025-11-30 08:49:53 +08:00
commit f5b0a7389f
32 changed files with 2764 additions and 0 deletions

View File

@@ -0,0 +1,95 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "typer",
# "httpx",
# ]
# ///
"""
Remove all agent orchestrator sessions.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent / "lib"))
import typer
from typing import Optional
import httpx
app = typer.Typer(add_completion=False)
def delete_from_observability(session_id: str, observability_url: str) -> bool:
"""
Delete session from observability backend.
Returns True if successful or session not found, False on error.
"""
try:
response = httpx.delete(
f"{observability_url}/sessions/{session_id}",
timeout=2.0
)
# 200 = deleted, 404 = not found (both OK)
return response.status_code in (200, 404)
except Exception:
# Silent failure - don't block cleanup
return False
@app.command()
def main(
project_dir: Optional[Path] = typer.Option(None, "--project-dir", help="Project directory"),
sessions_dir: Optional[Path] = typer.Option(None, "--sessions-dir", help="Sessions directory"),
):
"""
Remove all sessions.
Examples:
ao-clean
ao-clean --sessions-dir /custom/path
"""
from config import load_config
from session import list_all_sessions
import shutil
try:
# Load configuration with CLI overrides
config = load_config(
cli_project_dir=str(project_dir) if project_dir else None,
cli_sessions_dir=str(sessions_dir) if sessions_dir else None,
cli_agents_dir=None,
)
# Check if sessions directory exists
if config.sessions_dir.exists():
# Delete from observability if enabled
if config.observability_enabled:
try:
sessions = list_all_sessions(config.sessions_dir)
for session_name, session_id, project_dir_path in sessions:
if session_id and session_id != "unknown":
delete_from_observability(
session_id,
config.observability_url
)
except Exception:
# Ignore errors - sessions might not exist or be incomplete
pass
# Remove entire directory
shutil.rmtree(config.sessions_dir)
print("All sessions removed")
else:
print("No sessions to remove")
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
raise typer.Exit(1)
if __name__ == "__main__":
app()

View File

@@ -0,0 +1,80 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "typer",
# ]
# ///
"""
Extract the result from a completed agent orchestrator session.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent / "lib"))
import typer
from typing import Optional
app = typer.Typer(add_completion=False)
@app.command()
def main(
session_name: str = typer.Argument(..., help="Name of the session"),
project_dir: Optional[Path] = typer.Option(None, "--project-dir", help="Project directory"),
sessions_dir: Optional[Path] = typer.Option(None, "--sessions-dir", help="Sessions directory"),
):
"""
Extract the result from a completed session.
Examples:
ao-get-result mysession
"""
from config import load_config
from session import validate_session_name, get_session_status, extract_result
try:
# Validate session name
validate_session_name(session_name)
# Load configuration with CLI overrides
config = load_config(
cli_project_dir=str(project_dir) if project_dir else None,
cli_sessions_dir=str(sessions_dir) if sessions_dir else None,
cli_agents_dir=None,
)
# Check session status
status = get_session_status(session_name, config.sessions_dir)
if status == "not_existent":
print(f"Error: Session '{session_name}' does not exist", file=sys.stderr)
raise typer.Exit(1)
if status == "running":
print(f"Error: Session '{session_name}' is still running. Wait for completion or check status with ao-status.", file=sys.stderr)
raise typer.Exit(1)
# Extract and print result
session_file = config.sessions_dir / f"{session_name}.jsonl"
result = extract_result(session_file)
print(result)
except ValueError as e:
# Session validation errors or result extraction errors
print(f"Error: {e}", file=sys.stderr)
raise typer.Exit(1)
except FileNotFoundError as e:
# Session file not found
print(f"Error: {e}", file=sys.stderr)
raise typer.Exit(1)
except Exception as e:
# Unexpected errors
print(f"Error: {e}", file=sys.stderr)
raise typer.Exit(1)
if __name__ == "__main__":
app()

View File

@@ -0,0 +1,70 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "typer",
# ]
# ///
"""
List all available agent definitions.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent / "lib"))
import typer
from typing import Optional
from config import load_config
from agent import list_all_agents
app = typer.Typer(add_completion=False)
@app.command()
def main(
project_dir: Optional[Path] = typer.Option(None, "--project-dir", help="Project directory"),
agents_dir: Optional[Path] = typer.Option(None, "--agents-dir", help="Agents directory"),
):
"""
List all available agent definitions.
Displays: agent name, description
Examples:
ao-list-agents
ao-list-agents --agents-dir /path/to/agents
ao-list-agents --project-dir /my/project
"""
# 1. Load configuration
config = load_config(
cli_project_dir=str(project_dir) if project_dir else None,
cli_agents_dir=str(agents_dir) if agents_dir else None,
)
# 2. Get list of agents (list_all_agents handles missing directory gracefully)
agents = list_all_agents(config.agents_dir)
# 3. Handle empty case
if not agents:
print("No agent definitions found")
return
# 4. Display agents with bash-compatible formatting
first = True
for name, description in agents:
if first:
first = False
else:
# Add separator before subsequent agents
print("---")
print()
print(f"{name}:")
print(description)
print()
if __name__ == "__main__":
app()

View File

@@ -0,0 +1,74 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "typer",
# ]
# ///
"""
List all agent orchestrator sessions with metadata.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent / "lib"))
import typer
from typing import Optional
app = typer.Typer(add_completion=False)
@app.command()
def main(
project_dir: Optional[Path] = typer.Option(None, "--project-dir", help="Project directory"),
sessions_dir: Optional[Path] = typer.Option(None, "--sessions-dir", help="Sessions directory"),
):
"""
List all sessions with metadata.
Displays: session name (session-id: session_id, project-dir: project_dir)
Examples:
ao-list-sessions
ao-list-sessions --sessions-dir /custom/path
"""
from config import load_config
from session import list_all_sessions
from utils import debug_log
# DEBUG LOGGING - Command entry
debug_log("COMMAND - ao-list-sessions", {
"cwd": str(Path.cwd()),
"argv": sys.argv,
"project_dir": str(project_dir) if project_dir else "None",
"sessions_dir": str(sessions_dir) if sessions_dir else "None",
})
try:
# Load configuration with CLI overrides
config = load_config(
cli_project_dir=str(project_dir) if project_dir else None,
cli_sessions_dir=str(sessions_dir) if sessions_dir else None,
cli_agents_dir=None,
)
# Get all sessions (list_all_sessions handles missing directory gracefully)
sessions = list_all_sessions(config.sessions_dir)
# Display results
if not sessions:
print("No sessions found")
else:
for session_name, session_id, project_dir_path in sessions:
print(f"{session_name} (session-id: {session_id}, project-dir: {project_dir_path})")
except Exception as e:
# Handle errors
print(f"Error: {e}", file=sys.stderr)
raise typer.Exit(1)
if __name__ == "__main__":
app()

View File

@@ -0,0 +1,186 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "claude-agent-sdk",
# "typer",
# "httpx",
# ]
# ///
"""
Create a new agent orchestrator session.
This command creates a new Claude AI session, optionally with an agent configuration.
"""
import sys
from pathlib import Path
# Add lib to path for shared modules
sys.path.insert(0, str(Path(__file__).parent / "lib"))
import typer
from typing import Optional
app = typer.Typer(add_completion=False)
@app.command()
def main(
session_name: str = typer.Argument(..., help="Name of the session"),
prompt: Optional[str] = typer.Option(None, "-p", "--prompt", help="Session prompt"),
agent: Optional[str] = typer.Option(None, "--agent", help="Agent to use"),
project_dir: Optional[Path] = typer.Option(None, "--project-dir", help="Project directory"),
sessions_dir: Optional[Path] = typer.Option(None, "--sessions-dir", help="Sessions directory"),
agents_dir: Optional[Path] = typer.Option(None, "--agents-dir", help="Agents directory"),
):
"""
Create a new agent orchestrator session.
Examples:
ao-new mysession -p "Write hello world"
ao-new research --agent web-researcher -p "Research AI"
cat prompt.md | ao-new mysession
"""
from config import load_config
from session import (
validate_session_name,
get_session_status,
save_session_metadata,
)
from agent import load_agent_config, build_mcp_servers_dict
from claude_client import run_session_sync
from utils import (
get_prompt_from_args_and_stdin,
ensure_directory_exists,
log_command,
log_result,
error_exit,
debug_log,
)
# DEBUG LOGGING - Command entry
debug_log("COMMAND - ao-new", {
"cwd": str(Path.cwd()),
"argv": sys.argv,
"session_name": session_name,
"prompt": prompt or "None (will read from stdin)",
"agent": agent or "None",
"project_dir": str(project_dir) if project_dir else "None",
"sessions_dir": str(sessions_dir) if sessions_dir else "None",
"agents_dir": str(agents_dir) if agents_dir else "None",
})
try:
# 1. Validate session name
validate_session_name(session_name)
# 2. Load configuration
config = load_config(
cli_project_dir=str(project_dir) if project_dir else None,
cli_sessions_dir=str(sessions_dir) if sessions_dir else None,
cli_agents_dir=str(agents_dir) if agents_dir else None,
)
# 3. Check session doesn't already exist
status = get_session_status(session_name, config.sessions_dir)
if status != "not_existent":
error_exit(
f"Session '{session_name}' already exists. "
"Use 'ao-resume' command to continue or choose a different name"
)
# 4. Get prompt (from -p and/or stdin)
user_prompt = get_prompt_from_args_and_stdin(prompt)
# 5. Load agent if specified
final_prompt = user_prompt
mcp_servers = None
agent_name = None
if agent:
agent_config = load_agent_config(agent, config.agents_dir)
agent_name = agent_config.name
# Prepend system prompt if available
if agent_config.system_prompt:
final_prompt = f"{agent_config.system_prompt}\n\n---\n\n{user_prompt}"
# Build MCP servers dict
mcp_servers = build_mcp_servers_dict(agent_config.mcp_config)
# 6. Ensure directories exist
ensure_directory_exists(config.sessions_dir)
# 7. STAGE 1: Save initial session metadata WITHOUT session_id
# This allows users to see the session was started
save_session_metadata(
session_name=session_name,
agent=agent_name,
project_dir=config.project_dir,
agents_dir=config.agents_dir,
sessions_dir=config.sessions_dir,
session_id=None, # Will be added in Stage 2 during streaming
)
# 8. Log command (if logging enabled)
if config.enable_logging:
# Build command string for logging (similar to bash)
mcp_info = f"with MCP servers: {list(mcp_servers.keys())}" if mcp_servers else "no MCP"
full_command = (
f"cd {config.project_dir} && "
f"query(prompt=<prompt>, cwd={config.project_dir}, "
f"permission_mode=bypassPermissions, {mcp_info})"
)
log_command(
session_name=session_name,
command_type="new",
agent_name=agent_name,
mcp_config_path=str(agent_config.mcp_config) if agent and agent_config.mcp_config else None,
full_command=full_command,
prompt=final_prompt,
sessions_dir=config.sessions_dir,
project_dir=config.project_dir,
agents_dir=config.agents_dir,
enable_logging=config.enable_logging,
)
# 9. Run Claude session
# STAGE 2 happens automatically during streaming when session_id is received
session_file = config.sessions_dir / f"{session_name}.jsonl"
session_id, result = run_session_sync(
prompt=final_prompt,
session_file=session_file,
project_dir=config.project_dir,
session_name=session_name, # For Stage 2 metadata update
sessions_dir=config.sessions_dir, # For Stage 2 metadata update
mcp_servers=mcp_servers,
observability_enabled=config.observability_enabled,
observability_url=config.observability_url,
agent_name=agent_name,
)
# 10. Log result (if logging enabled)
if config.enable_logging:
log_result(session_name, result, config.sessions_dir, config.enable_logging)
# 11. Print result to stdout
print(result)
except ValueError as e:
# Session validation errors, prompt errors, etc.
error_exit(str(e))
except FileNotFoundError as e:
# Agent not found, config file issues, etc.
error_exit(str(e))
except ImportError as e:
# SDK not installed
error_exit(str(e))
except Exception as e:
# Unexpected errors (SDK errors, etc.)
error_exit(f"Unexpected error: {e}")
if __name__ == "__main__":
app()

View File

@@ -0,0 +1,196 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "claude-agent-sdk",
# "typer",
# "httpx",
# ]
# ///
"""
Resume an existing agent orchestrator session.
This command continues a previous session with a new prompt.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent / "lib"))
import typer
from typing import Optional
app = typer.Typer(add_completion=False)
@app.command()
def main(
session_name: str = typer.Argument(..., help="Name of the session"),
prompt: Optional[str] = typer.Option(None, "-p", "--prompt", help="Continuation prompt"),
project_dir: Optional[Path] = typer.Option(None, "--project-dir", help="Project directory"),
sessions_dir: Optional[Path] = typer.Option(None, "--sessions-dir", help="Sessions directory"),
agents_dir: Optional[Path] = typer.Option(None, "--agents-dir", help="Agents directory"),
):
"""
Resume an existing agent orchestrator session.
Examples:
ao-resume mysession -p "Add error handling"
cat additional-requirements.md | ao-resume mysession
"""
from config import load_config
from session import (
validate_session_name,
get_session_status,
load_session_metadata,
update_session_metadata,
)
from agent import load_agent_config, build_mcp_servers_dict
from claude_client import run_session_sync
from utils import (
get_prompt_from_args_and_stdin,
log_command,
log_result,
error_exit,
debug_log,
)
# DEBUG LOGGING - Command entry
debug_log("COMMAND - ao-resume", {
"cwd": str(Path.cwd()),
"argv": sys.argv,
"session_name": session_name,
"prompt": prompt or "None (will read from stdin)",
"project_dir": str(project_dir) if project_dir else "None",
"sessions_dir": str(sessions_dir) if sessions_dir else "None",
"agents_dir": str(agents_dir) if agents_dir else "None",
})
try:
# 1. Validate session name
validate_session_name(session_name)
# 2. Load configuration
config = load_config(
cli_project_dir=str(project_dir) if project_dir else None,
cli_sessions_dir=str(sessions_dir) if sessions_dir else None,
cli_agents_dir=str(agents_dir) if agents_dir else None,
)
# 3. Check session exists
status = get_session_status(session_name, config.sessions_dir)
if status == "not_existent":
error_exit(
f"Session '{session_name}' does not exist. "
"Use 'ao-new' command to create it"
)
# 4. Load session metadata (contains session_id, agent, paths)
metadata = load_session_metadata(session_name, config.sessions_dir)
# 5. Context consistency validation
# Warn if CLI overrides differ from metadata (but don't fail)
if project_dir and Path(project_dir).resolve() != metadata.project_dir:
print(
f"Warning: --project-dir override ignored. "
f"Using session metadata value: {metadata.project_dir}",
file=sys.stderr,
)
if agents_dir and Path(agents_dir).resolve() != metadata.agents_dir:
print(
f"Warning: --agents-dir override ignored. "
f"Using session metadata value: {metadata.agents_dir}",
file=sys.stderr,
)
# 6. Extract session_id from metadata
resume_session_id = metadata.session_id
if not resume_session_id:
error_exit(
f"Session '{session_name}' has no session_id in metadata. "
"Cannot resume incomplete session"
)
# 7. Get prompt (from -p and/or stdin)
user_prompt = get_prompt_from_args_and_stdin(prompt)
# 8. Load agent configuration if session has agent
# NOTE: Do NOT prepend system prompt for resume (only for new sessions)
mcp_servers = None
agent_config = None
if metadata.agent:
agent_config = load_agent_config(metadata.agent, metadata.agents_dir)
# Build MCP servers dict for SDK
mcp_servers = build_mcp_servers_dict(agent_config.mcp_config)
# 9. Log command (if logging enabled)
if config.enable_logging:
# Build command string for logging (similar to bash)
mcp_info = f"with MCP servers: {list(mcp_servers.keys())}" if mcp_servers else "no MCP"
full_command = (
f"cd {metadata.project_dir} && "
f"query(prompt=<prompt>, cwd={metadata.project_dir}, "
f"permission_mode=bypassPermissions, resume={resume_session_id}, {mcp_info})"
)
log_command(
session_name=session_name,
command_type="resume",
agent_name=metadata.agent,
mcp_config_path=str(agent_config.mcp_config) if agent_config and agent_config.mcp_config else None,
full_command=full_command,
prompt=user_prompt,
sessions_dir=config.sessions_dir,
project_dir=config.project_dir,
agents_dir=config.agents_dir,
enable_logging=config.enable_logging,
)
# 10. Run Claude session with resume
# Use metadata paths (not CLI overrides) to ensure context consistency
session_file = config.sessions_dir / f"{session_name}.jsonl"
session_id, result = run_session_sync(
prompt=user_prompt, # No system prompt prepended!
session_file=session_file,
project_dir=metadata.project_dir, # From metadata
session_name=session_name,
sessions_dir=config.sessions_dir,
mcp_servers=mcp_servers,
resume_session_id=resume_session_id, # KEY: Enable resume
observability_enabled=config.observability_enabled,
observability_url=config.observability_url,
agent_name=metadata.agent,
)
# 11. Update session metadata timestamp
update_session_metadata(session_name, config.sessions_dir)
# 12. Log result (if logging enabled)
if config.enable_logging:
log_result(session_name, result, config.sessions_dir, config.enable_logging)
# 13. Print result to stdout
print(result)
except ValueError as e:
# Session validation errors, prompt errors, etc.
error_exit(str(e))
except FileNotFoundError as e:
# Session not found, agent not found, etc.
error_exit(str(e))
except ImportError as e:
# SDK not installed
error_exit(
f"Claude SDK not installed: {e}\n"
"Install with: uv pip install claude-agent-sdk"
)
except Exception as e:
# Unexpected errors (SDK errors, etc.)
error_exit(f"Unexpected error: {e}")
if __name__ == "__main__":
app()

View File

@@ -0,0 +1,92 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "typer",
# ]
# ///
"""
Display session configuration and context.
"""
import sys
import json
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent / "lib"))
import typer
from typing import Optional
app = typer.Typer(add_completion=False)
@app.command()
def main(
session_name: str = typer.Argument(..., help="Name of the session"),
project_dir: Optional[Path] = typer.Option(None, "--project-dir", help="Project directory"),
sessions_dir: Optional[Path] = typer.Option(None, "--sessions-dir", help="Sessions directory"),
):
"""
Display session configuration and context.
Shows: agent used, directories, session metadata
Examples:
ao-show-config mysession
ao-show-config mysession --project-dir /my/project
"""
from config import load_config
from session import validate_session_name, load_session_metadata
try:
# Validate session name
validate_session_name(session_name)
# Load configuration with CLI overrides
config = load_config(
cli_project_dir=str(project_dir) if project_dir else None,
cli_sessions_dir=str(sessions_dir) if sessions_dir else None,
cli_agents_dir=None,
)
# Check if session exists
meta_file = config.sessions_dir / f"{session_name}.meta.json"
if not meta_file.exists():
print(f"Error: Session '{session_name}' does not exist", file=sys.stderr)
raise typer.Exit(1)
# Load session metadata
metadata = load_session_metadata(session_name, config.sessions_dir)
# Display configuration matching bash format
print(f"Configuration for session '{session_name}':")
print(f" Session file: {config.sessions_dir}/{session_name}.jsonl")
print(f" Project dir: {metadata.project_dir} (from meta.json)")
print(f" Agents dir: {metadata.agents_dir} (from meta.json)")
print(f" Sessions dir: {config.sessions_dir} (current)")
print(f" Agent: {metadata.agent if metadata.agent else 'none'}")
print(f" Created: {metadata.created_at.isoformat()}Z")
print(f" Last resumed: {metadata.last_resumed_at.isoformat()}Z")
print(f" Schema version: {metadata.schema_version}")
except ValueError as e:
# Session validation errors
print(f"Error: {e}", file=sys.stderr)
raise typer.Exit(1)
except FileNotFoundError as e:
# Session metadata not found
print(f"Error: {e}", file=sys.stderr)
raise typer.Exit(1)
except (json.JSONDecodeError, KeyError) as e:
# JSON parsing errors
print(f"Error: Invalid session metadata: {e}", file=sys.stderr)
raise typer.Exit(1)
except Exception as e:
# Unexpected errors
print(f"Error: {e}", file=sys.stderr)
raise typer.Exit(1)
if __name__ == "__main__":
app()

View File

@@ -0,0 +1,68 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "typer",
# ]
# ///
"""
Check the status of an agent orchestrator session.
Returns: running, finished, or not_existent
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent / "lib"))
import typer
from typing import Optional
app = typer.Typer(add_completion=False)
@app.command()
def main(
session_name: str = typer.Argument(..., help="Name of the session"),
project_dir: Optional[Path] = typer.Option(None, "--project-dir", help="Project directory"),
sessions_dir: Optional[Path] = typer.Option(None, "--sessions-dir", help="Sessions directory"),
):
"""
Check the status of a session.
Outputs one of: running, finished, not_existent
Examples:
ao-status mysession
"""
from config import load_config
from session import validate_session_name, get_session_status
try:
# Validate session name
validate_session_name(session_name)
# Load configuration with CLI overrides
config = load_config(
cli_project_dir=str(project_dir) if project_dir else None,
cli_sessions_dir=str(sessions_dir) if sessions_dir else None,
cli_agents_dir=None,
)
# Get and print session status
status = get_session_status(session_name, config.sessions_dir)
print(status)
except ValueError as e:
# Session validation errors
print(f"Error: {e}", file=sys.stderr)
raise typer.Exit(1)
except Exception as e:
# Unexpected errors
print(f"Error: {e}", file=sys.stderr)
raise typer.Exit(1)
if __name__ == "__main__":
app()