494 lines
15 KiB
Python
Executable File
494 lines
15 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
command_define.py – Implementation of the command.define Skill
|
||
Validates command manifests and registers them in the Command Registry.
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import json
|
||
import yaml
|
||
from typing import Dict, Any, List, Optional
|
||
from datetime import datetime, timezone
|
||
from pydantic import ValidationError as PydanticValidationError
|
||
|
||
|
||
from betty.config import (
|
||
BASE_DIR,
|
||
REQUIRED_COMMAND_FIELDS,
|
||
COMMANDS_REGISTRY_FILE,
|
||
REGISTRY_FILE,
|
||
AGENTS_REGISTRY_FILE,
|
||
)
|
||
from betty.enums import CommandExecutionType, CommandStatus
|
||
from betty.validation import (
|
||
validate_path,
|
||
validate_manifest_fields,
|
||
validate_command_name,
|
||
validate_version,
|
||
validate_command_execution_type
|
||
)
|
||
from betty.logging_utils import setup_logger
|
||
from betty.errors import format_error_response
|
||
from betty.models import CommandManifest
|
||
from betty.file_utils import atomic_write_json
|
||
|
||
logger = setup_logger(__name__)
|
||
|
||
|
||
class CommandValidationError(Exception):
|
||
"""Raised when command validation fails."""
|
||
pass
|
||
|
||
|
||
class CommandRegistryError(Exception):
|
||
"""Raised when command registry operations fail."""
|
||
pass
|
||
|
||
|
||
def build_response(ok: bool, path: str, errors: Optional[List[str]] = None, details: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
||
"""
|
||
Build standardized response dictionary.
|
||
|
||
Args:
|
||
ok: Whether operation succeeded
|
||
path: Path to command manifest
|
||
errors: List of error messages
|
||
details: Additional details
|
||
|
||
Returns:
|
||
Response dictionary
|
||
"""
|
||
response: Dict[str, Any] = {
|
||
"ok": ok,
|
||
"status": "success" if ok else "failed",
|
||
"errors": errors or [],
|
||
"path": path,
|
||
}
|
||
|
||
if details is not None:
|
||
response["details"] = details
|
||
|
||
return response
|
||
|
||
|
||
def load_command_manifest(path: str) -> Dict[str, Any]:
|
||
"""
|
||
Load and parse a command manifest from YAML file.
|
||
|
||
Args:
|
||
path: Path to command manifest file
|
||
|
||
Returns:
|
||
Parsed manifest dictionary
|
||
|
||
Raises:
|
||
CommandValidationError: If manifest cannot be loaded or parsed
|
||
"""
|
||
try:
|
||
with open(path) as f:
|
||
manifest = yaml.safe_load(f)
|
||
return manifest
|
||
except FileNotFoundError:
|
||
raise CommandValidationError(f"Manifest file not found: {path}")
|
||
except yaml.YAMLError as e:
|
||
raise CommandValidationError(f"Failed to parse YAML: {e}")
|
||
|
||
|
||
def load_skill_registry() -> Dict[str, Any]:
|
||
"""
|
||
Load skill registry for validation.
|
||
|
||
Returns:
|
||
Skill registry dictionary
|
||
|
||
Raises:
|
||
CommandValidationError: If registry cannot be loaded
|
||
"""
|
||
try:
|
||
with open(REGISTRY_FILE) as f:
|
||
return json.load(f)
|
||
except FileNotFoundError:
|
||
raise CommandValidationError(f"Skill registry not found: {REGISTRY_FILE}")
|
||
except json.JSONDecodeError as e:
|
||
raise CommandValidationError(f"Failed to parse skill registry: {e}")
|
||
|
||
|
||
def load_agent_registry() -> Dict[str, Any]:
|
||
"""
|
||
Load agent registry for validation.
|
||
|
||
Returns:
|
||
Agent registry dictionary
|
||
|
||
Raises:
|
||
CommandValidationError: If registry cannot be loaded
|
||
"""
|
||
try:
|
||
with open(AGENTS_REGISTRY_FILE) as f:
|
||
return json.load(f)
|
||
except FileNotFoundError:
|
||
raise CommandValidationError(f"Agent registry not found: {AGENTS_REGISTRY_FILE}")
|
||
except json.JSONDecodeError as e:
|
||
raise CommandValidationError(f"Failed to parse agent registry: {e}")
|
||
|
||
|
||
def validate_command_schema(manifest: Dict[str, Any]) -> List[str]:
|
||
"""
|
||
Validate command manifest using Pydantic schema.
|
||
|
||
Args:
|
||
manifest: Command manifest dictionary
|
||
|
||
Returns:
|
||
List of validation errors (empty if valid)
|
||
"""
|
||
errors: List[str] = []
|
||
|
||
try:
|
||
CommandManifest.model_validate(manifest)
|
||
logger.info("Pydantic schema validation passed for command manifest")
|
||
except PydanticValidationError as exc:
|
||
logger.warning("Pydantic schema validation failed for command manifest")
|
||
for error in exc.errors():
|
||
field = ".".join(str(loc) for loc in error["loc"])
|
||
message = error["msg"]
|
||
error_type = error["type"]
|
||
errors.append(f"Schema validation error at '{field}': {message} (type: {error_type})")
|
||
|
||
return errors
|
||
|
||
|
||
def validate_execution_target(execution: Dict[str, Any]) -> List[str]:
|
||
"""
|
||
Validate that the execution target exists in the appropriate registry.
|
||
|
||
Args:
|
||
execution: Execution configuration from manifest
|
||
|
||
Returns:
|
||
List of validation errors (empty if valid)
|
||
"""
|
||
errors = []
|
||
exec_type = execution.get("type")
|
||
target = execution.get("target")
|
||
|
||
if not target:
|
||
errors.append("execution.target is required")
|
||
return errors
|
||
|
||
try:
|
||
if exec_type == "skill":
|
||
# Validate skill exists
|
||
skill_registry = load_skill_registry()
|
||
registered_skills = {skill["name"] for skill in skill_registry.get("skills", [])}
|
||
if target not in registered_skills:
|
||
errors.append(f"Skill '{target}' not found in skill registry")
|
||
|
||
elif exec_type == "agent":
|
||
# Validate agent exists
|
||
agent_registry = load_agent_registry()
|
||
registered_agents = {agent["name"] for agent in agent_registry.get("agents", [])}
|
||
if target not in registered_agents:
|
||
errors.append(f"Agent '{target}' not found in agent registry")
|
||
|
||
elif exec_type == "workflow":
|
||
# Validate workflow file exists
|
||
workflow_path = os.path.join(BASE_DIR, "workflows", f"{target}.yaml")
|
||
if not os.path.exists(workflow_path):
|
||
errors.append(f"Workflow file not found: {workflow_path}")
|
||
|
||
except CommandValidationError as e:
|
||
errors.append(f"Could not validate target: {str(e)}")
|
||
|
||
return errors
|
||
|
||
|
||
def validate_manifest(path: str) -> Dict[str, Any]:
|
||
"""
|
||
Validate that a command manifest meets all requirements.
|
||
|
||
Validation checks:
|
||
1. Required fields are present
|
||
2. Name format is valid
|
||
3. Version format is valid
|
||
4. Execution type is valid
|
||
5. Execution target exists in appropriate registry
|
||
6. Parameters are properly formatted (if present)
|
||
|
||
Args:
|
||
path: Path to command manifest file
|
||
|
||
Returns:
|
||
Dictionary with validation results:
|
||
- valid: Boolean indicating if manifest is valid
|
||
- errors: List of validation errors (if any)
|
||
- manifest: The parsed manifest (if valid)
|
||
- path: Path to the manifest file
|
||
"""
|
||
validate_path(path, must_exist=True)
|
||
|
||
logger.info(f"Validating command manifest: {path}")
|
||
|
||
errors = []
|
||
|
||
# Load manifest
|
||
try:
|
||
manifest = load_command_manifest(path)
|
||
except CommandValidationError as e:
|
||
return {
|
||
"valid": False,
|
||
"errors": [str(e)],
|
||
"path": path
|
||
}
|
||
|
||
# Check required fields first so the message appears before schema errors
|
||
missing = validate_manifest_fields(manifest, REQUIRED_COMMAND_FIELDS)
|
||
if missing:
|
||
missing_message = f"Missing required fields: {', '.join(missing)}"
|
||
errors.append(missing_message)
|
||
logger.warning(f"Missing required fields: {missing}")
|
||
|
||
# Validate with Pydantic schema (keep going to surface custom errors too)
|
||
schema_errors = validate_command_schema(manifest)
|
||
errors.extend(schema_errors)
|
||
|
||
name = manifest.get("name")
|
||
if name is not None:
|
||
try:
|
||
validate_command_name(name)
|
||
except Exception as e:
|
||
errors.append(f"Invalid name: {str(e)}")
|
||
logger.warning(f"Invalid name: {e}")
|
||
|
||
version = manifest.get("version")
|
||
if version is not None:
|
||
try:
|
||
validate_version(version)
|
||
except Exception as e:
|
||
errors.append(f"Invalid version: {str(e)}")
|
||
logger.warning(f"Invalid version: {e}")
|
||
|
||
execution = manifest.get("execution")
|
||
if execution is None:
|
||
if "execution" not in missing:
|
||
errors.append("execution must be provided")
|
||
logger.warning("Execution configuration missing")
|
||
elif not isinstance(execution, dict):
|
||
errors.append("execution must be an object")
|
||
logger.warning("Execution configuration is not a dictionary")
|
||
else:
|
||
exec_type = execution.get("type")
|
||
if not exec_type:
|
||
errors.append("execution.type is required")
|
||
else:
|
||
try:
|
||
validate_command_execution_type(exec_type)
|
||
except Exception as e:
|
||
errors.append(f"Invalid execution.type: {str(e)}")
|
||
logger.warning(f"Invalid execution type: {e}")
|
||
|
||
if exec_type:
|
||
target_errors = validate_execution_target(execution)
|
||
errors.extend(target_errors)
|
||
|
||
# Validate status if present
|
||
if "status" in manifest:
|
||
valid_statuses = [s.value for s in CommandStatus]
|
||
if manifest["status"] not in valid_statuses:
|
||
errors.append(f"Invalid status: '{manifest['status']}'. Must be one of: {', '.join(valid_statuses)}")
|
||
logger.warning(f"Invalid status: {manifest['status']}")
|
||
|
||
# Validate parameters if present
|
||
if "parameters" in manifest:
|
||
params = manifest["parameters"]
|
||
if not isinstance(params, list):
|
||
errors.append("parameters must be an array")
|
||
else:
|
||
for i, param in enumerate(params):
|
||
if not isinstance(param, dict):
|
||
errors.append(f"parameters[{i}] must be an object")
|
||
continue
|
||
if "name" not in param:
|
||
errors.append(f"parameters[{i}] missing required field: name")
|
||
if "type" not in param:
|
||
errors.append(f"parameters[{i}] missing required field: type")
|
||
|
||
if errors:
|
||
logger.warning(f"Validation failed with {len(errors)} error(s)")
|
||
return {
|
||
"valid": False,
|
||
"errors": errors,
|
||
"path": path
|
||
}
|
||
|
||
logger.info("✅ Command manifest validation passed")
|
||
return {
|
||
"valid": True,
|
||
"errors": [],
|
||
"path": path,
|
||
"manifest": manifest
|
||
}
|
||
|
||
|
||
def load_command_registry() -> Dict[str, Any]:
|
||
"""
|
||
Load existing command registry.
|
||
|
||
Returns:
|
||
Command registry dictionary, or new empty registry if file doesn't exist
|
||
"""
|
||
if not os.path.exists(COMMANDS_REGISTRY_FILE):
|
||
logger.info("Command registry not found, creating new registry")
|
||
return {
|
||
"registry_version": "1.0.0",
|
||
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||
"commands": []
|
||
}
|
||
|
||
try:
|
||
with open(COMMANDS_REGISTRY_FILE) as f:
|
||
registry = json.load(f)
|
||
logger.info(f"Loaded command registry with {len(registry.get('commands', []))} command(s)")
|
||
return registry
|
||
except json.JSONDecodeError as e:
|
||
raise CommandRegistryError(f"Failed to parse command registry: {e}")
|
||
|
||
|
||
def update_command_registry(manifest: Dict[str, Any]) -> bool:
|
||
"""
|
||
Add or update command in the command registry.
|
||
|
||
Args:
|
||
manifest: Validated command manifest
|
||
|
||
Returns:
|
||
True if registry was updated successfully
|
||
|
||
Raises:
|
||
CommandRegistryError: If registry update fails
|
||
"""
|
||
logger.info(f"Updating command registry for: {manifest['name']}")
|
||
|
||
# Load existing registry
|
||
registry = load_command_registry()
|
||
|
||
# Create registry entry
|
||
entry = {
|
||
"name": manifest["name"],
|
||
"version": manifest["version"],
|
||
"description": manifest["description"],
|
||
"execution": manifest["execution"],
|
||
"parameters": manifest.get("parameters", []),
|
||
"status": manifest.get("status", "draft"),
|
||
"tags": manifest.get("tags", [])
|
||
}
|
||
|
||
# Check if command already exists
|
||
commands = registry.get("commands", [])
|
||
existing_index = None
|
||
for i, command in enumerate(commands):
|
||
if command["name"] == manifest["name"]:
|
||
existing_index = i
|
||
break
|
||
|
||
if existing_index is not None:
|
||
# Update existing command
|
||
commands[existing_index] = entry
|
||
logger.info(f"Updated existing command: {manifest['name']}")
|
||
else:
|
||
# Add new command
|
||
commands.append(entry)
|
||
logger.info(f"Added new command: {manifest['name']}")
|
||
|
||
registry["commands"] = commands
|
||
registry["generated_at"] = datetime.now(timezone.utc).isoformat()
|
||
|
||
# Write registry back to disk atomically
|
||
try:
|
||
atomic_write_json(COMMANDS_REGISTRY_FILE, registry)
|
||
logger.info(f"Command registry updated successfully")
|
||
return True
|
||
except Exception as e:
|
||
raise CommandRegistryError(f"Failed to write command registry: {e}")
|
||
|
||
|
||
def main():
|
||
"""Main CLI entry point."""
|
||
if len(sys.argv) < 2:
|
||
message = "Usage: command_define.py <path_to_command.yaml>"
|
||
response = build_response(
|
||
False,
|
||
path="",
|
||
errors=[message],
|
||
details={"error": {"error": "UsageError", "message": message, "details": {}}},
|
||
)
|
||
print(json.dumps(response, indent=2))
|
||
sys.exit(1)
|
||
|
||
path = sys.argv[1]
|
||
|
||
try:
|
||
# Validate manifest
|
||
validation = validate_manifest(path)
|
||
details = dict(validation)
|
||
|
||
if validation.get("valid"):
|
||
# Update registry
|
||
try:
|
||
registry_updated = update_command_registry(validation["manifest"])
|
||
details["status"] = "registered"
|
||
details["registry_updated"] = registry_updated
|
||
except CommandRegistryError as e:
|
||
logger.error(f"Registry update failed: {e}")
|
||
details["status"] = "validated"
|
||
details["registry_updated"] = False
|
||
details["registry_error"] = str(e)
|
||
else:
|
||
# Check if there are schema validation errors
|
||
has_schema_errors = any("Schema validation error" in err for err in validation.get("errors", []))
|
||
if has_schema_errors:
|
||
details["error"] = {
|
||
"type": "SchemaError",
|
||
"error": "SchemaError",
|
||
"message": "Command manifest schema validation failed",
|
||
"details": {"errors": validation.get("errors", [])}
|
||
}
|
||
|
||
# Build response
|
||
response = build_response(
|
||
bool(validation.get("valid")),
|
||
path=path,
|
||
errors=validation.get("errors", []),
|
||
details=details,
|
||
)
|
||
print(json.dumps(response, indent=2))
|
||
sys.exit(0 if response["ok"] else 1)
|
||
|
||
except CommandValidationError as e:
|
||
logger.error(str(e))
|
||
error_info = format_error_response(e)
|
||
response = build_response(
|
||
False,
|
||
path=path,
|
||
errors=[error_info.get("message", str(e))],
|
||
details={"error": error_info},
|
||
)
|
||
print(json.dumps(response, indent=2))
|
||
sys.exit(1)
|
||
except Exception as e:
|
||
logger.error(f"Unexpected error: {e}")
|
||
error_info = format_error_response(e, include_traceback=True)
|
||
response = build_response(
|
||
False,
|
||
path=path,
|
||
errors=[error_info.get("message", str(e))],
|
||
details={"error": error_info},
|
||
)
|
||
print(json.dumps(response, indent=2))
|
||
sys.exit(1)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|