#!/usr/bin/env python3 """ meta.command - Command Creator Meta-Agent Generates command manifests from natural language descriptions. Usage: python3 agents/meta.command/meta_command.py Examples: python3 agents/meta.command/meta_command.py examples/api_validate_command.md python3 agents/meta.command/meta_command.py examples/deploy_command.json """ import os import sys import json import yaml import re from pathlib import Path from typing import Dict, List, Any, Optional # Add parent directory to path sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))) from betty.config import ( BASE_DIR, COMMANDS_REGISTRY_FILE, ) from betty.enums import CommandExecutionType, CommandStatus from betty.logging_utils import setup_logger from betty.traceability import get_tracer, RequirementInfo logger = setup_logger(__name__) # Import artifact validation from artifact.define skill try: import importlib.util artifact_define_path = Path(__file__).parent.parent.parent / "skills" / "artifact.define" / "artifact_define.py" spec = importlib.util.spec_from_file_location("artifact_define", artifact_define_path) artifact_define_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(artifact_define_module) validate_artifact_type = artifact_define_module.validate_artifact_type KNOWN_ARTIFACT_TYPES = artifact_define_module.KNOWN_ARTIFACT_TYPES ARTIFACT_VALIDATION_AVAILABLE = True except Exception as e: ARTIFACT_VALIDATION_AVAILABLE = False class CommandCreator: """Creates command manifests from descriptions""" VALID_EXECUTION_TYPES = ["agent", "skill", "workflow"] VALID_STATUSES = ["draft", "active", "deprecated", "archived"] VALID_PARAMETER_TYPES = ["string", "integer", "boolean", "enum", "array", "object"] # Keywords for complexity analysis AUTONOMY_KEYWORDS = [ "analyze", "optimize", "decide", "evaluate", "assess", "complex", "multi-step", "autonomous", "intelligent", "adaptive", "sophisticated", "advanced", "comprehensive" ] REUSABILITY_KEYWORDS = [ "reusable", "composable", "building block", "library", "utility", "helper", "shared", "common", "core" ] def __init__(self, base_dir: str = BASE_DIR): """Initialize command creator""" self.base_dir = Path(base_dir) self.commands_dir = self.base_dir / "commands" def parse_description(self, description_path: str) -> Dict[str, Any]: """ Parse command description from Markdown or JSON file Args: description_path: Path to description file Returns: Dict with command configuration """ path = Path(description_path) if not path.exists(): raise FileNotFoundError(f"Description file not found: {description_path}") # Read file content = path.read_text() # Try JSON first if path.suffix == ".json": return json.loads(content) # Parse Markdown format cmd_desc = {} # Extract fields using regex patterns patterns = { "name": r"#\s*Name:\s*(.+)", "version": r"#\s*Version:\s*(.+)", "description": r"#\s*Description:\s*(.+)", "execution_type": r"#\s*Execution\s*Type:\s*(.+)", "target": r"#\s*Target:\s*(.+)", "status": r"#\s*Status:\s*(.+)", } for field, pattern in patterns.items(): match = re.search(pattern, content, re.IGNORECASE) if match: value = match.group(1).strip() cmd_desc[field] = value # Parse parameters section params_section = re.search( r"#\s*Parameters:\s*\n(.*?)(?=\n#|\Z)", content, re.DOTALL | re.IGNORECASE ) if params_section: cmd_desc["parameters"] = self._parse_parameters(params_section.group(1)) # Parse tags tags_match = re.search(r"#\s*Tags:\s*(.+)", content, re.IGNORECASE) if tags_match: tags_str = tags_match.group(1).strip() # Parse comma-separated or bracket-enclosed tags if tags_str.startswith("[") and tags_str.endswith("]"): tags_str = tags_str[1:-1] cmd_desc["tags"] = [t.strip() for t in tags_str.split(",")] # Parse execution context context_section = re.search( r"#\s*Execution\s*Context:\s*\n(.*?)(?=\n#|\Z)", content, re.DOTALL | re.IGNORECASE ) if context_section: cmd_desc["execution_context"] = self._parse_context(context_section.group(1)) # Parse artifact metadata sections produces_section = re.search( r"#\s*Produces\s*Artifacts:\s*\n(.*?)(?=\n#|\Z)", content, re.DOTALL | re.IGNORECASE ) if produces_section: cmd_desc["artifact_produces"] = self._parse_artifact_list(produces_section.group(1)) consumes_section = re.search( r"#\s*Consumes\s*Artifacts:\s*\n(.*?)(?=\n#|\Z)", content, re.DOTALL | re.IGNORECASE ) if consumes_section: cmd_desc["artifact_consumes"] = self._parse_artifact_list(consumes_section.group(1)) # Validate required fields required = ["name", "description", "execution_type", "target"] missing = [f for f in required if f not in cmd_desc] if missing: raise ValueError(f"Missing required fields: {', '.join(missing)}") # Validate execution type if cmd_desc["execution_type"].lower() not in self.VALID_EXECUTION_TYPES: raise ValueError( f"Invalid execution type: {cmd_desc['execution_type']}. " f"Must be one of: {', '.join(self.VALID_EXECUTION_TYPES)}" ) # Ensure command name starts with / if not cmd_desc["name"].startswith("/"): cmd_desc["name"] = "/" + cmd_desc["name"] # Set defaults if "version" not in cmd_desc: cmd_desc["version"] = "0.1.0" if "status" not in cmd_desc: cmd_desc["status"] = "draft" if "parameters" not in cmd_desc: cmd_desc["parameters"] = [] return cmd_desc def _parse_parameters(self, params_text: str) -> List[Dict[str, Any]]: """Parse parameters from markdown text""" parameters = [] # Match parameter blocks # Format: - name: type (required/optional) - description param_pattern = r"-\s+(\w+):\s+(\w+)(?:\s+\(([^)]+)\))?\s+-\s+(.+?)(?=\n-|\n#|\Z)" matches = re.finditer(param_pattern, params_text, re.DOTALL) for match in matches: name, param_type, modifiers, description = match.groups() param = { "name": name.strip(), "type": param_type.strip(), "description": description.strip() } # Parse modifiers (required, optional, default=value) if modifiers: modifiers = modifiers.lower() param["required"] = "required" in modifiers # Extract default value default_match = re.search(r"default[=:]\s*([^,\s]+)", modifiers) if default_match: default_val = default_match.group(1) # Convert types if param_type == "integer": default_val = int(default_val) elif param_type == "boolean": default_val = default_val.lower() in ("true", "yes", "1") param["default"] = default_val # Extract enum values values_match = re.search(r"values[=:]\s*\[([^\]]+)\]", modifiers) if values_match: param["values"] = [v.strip() for v in values_match.group(1).split(",")] parameters.append(param) return parameters def _parse_context(self, context_text: str) -> Dict[str, Any]: """Parse execution context from markdown text""" context = {} # Simple key: value parsing for line in context_text.split("\n"): line = line.strip() if not line or line.startswith("#"): continue match = re.match(r"-\s*(\w+):\s*(.+)", line) if match: key, value = match.groups() # Try to parse as JSON for complex values try: context[key] = json.loads(value) except (json.JSONDecodeError, ValueError): context[key] = value.strip() return context def _parse_artifact_list(self, artifact_text: str) -> List[str]: """Parse artifact list from markdown text""" artifacts = [] for line in artifact_text.split("\n"): line = line.strip() if not line or line.startswith("#"): continue # Match lines starting with - or * match = re.match(r"[-*]\s*`?([a-z0-9-]+)`?", line) if match: artifacts.append(match.group(1)) return artifacts def analyze_complexity(self, cmd_desc: Dict[str, Any], full_content: str = "") -> Dict[str, Any]: """ Analyze command complexity and recommend pattern Args: cmd_desc: Parsed command description full_content: Full description file content for analysis Returns: Dict with complexity analysis and pattern recommendation """ analysis = { "step_count": 0, "complexity": "low", "autonomy_level": "none", "reusability": "low", "recommended_pattern": "COMMAND_ONLY", "should_create_skill": False, "reasoning": [] } # Count steps from description # Look for numbered lists, bullet points, or explicit step mentions step_patterns = [ r"^\s*\d+\.\s+", # Numbered lists r"^\s*[-*]\s+", # Bullet points r"\bstep\s+\d+\b", # Explicit "step N" ] lines = full_content.split("\n") step_count = 0 for line in lines: for pattern in step_patterns: if re.search(pattern, line, re.IGNORECASE): step_count += 1 break analysis["step_count"] = step_count # Analyze content for keywords content_lower = full_content.lower() desc_lower = cmd_desc.get("description", "").lower() combined = content_lower + " " + desc_lower # Check autonomy keywords autonomy_matches = [kw for kw in self.AUTONOMY_KEYWORDS if kw in combined] if len(autonomy_matches) >= 3: analysis["autonomy_level"] = "high" elif len(autonomy_matches) >= 1: analysis["autonomy_level"] = "medium" else: analysis["autonomy_level"] = "low" # Check reusability keywords reusability_matches = [kw for kw in self.REUSABILITY_KEYWORDS if kw in combined] if len(reusability_matches) >= 2: analysis["reusability"] = "high" elif len(reusability_matches) >= 1: analysis["reusability"] = "medium" # Determine complexity if step_count >= 10: analysis["complexity"] = "high" elif step_count >= 4: analysis["complexity"] = "medium" else: analysis["complexity"] = "low" # Estimate lines of logic (rough heuristic) instruction_lines = sum(1 for line in lines if line.strip() and not line.strip().startswith("#")) if instruction_lines > 50: analysis["complexity"] = "high" # Decide pattern based on decision tree if step_count >= 10 or analysis["complexity"] == "high": analysis["recommended_pattern"] = "SKILL_AND_COMMAND" analysis["should_create_skill"] = True analysis["reasoning"].append(f"High complexity: {step_count} steps detected") elif analysis["autonomy_level"] == "high": analysis["recommended_pattern"] = "SKILL_AND_COMMAND" analysis["should_create_skill"] = True analysis["reasoning"].append(f"High autonomy: matched keywords {autonomy_matches[:3]}") elif analysis["reusability"] == "high": if step_count <= 3: analysis["recommended_pattern"] = "SKILL_ONLY" analysis["should_create_skill"] = True analysis["reasoning"].append("High reusability but low complexity: create skill only") else: analysis["recommended_pattern"] = "SKILL_AND_COMMAND" analysis["should_create_skill"] = True analysis["reasoning"].append(f"High reusability with {step_count} steps: create both") elif step_count >= 4 and step_count <= 9: # Medium complexity - could go either way if analysis["autonomy_level"] == "medium": analysis["recommended_pattern"] = "SKILL_AND_COMMAND" analysis["should_create_skill"] = True analysis["reasoning"].append(f"Medium complexity ({step_count} steps) with some autonomy needs") else: analysis["recommended_pattern"] = "COMMAND_ONLY" analysis["reasoning"].append(f"Medium complexity ({step_count} steps) but simple logic: inline is fine") else: # Low complexity - command only analysis["recommended_pattern"] = "COMMAND_ONLY" analysis["reasoning"].append(f"Low complexity ({step_count} steps): inline orchestration is sufficient") # Check if execution type already specifies skill if cmd_desc.get("execution_type") == "skill": analysis["recommended_pattern"] = "SKILL_AND_COMMAND" analysis["should_create_skill"] = True analysis["reasoning"].append("Execution type explicitly set to 'skill'") return analysis def generate_command_manifest(self, cmd_desc: Dict[str, Any]) -> str: """ Generate command manifest YAML Args: cmd_desc: Parsed command description Returns: YAML string """ manifest = { "name": cmd_desc["name"], "version": cmd_desc["version"], "description": cmd_desc["description"] } # Add parameters if present if cmd_desc.get("parameters"): manifest["parameters"] = cmd_desc["parameters"] # Add execution configuration execution = { "type": cmd_desc["execution_type"], "target": cmd_desc["target"] } if cmd_desc.get("execution_context"): execution["context"] = cmd_desc["execution_context"] manifest["execution"] = execution # Add status manifest["status"] = cmd_desc.get("status", "draft") # Add tags if present if cmd_desc.get("tags"): manifest["tags"] = cmd_desc["tags"] # Add artifact metadata if present if cmd_desc.get("artifact_produces") or cmd_desc.get("artifact_consumes"): artifact_metadata = {} if cmd_desc.get("artifact_produces"): artifact_metadata["produces"] = [ {"type": art_type} for art_type in cmd_desc["artifact_produces"] ] if cmd_desc.get("artifact_consumes"): artifact_metadata["consumes"] = [ {"type": art_type, "required": True} for art_type in cmd_desc["artifact_consumes"] ] manifest["artifact_metadata"] = artifact_metadata return yaml.dump(manifest, default_flow_style=False, sort_keys=False) def validate_artifacts(self, cmd_desc: Dict[str, Any]) -> List[str]: """ Validate that artifact types exist in the known registry. Args: cmd_desc: Parsed command description Returns: List of warning messages """ warnings = [] if not ARTIFACT_VALIDATION_AVAILABLE: warnings.append( "Artifact validation skipped: artifact.define skill not available" ) return warnings # Validate produced artifacts for artifact_type in cmd_desc.get("artifact_produces", []): is_valid, warning = validate_artifact_type(artifact_type) if not is_valid and warning: warnings.append(f"Produces: {warning}") # Validate consumed artifacts for artifact_type in cmd_desc.get("artifact_consumes", []): is_valid, warning = validate_artifact_type(artifact_type) if not is_valid and warning: warnings.append(f"Consumes: {warning}") return warnings def validate_target(self, cmd_desc: Dict[str, Any]) -> List[str]: """ Validate that the target skill or agent exists. Args: cmd_desc: Parsed command description Returns: List of warning messages """ warnings = [] execution_type = cmd_desc.get("execution_type", "").lower() target = cmd_desc.get("target", "") if execution_type == "skill": # Check if skill exists in registry or skills directory skill_registry = self.base_dir / "registry" / "skills.json" skill_dir = self.base_dir / "skills" / target.replace(".", "/") skill_exists = False if skill_registry.exists(): try: with open(skill_registry) as f: registry = json.load(f) if target in registry.get("skills", {}): skill_exists = True except Exception: pass if not skill_exists and not skill_dir.exists(): warnings.append( f"Target skill '{target}' not found in registry or skills directory. " f"You may need to create it using meta.skill first." ) elif execution_type == "agent": # Check if agent exists in agents directory agent_dir = self.base_dir / "agents" / target if not agent_dir.exists(): warnings.append( f"Target agent '{target}' not found in agents directory. " f"You may need to create it using meta.agent first." ) return warnings def create_command( self, description_path: str, requirement: Optional[RequirementInfo] = None ) -> Dict[str, Any]: """ Create command manifest from description file Args: description_path: Path to description file requirement: Optional requirement information for traceability Returns: Dict with creation results """ try: print(f"šŸŽÆ meta.command - Creating command from {description_path}\n") # Read full content for analysis with open(description_path, 'r') as f: full_content = f.read() # Parse description cmd_desc = self.parse_description(description_path) # Validate artifacts artifact_warnings = self.validate_artifacts(cmd_desc) if artifact_warnings: print("\nāš ļø Artifact Validation Warnings:") for warning in artifact_warnings: print(f" {warning}") print() # Validate target skill/agent target_warnings = self.validate_target(cmd_desc) if target_warnings: print("\nāš ļø Target Validation Warnings:") for warning in target_warnings: print(f" {warning}") print() # Analyze complexity and recommend pattern analysis = self.analyze_complexity(cmd_desc, full_content) # Display analysis print(f"šŸ“Š Complexity Analysis:") print(f" Steps detected: {analysis['step_count']}") print(f" Complexity: {analysis['complexity']}") print(f" Autonomy level: {analysis['autonomy_level']}") print(f" Reusability: {analysis['reusability']}") print(f"\nšŸ’” Recommended Pattern: {analysis['recommended_pattern']}") for reason in analysis['reasoning']: print(f" • {reason}") print() # Generate manifest YAML manifest_yaml = self.generate_command_manifest(cmd_desc) # Ensure commands directory exists self.commands_dir.mkdir(parents=True, exist_ok=True) # Determine output filename # Remove leading / and replace spaces/special chars with hyphens filename = cmd_desc["name"].lstrip("/").replace(" ", "-").lower() filename = re.sub(r"[^a-z0-9-]", "", filename) manifest_file = self.commands_dir / f"{filename}.yaml" # Write manifest file manifest_file.write_text(manifest_yaml) print(f"✨ Command '{cmd_desc['name']}' created successfully!\n") print(f"šŸ“„ Created file:") print(f" - {manifest_file}\n") print(f"āœ… Command manifest is ready for registration") print(f" Name: {cmd_desc['name']}") print(f" Execution: {cmd_desc['execution_type']} → {cmd_desc['target']}") print(f" Status: {cmd_desc.get('status', 'draft')}\n") # Display skill creation recommendation if needed if analysis['should_create_skill']: print(f"āš ļø RECOMMENDATION: Create the skill first!") print(f" Pattern: {analysis['recommended_pattern']}") print(f"\n This command delegates to a skill ({cmd_desc['target']}),") print(f" but that skill may not exist yet.\n") print(f" Suggested workflow:") print(f" 1. Create skill: python3 agents/meta.skill/meta_skill.py ") print(f" - Skill should implement: {cmd_desc['target']}") print(f" - Include all complex logic from the command description") print(f" 2. Test skill: python3 skills/{cmd_desc['target'].replace('.', '/')}/{cmd_desc['target'].replace('.', '_')}.py") print(f" 3. Review this command manifest: cat {manifest_file}") print(f" 4. Register command: python3 skills/command.define/command_define.py {manifest_file}") print(f" 5. Verify in registry: cat registry/commands.json") print(f"\n See docs/SKILL_COMMAND_DECISION_TREE.md for pattern details\n") else: print(f"šŸ“ Next steps:") print(f" 1. Review the manifest: cat {manifest_file}") print(f" 2. Register command: python3 skills/command.define/command_define.py {manifest_file}") print(f" 3. Verify in registry: cat registry/commands.json") result = { "ok": True, "status": "success", "command_name": cmd_desc["name"], "manifest_file": str(manifest_file), "complexity_analysis": analysis, "artifact_warnings": artifact_warnings, "target_warnings": target_warnings } # Log traceability if requirement provided trace_id = None if requirement: try: tracer = get_tracer() # Create component ID from command name component_id = f"command.{filename.replace('-', '_')}" trace_id = tracer.log_creation( component_id=component_id, component_name=cmd_desc["name"], component_type="command", component_version=cmd_desc["version"], component_file_path=str(manifest_file), input_source_path=description_path, created_by_tool="meta.command", created_by_version="0.1.0", requirement=requirement, tags=["command", "auto-generated"] + cmd_desc.get("tags", []), project="Betty Framework" ) # Log validation check validation_details = { "checks_performed": [ {"name": "command_structure", "status": "passed"}, {"name": "execution_type_validation", "status": "passed", "message": f"Valid execution type: {cmd_desc['execution_type']}"}, {"name": "name_validation", "status": "passed", "message": f"Command name follows convention: {cmd_desc['name']}"} ] } # Check parameters if cmd_desc.get("parameters"): validation_details["checks_performed"].append({ "name": "parameters_validation", "status": "passed", "message": f"Validated {len(cmd_desc['parameters'])} parameters" }) tracer.log_verification( component_id=component_id, check_type="validation", tool="meta.command", result="passed", details=validation_details ) result["trace_id"] = trace_id result["component_id"] = component_id except Exception as e: print(f"āš ļø Warning: Could not log traceability: {e}") return result except Exception as e: print(f"āŒ Error creating command: {e}") logger.error(f"Error creating command: {e}", exc_info=True) return { "ok": False, "status": "failed", "error": str(e) } def main(): """CLI entry point""" import argparse parser = argparse.ArgumentParser( description="meta.command - Create command manifests from descriptions" ) parser.add_argument( "description", help="Path to command description file (.md or .json)" ) # Traceability arguments parser.add_argument( "--requirement-id", help="Requirement identifier (e.g., REQ-2025-001)" ) parser.add_argument( "--requirement-description", help="What this command accomplishes" ) parser.add_argument( "--requirement-source", help="Source document" ) parser.add_argument( "--issue-id", help="Issue tracking ID (e.g., JIRA-123)" ) parser.add_argument( "--requested-by", help="Who requested this" ) parser.add_argument( "--rationale", help="Why this is needed" ) args = parser.parse_args() # Create requirement info if provided requirement = None if args.requirement_id and args.requirement_description: requirement = RequirementInfo( id=args.requirement_id, description=args.requirement_description, source=args.requirement_source, issue_id=args.issue_id, requested_by=args.requested_by, rationale=args.rationale ) creator = CommandCreator() result = creator.create_command(args.description, requirement=requirement) # Display traceability info if available if result.get("trace_id"): print(f"\nšŸ“ Traceability: {result['trace_id']}") print(f" View trace: python3 betty/trace_cli.py show {result['component_id']}") sys.exit(0 if result.get("ok") else 1) if __name__ == "__main__": main()