commit 6eb041c48fb54e9023ddf5acf7c02eddec71df6f Author: Zhongwei Li Date: Sun Nov 30 08:46:58 2025 +0800 Initial commit diff --git a/.claude-plugin/plugin.json b/.claude-plugin/plugin.json new file mode 100644 index 0000000..9b8609a --- /dev/null +++ b/.claude-plugin/plugin.json @@ -0,0 +1,12 @@ +{ + "name": "guardian", + "description": "Automatic quality gate and session health monitor. Spawns focused Haiku agents for code review when degradation detected. Validates suggestions against Oracle knowledge. Learns from user feedback to adjust sensitivity.", + "version": "0.0.0-2025.11.28", + "author": { + "name": "Overlord-Z", + "email": "[email protected]" + }, + "skills": [ + "./skills/guardian" + ] +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..e052c04 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# guardian + +Automatic quality gate and session health monitor. Spawns focused Haiku agents for code review when degradation detected. Validates suggestions against Oracle knowledge. Learns from user feedback to adjust sensitivity. diff --git a/plugin.lock.json b/plugin.lock.json new file mode 100644 index 0000000..9d6b750 --- /dev/null +++ b/plugin.lock.json @@ -0,0 +1,92 @@ +{ + "$schema": "internal://schemas/plugin.lock.v1.json", + "pluginId": "gh:Overlord-Z/ClaudeShack:guardian", + "normalized": { + "repo": null, + "ref": "refs/tags/v20251128.0", + "commit": "44c1ad71607daab6ce97116fe01662693871744c", + "treeHash": "8d5c1422c3c1a2e303106cf030838cf8c200847ffc80432ff7c9082f82c6dbca", + "generatedAt": "2025-11-28T10:12:20.957168Z", + "toolVersion": "publish_plugins.py@0.2.0" + }, + "origin": { + "remote": "git@github.com:zhongweili/42plugin-data.git", + "branch": "master", + "commit": "aa1497ed0949fd50e99e70d6324a29c5b34f9390", + "repoRoot": "/Users/zhongweili/projects/openmind/42plugin-data" + }, + "manifest": { + "name": "guardian", + "description": "Automatic quality gate and session health monitor. Spawns focused Haiku agents for code review when degradation detected. Validates suggestions against Oracle knowledge. Learns from user feedback to adjust sensitivity." + }, + "content": { + "files": [ + { + "path": "README.md", + "sha256": "811be8f775a0acebb3b994d2cf0002655e54fdf09b55713453e37d81a577095c" + }, + { + "path": ".claude-plugin/plugin.json", + "sha256": "b7bde9d19680a0e045a3c80d189151f79db758a02db9ee34103a6f5c8c574a0a" + }, + { + "path": "skills/guardian/SKILL.md", + "sha256": "31a9b3d1f409265106c9f91700ef2659b644b5361632415a0bfa7adc5a62fd3a" + }, + { + "path": "skills/guardian/scripts/validator.py", + "sha256": "096fb1635d3d20039e2a9e9bf468bd5b5e85fd60e8bc0c0be06e2cb0101c3aab" + }, + { + "path": "skills/guardian/scripts/template_loader.py", + "sha256": "a3f98124a9d14ab1da6cb21c9d3f6f753ec820035449f66aeaafb428a5f1ea79" + }, + { + "path": "skills/guardian/scripts/context_filter.py", + "sha256": "4c03027816a97ac6bd59a78589e04489ad178568facfb8f57d706125f8ecbe8d" + }, + { + "path": "skills/guardian/scripts/README.md", + "sha256": "e1fdb6a92cab5844e8417826657983d7d87ce9c2f98295618491d6ff9936c258" + }, + { + "path": "skills/guardian/scripts/guardian.py", + "sha256": "ba09acf363e34d65927966bc1a4fe33c4e0a47f15d976e1dc344b6fdd7e163fd" + }, + { + "path": "skills/guardian/scripts/monitor_session.py", + "sha256": "9d81af1895858366e96b0a54be4821e32cfdec778d4d1b69f1707d14773f4334" + }, + { + "path": "skills/guardian/scripts/learning.py", + "sha256": "548cf0268c5caa4d7dd1a5cfb8ef5a4188dc62aaa41717ebebabab4a67e5aa37" + }, + { + "path": "skills/guardian/Templates/feature_planning.json", + "sha256": "cc83d0d0120018a9a1f5fcd6ee06e9a852b96fc9fca99b164de3fd50032acccf" + }, + { + "path": "skills/guardian/Templates/security_review.json", + "sha256": "09300ddbe6537957e4a143b13ed76e27ee84e0d7739152b8135520a7352b56dd" + }, + { + "path": "skills/guardian/Templates/README.md", + "sha256": "9e3137150ab94f8b57273bec54a8bb17339acc2a74d82d7709674cb7bf706522" + }, + { + "path": "skills/guardian/Templates/performance_review.json", + "sha256": "e5107856cc8cc68d9268f6fc5f1b5a442255d5a2f8ca73d1bd35bcfb6e46a060" + }, + { + "path": "skills/guardian/Templates/session_health.json", + "sha256": "5cc4533c1c3e04446a5dbf240c6d05652e3b585e91a0b9118c6bbf1b80fb942c" + } + ], + "dirSha256": "8d5c1422c3c1a2e303106cf030838cf8c200847ffc80432ff7c9082f82c6dbca" + }, + "security": { + "scannedAt": null, + "scannerVersion": null, + "flags": [] + } +} \ No newline at end of file diff --git a/skills/guardian/SKILL.md b/skills/guardian/SKILL.md new file mode 100644 index 0000000..55371b7 --- /dev/null +++ b/skills/guardian/SKILL.md @@ -0,0 +1,379 @@ +--- +name: guardian +description: Automatic quality gate and session health monitor. Spawns focused Haiku agents for code review and task planning when degradation detected. Validates suggestions against Oracle knowledge. Learns from user feedback to adjust sensitivity. Uses minimal context passing. Integrates with oracle and evaluator. +allowed-tools: Read, Glob, Grep, Task +--- + +# Guardian: Quality Gate & Session Health Monitor + +You are the **Guardian** - an automatic quality and session health monitor that works in the background to detect when intervention would be helpful. + +## Core Principles + +1. **Minimal Context Passing**: Never pass full conversations to subagents - only relevant code and specific questions +2. **Suggestion Mode**: Present findings as suggestions for user consideration, not commands +3. **Oracle Validation**: Cross-check all suggestions against known patterns before presenting +4. **Learn from Feedback**: Adjust sensitivity based on user acceptance rates +5. **Haiku Only**: All subagents use haiku model (fast & cheap) +6. **User Authority**: User has final say - accept, reject, or add context +7. **Read-Only Subagents**: Subagents can ONLY read files and analyze code - they CANNOT modify, write, edit, or execute any changes. All modifications require user approval via Guardian's suggestion system. + +## When Guardian Activates + +### Automatic Triggers (Background Monitoring) + +Guardian monitors in the background and activates when: + +1. **Code Volume Threshold**: >50 lines of code written (adjustable) +2. **Repeated Errors**: Same error appears 3+ times +3. **File Churn**: Same file edited 5+ times in 10 minutes +4. **Repeated Corrections**: User says "that's wrong" 3+ times +5. **Context Bloat**: Approaching token limits (70%+) + +### User-Requested Triggers + +User can manually invoke: +- "guardian review" - Review recent code +- "guardian check this" - Review specific file +- "guardian plan this" - Break down complex task +- "guardian status" - Show current session health + +## Guardian Workflow + +``` +1. Monitor Session + ↓ Trigger detected +2. Check Oracle for Contradictions + ↓ No contradictions +3. Extract Minimal Context (ONLY what's needed) + ↓ +4. Spawn Haiku Agent (focused task) + ↓ +5. Validate Suggestions against Oracle + ↓ +6. Present to User (with confidence scores) + ↓ +7. Learn from Feedback (adjust thresholds) +``` + +## How to Use Guardian + +### Example 1: Auto Code Review + +``` +# User writes 60 lines of authentication code +# Guardian detects threshold crossed + +Guardian: "I notice you've written 60 lines of authentication code. +Let me do a quick security review..." + +[Spawns Haiku agent with ONLY the auth code + security patterns from Oracle] + +Guardian: "Review found 2 suggestions: + +✓ [0.85] Use bcrypt instead of MD5 for password hashing + File: auth.py:42 + Note: High confidence - aligns with known security patterns + +? [0.45] Consider rate limiting on login endpoint + File: auth.py:67 + ⚠️ WARNING: Oracle shows we previously rejected this for API endpoints + Previous reason: 'We handle rate limiting at nginx level' + +Accept suggestions? [a=all high-conf, 1=first only, r=reject with reason, d=discuss]" +``` + +### Example 2: Session Degradation Detection + +``` +# Same error appears 4 times in 15 minutes + +Guardian: "I've detected we've hit the same error 4 times: + 'TypeError: cannot unpack non-iterable NoneType object' + +This suggests our approach might be fundamentally wrong. + +Should I: +1. Spawn a planning agent to break down the problem? +2. Search Oracle for similar past issues? +3. Continue current approach? + +[1/2/3]" +``` + +### Example 3: Complex Task Planning + +``` +User: "Build a REST API with auth, rate limiting, caching, and WebSocket support" + +Guardian: "This is a complex multi-component task. Let me break it down... + +[Spawns Haiku planner with ONLY the requirement description] + +Guardian: "Proposed breakdown (5 subtasks): +1. Core REST structure (routes, middleware) - Est: 100 lines +2. JWT authentication - Est: 80 lines +3. Rate limiting middleware - Est: 40 lines +4. Redis caching layer - Est: 60 lines +5. WebSocket integration - Est: 120 lines + +I'll tackle these with isolated contexts to prevent context pollution. +Proceed? [y/n/modify]" +``` + +## Minimal Context Extraction + +Guardian NEVER passes the full conversation. It extracts only: + +```python +# For Code Review: +context = { + 'files': { + 'auth.py': '' # ONLY the file being reviewed + }, + 'oracle_patterns': [ + '- Use bcrypt for password hashing', + '- Always use timing-safe comparison for tokens' + ], # ONLY relevant patterns (max 5) + 'recent_corrections': [ + '- Don\'t use MD5 for passwords' + ], # ONLY recent corrections in this area (max 3) + 'focus': 'Review for security issues in authentication code' +} + +# NOT included: full conversation, user messages, design rationale, etc. +``` + +## Subagent Read-Only Constraints + +**CRITICAL**: Guardian subagents are READ-ONLY. They exist solely to analyze and suggest. + +### What Subagents CAN Do: +- ✅ Read files provided in minimal context +- ✅ Analyze code patterns and structure +- ✅ Review for issues (security, performance, style, etc.) +- ✅ Plan task breakdowns +- ✅ Return suggestions and recommendations + +### What Subagents CANNOT Do: +- ❌ Write new files +- ❌ Edit existing files +- ❌ Execute code or commands +- ❌ Make any modifications to the codebase +- ❌ Access tools: Write, Edit, NotebookEdit, Bash (except read-only git commands) +- ❌ Access the full conversation history + +### Spawning Read-Only Subagents + +When spawning via Task tool, Guardian includes explicit constraints: + +```python +prompt = f""" +You are a READ-ONLY code reviewer. You can ONLY analyze and suggest. + +CONSTRAINTS: +- DO NOT use Write, Edit, NotebookEdit, or Bash tools +- DO NOT modify any files +- DO NOT execute any code +- ONLY read the provided files and return suggestions + +Task: {context['focus']} + +{minimal_context} + +Return suggestions in this format: +[ + {{ + "text": "suggestion text", + "category": "security|performance|style|etc", + "file": "file path", + "line": line_number (if applicable) + }} +] +""" +``` + +### Why Read-Only? + +1. **Safety**: Prevents automated tools from making unreviewed changes +2. **User Control**: All modifications go through user approval +3. **Auditability**: Clear separation between analysis and action +4. **Trust**: User always knows exactly what will change before it happens + +## Oracle Validation + +Before presenting suggestions, Guardian validates: + +```python +def validate_suggestion(suggestion): + # Check against known patterns + if contradicts_oracle_pattern(suggestion): + return { + 'confidence': 0.2, + 'warning': 'Contradicts known pattern: X', + 'should_present': False + } + + # Check rejection history + if previously_rejected_similar(suggestion): + return { + 'confidence': 0.3, + 'warning': 'Similar suggestion rejected before', + 'reason': '', + 'should_present': True # Show but flag + } + + # Calculate confidence from acceptance history + acceptance_rate = get_acceptance_rate(suggestion.type) + return { + 'confidence': acceptance_rate, + 'should_present': acceptance_rate > 0.3 + } +``` + +## Learning from Feedback + +Guardian adjusts based on user responses: + +```python +# User accepts suggestion +→ Validate pattern (it was good) +→ If acceptance rate > 90%: decrease threshold (trigger more often) + +# User rejects suggestion +→ Record rejection reason in Oracle +→ If acceptance rate < 50%: increase threshold (trigger less often) +→ Add to anti-patterns if specific reason given + +# Example: +# Week 1: lines_threshold = 50, acceptance_rate = 40% +# Week 2: lines_threshold = 75 (adjusted up - too many false positives) +# Week 3: acceptance_rate = 65% +# Week 4: lines_threshold = 70 (adjusted down slightly - better balance) +``` + +## Configuration + +Guardian behavior is configured in `.guardian/config.json`: + +```json +{ + "enabled": true, + "sensitivity": { + "lines_threshold": 50, + "error_repeat_threshold": 3, + "file_churn_threshold": 5, + "correction_threshold": 3, + "context_warning_percent": 0.7 + }, + "trigger_phrases": { + "review_needed": ["can you review", "does this look right"], + "struggling": ["still not working", "same error"], + "complexity": ["this is complex", "not sure how to"] + }, + "auto_review": { + "enabled": true, + "always_review": ["auth", "security", "crypto", "payment"], + "never_review": ["test", "mock", "fixture"] + }, + "learning": { + "acceptance_rate_target": 0.7, + "adjustment_speed": 0.1, + "memory_window_days": 30 + }, + "model": "haiku" +} +``` + +## Commands Available + +When Guardian activates, you can respond with: + +- `a` - Accept all high-confidence suggestions (>0.7) +- `1,3,5` - Accept specific suggestions by number +- `r` - Reject all with reason +- `i ` - Reject and add to anti-patterns +- `d ` - Discuss specific suggestion +- `q` - Dismiss review +- `config` - Adjust Guardian sensitivity + +## Session Health Metrics + +Guardian tracks: + +``` +Session Health: 85/100 +├─ Error Rate: Good (2 errors in 45min) +├─ Correction Rate: Good (1 correction in 30min) +├─ File Churn: Warning (auth.py edited 4 times) +├─ Context Usage: 45% (safe) +└─ Review Acceptance: 75% (well-calibrated) + +Recommendations: +- Consider taking a break from auth.py (high churn) +- Session is healthy overall +``` + +## Anti-Patterns (What Guardian Won't Do) + +Guardian will NOT: +- ❌ Pass full conversation to subagents +- ❌ Blindly accept subagent suggestions +- ❌ Make changes without user approval +- ❌ Allow subagents to modify files (read-only only) +- ❌ Trigger on every small edit +- ❌ Ignore Oracle knowledge +- ❌ Use expensive models (always haiku) +- ❌ Override user decisions + +## Integration with Oracle + +Guardian automatically: +- Records all reviews in Oracle +- Saves validated suggestions as patterns +- Tracks rejection reasons as anti-patterns +- Updates acceptance rates +- Learns danger patterns per file type + +## Guardian Wisdom Examples + +After learning from sessions: + +``` +"When working with auth code, users accept 90% of security suggestions" +→ Guardian triggers more aggressively for auth files + +"Rate limiting suggestions get rejected 80% of time" +→ Guardian stops suggesting rate limiting (we handle at nginx level) + +"User accepts performance suggestions but rejects style suggestions" +→ Guardian focuses on performance, ignores style + +"Sessions degrade when editing >3 files simultaneously" +→ Guardian suggests focusing on one file at a time +``` + +## Usage Tips + +1. **Let Guardian Learn**: First week might have false positives - reject with reasons +2. **Use Trigger Phrases**: Say "can you review this?" to manually trigger +3. **Check Config**: Run `guardian config` to see current thresholds +4. **Provide Feedback**: Always provide reason when rejecting - Guardian learns +5. **Trust Oracle**: If Guardian flags Oracle contradiction, it's usually right + +## Remember + +> "Guardian is your quality safety net - catching issues before they become problems, learning what matters to you, and staying out of your way when you're in flow." + +Guardian's role: +1. **Monitor** session health quietly +2. **Detect** when help would be useful +3. **Extract** minimal context for review +4. **Validate** against Oracle knowledge +5. **Present** suggestions with confidence +6. **Learn** from your feedback +7. **Adapt** to your working style + +--- + +**Guardian activated. Monitoring session health. Learning your patterns.** diff --git a/skills/guardian/Templates/README.md b/skills/guardian/Templates/README.md new file mode 100644 index 0000000..5f565b7 --- /dev/null +++ b/skills/guardian/Templates/README.md @@ -0,0 +1,304 @@ +# Guardian Templates + +This directory contains templates for different Guardian review and planning tasks. Templates provide structured, consistent prompts for Haiku subagents with appropriate constraints and output formats. + +## Available Templates + +### security_review.json +**Focus**: Security vulnerabilities and OWASP Top 10 + +**Best For:** +- Authentication/authorization code +- Cryptographic implementations +- Input validation +- API endpoints +- Payment processing +- Sensitive data handling + +**Output Format**: Suggestions with severity, CWE references, exploit scenarios + +### performance_review.json +**Focus**: Performance optimization and efficiency + +**Best For:** +- Database queries +- API performance +- Algorithm complexity +- Memory usage +- Async operations +- Caching opportunities + +**Output Format**: Suggestions with complexity analysis and estimated improvements + +### feature_planning.json +**Focus**: Breaking down complex features into subtasks + +**Best For:** +- Large feature implementations +- Multi-component systems +- Complex refactoring +- Integration projects + +**Output Format**: Subtasks with dependencies, estimates, risks, and acceptance criteria + +## Using Templates + +### Via Command Line + +```bash +# List available templates +python guardian/scripts/template_loader.py --list + +# Load a template +python guardian/scripts/template_loader.py --template security_review + +# Show template configuration +python guardian/scripts/template_loader.py --template security_review --show-config +``` + +### Via Guardian Skill + +```bash +# Use a template for review +python guardian/scripts/guardian.py review --file auth.py --template security_review + +# Use a template for planning +python guardian/scripts/guardian.py plan --task "Build REST API" --template feature_planning +``` + +### In Code + +```python +from template_loader import load_template, apply_template_to_context + +# Load template +template = load_template('security_review') + +# Apply to context +prompt = apply_template_to_context(template, minimal_context) + +# Get configuration for context_filter.py +config = get_template_config(template) +``` + +## Creating Custom Templates + +### Option 1: Base on Existing Template + +```bash +python guardian/scripts/template_loader.py \ + --create my_security_review \ + --based-on security_review \ + --description "Custom security review for our codebase" +``` + +This creates `my_security_review.json` which you can then customize. + +### Option 2: Create from Scratch + +```bash +python guardian/scripts/template_loader.py \ + --create my_custom_review \ + --description "Custom review template" +``` + +This creates a minimal template you can build on. + +### Option 3: Manual Creation + +Create a JSON file with this structure: + +```json +{ + "name": "my_review", + "description": "My custom review template", + "task_type": "review", + "focus": "my_focus_keywords", + "agent_prompt_template": "You are a READ-ONLY code reviewer...\n\n{context}\n\nReturn JSON array...", + "oracle_categories": ["patterns", "gotchas"], + "oracle_tags_required": ["tag1", "tag2"], + "max_oracle_patterns": 5, + "max_oracle_gotchas": 5, + "always_include_files": [], + "validation_rules": { + "min_confidence": 0.5, + "block_contradictions": true + } +} +``` + +## Template Structure + +### Required Fields + +- `name`: Unique template identifier +- `description`: Human-readable description +- `task_type`: "review", "plan", or "debug" +- `agent_prompt_template`: Prompt template with `{context}` placeholder + +### Optional Fields + +- `focus`: Keywords for context filtering (e.g., "security performance") +- `oracle_categories`: Oracle knowledge categories to load ["patterns", "gotchas", "corrections", "solutions"] +- `oracle_tags_required`: Tags to filter Oracle knowledge by +- `max_oracle_patterns`: Maximum Oracle patterns to include (default: 5) +- `max_oracle_gotchas`: Maximum Oracle gotchas to include (default: 5) +- `always_include_files`: Additional files to always include (e.g., config files) +- `validation_rules`: Rules for validating subagent suggestions + +### Validation Rules + +```json +{ + "min_confidence": 0.5, // Minimum confidence score to present + "block_contradictions": true, // Block suggestions that contradict Oracle + "require_severity": false, // Require severity field in suggestions + "require_impact": false, // Require impact field in suggestions + "require_dependencies": false // Require dependencies field (for planning) +} +``` + +## Prompt Template Guidelines + +### Critical Constraints Section + +Always include: + +``` +CRITICAL CONSTRAINTS: +- DO NOT use Write, Edit, NotebookEdit, or Bash tools +- DO NOT modify any files +- DO NOT execute any code +- ONLY read the provided context and return suggestions +``` + +### Context Placeholder + +Include `{context}` where minimal context should be inserted: + +``` +Your task: Review this code for security issues. + +{context} + +Return your findings... +``` + +### Output Format + +Specify clear JSON output format: + +```json +[ + { + "text": "Clear description of the issue", + "category": "security|performance|style|bugs", + "file": "file path", + "line": line_number (if applicable, otherwise null) + } +] +``` + +### Reminder Section + +End with: + +``` +Remember: You are READ-ONLY. Only analyze and suggest, never modify. +``` + +## Best Practices + +### For Review Templates + +1. **Be Specific**: Focus on particular types of issues (security, performance, style) +2. **Provide Examples**: Show what good/bad code looks like +3. **Reference Standards**: OWASP, CWE, language style guides +4. **Request Details**: Ask for line numbers, severity, remediation steps + +### For Planning Templates + +1. **Encourage Decomposition**: Break large tasks into small, testable pieces +2. **Request Dependencies**: Ask for task ordering and prerequisites +3. **Ask for Risks**: Identify potential issues early +4. **Require Estimates**: Time and complexity estimates help prioritization + +### For All Templates + +1. **Minimal Context**: Only request what's needed for the task +2. **Read-Only Focus**: Emphasize analysis over action +3. **Structured Output**: Request JSON for easy parsing +4. **Oracle Integration**: Leverage Oracle knowledge for context + +## Template Versioning + +Templates follow semantic versioning via filename suffixes: + +- `security_review.json` - Latest version +- `security_review_v2.json` - Explicit version 2 +- `security_review_legacy.json` - Deprecated version + +When updating templates, create a new version and mark old ones as legacy. + +## Contributing Templates + +To contribute a new template: + +1. Create your template following the structure above +2. Test it with real code reviews +3. Document what it's best for +4. Submit a PR with: + - Template JSON file + - Example usage + - Test results + +Good templates help the entire community! + +## Examples + +### Example: Security Review + +```bash +# Run security review on auth file +python guardian/scripts/guardian.py review \ + --file src/auth.py \ + --template security_review + +# Guardian will: +# 1. Load security_review.json template +# 2. Extract minimal context (auth.py + security Oracle patterns) +# 3. Apply template to context +# 4. Spawn Haiku agent with structured prompt +# 5. Validate suggestions against Oracle +# 6. Present results with confidence scores +``` + +### Example: Performance Review + +```bash +# Run performance review on database queries +python guardian/scripts/guardian.py review \ + --file src/database/queries.py \ + --template performance_review + +# Focuses on: +# - Query complexity +# - N+1 problems +# - Missing indexes +# - Caching opportunities +``` + +### Example: Feature Planning + +```bash +# Plan a new REST API +python guardian/scripts/guardian.py plan \ + --task "Build REST API with auth and rate limiting" \ + --template feature_planning + +# Returns: +# - Subtask breakdown +# - Dependencies +# - Estimates +# - Risk assessment +``` diff --git a/skills/guardian/Templates/feature_planning.json b/skills/guardian/Templates/feature_planning.json new file mode 100644 index 0000000..c100d9b --- /dev/null +++ b/skills/guardian/Templates/feature_planning.json @@ -0,0 +1,17 @@ +{ + "name": "feature_planning", + "description": "Feature development task breakdown template", + "task_type": "plan", + "focus": "task_breakdown", + "agent_prompt_template": "You are a READ-ONLY task planner for Guardian. You can ONLY analyze and plan.\n\nCRITICAL CONSTRAINTS:\n- DO NOT use Write, Edit, NotebookEdit, or Bash tools\n- DO NOT modify any files\n- DO NOT execute any code\n- ONLY analyze the task and return a breakdown plan\n\nYour task: Break down this feature into implementable subtasks.\n\n{context}\n\n**Planning Methodology:**\n\n1. **Decomposition Strategy:**\n - Start with infrastructure/foundation tasks\n - Build incrementally (each task should be independently testable)\n - Identify critical path and parallel work opportunities\n - Consider deployment and rollback strategies\n\n2. **Task Characteristics:**\n - Each task should be completable in one focused session\n - Clear acceptance criteria\n - Minimal coupling with other tasks\n - Explicit dependencies\n\n3. **Risk Assessment:**\n - Identify high-risk/high-complexity tasks\n - Suggest proof-of-concept or spike tasks for unknowns\n - Consider edge cases and error scenarios\n\n4. **Integration Points:**\n - API contracts\n - Database schema changes\n - Configuration requirements\n - Third-party dependencies\n\nReturn your plan as a JSON array of subtasks with this format:\n[\n {{\n \"task_id\": \"unique_identifier\",\n \"task\": \"Clear, actionable description of the subtask\",\n \"estimated_lines\": approximate lines of code needed,\n \"estimated_time\": \"e.g., '1 hour', '2-3 hours', '1 day'\",\n \"dependencies\": [\"list\", \"of\", \"prerequisite\", \"task_ids\"],\n \"files_affected\": [\"list of files that will be created/modified\"],\n \"priority\": \"high|medium|low\",\n \"complexity\": \"high|medium|low\",\n \"risks\": [\"list of potential risks or unknowns\"],\n \"acceptance_criteria\": [\"specific criteria to consider task complete\"],\n \"testing_strategy\": \"How to test this subtask\"\n }}\n]\n\nAlso include a summary:\n{{\n \"total_tasks\": number,\n \"critical_path\": [\"task_ids in order\"],\n \"parallel_opportunities\": [[\"task_ids that can be done in parallel\"]],\n \"estimated_total_time\": \"e.g., '1-2 days'\",\n \"high_risk_tasks\": [\"task_ids\"],\n \"recommended_order\": [\"task_ids in recommended execution order\"]\n}}\n\nRemember: You are READ-ONLY. Only analyze and plan, never modify.", + "oracle_categories": ["patterns", "solutions"], + "oracle_tags_required": [], + "max_oracle_patterns": 5, + "max_oracle_gotchas": 3, + "always_include_files": [], + "validation_rules": { + "min_confidence": 0.6, + "block_contradictions": false, + "require_dependencies": true + } +} diff --git a/skills/guardian/Templates/performance_review.json b/skills/guardian/Templates/performance_review.json new file mode 100644 index 0000000..d006cc1 --- /dev/null +++ b/skills/guardian/Templates/performance_review.json @@ -0,0 +1,17 @@ +{ + "name": "performance_review", + "description": "Performance and optimization code review template", + "task_type": "review", + "focus": "performance", + "agent_prompt_template": "You are a READ-ONLY performance code reviewer for Guardian. You can ONLY analyze and suggest.\n\nCRITICAL CONSTRAINTS:\n- DO NOT use Write, Edit, NotebookEdit, or Bash tools\n- DO NOT modify any files\n- DO NOT execute any code\n- ONLY read the provided context and return suggestions\n\nYour task: Perform a thorough performance review focusing on:\n\n**Algorithmic Efficiency:**\n- Time complexity (O(n), O(n²), O(log n), etc.)\n- Space complexity and memory usage\n- Unnecessary iterations or nested loops\n- Inefficient data structures\n\n**Common Performance Issues:**\n- N+1 query problems\n- Premature optimization\n- Synchronous operations that should be async\n- Blocking I/O operations\n- Missing indexes on database queries\n- Unnecessary object creation/allocation\n- String concatenation in loops\n- Regex compilation in hot paths\n- Missing caching opportunities\n- Memory leaks\n\n**Platform-Specific:**\n- JavaScript: Unnecessary re-renders, large bundle sizes\n- Python: GIL contention, list comprehensions vs generators\n- Database: Missing indexes, inefficient joins, full table scans\n- API: Missing rate limiting, pagination, compression\n\n{context}\n\nReturn your findings as a JSON array of suggestions with this format:\n[\n {{\n \"text\": \"Clear description of the performance issue and recommended fix\",\n \"category\": \"performance\",\n \"impact\": \"critical|high|medium|low\",\n \"file\": \"file path\",\n \"line\": line_number (if applicable, otherwise null),\n \"current_complexity\": \"O(...) or description\",\n \"improved_complexity\": \"O(...) or description\",\n \"estimated_improvement\": \"e.g., '10x faster', '50% less memory'\"\n }}\n]\n\nIf you find no performance issues, return an empty array: []\n\nRemember: You are READ-ONLY. Only analyze and suggest, never modify.", + "oracle_categories": ["patterns", "gotchas"], + "oracle_tags_required": ["performance", "optimization", "async", "caching"], + "max_oracle_patterns": 8, + "max_oracle_gotchas": 5, + "always_include_files": [], + "validation_rules": { + "min_confidence": 0.5, + "block_contradictions": true, + "require_impact": true + } +} diff --git a/skills/guardian/Templates/security_review.json b/skills/guardian/Templates/security_review.json new file mode 100644 index 0000000..d6e21eb --- /dev/null +++ b/skills/guardian/Templates/security_review.json @@ -0,0 +1,17 @@ +{ + "name": "security_review", + "description": "Security-focused code review template", + "task_type": "review", + "focus": "security", + "agent_prompt_template": "You are a READ-ONLY security code reviewer for Guardian. You can ONLY analyze and suggest.\n\nCRITICAL CONSTRAINTS:\n- DO NOT use Write, Edit, NotebookEdit, or Bash tools\n- DO NOT modify any files\n- DO NOT execute any code\n- ONLY read the provided context and return suggestions\n\nYour task: Perform a thorough security review focusing on:\n\n**OWASP Top 10 (2025):**\n1. Injection vulnerabilities (SQL, Command, XSS, etc.)\n2. Broken authentication and session management\n3. Sensitive data exposure\n4. XML external entities (XXE)\n5. Broken access control\n6. Security misconfiguration\n7. Cross-site scripting (XSS)\n8. Insecure deserialization\n9. Using components with known vulnerabilities\n10. Insufficient logging and monitoring\n\n**Additional Security Checks:**\n- Cryptographic weaknesses (weak algorithms, hardcoded keys)\n- Race conditions and TOCTOU vulnerabilities\n- Input validation and sanitization\n- Output encoding\n- CSRF protection\n- Secure defaults\n- Principle of least privilege\n- Defense in depth\n\n{context}\n\nReturn your findings as a JSON array of suggestions with this format:\n[\n {{\n \"text\": \"Clear description of the security issue and recommended fix\",\n \"category\": \"security\",\n \"severity\": \"critical|high|medium|low\",\n \"cwe\": \"CWE-XXX (if applicable)\",\n \"file\": \"file path\",\n \"line\": line_number (if applicable, otherwise null),\n \"exploit_scenario\": \"Brief description of how this could be exploited\",\n \"remediation\": \"Specific fix recommendation\"\n }}\n]\n\nIf you find no security issues, return an empty array: []\n\nRemember: You are READ-ONLY. Only analyze and suggest, never modify.", + "oracle_categories": ["patterns", "gotchas", "corrections"], + "oracle_tags_required": ["security", "auth", "crypto", "injection", "xss"], + "max_oracle_patterns": 10, + "max_oracle_gotchas": 5, + "always_include_files": ["*.config", "*.env.example"], + "validation_rules": { + "min_confidence": 0.4, + "block_contradictions": true, + "require_severity": true + } +} diff --git a/skills/guardian/Templates/session_health.json b/skills/guardian/Templates/session_health.json new file mode 100644 index 0000000..767bb38 --- /dev/null +++ b/skills/guardian/Templates/session_health.json @@ -0,0 +1,131 @@ +{ + "name": "Session Health Monitor", + "description": "Tracks session degradation signals and recommends when to start fresh", + "version": "1.0.0", + "triggers": { + "automatic": true, + "interval_minutes": 10, + "on_error": true, + "on_correction": true + }, + "metrics": { + "context_usage": { + "check": "token_count / max_tokens", + "warning_threshold": 0.7, + "critical_threshold": 0.85, + "weight": 0.25 + }, + "error_frequency": { + "check": "errors_last_30min", + "warning_threshold": 3, + "critical_threshold": 5, + "weight": 0.2 + }, + "correction_rate": { + "check": "corrections_last_30min", + "warning_threshold": 3, + "critical_threshold": 5, + "weight": 0.2 + }, + "file_churn": { + "check": "same_file_edits_in_10min", + "warning_threshold": 5, + "critical_threshold": 8, + "weight": 0.15 + }, + "repeated_errors": { + "check": "same_error_count", + "warning_threshold": 2, + "critical_threshold": 3, + "weight": 0.2 + } + }, + "health_score_calculation": "weighted_average_of_metrics", + "recommendations": { + "90-100": { + "status": "✅ Excellent", + "color": "green", + "message": "Session is healthy. Continue working.", + "action": "none" + }, + "70-89": { + "status": "✓ Good", + "color": "blue", + "message": "Session is performing well.", + "action": "none" + }, + "50-69": { + "status": "⚠️ Fair", + "color": "yellow", + "message": "Session showing minor degradation. Consider taking a break or refocusing.", + "action": "suggest_break" + }, + "30-49": { + "status": "⚠️ Warning", + "color": "orange", + "message": "Session degrading. Recommend starting fresh session soon.", + "action": "suggest_handoff" + }, + "0-29": { + "status": "❌ Critical", + "color": "red", + "message": "Session severely degraded. Strongly recommend session handoff NOW.", + "action": "recommend_handoff_now" + } + }, + "handoff_triggers": { + "health_below": 40, + "context_above": 0.85, + "repeated_errors_above": 3, + "corrections_above": 5, + "session_duration_minutes": 180 + }, + "dashboard_display": { + "show_in_status": true, + "format": "Session Health: {score}/100 {status_icon}", + "details_on_request": true, + "details_format": { + "title": "Session Health Dashboard", + "sections": [ + { + "name": "Overall Health", + "display": "{score}/100 - {status}" + }, + { + "name": "Metrics", + "display": [ + "Context Usage: {context_usage}% ({status_icon})", + "Error Rate: {errors_last_30min} in 30min ({status_icon})", + "Correction Rate: {corrections_last_30min} in 30min ({status_icon})", + "File Churn: {max_file_edit_count} edits to same file ({status_icon})", + "Repeated Errors: {repeated_error_count} ({status_icon})" + ] + }, + { + "name": "Recommendation", + "display": "{recommendation_message}" + }, + { + "name": "Session Stats", + "display": [ + "Duration: {duration_minutes} minutes", + "Files Modified: {files_modified_count}", + "Commands Run: {commands_run_count}", + "Tokens Used: {tokens_used}/{max_tokens}" + ] + } + ] + } + }, + "oracle_integration": { + "record_health_scores": true, + "record_handoff_events": true, + "track_degradation_patterns": true, + "learn_from_successful_sessions": true + }, + "evaluator_integration": { + "track_health_metrics": true, + "track_handoff_frequency": true, + "track_post_handoff_success": true + } +} diff --git a/skills/guardian/scripts/README.md b/skills/guardian/scripts/README.md new file mode 100644 index 0000000..1126a27 --- /dev/null +++ b/skills/guardian/scripts/README.md @@ -0,0 +1,225 @@ +# Guardian Scripts + +This directory contains the implementation scripts for the Guardian skill. + +## Core Scripts + +### guardian.py +**Main orchestrator** - Coordinates all Guardian components. + +Usage: +```bash +# Review code +python guardian.py review --file auth.py --focus security + +# Plan complex task +python guardian.py plan --task "Build REST API" + +# Debug error +python guardian.py debug --file app.py --error "TypeError: cannot unpack" + +# Check triggers +python guardian.py check + +# Session status +python guardian.py status +``` + +### monitor_session.py +**Session health monitor** - Tracks metrics and detects when intervention is needed. + +Key Features: +- Tracks code volume, errors, file edits, corrections +- Detects threshold crossings +- Calculates session health scores +- Minimal storage (only metrics, not full conversation) + +Usage: +```bash +# Track events +python monitor_session.py --event code-written --file auth.py --lines 60 +python monitor_session.py --event error --message "TypeError: ..." +python monitor_session.py --event file-edit --file app.py +python monitor_session.py --event correction --message "that's wrong..." + +# Check health +python monitor_session.py --check-health +python monitor_session.py --check-triggers + +# Initialize/reset +python monitor_session.py --init +python monitor_session.py --reset +``` + +### context_filter.py +**Minimal context extractor** - Extracts only what's needed for subagent tasks. + +Key Principle: "Caller should only pass exactly what is needed for the task so it can be laser focused" + +Features: +- Loads only relevant files +- Extracts relevant Oracle patterns (max 5) +- Finds recent corrections (max 3) +- NO full conversation passing + +Usage: +```bash +# Extract context for review +python context_filter.py --task review --file auth.py --focus security + +# Extract context for planning +python context_filter.py --task plan --description "Build REST API" + +# Extract context for debugging +python context_filter.py --task debug --file app.py --error "TypeError: ..." + +# Output as JSON +python context_filter.py --task review --file auth.py --format json +``` + +### validator.py +**Suggestion validator** - Cross-checks subagent suggestions against Oracle knowledge. + +Key Principle: "Subagent might be missing important codebase context - need validation layer" + +Features: +- Validates against Oracle patterns +- Detects contradictions with known practices +- Checks rejection history +- Calculates confidence scores +- Learns from acceptance rates + +Usage: +```bash +# Validate single suggestion +python validator.py --suggestion "Use MD5 for passwords" --category security + +# Validate from file +python validator.py --suggestions-file suggestions.json + +# Record rejection +python validator.py --record-rejection "Add rate limiting" --rejection-reason "We handle at nginx level" --category performance + +# Update stats +python validator.py --update-stats accept --category security +python validator.py --update-stats reject --category style + +# Check rejection history +python validator.py --check-rejection "Use rate limiting" +``` + +### learning.py +**Learning system** - Adjusts thresholds based on user feedback. + +Features: +- Tracks acceptance/rejection rates +- Adjusts thresholds to maintain target acceptance rate +- Learns anti-patterns from rejections +- Updates auto-review rules dynamically + +Usage: +```bash +# Apply adjustments +python learning.py --adjust + +# Show recommendations (dry run) +python learning.py --recommend + +# View statistics +python learning.py --stats + +# Configure +python learning.py --set-target 0.75 +python learning.py --set-speed 0.1 +``` + +## Architecture + +``` +guardian.py (orchestrator) + | + +-- monitor_session.py (check triggers & health) + | + +-- context_filter.py (extract minimal context) + | + +-- [Task tool with Haiku agent] (perform review/planning) + | + +-- validator.py (validate suggestions) + | + +-- [Present to user with confidence scores] + | + +-- learning.py (adjust based on feedback) +``` + +## Data Storage + +Guardian stores minimal data in `.guardian/`: + +``` +.guardian/ +├── config.json # Configuration and thresholds +├── session_state.json # Current session metrics (NOT full conversation) +├── rejection_history.json # Recently rejected suggestions +└── acceptance_stats.json # Acceptance rate statistics +``` + +## Key Design Principles + +1. **Minimal Context Passing** - Never pass full conversations to subagents +2. **Suggestion Mode** - Present findings as suggestions, not commands +3. **Oracle Validation** - Cross-check all suggestions against known patterns +4. **Learning from Feedback** - Adjust sensitivity based on user acceptance +5. **Haiku Only** - All subagents use haiku model (fast & cheap) +6. **User Authority** - User has final say on all suggestions +7. **Read-Only Subagents** - Subagents can ONLY read and analyze, NEVER modify files + +## Integration with Oracle + +Guardian automatically: +- Loads relevant Oracle patterns before review +- Validates suggestions against Oracle knowledge +- Records validated suggestions in Oracle +- Stores rejection reasons as anti-patterns in Oracle + +## Example Workflow + +1. **User writes 60 lines of auth code** +2. **monitor_session.py** detects threshold crossed +3. **guardian.py** extracts minimal context via **context_filter.py** +4. **Guardian spawns Haiku agent** with only: auth.py + security patterns +5. **Haiku agent** returns suggestions +6. **validator.py** checks suggestions against Oracle +7. **Guardian presents** filtered suggestions with confidence scores +8. **User accepts/rejects** +9. **learning.py** adjusts thresholds based on feedback + +## Testing + +```bash +# Initialize Guardian +cd /path/to/your/project +python guardian.py --init + +# Simulate code writing +python monitor_session.py --event code-written --file test.py --lines 60 + +# Check if should trigger +python monitor_session.py --check-triggers + +# Perform review +python guardian.py review --file test.py --focus security + +# View session health +python guardian.py status + +# View learning stats +python learning.py --stats +``` + +## Future Enhancements + +- Integration with Claude Code Task tool for spawning Haiku agents +- Real-time monitoring via background process +- Web dashboard for session health visualization +- Team-wide learning (shared Guardian config via git) +- Integration with CI/CD pipelines diff --git a/skills/guardian/scripts/context_filter.py b/skills/guardian/scripts/context_filter.py new file mode 100644 index 0000000..fa1caa0 --- /dev/null +++ b/skills/guardian/scripts/context_filter.py @@ -0,0 +1,530 @@ +#!/usr/bin/env python3 +""" +Guardian Context Filter + +Extracts MINIMAL context for subagent tasks - following the key principle: +"Caller should only pass exactly what is needed for the task so it can be laser focused." + +This script NEVER passes full conversation history. It extracts only: +- Specific files being reviewed +- Relevant Oracle patterns (max 5) +- Recent corrections in the same area (max 3) +- A focused task description + +Usage: + # Extract context for code review + python context_filter.py --task review --file auth.py --focus security + + # Extract context for planning + python context_filter.py --task plan --description "Build REST API with auth" + + # Extract context for debugging + python context_filter.py --task debug --file app.py --error "TypeError: cannot unpack" + +Environment Variables: + ORACLE_PATH: Path to Oracle directory [default: .oracle] +""" + +import os +import sys +import json +import argparse +from pathlib import Path +from typing import Dict, List, Optional, Any +import re + + +def find_oracle_root() -> Optional[Path]: + """Find the .oracle directory.""" + current = Path.cwd() + + while current != current.parent: + oracle_path = current / '.oracle' + if oracle_path.exists(): + return oracle_path + current = current.parent + + return None + + +def load_oracle_knowledge(oracle_path: Path, categories: Optional[List[str]] = None) -> List[Dict[str, Any]]: + """Load Oracle knowledge from specified categories. + + Args: + oracle_path: Path to .oracle directory + categories: List of categories to load (defaults to all) + + Returns: + List of knowledge entries + """ + knowledge_dir = oracle_path / 'knowledge' + all_knowledge: List[Dict[str, Any]] = [] + + if categories is None: + categories = ['patterns', 'preferences', 'gotchas', 'solutions', 'corrections'] + + for category in categories: + file_path = knowledge_dir / f'{category}.json' + if file_path.exists(): + try: + with open(file_path, 'r', encoding='utf-8') as f: + entries = json.load(f) + for entry in entries: + if isinstance(entry, dict): + entry['_category'] = category + all_knowledge.append(entry) + except (json.JSONDecodeError, FileNotFoundError, OSError, IOError): + continue + + return all_knowledge + + +def extract_file_patterns(file_path: str) -> List[str]: + """Extract patterns from a file path for matching Oracle knowledge. + + Args: + file_path: Path to the file + + Returns: + List of patterns (extension, directory names, filename) + """ + patterns = [] + path = Path(file_path) + + # Add file extension + if path.suffix: + patterns.append(path.suffix[1:]) # Remove dot + + # Add filename without extension + if path.stem: + patterns.append(path.stem) + + # Add directory components + for part in path.parts[:-1]: + if part and part != '.' and part != '..': + patterns.append(part) + + return patterns + + +def find_relevant_patterns( + oracle_knowledge: List[Dict[str, Any]], + file_patterns: List[str], + focus_keywords: Optional[List[str]] = None, + max_patterns: int = 5 +) -> List[Dict[str, Any]]: + """Find relevant Oracle patterns for the context. + + Args: + oracle_knowledge: All Oracle knowledge + file_patterns: Patterns extracted from file path + focus_keywords: Optional focus keywords (e.g., "security", "performance") + max_patterns: Maximum number of patterns to return + + Returns: + List of relevant pattern entries + """ + # Filter to patterns category only + patterns = [k for k in oracle_knowledge if k.get('_category') == 'patterns'] + + scored_patterns = [] + + for pattern in patterns: + score = 0.0 + + # Priority scoring + priority = pattern.get('priority', 'medium') + if priority == 'critical': + score += 1.0 + elif priority == 'high': + score += 0.7 + elif priority == 'medium': + score += 0.4 + + # Tag matching + tags = pattern.get('tags', []) + if tags and file_patterns: + matches = sum(1 for fp in file_patterns + if any(re.search(r'\b' + re.escape(fp.lower()) + r'\b', tag.lower()) + for tag in tags)) + score += matches * 0.3 + + # Focus keyword matching + if focus_keywords: + content = f"{pattern.get('title', '')} {pattern.get('content', '')}".lower() + keyword_matches = sum(1 for keyword in focus_keywords + if re.search(r'\b' + re.escape(keyword.lower()) + r'\b', content)) + score += keyword_matches * 0.5 + + scored_patterns.append((pattern, score)) + + # Sort by score descending + scored_patterns.sort(key=lambda x: x[1], reverse=True) + + # Return top N + return [pattern for pattern, score in scored_patterns[:max_patterns]] + + +def find_relevant_gotchas( + oracle_knowledge: List[Dict[str, Any]], + file_patterns: List[str], + focus_keywords: Optional[List[str]] = None, + max_gotchas: int = 5 +) -> List[Dict[str, Any]]: + """Find relevant Oracle gotchas for the context.""" + gotchas = [k for k in oracle_knowledge if k.get('_category') == 'gotchas'] + + scored_gotchas = [] + + for gotcha in gotchas: + score = 0.0 + + # Priority scoring (gotchas are critical by nature) + priority = gotcha.get('priority', 'high') + if priority == 'critical': + score += 1.0 + elif priority == 'high': + score += 0.8 + + # Tag matching + tags = gotcha.get('tags', []) + if tags and file_patterns: + matches = sum(1 for fp in file_patterns + if any(re.search(r'\b' + re.escape(fp.lower()) + r'\b', tag.lower()) + for tag in tags)) + score += matches * 0.4 + + # Focus keyword matching + if focus_keywords: + content = f"{gotcha.get('title', '')} {gotcha.get('content', '')}".lower() + keyword_matches = sum(1 for keyword in focus_keywords + if re.search(r'\b' + re.escape(keyword.lower()) + r'\b', content)) + score += keyword_matches * 0.6 + + scored_gotchas.append((gotcha, score)) + + scored_gotchas.sort(key=lambda x: x[1], reverse=True) + + return [gotcha for gotcha, score in scored_gotchas[:max_gotchas]] + + +def find_recent_corrections( + oracle_knowledge: List[Dict[str, Any]], + file_patterns: List[str], + max_corrections: int = 3 +) -> List[Dict[str, Any]]: + """Find recent relevant corrections from Oracle. + + Args: + oracle_knowledge: All Oracle knowledge + file_patterns: Patterns from file path + max_corrections: Maximum corrections to return + + Returns: + List of recent relevant corrections + """ + corrections = [k for k in oracle_knowledge if k.get('_category') == 'corrections'] + + # Sort by creation date (most recent first) + sorted_corrections = sorted( + corrections, + key=lambda x: x.get('created', ''), + reverse=True + ) + + # Filter for relevance + relevant = [] + for correction in sorted_corrections: + tags = correction.get('tags', []) + content = f"{correction.get('title', '')} {correction.get('content', '')}".lower() + + # Check if relevant to current file patterns + is_relevant = False + + if tags and file_patterns: + if any(fp.lower() in tag.lower() for fp in file_patterns for tag in tags): + is_relevant = True + + if file_patterns: + if any(re.search(r'\b' + re.escape(fp.lower()) + r'\b', content) for fp in file_patterns): + is_relevant = True + + if is_relevant: + relevant.append(correction) + + if len(relevant) >= max_corrections: + break + + return relevant + + +def build_minimal_context( + task_type: str, + file_path: Optional[str] = None, + focus: Optional[str] = None, + description: Optional[str] = None, + error_message: Optional[str] = None, + oracle_path: Optional[Path] = None +) -> Dict[str, Any]: + """Build minimal context for subagent task. + + Args: + task_type: Type of task (review, plan, debug) + file_path: Optional file path to review + focus: Optional focus keywords (e.g., "security performance") + description: Optional task description + error_message: Optional error message for debugging + oracle_path: Optional path to Oracle directory + + Returns: + Minimal context dictionary + """ + context: Dict[str, Any] = { + 'task': task_type, + 'files': {}, + 'oracle_patterns': [], + 'oracle_gotchas': [], + 'recent_corrections': [], + 'focus': '' + } + + # Parse focus keywords + focus_keywords = focus.split() if focus else [] + + # Extract file patterns + file_patterns = extract_file_patterns(file_path) if file_path else [] + + # Load file content if provided (with size limit to avoid memory exhaustion) + MAX_FILE_SIZE = 1024 * 1024 # 1MB limit + if file_path: + try: + path = Path(file_path) + if path.exists() and path.is_file(): + file_size = path.stat().st_size + if file_size > MAX_FILE_SIZE: + context['files'][file_path] = f"[File too large: {file_size} bytes. Showing first 1MB only]\n" + with open(path, 'r', encoding='utf-8') as f: + # Read only first 1MB + context['files'][file_path] += f.read(MAX_FILE_SIZE) + else: + with open(path, 'r', encoding='utf-8') as f: + context['files'][file_path] = f.read() + except (OSError, IOError, UnicodeDecodeError) as e: + context['files'][file_path] = "[Error: Could not read file]" + + # Load Oracle knowledge if available + if oracle_path: + oracle_knowledge = load_oracle_knowledge(oracle_path) + + # Find relevant patterns + context['oracle_patterns'] = find_relevant_patterns( + oracle_knowledge, + file_patterns, + focus_keywords, + max_patterns=5 + ) + + # Find relevant gotchas + context['oracle_gotchas'] = find_relevant_gotchas( + oracle_knowledge, + file_patterns, + focus_keywords, + max_gotchas=5 + ) + + # Find recent corrections + context['recent_corrections'] = find_recent_corrections( + oracle_knowledge, + file_patterns, + max_corrections=3 + ) + + # Build focus description + if task_type == 'review': + if focus: + context['focus'] = f"Review {file_path or 'code'} for {focus} issues" + else: + context['focus'] = f"Review {file_path or 'code'} for potential issues" + + elif task_type == 'plan': + context['focus'] = f"Break down this task into subtasks: {description or 'Complex task'}" + + elif task_type == 'debug': + context['focus'] = f"Debug error in {file_path or 'code'}: {error_message or 'Unknown error'}" + if error_message: + context['error'] = error_message + + return context + + +def format_context_for_agent(context: Dict[str, Any], format_type: str = 'text') -> str: + """Format context for subagent consumption. + + Args: + context: Minimal context dictionary + format_type: Output format (text or json) + + Returns: + Formatted context string + """ + if format_type == 'json': + return json.dumps(context, indent=2) + + # Text format + lines = [] + + lines.append(f"# Task: {context['task'].capitalize()}") + lines.append("") + lines.append(f"**Focus**: {context['focus']}") + lines.append("") + + # Files + if context['files']: + lines.append("## Files to Review") + lines.append("") + for file_path, content in context['files'].items(): + lines.append(f"### {file_path}") + lines.append("") + lines.append("```") + lines.append(content[:5000]) # Limit file size + if len(content) > 5000: + lines.append("... [truncated]") + lines.append("```") + lines.append("") + + # Oracle patterns + if context['oracle_patterns']: + lines.append("## Relevant Patterns (from Oracle)") + lines.append("") + for pattern in context['oracle_patterns']: + title = pattern.get('title', 'Untitled') + content = pattern.get('content', '') + lines.append(f"- **{title}**") + if content: + lines.append(f" {content[:200]}") + lines.append("") + + # Oracle gotchas + if context['oracle_gotchas']: + lines.append("## Gotchas to Watch For (from Oracle)") + lines.append("") + for gotcha in context['oracle_gotchas']: + title = gotcha.get('title', 'Untitled') + content = gotcha.get('content', '') + priority = gotcha.get('priority', 'medium') + if priority == 'critical': + lines.append(f"- **[CRITICAL]** {title}") + else: + lines.append(f"- {title}") + if content: + lines.append(f" {content[:200]}") + lines.append("") + + # Recent corrections + if context['recent_corrections']: + lines.append("## Recent Corrections (from Oracle)") + lines.append("") + for correction in context['recent_corrections']: + content = correction.get('content', '') + title = correction.get('title', 'Correction') + + # Try to extract the "Right:" part + if 'Right:' in content: + try: + right_part = content.split('Right:', 1)[1].split('\n', 1)[0].strip() + if right_part: + lines.append(f"- {right_part}") + else: + lines.append(f"- {title}") + except (IndexError, ValueError, AttributeError): + lines.append(f"- {title}") + else: + lines.append(f"- {title}") + lines.append("") + + return "\n".join(lines) + + +def main(): + parser = argparse.ArgumentParser( + description='Extract minimal context for Guardian subagent tasks', + formatter_class=argparse.RawDescriptionHelpFormatter + ) + + parser.add_argument( + '--task', + required=True, + choices=['review', 'plan', 'debug'], + help='Type of task' + ) + + parser.add_argument( + '--file', + help='File path to review/debug' + ) + + parser.add_argument( + '--focus', + help='Focus keywords (e.g., "security performance")' + ) + + parser.add_argument( + '--description', + help='Task description (for planning tasks)' + ) + + parser.add_argument( + '--error', + help='Error message (for debugging tasks)' + ) + + parser.add_argument( + '--format', + choices=['text', 'json'], + default='text', + help='Output format' + ) + + parser.add_argument( + '--no-oracle', + action='store_true', + help='Skip Oracle knowledge loading' + ) + + args = parser.parse_args() + + # Validate arguments based on task type + if args.task == 'review' and not args.file: + print("Error: --file required for review tasks", file=sys.stderr) + sys.exit(1) + + if args.task == 'plan' and not args.description: + print("Error: --description required for planning tasks", file=sys.stderr) + sys.exit(1) + + if args.task == 'debug' and not args.file: + print("Error: --file required for debug tasks", file=sys.stderr) + sys.exit(1) + + # Find Oracle + oracle_path = None + if not args.no_oracle: + oracle_path = find_oracle_root() + + # Build minimal context + context = build_minimal_context( + task_type=args.task, + file_path=args.file, + focus=args.focus, + description=args.description, + error_message=args.error, + oracle_path=oracle_path + ) + + # Format and output + output = format_context_for_agent(context, args.format) + print(output) + + +if __name__ == '__main__': + main() diff --git a/skills/guardian/scripts/guardian.py b/skills/guardian/scripts/guardian.py new file mode 100644 index 0000000..a3fa2c8 --- /dev/null +++ b/skills/guardian/scripts/guardian.py @@ -0,0 +1,524 @@ +#!/usr/bin/env python3 +""" +Guardian - Main Orchestrator + +Coordinates Guardian components to provide automatic quality gates and session health monitoring. + +This is the main entry point for Guardian operations. It: +1. Checks session health metrics +2. Determines if intervention is needed +3. Extracts minimal context +4. Spawns Haiku subagent for focused review/planning +5. Validates suggestions against Oracle +6. Presents results to user with confidence scores +7. Learns from user feedback + +Usage: + # Manual code review + python guardian.py review --file auth.py --focus security + + # Check if Guardian should trigger + python guardian.py check + + # Plan a complex task + python guardian.py plan --task "Build REST API with auth and rate limiting" + + # Debug an error + python guardian.py debug --file app.py --error "TypeError: cannot unpack" + + # Get session health status + python guardian.py status + +Environment Variables: + GUARDIAN_MODEL: Model to use for subagents [default: haiku] +""" + +import os +import sys +import json +import argparse +from pathlib import Path +from typing import Dict, List, Optional, Any +import subprocess + + +def find_scripts_dir() -> Path: + """Find the Guardian Scripts directory.""" + return Path(__file__).parent + + +def run_script(script_name: str, args: List[str]) -> Dict[str, Any]: + """Run a Guardian script and return JSON output. + + Args: + script_name: Name of the script (without .py extension) + args: List of command-line arguments + + Returns: + Parsed JSON output from the script + """ + scripts_dir = find_scripts_dir() + script_path = scripts_dir / f"{script_name}.py" + + if not script_path.exists(): + raise FileNotFoundError(f"Script not found: {script_path}") + + try: + result = subprocess.run( + ['python', str(script_path)] + args, + capture_output=True, + text=True, + timeout=30 + ) + + if result.returncode != 0: + # Try to parse error as JSON + try: + return json.loads(result.stdout) + except json.JSONDecodeError: + return {'error': result.stderr or result.stdout} + + # Parse output as JSON + try: + return json.loads(result.stdout) + except json.JSONDecodeError: + return {'output': result.stdout} + + except subprocess.TimeoutExpired: + return {'error': 'Script timeout'} + except Exception as e: + return {'error': str(e)} + + +def check_triggers() -> Dict[str, Any]: + """Check if any Guardian triggers have fired.""" + return run_script('monitor_session', ['--check-triggers']) + + +def get_session_health() -> Dict[str, Any]: + """Get current session health metrics.""" + return run_script('monitor_session', ['--check-health']) + + +def extract_context( + task_type: str, + file_path: Optional[str] = None, + focus: Optional[str] = None, + description: Optional[str] = None, + error_message: Optional[str] = None +) -> str: + """Extract minimal context for subagent task.""" + args = ['--task', task_type, '--format', 'text'] + + if file_path: + args.extend(['--file', file_path]) + if focus: + args.extend(['--focus', focus]) + if description: + args.extend(['--description', description]) + if error_message: + args.extend(['--error', error_message]) + + result = run_script('context_filter', args) + + if 'output' in result: + return result['output'] + elif 'error' in result: + return f"Error extracting context: {result['error']}" + else: + return str(result) + + +def validate_suggestions(suggestions: List[Dict[str, str]]) -> List[Dict[str, Any]]: + """Validate suggestions against Oracle knowledge. + + Args: + suggestions: List of suggestion dictionaries with 'text' and 'category' keys + + Returns: + List of validated suggestions with confidence scores + """ + # Create temporary file with suggestions + import tempfile + suggestions_file = None + + try: + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + json.dump(suggestions, f) + suggestions_file = f.name + + result = run_script('validator', ['--suggestions-file', suggestions_file]) + return result if isinstance(result, list) else [] + finally: + # Clean up temp file + if suggestions_file: + try: + os.unlink(suggestions_file) + except OSError: + pass + + +def format_suggestions_for_user(validated_suggestions: List[Dict[str, Any]]) -> str: + """Format validated suggestions for user presentation. + + Args: + validated_suggestions: List of validated suggestions + + Returns: + Formatted string for user + """ + lines = [] + + # Filter to presentable suggestions + presentable = [s for s in validated_suggestions if s.get('should_present', True)] + + if not presentable: + return "Guardian: No suggestions to present (all filtered by validation)" + + lines.append(f"Guardian Review Found {len(presentable)} Suggestions:") + lines.append("") + + for i, suggestion in enumerate(presentable, 1): + confidence = suggestion.get('confidence', 0.0) + text = suggestion.get('suggestion', '') + category = suggestion.get('category', 'general') + warnings = suggestion.get('warnings', []) + notes = suggestion.get('notes', []) + + # Format confidence indicator + if confidence >= 0.7: + conf_indicator = f"[{confidence:.2f}]" + elif confidence >= 0.5: + conf_indicator = f"?[{confidence:.2f}]" + else: + conf_indicator = f"![{confidence:.2f}]" + + lines.append(f"{i}. {conf_indicator} {text}") + lines.append(f" Category: {category}") + + # Add warnings + for warning in warnings: + severity = warning.get('severity', 'low') + message = warning.get('message', '') + if severity == 'high': + lines.append(f" WARNING: {message}") + else: + lines.append(f" Note: {message}") + + # Add notes + for note in notes: + lines.append(f" {note}") + + lines.append("") + + # Add command options + lines.append("Options:") + lines.append(" a - Accept all high-confidence suggestions (>=0.7)") + lines.append(" 1,3,5 - Accept specific suggestions by number") + lines.append(" r - Reject all with reason") + lines.append(" i - Reject and add to anti-patterns") + lines.append(" d - Discuss specific suggestion") + lines.append(" q - Dismiss review") + + return "\n".join(lines) + + +def perform_review( + file_path: str, + focus: Optional[str] = None +) -> None: + """Perform code review using Guardian. + + Args: + file_path: Path to file to review + focus: Optional focus keywords (e.g., "security performance") + """ + print(f"Guardian: Reviewing {file_path}...") + print() + + # Extract minimal context + context_text = extract_context('review', file_path=file_path, focus=focus) + + # Build read-only prompt for Haiku agent + agent_prompt = f"""You are a READ-ONLY code reviewer for Guardian. You can ONLY analyze and suggest. + +CRITICAL CONSTRAINTS: +- DO NOT use Write, Edit, NotebookEdit, or Bash tools +- DO NOT modify any files +- DO NOT execute any code +- ONLY read the provided context and return suggestions + +Your task: Review the code for potential issues and return suggestions. + +{context_text} + +Return your findings as a JSON array of suggestions with this format: +[ + {{ + "text": "Clear description of the issue and recommended fix", + "category": "security|performance|style|bugs|maintainability", + "file": "file path (if applicable)", + "line": line_number (if applicable, otherwise null) + }} +] + +If you find no issues, return an empty array: [] + +Remember: You are READ-ONLY. Only analyze and suggest, never modify.""" + + print("Spawning Haiku review agent with minimal context...") + print() + + # Note: This would be implemented when Guardian is used as a skill + # For standalone script usage, we output instructions instead + print("=" * 60) + print("READY TO SPAWN HAIKU AGENT") + print("=" * 60) + print("To complete this review, use the Task tool with:") + print(f" subagent_type: general-purpose") + print(f" model: haiku") + print(f" prompt: ") + print() + print("Agent Prompt:") + print("-" * 60) + print(agent_prompt) + print("=" * 60) + print() + print("Once you have the agent's response, the suggestions will be:") + print(" 1. Validated against Oracle knowledge") + print(" 2. Presented with confidence scores") + print(" 3. Offered for user acceptance/rejection") + print() + print("Example suggestion handling:") + mock_suggestions = [ + { + 'text': 'Consider using bcrypt for password hashing instead of MD5', + 'category': 'security', + 'file': file_path, + 'line': None + } + ] + validated = validate_suggestions(mock_suggestions) + presentation = format_suggestions_for_user(validated) + print(presentation) + + +def perform_planning(task_description: str) -> None: + """Break down a complex task using Guardian planning. + + Args: + task_description: Description of the task to break down + """ + print("Guardian: Breaking down complex task...") + print() + + # Extract minimal context + context_text = extract_context('plan', description=task_description) + + # Build read-only prompt for Haiku planner + agent_prompt = f"""You are a READ-ONLY task planner for Guardian. You can ONLY analyze and plan. + +CRITICAL CONSTRAINTS: +- DO NOT use Write, Edit, NotebookEdit, or Bash tools +- DO NOT modify any files +- DO NOT execute any code +- ONLY analyze the task and return a breakdown plan + +Your task: Break down this complex task into manageable subtasks. + +{context_text} + +Return your plan as a JSON array of subtasks with this format: +[ + {{ + "task": "Clear description of the subtask", + "estimated_lines": approximate lines of code needed, + "dependencies": ["list", "of", "prerequisite", "subtask", "numbers"], + "files_affected": ["list of files that will be created/modified"], + "priority": "high|medium|low" + }} +] + +Consider: +- Dependencies between tasks +- Logical ordering +- Potential complexity and risks +- Integration points + +Remember: You are READ-ONLY. Only analyze and plan, never modify.""" + + print("=" * 60) + print("READY TO SPAWN HAIKU PLANNER") + print("=" * 60) + print("To complete this planning task, use the Task tool with:") + print(f" subagent_type: Plan") + print(f" model: haiku") + print(f" prompt: ") + print() + print("Agent Prompt:") + print("-" * 60) + print(agent_prompt) + print("=" * 60) + + +def perform_debug(file_path: str, error_message: str) -> None: + """Debug an error using Guardian. + + Args: + file_path: Path to file with error + error_message: Error message to debug + """ + print(f"Guardian: Debugging error in {file_path}...") + print() + + # Extract minimal context + context_text = extract_context('debug', file_path=file_path, error_message=error_message) + + # Build read-only prompt for Haiku debugger + agent_prompt = f"""You are a READ-ONLY error debugger for Guardian. You can ONLY analyze and suggest fixes. + +CRITICAL CONSTRAINTS: +- DO NOT use Write, Edit, NotebookEdit, or Bash tools +- DO NOT modify any files +- DO NOT execute any code +- ONLY analyze the error and return debugging suggestions + +Your task: Analyze this error and suggest potential fixes. + +{context_text} + +Return your analysis as a JSON object with this format: +{{ + "root_cause": "Most likely cause of the error", + "affected_code": {{ + "file": "file path", + "line": line_number (if known) + }}, + "suggestions": [ + {{ + "text": "Clear description of the fix", + "category": "bug", + "confidence": 0.0 to 1.0 + }} + ], + "similar_patterns": ["Any similar error patterns from Oracle knowledge"] +}} + +Consider: +- What the error message indicates +- Common causes of this error type +- Relevant Oracle patterns or gotchas +- Edge cases that might trigger this + +Remember: You are READ-ONLY. Only analyze and suggest, never modify.""" + + print("=" * 60) + print("READY TO SPAWN HAIKU DEBUGGER") + print("=" * 60) + print("To complete this debug task, use the Task tool with:") + print(f" subagent_type: general-purpose") + print(f" model: haiku") + print(f" prompt: ") + print() + print("Agent Prompt:") + print("-" * 60) + print(agent_prompt) + print("=" * 60) + + +def check_if_should_trigger() -> bool: + """Check if Guardian should automatically trigger. + + Returns: + True if Guardian should trigger, False otherwise + """ + triggers = check_triggers() + + if isinstance(triggers, list) and len(triggers) > 0: + print("Guardian: Detected triggers:") + print(json.dumps(triggers, indent=2)) + return True + + return False + + +def show_status() -> None: + """Show current session health status.""" + health = get_session_health() + + print("Guardian Session Health Status:") + print("=" * 60) + print(json.dumps(health, indent=2)) + print("=" * 60) + + # Check triggers + triggers = check_triggers() + if isinstance(triggers, list) and len(triggers) > 0: + print() + print("Active Triggers:") + for trigger in triggers: + trigger_type = trigger.get('trigger', 'unknown') + priority = trigger.get('priority', 'medium') + print(f" - [{priority.upper()}] {trigger_type}") + for key, value in trigger.items(): + if key not in ['trigger', 'priority']: + print(f" {key}: {value}") + + +def main(): + parser = argparse.ArgumentParser( + description='Guardian - Quality gate and session health monitor', + formatter_class=argparse.RawDescriptionHelpFormatter + ) + + subparsers = parser.add_subparsers(dest='command', help='Guardian commands') + + # Review command + review_parser = subparsers.add_parser('review', help='Review code for issues') + review_parser.add_argument('--file', required=True, help='File to review') + review_parser.add_argument('--focus', help='Focus keywords (e.g., "security performance")') + + # Plan command + plan_parser = subparsers.add_parser('plan', help='Break down complex task') + plan_parser.add_argument('--task', required=True, help='Task description') + + # Debug command + debug_parser = subparsers.add_parser('debug', help='Debug an error') + debug_parser.add_argument('--file', required=True, help='File with error') + debug_parser.add_argument('--error', required=True, help='Error message') + + # Check command + subparsers.add_parser('check', help='Check if Guardian should trigger') + + # Status command + subparsers.add_parser('status', help='Show session health status') + + args = parser.parse_args() + + if not args.command: + parser.print_help() + sys.exit(1) + + # Execute command + if args.command == 'review': + perform_review(args.file, args.focus) + + elif args.command == 'plan': + perform_planning(args.task) + + elif args.command == 'debug': + perform_debug(args.file, args.error) + + elif args.command == 'check': + if check_if_should_trigger(): + sys.exit(0) # Should trigger + else: + print("Guardian: No triggers detected") + sys.exit(1) # Should not trigger + + elif args.command == 'status': + show_status() + + +if __name__ == '__main__': + main() diff --git a/skills/guardian/scripts/learning.py b/skills/guardian/scripts/learning.py new file mode 100644 index 0000000..b2bccc9 --- /dev/null +++ b/skills/guardian/scripts/learning.py @@ -0,0 +1,512 @@ +#!/usr/bin/env python3 +""" +Guardian Learning System + +Adjusts Guardian sensitivity and thresholds based on user feedback. + +Key Principle: "Learn from feedback to adjust sensitivity" + +This script: +1. Tracks acceptance/rejection rates per category +2. Adjusts thresholds dynamically to maintain target acceptance rate +3. Learns which file types need more/less review +4. Stores anti-patterns based on rejection reasons +5. Adapts to user's working style over time + +Usage: + # Adjust thresholds based on recent feedback + python learning.py --adjust + + # Get current threshold recommendations + python learning.py --recommend + + # Manually set acceptance rate target + python learning.py --set-target 0.75 + + # View learning statistics + python learning.py --stats + +Environment Variables: + GUARDIAN_CONFIG_PATH: Path to Guardian config file [default: .guardian/config.json] +""" + +import os +import sys +import json +import argparse +from pathlib import Path +from typing import Dict, List, Optional, Any +from datetime import datetime, timedelta + + +def find_guardian_root() -> Optional[Path]: + """Find the .guardian directory.""" + current = Path.cwd() + + while current != current.parent: + guardian_path = current / '.guardian' + if guardian_path.exists(): + return guardian_path + current = current.parent + + return None + + +def load_config(guardian_path: Path) -> Dict[str, Any]: + """Load Guardian configuration.""" + config_path = guardian_path / 'config.json' + + if not config_path.exists(): + return {} + + try: + with open(config_path, 'r', encoding='utf-8') as f: + return json.load(f) + except (json.JSONDecodeError, FileNotFoundError, OSError, IOError): + return {} + + +def save_config(guardian_path: Path, config: Dict[str, Any]) -> None: + """Save Guardian configuration.""" + config_path = guardian_path / 'config.json' + + try: + with open(config_path, 'w', encoding='utf-8') as f: + json.dump(config, f, indent=2) + except (OSError, IOError) as e: + print(f"Error: Failed to save config: {e}", file=sys.stderr) + sys.exit(1) + + +def load_acceptance_stats(guardian_path: Path) -> Dict[str, Any]: + """Load acceptance rate statistics.""" + stats_file = guardian_path / 'acceptance_stats.json' + + if not stats_file.exists(): + return { + 'by_category': {}, + 'by_type': {}, + 'overall': { + 'accepted': 0, + 'rejected': 0, + 'rate': 0.0 + } + } + + try: + with open(stats_file, 'r', encoding='utf-8') as f: + return json.load(f) + except (json.JSONDecodeError, FileNotFoundError, OSError, IOError): + return {'by_category': {}, 'by_type': {}, 'overall': {'accepted': 0, 'rejected': 0, 'rate': 0.0}} + + +def load_rejection_history(guardian_path: Path, days: int = 30) -> List[Dict[str, Any]]: + """Load recent rejection history. + + Args: + guardian_path: Path to Guardian directory + days: Number of days to look back + + Returns: + List of recent rejections + """ + history_file = guardian_path / 'rejection_history.json' + + if not history_file.exists(): + return [] + + try: + with open(history_file, 'r', encoding='utf-8') as f: + all_history = json.load(f) + except (json.JSONDecodeError, FileNotFoundError, OSError, IOError): + return [] + + # Filter to recent rejections + cutoff = datetime.now() - timedelta(days=days) + recent = [] + + for rejection in all_history: + try: + ts = datetime.fromisoformat(rejection.get('timestamp', '')) + # Handle timezone-aware timestamps + if ts.tzinfo and not cutoff.tzinfo: + cutoff = cutoff.replace(tzinfo=ts.tzinfo) + if ts >= cutoff: + recent.append(rejection) + except (ValueError, TypeError): + continue + + return recent + + +def calculate_threshold_adjustments( + config: Dict[str, Any], + acceptance_stats: Dict[str, Any], + target_rate: float = 0.7, + adjustment_speed: float = 0.1 +) -> Dict[str, int]: + """Calculate new threshold values based on acceptance rates. + + Args: + config: Current configuration + acceptance_stats: Acceptance statistics + target_rate: Target acceptance rate (0.0 to 1.0) + adjustment_speed: How fast to adjust (0.0 to 1.0) + + Returns: + Dictionary of adjusted threshold values + """ + current_sensitivity = config.get('sensitivity', {}) + overall_rate = acceptance_stats.get('overall', {}).get('rate', 0.5) + + adjustments = {} + + # Lines threshold adjustment + current_lines = current_sensitivity.get('lines_threshold', 50) + + if overall_rate < target_rate - 0.1: + # Too many false positives - increase threshold (trigger less often) + adjustment = int(current_lines * adjustment_speed) + adjustments['lines_threshold'] = current_lines + max(5, adjustment) + elif overall_rate > target_rate + 0.1: + # Too many missed issues - decrease threshold (trigger more often) + adjustment = int(current_lines * adjustment_speed) + adjustments['lines_threshold'] = max(20, current_lines - max(5, adjustment)) + else: + # Well-calibrated + adjustments['lines_threshold'] = current_lines + + # Error repeat threshold adjustment + error_stats = acceptance_stats.get('by_category', {}).get('error_analysis', {}) + error_rate = error_stats.get('rate', 0.5) + current_error_threshold = current_sensitivity.get('error_repeat_threshold', 3) + + if error_rate < target_rate - 0.1: + adjustments['error_repeat_threshold'] = min(10, current_error_threshold + 1) + elif error_rate > target_rate + 0.1: + adjustments['error_repeat_threshold'] = max(2, current_error_threshold - 1) + else: + adjustments['error_repeat_threshold'] = current_error_threshold + + # File churn threshold adjustment + churn_stats = acceptance_stats.get('by_category', {}).get('file_churn', {}) + churn_rate = churn_stats.get('rate', 0.5) + current_churn_threshold = current_sensitivity.get('file_churn_threshold', 5) + + if churn_rate < target_rate - 0.1: + adjustments['file_churn_threshold'] = min(15, current_churn_threshold + 1) + elif churn_rate > target_rate + 0.1: + adjustments['file_churn_threshold'] = max(3, current_churn_threshold - 1) + else: + adjustments['file_churn_threshold'] = current_churn_threshold + + # Correction threshold adjustment + correction_stats = acceptance_stats.get('by_category', {}).get('corrections', {}) + correction_rate = correction_stats.get('rate', 0.5) + current_correction_threshold = current_sensitivity.get('correction_threshold', 3) + + if correction_rate < target_rate - 0.1: + adjustments['correction_threshold'] = min(10, current_correction_threshold + 1) + elif correction_rate > target_rate + 0.1: + adjustments['correction_threshold'] = max(2, current_correction_threshold - 1) + else: + adjustments['correction_threshold'] = current_correction_threshold + + return adjustments + + +def learn_from_rejections( + rejection_history: List[Dict[str, Any]] +) -> Dict[str, Any]: + """Analyze rejection patterns to learn anti-patterns. + + Args: + rejection_history: List of rejections + + Returns: + Dictionary of learned anti-patterns and insights + """ + insights = { + 'common_rejection_reasons': {}, + 'frequently_rejected_categories': {}, + 'anti_patterns': [] + } + + # Count rejection reasons + for rejection in rejection_history: + reason = rejection.get('reason', 'Unknown') + category = rejection.get('category', 'general') + + # Count by reason + if reason not in insights['common_rejection_reasons']: + insights['common_rejection_reasons'][reason] = 0 + insights['common_rejection_reasons'][reason] += 1 + + # Count by category + if category not in insights['frequently_rejected_categories']: + insights['frequently_rejected_categories'][category] = 0 + insights['frequently_rejected_categories'][category] += 1 + + # Identify anti-patterns (highly rejected categories) + for category, count in insights['frequently_rejected_categories'].items(): + if count >= 5: # Rejected 5+ times + rejection_rate = count / len(rejection_history) if len(rejection_history) > 0 else 0 + if rejection_rate > 0.8: # 80%+ rejection rate + insights['anti_patterns'].append({ + 'category': category, + 'rejection_count': count, + 'rejection_rate': rejection_rate, + 'recommendation': f"Stop suggesting {category} - rejection rate {rejection_rate:.0%}" + }) + + return insights + + +def update_auto_review_rules( + config: Dict[str, Any], + acceptance_stats: Dict[str, Any], + insights: Dict[str, Any] +) -> Dict[str, Any]: + """Update auto-review rules based on learning. + + Args: + config: Current configuration + acceptance_stats: Acceptance statistics + insights: Learned insights from rejections + + Returns: + Updated auto_review configuration + """ + auto_review = config.get('auto_review', { + 'enabled': True, + 'always_review': ['auth', 'security', 'crypto', 'payment'], + 'never_review': ['test', 'mock', 'fixture'] + }) + + # Add anti-patterns to never_review + for anti_pattern in insights.get('anti_patterns', []): + category = anti_pattern['category'] + if category not in auto_review['never_review']: + auto_review['never_review'].append(category) + + # Check which categories have high acceptance rates + by_category = acceptance_stats.get('by_category', {}) + + for category, stats in by_category.items(): + rate = stats.get('rate', 0.0) + total = stats.get('accepted', 0) + stats.get('rejected', 0) + + # If category has >90% acceptance and >10 samples, add to always_review + if rate > 0.9 and total > 10: + if category not in auto_review['always_review']: + auto_review['always_review'].append(category) + + # If category has <20% acceptance and >10 samples, add to never_review + if rate < 0.2 and total > 10: + if category not in auto_review['never_review']: + auto_review['never_review'].append(category) + + return auto_review + + +def apply_adjustments( + guardian_path: Path, + dry_run: bool = False +) -> Dict[str, Any]: + """Apply learned adjustments to configuration. + + Args: + guardian_path: Path to Guardian directory + dry_run: If True, don't save changes (just return recommendations) + + Returns: + Dictionary with adjustment details + """ + config = load_config(guardian_path) + acceptance_stats = load_acceptance_stats(guardian_path) + rejection_history = load_rejection_history(guardian_path) + + # Get learning parameters + learning_config = config.get('learning', {}) + target_rate = learning_config.get('acceptance_rate_target', 0.7) + adjustment_speed = learning_config.get('adjustment_speed', 0.1) + + # Calculate threshold adjustments + threshold_adjustments = calculate_threshold_adjustments( + config, + acceptance_stats, + target_rate, + adjustment_speed + ) + + # Learn from rejections + insights = learn_from_rejections(rejection_history) + + # Update auto-review rules + updated_auto_review = update_auto_review_rules(config, acceptance_stats, insights) + + # Prepare result + result = { + 'current_acceptance_rate': acceptance_stats.get('overall', {}).get('rate', 0.0), + 'target_acceptance_rate': target_rate, + 'threshold_adjustments': threshold_adjustments, + 'auto_review_updates': updated_auto_review, + 'insights': insights, + 'applied': not dry_run + } + + # Apply changes if not dry run + if not dry_run: + # Update sensitivity thresholds + if 'sensitivity' not in config: + config['sensitivity'] = {} + config['sensitivity'].update(threshold_adjustments) + + # Update auto_review + config['auto_review'] = updated_auto_review + + # Save updated config + save_config(guardian_path, config) + + return result + + +def get_statistics(guardian_path: Path) -> Dict[str, Any]: + """Get learning statistics. + + Args: + guardian_path: Path to Guardian directory + + Returns: + Statistics dictionary + """ + config = load_config(guardian_path) + acceptance_stats = load_acceptance_stats(guardian_path) + rejection_history = load_rejection_history(guardian_path, days=30) + + overall = acceptance_stats.get('overall', {}) + by_category = acceptance_stats.get('by_category', {}) + + stats = { + 'overall': { + 'accepted': overall.get('accepted', 0), + 'rejected': overall.get('rejected', 0), + 'acceptance_rate': overall.get('rate', 0.0) + }, + 'by_category': {}, + 'current_thresholds': config.get('sensitivity', {}), + 'target_acceptance_rate': config.get('learning', {}).get('acceptance_rate_target', 0.7), + 'recent_rejections_30d': len(rejection_history), + 'auto_review_rules': config.get('auto_review', {}) + } + + # Add category breakdown + for category, cat_stats in by_category.items(): + stats['by_category'][category] = { + 'accepted': cat_stats.get('accepted', 0), + 'rejected': cat_stats.get('rejected', 0), + 'rate': cat_stats.get('rate', 0.0) + } + + return stats + + +def main(): + parser = argparse.ArgumentParser( + description='Guardian learning system - adjust thresholds based on feedback', + formatter_class=argparse.RawDescriptionHelpFormatter + ) + + parser.add_argument( + '--adjust', + action='store_true', + help='Apply threshold adjustments based on recent feedback' + ) + + parser.add_argument( + '--recommend', + action='store_true', + help='Show recommended adjustments without applying (dry run)' + ) + + parser.add_argument( + '--stats', + action='store_true', + help='Show learning statistics' + ) + + parser.add_argument( + '--set-target', + type=float, + help='Set target acceptance rate (0.0 to 1.0)' + ) + + parser.add_argument( + '--set-speed', + type=float, + help='Set adjustment speed (0.0 to 1.0)' + ) + + args = parser.parse_args() + + # Find Guardian + guardian_path = find_guardian_root() + + if not guardian_path: + print("Error: Guardian not initialized (.guardian directory not found)", file=sys.stderr) + sys.exit(1) + + # Handle set target + if args.set_target is not None: + if not 0.0 <= args.set_target <= 1.0: + print("Error: Target acceptance rate must be between 0.0 and 1.0", file=sys.stderr) + sys.exit(1) + + config = load_config(guardian_path) + if 'learning' not in config: + config['learning'] = {} + config['learning']['acceptance_rate_target'] = args.set_target + save_config(guardian_path, config) + + print(json.dumps({'target_set': args.set_target})) + sys.exit(0) + + # Handle set speed + if args.set_speed is not None: + if not 0.0 <= args.set_speed <= 1.0: + print("Error: Adjustment speed must be between 0.0 and 1.0", file=sys.stderr) + sys.exit(1) + + config = load_config(guardian_path) + if 'learning' not in config: + config['learning'] = {} + config['learning']['adjustment_speed'] = args.set_speed + save_config(guardian_path, config) + + print(json.dumps({'speed_set': args.set_speed})) + sys.exit(0) + + # Handle stats + if args.stats: + stats = get_statistics(guardian_path) + print(json.dumps(stats, indent=2)) + sys.exit(0) + + # Handle recommend (dry run) + if args.recommend: + result = apply_adjustments(guardian_path, dry_run=True) + print(json.dumps(result, indent=2)) + sys.exit(0) + + # Handle adjust + if args.adjust: + result = apply_adjustments(guardian_path, dry_run=False) + print(json.dumps(result, indent=2)) + sys.exit(0) + + parser.print_help() + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/skills/guardian/scripts/monitor_session.py b/skills/guardian/scripts/monitor_session.py new file mode 100644 index 0000000..e6c2d97 --- /dev/null +++ b/skills/guardian/scripts/monitor_session.py @@ -0,0 +1,610 @@ +#!/usr/bin/env python3 +""" +Guardian Session Monitor + +Tracks session health metrics and detects when intervention would be helpful. +This script monitors in the background and triggers Guardian when thresholds are crossed. + +Key Principle: MINIMAL STORAGE - only tracks metrics, NOT full conversation content. + +Usage: + # Track a code write event + python monitor_session.py --event code-written --file auth.py --lines 60 + + # Track an error + python monitor_session.py --event error --message "TypeError: cannot unpack non-iterable" + + # Track a user correction + python monitor_session.py --event correction --message "that's wrong, use bcrypt instead" + + # Check session health + python monitor_session.py --check-health + + # Reset session metrics + python monitor_session.py --reset + +Environment Variables: + GUARDIAN_CONFIG_PATH: Path to Guardian config file [default: .guardian/config.json] +""" + +import os +import sys +import json +import argparse +from datetime import datetime, timedelta +from pathlib import Path +from typing import Dict, List, Optional, Any +from collections import defaultdict + + +def parse_timestamp_with_tz(ts_str: str, reference_time: datetime) -> Optional[datetime]: + """Parse ISO timestamp and make it comparable with reference_time. + + Args: + ts_str: ISO format timestamp string + reference_time: Reference datetime to match timezone with + + Returns: + Parsed datetime that's comparable with reference_time, or None if parsing fails + """ + try: + ts = datetime.fromisoformat(ts_str) + # If timestamp has timezone but reference doesn't, make timestamp naive + if ts.tzinfo and not reference_time.tzinfo: + ts = ts.replace(tzinfo=None) + # If reference has timezone but timestamp doesn't, make timestamp aware + elif not ts.tzinfo and reference_time.tzinfo: + ts = ts.replace(tzinfo=reference_time.tzinfo) + return ts + except (ValueError, TypeError): + return None + + +def find_guardian_root() -> Optional[Path]: + """Find the .guardian directory.""" + current = Path.cwd() + + while current != current.parent: + guardian_path = current / '.guardian' + if guardian_path.exists(): + return guardian_path + current = current.parent + + return None + + +def init_guardian_if_needed() -> Path: + """Initialize .guardian directory if it doesn't exist.""" + guardian_path = Path.cwd() / '.guardian' + + if not guardian_path.exists(): + guardian_path.mkdir(parents=True, exist_ok=True) + + # Create default config + default_config = { + "enabled": True, + "sensitivity": { + "lines_threshold": 50, + "error_repeat_threshold": 3, + "file_churn_threshold": 5, + "correction_threshold": 3, + "context_warning_percent": 0.7 + }, + "trigger_phrases": { + "review_needed": ["can you review", "does this look right"], + "struggling": ["still not working", "same error"], + "complexity": ["this is complex", "not sure how to"] + }, + "auto_review": { + "enabled": True, + "always_review": ["auth", "security", "crypto", "payment"], + "never_review": ["test", "mock", "fixture"] + }, + "learning": { + "acceptance_rate_target": 0.7, + "adjustment_speed": 0.1, + "memory_window_days": 30 + }, + "model": "haiku" + } + + config_path = guardian_path / 'config.json' + with open(config_path, 'w', encoding='utf-8') as f: + json.dump(default_config, f, indent=2) + + # Create session state file + state_path = guardian_path / 'session_state.json' + with open(state_path, 'w', encoding='utf-8') as f: + json.dump({ + "session_start": datetime.now().isoformat(), + "metrics": {}, + "history": [] + }, f, indent=2) + + return guardian_path + + +def load_config(guardian_path: Path) -> Dict[str, Any]: + """Load Guardian configuration.""" + config_path = guardian_path / 'config.json' + + if not config_path.exists(): + # Return default config + return { + "enabled": True, + "sensitivity": { + "lines_threshold": 50, + "error_repeat_threshold": 3, + "file_churn_threshold": 5, + "correction_threshold": 3, + "context_warning_percent": 0.7 + } + } + + try: + with open(config_path, 'r', encoding='utf-8') as f: + return json.load(f) + except (json.JSONDecodeError, FileNotFoundError, OSError, IOError): + return {"enabled": False} + + +def load_session_state(guardian_path: Path) -> Dict[str, Any]: + """Load current session state.""" + state_path = guardian_path / 'session_state.json' + + if not state_path.exists(): + return { + "session_start": datetime.now().isoformat(), + "metrics": {}, + "history": [] + } + + try: + with open(state_path, 'r', encoding='utf-8') as f: + return json.load(f) + except (json.JSONDecodeError, FileNotFoundError, OSError, IOError): + return { + "session_start": datetime.now().isoformat(), + "metrics": {}, + "history": [] + } + + +def save_session_state(guardian_path: Path, state: Dict[str, Any]) -> None: + """Save session state.""" + state_path = guardian_path / 'session_state.json' + + try: + with open(state_path, 'w', encoding='utf-8') as f: + json.dump(state, f, indent=2) + except (OSError, IOError) as e: + print(f"Warning: Failed to save session state: {e}", file=sys.stderr) + + +def track_code_written(state: Dict[str, Any], file_path: str, lines: int) -> None: + """Track code writing event.""" + MAX_EVENTS = 100 # Limit to prevent unbounded memory growth + + if 'code_written' not in state['metrics']: + state['metrics']['code_written'] = {} + + if file_path not in state['metrics']['code_written']: + state['metrics']['code_written'][file_path] = { + 'total_lines': 0, + 'events': [] + } + + state['metrics']['code_written'][file_path]['total_lines'] += lines + state['metrics']['code_written'][file_path]['events'].append({ + 'timestamp': datetime.now().isoformat(), + 'lines': lines + }) + + # Keep only recent events + if len(state['metrics']['code_written'][file_path]['events']) > MAX_EVENTS: + state['metrics']['code_written'][file_path]['events'] = \ + state['metrics']['code_written'][file_path]['events'][-MAX_EVENTS:] + + +def track_error(state: Dict[str, Any], error_message: str) -> None: + """Track error occurrence.""" + MAX_OCCURRENCES = 50 # Limit to prevent unbounded memory growth + + if 'errors' not in state['metrics']: + state['metrics']['errors'] = {} + + # Normalize error message (first line only) + error_key = error_message.split('\n')[0].strip()[:200] + + if error_key not in state['metrics']['errors']: + state['metrics']['errors'][error_key] = { + 'count': 0, + 'first_seen': datetime.now().isoformat(), + 'last_seen': None, + 'occurrences': [] + } + + state['metrics']['errors'][error_key]['count'] += 1 + state['metrics']['errors'][error_key]['last_seen'] = datetime.now().isoformat() + state['metrics']['errors'][error_key]['occurrences'].append({ + 'timestamp': datetime.now().isoformat() + }) + + # Keep only recent occurrences + if len(state['metrics']['errors'][error_key]['occurrences']) > MAX_OCCURRENCES: + state['metrics']['errors'][error_key]['occurrences'] = \ + state['metrics']['errors'][error_key]['occurrences'][-MAX_OCCURRENCES:] + + +def track_file_edit(state: Dict[str, Any], file_path: str) -> None: + """Track file edit event.""" + MAX_TIMESTAMPS = 100 # Limit to prevent unbounded memory growth + + if 'file_edits' not in state['metrics']: + state['metrics']['file_edits'] = {} + + if file_path not in state['metrics']['file_edits']: + state['metrics']['file_edits'][file_path] = { + 'count': 0, + 'timestamps': [] + } + + state['metrics']['file_edits'][file_path]['count'] += 1 + state['metrics']['file_edits'][file_path]['timestamps'].append( + datetime.now().isoformat() + ) + + # Keep only recent timestamps + if len(state['metrics']['file_edits'][file_path]['timestamps']) > MAX_TIMESTAMPS: + state['metrics']['file_edits'][file_path]['timestamps'] = \ + state['metrics']['file_edits'][file_path]['timestamps'][-MAX_TIMESTAMPS:] + + +def track_correction(state: Dict[str, Any], message: str) -> None: + """Track user correction event.""" + MAX_CORRECTIONS = 100 # Limit to prevent unbounded memory growth + + if 'corrections' not in state['metrics']: + state['metrics']['corrections'] = [] + + state['metrics']['corrections'].append({ + 'timestamp': datetime.now().isoformat(), + 'message': message[:500] # Truncate to avoid storing too much + }) + + # Keep only recent corrections + if len(state['metrics']['corrections']) > MAX_CORRECTIONS: + state['metrics']['corrections'] = state['metrics']['corrections'][-MAX_CORRECTIONS:] + + +def check_code_volume_threshold(state: Dict[str, Any], config: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Check if code volume threshold is crossed.""" + threshold = config.get('sensitivity', {}).get('lines_threshold', 50) + + code_written = state['metrics'].get('code_written', {}) + + for file_path, data in code_written.items(): + if data['total_lines'] >= threshold: + # Check if this file is in auto_review categories + auto_review = config.get('auto_review', {}) + always_review = auto_review.get('always_review', []) + never_review = auto_review.get('never_review', []) + + # Check never_review first + if any(keyword in file_path.lower() for keyword in never_review): + continue + + # Check always_review or threshold + should_review = any(keyword in file_path.lower() for keyword in always_review) + + if should_review or data['total_lines'] >= threshold: + return { + 'trigger': 'code_volume', + 'file': file_path, + 'lines': data['total_lines'], + 'threshold': threshold, + 'priority': 'high' if should_review else 'medium' + } + + return None + + +def check_repeated_errors(state: Dict[str, Any], config: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Check if same error is repeated.""" + threshold = config.get('sensitivity', {}).get('error_repeat_threshold', 3) + + errors = state['metrics'].get('errors', {}) + + for error_key, data in errors.items(): + if data['count'] >= threshold: + return { + 'trigger': 'repeated_errors', + 'error': error_key, + 'count': data['count'], + 'threshold': threshold, + 'priority': 'critical' + } + + return None + + +def check_file_churn(state: Dict[str, Any], config: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Check if a file is being edited too frequently.""" + threshold = config.get('sensitivity', {}).get('file_churn_threshold', 5) + time_window_minutes = 10 + + file_edits = state['metrics'].get('file_edits', {}) + + for file_path, data in file_edits.items(): + timestamps = data['timestamps'] + + # Count edits in last 10 minutes + cutoff = datetime.now() - timedelta(minutes=time_window_minutes) + recent_edits = [] + + for ts_str in timestamps: + ts = parse_timestamp_with_tz(ts_str, cutoff) + if ts and ts >= cutoff: + recent_edits.append(ts_str) + + if len(recent_edits) >= threshold: + return { + 'trigger': 'file_churn', + 'file': file_path, + 'edits': len(recent_edits), + 'time_window_minutes': time_window_minutes, + 'threshold': threshold, + 'priority': 'high' + } + + return None + + +def check_repeated_corrections(state: Dict[str, Any], config: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Check if user is making repeated corrections.""" + threshold = config.get('sensitivity', {}).get('correction_threshold', 3) + time_window_minutes = 30 + + corrections = state['metrics'].get('corrections', []) + + # Count corrections in last 30 minutes + cutoff = datetime.now() - timedelta(minutes=time_window_minutes) + recent_corrections = [] + + for correction in corrections: + try: + ts = parse_timestamp_with_tz(correction['timestamp'], cutoff) + if ts and ts >= cutoff: + recent_corrections.append(correction) + except KeyError: + continue + + if len(recent_corrections) >= threshold: + return { + 'trigger': 'repeated_corrections', + 'count': len(recent_corrections), + 'time_window_minutes': time_window_minutes, + 'threshold': threshold, + 'priority': 'critical' + } + + return None + + +def calculate_session_health(state: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]: + """Calculate overall session health score.""" + health_score = 100 + recommendations = [] + + # Check error rate + errors = state['metrics'].get('errors', {}) + total_errors = sum(data['count'] for data in errors.values()) + + try: + session_start = datetime.fromisoformat(state['session_start']) + now = datetime.now(session_start.tzinfo) if session_start.tzinfo else datetime.now() + session_duration_minutes = max(1, (now - session_start).total_seconds() / 60) + error_rate = total_errors / session_duration_minutes + except (ValueError, TypeError): + error_rate = 0 + session_duration_minutes = 0 + + if error_rate > 0.5: # More than 1 error per 2 minutes + health_score -= 20 + recommendations.append("High error rate - consider taking a break or reassessing approach") + + # Check correction rate + corrections = state['metrics'].get('corrections', []) + correction_rate = len(corrections) / max(1, session_duration_minutes) + + if correction_rate > 0.1: # More than 1 correction per 10 minutes + health_score -= 15 + recommendations.append("Frequent corrections - session may be going off track") + + # Check file churn + file_edits = state['metrics'].get('file_edits', {}) + for file_path, data in file_edits.items(): + if data['count'] > 5: + health_score -= 10 + recommendations.append(f"High churn on {file_path} - consider taking a break from this file") + break + + # Check repeated errors + for error_key, data in errors.items(): + if data['count'] >= 3: + health_score -= 20 + recommendations.append(f"Repeated error: {error_key[:50]}... - approach may be fundamentally wrong") + break + + return { + 'score': max(0, health_score), + 'session_duration_minutes': int(session_duration_minutes), + 'total_errors': total_errors, + 'total_corrections': len(corrections), + 'recommendations': recommendations + } + + +def check_triggers(state: Dict[str, Any], config: Dict[str, Any]) -> List[Dict[str, Any]]: + """Check all triggers and return those that fired.""" + triggered = [] + + # Check each trigger + trigger = check_code_volume_threshold(state, config) + if trigger: + triggered.append(trigger) + + trigger = check_repeated_errors(state, config) + if trigger: + triggered.append(trigger) + + trigger = check_file_churn(state, config) + if trigger: + triggered.append(trigger) + + trigger = check_repeated_corrections(state, config) + if trigger: + triggered.append(trigger) + + return triggered + + +def main(): + parser = argparse.ArgumentParser( + description='Guardian session health monitor', + formatter_class=argparse.RawDescriptionHelpFormatter + ) + + parser.add_argument( + '--event', + choices=['code-written', 'error', 'file-edit', 'correction'], + help='Type of event to track' + ) + + parser.add_argument( + '--file', + help='File path (for code-written and file-edit events)' + ) + + parser.add_argument( + '--lines', + type=int, + help='Number of lines written (for code-written events)' + ) + + parser.add_argument( + '--message', + help='Error or correction message' + ) + + parser.add_argument( + '--check-health', + action='store_true', + help='Check current session health' + ) + + parser.add_argument( + '--check-triggers', + action='store_true', + help='Check if any triggers have fired' + ) + + parser.add_argument( + '--reset', + action='store_true', + help='Reset session metrics' + ) + + parser.add_argument( + '--init', + action='store_true', + help='Initialize Guardian for this project' + ) + + args = parser.parse_args() + + # Initialize Guardian if requested + if args.init: + guardian_path = init_guardian_if_needed() + print(f"Guardian initialized at {guardian_path}") + sys.exit(0) + + # Find or create Guardian directory + guardian_path = find_guardian_root() + if not guardian_path: + guardian_path = init_guardian_if_needed() + + # Load config and state + config = load_config(guardian_path) + + if not config.get('enabled', False): + print("Guardian is disabled in config", file=sys.stderr) + sys.exit(0) + + state = load_session_state(guardian_path) + + # Handle reset + if args.reset: + state = { + "session_start": datetime.now().isoformat(), + "metrics": {}, + "history": [] + } + save_session_state(guardian_path, state) + print("Session metrics reset") + sys.exit(0) + + # Handle health check + if args.check_health: + health = calculate_session_health(state, config) + print(json.dumps(health, indent=2)) + sys.exit(0) + + # Handle trigger check + if args.check_triggers: + triggers = check_triggers(state, config) + print(json.dumps(triggers, indent=2)) + sys.exit(0) + + # Handle event tracking + if args.event: + if args.event == 'code-written': + if not args.file or args.lines is None: + print("Error: --file and --lines required for code-written event", file=sys.stderr) + sys.exit(1) + track_code_written(state, args.file, args.lines) + + elif args.event == 'error': + if not args.message: + print("Error: --message required for error event", file=sys.stderr) + sys.exit(1) + track_error(state, args.message) + + elif args.event == 'file-edit': + if not args.file: + print("Error: --file required for file-edit event", file=sys.stderr) + sys.exit(1) + track_file_edit(state, args.file) + + elif args.event == 'correction': + if not args.message: + print("Error: --message required for correction event", file=sys.stderr) + sys.exit(1) + track_correction(state, args.message) + + # Save updated state + save_session_state(guardian_path, state) + + # Check if any triggers fired + triggers = check_triggers(state, config) + if triggers: + print(json.dumps({'event_recorded': True, 'triggers': triggers}, indent=2)) + else: + print(json.dumps({'event_recorded': True}, indent=2)) + else: + parser.print_help() + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/skills/guardian/scripts/template_loader.py b/skills/guardian/scripts/template_loader.py new file mode 100644 index 0000000..bec9827 --- /dev/null +++ b/skills/guardian/scripts/template_loader.py @@ -0,0 +1,303 @@ +#!/usr/bin/env python3 +""" +Guardian Template Loader + +Loads and applies Guardian review/planning templates for consistent, +structured agent interactions. + +Usage: + # List available templates + python template_loader.py --list + + # Load a template + python template_loader.py --template security_review + + # Load template and apply to context + python template_loader.py --template security_review --file auth.py --output prompt.txt + + # Create custom template + python template_loader.py --create my_review --based-on security_review +""" + +import os +import sys +import json +import argparse +from pathlib import Path +from typing import Dict, List, Optional, Any + + +def find_templates_dir() -> Path: + """Find the Guardian Templates directory.""" + # First try relative to this script + script_dir = Path(__file__).parent + templates_dir = script_dir.parent / 'Templates' + + if templates_dir.exists(): + return templates_dir + + # Try from current directory + templates_dir = Path.cwd() / 'skills' / 'guardian' / 'Templates' + if templates_dir.exists(): + return templates_dir + + raise FileNotFoundError("Guardian Templates directory not found") + + +def list_templates() -> List[Dict[str, str]]: + """List all available Guardian templates. + + Returns: + List of template metadata dictionaries + """ + templates_dir = find_templates_dir() + templates = [] + + for template_file in templates_dir.glob('*.json'): + try: + with open(template_file, 'r', encoding='utf-8') as f: + template = json.load(f) + templates.append({ + 'name': template.get('name', template_file.stem), + 'description': template.get('description', 'No description'), + 'task_type': template.get('task_type', 'unknown'), + 'file': str(template_file) + }) + except (json.JSONDecodeError, OSError, IOError): + continue + + return templates + + +def load_template(template_name: str) -> Dict[str, Any]: + """Load a Guardian template by name. + + Args: + template_name: Name of the template (with or without .json extension) + + Returns: + Template configuration dictionary + """ + templates_dir = find_templates_dir() + + # Remove .json extension if provided + if template_name.endswith('.json'): + template_name = template_name[:-5] + + template_file = templates_dir / f"{template_name}.json" + + if not template_file.exists(): + raise FileNotFoundError(f"Template not found: {template_name}") + + try: + with open(template_file, 'r', encoding='utf-8') as f: + return json.load(f) + except json.JSONDecodeError as e: + raise ValueError(f"Invalid template JSON: {e}") + + +def apply_template_to_context( + template: Dict[str, Any], + context: str +) -> str: + """Apply a template to extracted context. + + Args: + template: Template configuration + context: Extracted minimal context + + Returns: + Formatted agent prompt + """ + prompt_template = template.get('agent_prompt_template', '') + + # Replace context placeholder + prompt = prompt_template.replace('{context}', context) + + return prompt + + +def get_template_config(template: Dict[str, Any]) -> Dict[str, Any]: + """Extract configuration parameters from template. + + Args: + template: Template configuration + + Returns: + Configuration dictionary for context_filter.py + """ + return { + 'task_type': template.get('task_type', 'review'), + 'focus': template.get('focus', ''), + 'oracle_categories': template.get('oracle_categories', ['patterns', 'gotchas']), + 'oracle_tags': template.get('oracle_tags_required', []), + 'max_patterns': template.get('max_oracle_patterns', 5), + 'max_gotchas': template.get('max_oracle_gotchas', 5), + 'validation_rules': template.get('validation_rules', {}) + } + + +def create_custom_template( + name: str, + base_template: Optional[str] = None, + description: str = "", + task_type: str = "review" +) -> Path: + """Create a new custom template. + + Args: + name: Name for the new template + base_template: Optional template to base this on + description: Template description + task_type: Type of task (review, plan, debug) + + Returns: + Path to the created template file + """ + templates_dir = find_templates_dir() + + if base_template: + # Load base template + base = load_template(base_template) + new_template = base.copy() + new_template['name'] = name + if description: + new_template['description'] = description + else: + # Create minimal template + new_template = { + "name": name, + "description": description or f"Custom {task_type} template", + "task_type": task_type, + "focus": "", + "agent_prompt_template": "You are a READ-ONLY code reviewer for Guardian.\n\nCRITICAL CONSTRAINTS:\n- DO NOT modify any files\n- ONLY read and analyze\n\n{context}\n\nReturn suggestions as JSON array.", + "oracle_categories": ["patterns", "gotchas"], + "oracle_tags_required": [], + "max_oracle_patterns": 5, + "max_oracle_gotchas": 5, + "always_include_files": [], + "validation_rules": { + "min_confidence": 0.5, + "block_contradictions": true + } + } + + # Save template + template_file = templates_dir / f"{name}.json" + + with open(template_file, 'w', encoding='utf-8') as f: + json.dump(new_template, f, indent=2) + + return template_file + + +def main(): + parser = argparse.ArgumentParser( + description='Guardian template loader and manager', + formatter_class=argparse.RawDescriptionHelpFormatter + ) + + parser.add_argument( + '--list', + action='store_true', + help='List all available templates' + ) + + parser.add_argument( + '--template', + help='Template name to load' + ) + + parser.add_argument( + '--file', + help='File to apply template to (used with --output)' + ) + + parser.add_argument( + '--output', + help='Output file for generated prompt' + ) + + parser.add_argument( + '--create', + help='Create a new custom template with this name' + ) + + parser.add_argument( + '--based-on', + help='Base the new template on an existing one' + ) + + parser.add_argument( + '--description', + help='Description for new template' + ) + + parser.add_argument( + '--show-config', + action='store_true', + help='Show template configuration (for use with context_filter.py)' + ) + + args = parser.parse_args() + + # List templates + if args.list: + templates = list_templates() + + print("Available Guardian Templates:") + print("=" * 60) + + for template in templates: + print(f"\nName: {template['name']}") + print(f" Type: {template['task_type']}") + print(f" Description: {template['description']}") + print(f" File: {template['file']}") + + print("\n" + "=" * 60) + print(f"Total: {len(templates)} templates") + sys.exit(0) + + # Create template + if args.create: + template_file = create_custom_template( + args.create, + args.based_on, + args.description or "", + "review" + ) + + print(f"Created template: {template_file}") + print("Edit this file to customize your template") + sys.exit(0) + + # Load template + if args.template: + template = load_template(args.template) + + if args.show_config: + config = get_template_config(template) + print(json.dumps(config, indent=2)) + sys.exit(0) + + if args.output and args.file: + # Apply template to file + # This would integrate with context_filter.py + print(f"Applying template '{args.template}' to {args.file}...") + print(f"Output will be saved to: {args.output}") + print("\nTo apply this template:") + print(f" 1. Run context_filter.py with template config") + print(f" 2. Apply template to extracted context") + print(f" 3. Save to {args.output}") + else: + # Just show template + print(json.dumps(template, indent=2)) + + sys.exit(0) + + parser.print_help() + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/skills/guardian/scripts/validator.py b/skills/guardian/scripts/validator.py new file mode 100644 index 0000000..80292eb --- /dev/null +++ b/skills/guardian/scripts/validator.py @@ -0,0 +1,586 @@ +#!/usr/bin/env python3 +""" +Guardian Suggestion Validator + +Cross-checks subagent suggestions against Oracle knowledge before presenting to user. + +Key Principle: "Subagent might be missing important codebase context - need validation layer" + +This script: +1. Validates suggestions against Oracle patterns +2. Detects contradictions with known good practices +3. Checks rejection history for similar suggestions +4. Calculates confidence scores +5. Flags suggestions that contradict Oracle knowledge + +Usage: + # Validate a single suggestion + python validator.py --suggestion "Use MD5 for password hashing" --category security + + # Validate multiple suggestions from JSON + python validator.py --suggestions-file suggestions.json + + # Check rejection history + python validator.py --check-rejection "Add rate limiting to endpoint" + +Environment Variables: + ORACLE_PATH: Path to Oracle directory [default: .oracle] + GUARDIAN_PATH: Path to Guardian directory [default: .guardian] +""" + +import os +import sys +import json +import argparse +from pathlib import Path +from typing import Dict, List, Optional, Any, Tuple +import re +from datetime import datetime, timedelta + + +def find_oracle_root() -> Optional[Path]: + """Find the .oracle directory.""" + current = Path.cwd() + + while current != current.parent: + oracle_path = current / '.oracle' + if oracle_path.exists(): + return oracle_path + current = current.parent + + return None + + +def find_guardian_root() -> Optional[Path]: + """Find the .guardian directory.""" + current = Path.cwd() + + while current != current.parent: + guardian_path = current / '.guardian' + if guardian_path.exists(): + return guardian_path + current = current.parent + + return None + + +def load_oracle_knowledge(oracle_path: Path) -> List[Dict[str, Any]]: + """Load all Oracle knowledge.""" + knowledge_dir = oracle_path / 'knowledge' + all_knowledge: List[Dict[str, Any]] = [] + + categories = ['patterns', 'preferences', 'gotchas', 'solutions', 'corrections'] + + for category in categories: + file_path = knowledge_dir / f'{category}.json' + if file_path.exists(): + try: + with open(file_path, 'r', encoding='utf-8') as f: + entries = json.load(f) + for entry in entries: + if isinstance(entry, dict): + entry['_category'] = category + all_knowledge.append(entry) + except (json.JSONDecodeError, FileNotFoundError, OSError, IOError): + continue + + return all_knowledge + + +def load_rejection_history(guardian_path: Path) -> List[Dict[str, Any]]: + """Load rejection history from Guardian.""" + history_file = guardian_path / 'rejection_history.json' + + if not history_file.exists(): + return [] + + try: + with open(history_file, 'r', encoding='utf-8') as f: + return json.load(f) + except (json.JSONDecodeError, FileNotFoundError, OSError, IOError): + return [] + + +def save_rejection_history(guardian_path: Path, history: List[Dict[str, Any]]) -> None: + """Save rejection history to Guardian.""" + history_file = guardian_path / 'rejection_history.json' + + try: + with open(history_file, 'w', encoding='utf-8') as f: + json.dump(history, f, indent=2) + except (OSError, IOError) as e: + print(f"Warning: Failed to save rejection history: {e}", file=sys.stderr) + + +def load_acceptance_stats(guardian_path: Path) -> Dict[str, Any]: + """Load acceptance rate statistics.""" + stats_file = guardian_path / 'acceptance_stats.json' + + if not stats_file.exists(): + return { + 'by_category': {}, + 'by_type': {}, + 'overall': { + 'accepted': 0, + 'rejected': 0, + 'rate': 0.0 + } + } + + try: + with open(stats_file, 'r', encoding='utf-8') as f: + return json.load(f) + except (json.JSONDecodeError, FileNotFoundError, OSError, IOError): + return {'by_category': {}, 'by_type': {}, 'overall': {'accepted': 0, 'rejected': 0, 'rate': 0.0}} + + +def check_contradiction_with_patterns( + suggestion: str, + oracle_knowledge: List[Dict[str, Any]] +) -> Optional[Dict[str, Any]]: + """Check if suggestion contradicts known Oracle patterns. + + Args: + suggestion: Suggestion text + oracle_knowledge: All Oracle knowledge + + Returns: + Contradiction details if found, None otherwise + """ + # Get patterns and gotchas + patterns = [k for k in oracle_knowledge if k.get('_category') == 'patterns'] + gotchas = [k for k in oracle_knowledge if k.get('_category') == 'gotchas'] + + suggestion_lower = suggestion.lower() + + # Check against patterns (high priority ones) + for pattern in patterns: + if pattern.get('priority') not in ['critical', 'high']: + continue + + title = pattern.get('title', '').lower() + content = pattern.get('content', '').lower() + + # Look for direct contradictions + # Example: suggestion says "use MD5" but pattern says "never use MD5" + if 'never' in content or 'don\'t' in content or 'avoid' in content: + # Extract what not to do + words = re.findall(r'\b\w+\b', suggestion_lower) + for word in words: + if len(word) > 3 and word in content: + # Potential contradiction + return { + 'type': 'pattern_contradiction', + 'pattern': pattern.get('title', 'Unknown pattern'), + 'pattern_content': pattern.get('content', ''), + 'priority': pattern.get('priority', 'medium'), + 'confidence': 0.8 + } + + # Check against gotchas (critical warnings) + for gotcha in gotchas: + title = gotcha.get('title', '').lower() + content = gotcha.get('content', '').lower() + + # Check if suggestion relates to a known gotcha + words = re.findall(r'\b\w+\b', suggestion_lower) + common_words = set(words) & set(re.findall(r'\b\w+\b', content)) + + if len(common_words) > 3: + return { + 'type': 'gotcha_warning', + 'gotcha': gotcha.get('title', 'Unknown gotcha'), + 'gotcha_content': gotcha.get('content', ''), + 'priority': gotcha.get('priority', 'high'), + 'confidence': 0.6 + } + + return None + + +def check_rejection_history( + suggestion: str, + rejection_history: List[Dict[str, Any]], + similarity_threshold: float = 0.6 +) -> Optional[Dict[str, Any]]: + """Check if similar suggestion was previously rejected. + + Args: + suggestion: Suggestion text + rejection_history: List of previously rejected suggestions + similarity_threshold: Minimum similarity to consider a match + + Returns: + Rejection details if found, None otherwise + """ + suggestion_words = set(re.findall(r'\b\w+\b', suggestion.lower())) + + for rejection in rejection_history: + rejected_text = rejection.get('suggestion', '').lower() + rejected_words = set(re.findall(r'\b\w+\b', rejected_text)) + + # Calculate Jaccard similarity + if len(suggestion_words) == 0 or len(rejected_words) == 0: + continue + + intersection = suggestion_words & rejected_words + union = suggestion_words | rejected_words + + similarity = len(intersection) / len(union) if len(union) > 0 else 0.0 + + if similarity >= similarity_threshold: + return { + 'type': 'previously_rejected', + 'rejected_suggestion': rejection.get('suggestion', ''), + 'rejection_reason': rejection.get('reason', 'No reason provided'), + 'rejected_date': rejection.get('timestamp', 'Unknown'), + 'similarity': similarity, + 'confidence': 0.3 # Low confidence due to previous rejection + } + + return None + + +def calculate_acceptance_rate( + category: str, + acceptance_stats: Dict[str, Any] +) -> float: + """Calculate acceptance rate for a category. + + Args: + category: Suggestion category + acceptance_stats: Acceptance statistics + + Returns: + Acceptance rate (0.0 to 1.0) + """ + by_category = acceptance_stats.get('by_category', {}) + + if category not in by_category: + # No history, return neutral rate + return 0.5 + + stats = by_category[category] + accepted = stats.get('accepted', 0) + rejected = stats.get('rejected', 0) + total = accepted + rejected + + if total == 0: + return 0.5 + + return accepted / total + + +def validate_suggestion( + suggestion: str, + category: str, + oracle_knowledge: List[Dict[str, Any]], + rejection_history: List[Dict[str, Any]], + acceptance_stats: Dict[str, Any] +) -> Dict[str, Any]: + """Validate a suggestion against Oracle knowledge. + + Args: + suggestion: Suggestion text + category: Suggestion category (security, performance, etc.) + oracle_knowledge: All Oracle knowledge + rejection_history: Previous rejections + acceptance_stats: Acceptance rate statistics + + Returns: + Validation result with confidence score and warnings + """ + result = { + 'suggestion': suggestion, + 'category': category, + 'confidence': 0.5, # Start neutral + 'warnings': [], + 'should_present': True, + 'notes': [] + } + + # Check for contradictions with Oracle patterns + contradiction = check_contradiction_with_patterns(suggestion, oracle_knowledge) + if contradiction: + result['confidence'] = contradiction['confidence'] + result['warnings'].append({ + 'severity': 'high', + 'type': contradiction['type'], + 'message': f"Contradicts Oracle {contradiction['type']}: {contradiction.get('pattern', contradiction.get('gotcha', 'Unknown'))}", + 'details': contradiction + }) + + if contradiction.get('priority') == 'critical': + result['should_present'] = False + result['notes'].append("BLOCKED: Contradicts critical Oracle pattern") + + # Check rejection history + previous_rejection = check_rejection_history(suggestion, rejection_history) + if previous_rejection: + result['confidence'] = min(result['confidence'], previous_rejection['confidence']) + result['warnings'].append({ + 'severity': 'medium', + 'type': 'previously_rejected', + 'message': f"Similar suggestion rejected before ({previous_rejection['similarity']:.0%} similar)", + 'details': previous_rejection + }) + result['notes'].append(f"Previous rejection reason: {previous_rejection['rejection_reason']}") + + # Calculate confidence from acceptance rate + acceptance_rate = calculate_acceptance_rate(category, acceptance_stats) + + # Adjust confidence based on historical acceptance + if not result['warnings']: + # No warnings, use acceptance rate + result['confidence'] = acceptance_rate + else: + # Has warnings, blend with acceptance rate (60% warning, 40% acceptance) + result['confidence'] = result['confidence'] * 0.6 + acceptance_rate * 0.4 + + # Add acceptance rate note + result['notes'].append(f"Historical acceptance rate for {category}: {acceptance_rate:.0%}") + + # Final decision on whether to present + if result['confidence'] < 0.3: + result['should_present'] = False + result['notes'].append("Confidence too low - suggestion blocked") + + return result + + +def validate_multiple_suggestions( + suggestions: List[Dict[str, str]], + oracle_path: Optional[Path] = None, + guardian_path: Optional[Path] = None +) -> List[Dict[str, Any]]: + """Validate multiple suggestions. + + Args: + suggestions: List of suggestion dictionaries with 'text' and 'category' keys + oracle_path: Path to Oracle directory + guardian_path: Path to Guardian directory + + Returns: + List of validation results + """ + # Load Oracle knowledge + oracle_knowledge = [] + if oracle_path: + oracle_knowledge = load_oracle_knowledge(oracle_path) + + # Load Guardian data + rejection_history = [] + acceptance_stats = {'by_category': {}, 'by_type': {}, 'overall': {'accepted': 0, 'rejected': 0, 'rate': 0.0}} + + if guardian_path: + rejection_history = load_rejection_history(guardian_path) + acceptance_stats = load_acceptance_stats(guardian_path) + + # Validate each suggestion + results = [] + for suggestion_dict in suggestions: + suggestion_text = suggestion_dict.get('text', '') + category = suggestion_dict.get('category', 'general') + + result = validate_suggestion( + suggestion_text, + category, + oracle_knowledge, + rejection_history, + acceptance_stats + ) + + results.append(result) + + return results + + +def record_rejection( + guardian_path: Path, + suggestion: str, + reason: str, + category: str +) -> None: + """Record a rejected suggestion for future reference. + + Args: + guardian_path: Path to Guardian directory + suggestion: Rejected suggestion text + reason: Reason for rejection + category: Suggestion category + """ + rejection_history = load_rejection_history(guardian_path) + + rejection_history.append({ + 'suggestion': suggestion, + 'reason': reason, + 'category': category, + 'timestamp': datetime.now().isoformat() + }) + + # Keep only last 100 rejections + if len(rejection_history) > 100: + rejection_history = rejection_history[-100:] + + save_rejection_history(guardian_path, rejection_history) + + +def update_acceptance_stats( + guardian_path: Path, + category: str, + accepted: bool +) -> None: + """Update acceptance rate statistics. + + Args: + guardian_path: Path to Guardian directory + category: Suggestion category + accepted: Whether the suggestion was accepted + """ + stats_file = guardian_path / 'acceptance_stats.json' + stats = load_acceptance_stats(guardian_path) + + # Update category stats + if category not in stats['by_category']: + stats['by_category'][category] = {'accepted': 0, 'rejected': 0} + + if accepted: + stats['by_category'][category]['accepted'] += 1 + stats['overall']['accepted'] += 1 + else: + stats['by_category'][category]['rejected'] += 1 + stats['overall']['rejected'] += 1 + + # Recalculate overall rate + total = stats['overall']['accepted'] + stats['overall']['rejected'] + stats['overall']['rate'] = stats['overall']['accepted'] / total if total > 0 else 0.0 + + # Recalculate category rates + for cat, cat_stats in stats['by_category'].items(): + cat_total = cat_stats['accepted'] + cat_stats['rejected'] + cat_stats['rate'] = cat_stats['accepted'] / cat_total if cat_total > 0 else 0.0 + + try: + with open(stats_file, 'w', encoding='utf-8') as f: + json.dump(stats, f, indent=2) + except (OSError, IOError) as e: + print(f"Warning: Failed to save acceptance stats: {e}", file=sys.stderr) + + +def main(): + parser = argparse.ArgumentParser( + description='Validate Guardian suggestions against Oracle knowledge', + formatter_class=argparse.RawDescriptionHelpFormatter + ) + + parser.add_argument( + '--suggestion', + help='Single suggestion to validate' + ) + + parser.add_argument( + '--category', + default='general', + help='Suggestion category (security, performance, style, etc.)' + ) + + parser.add_argument( + '--suggestions-file', + help='JSON file with multiple suggestions to validate' + ) + + parser.add_argument( + '--record-rejection', + help='Record a rejected suggestion' + ) + + parser.add_argument( + '--rejection-reason', + help='Reason for rejection (used with --record-rejection)' + ) + + parser.add_argument( + '--update-stats', + choices=['accept', 'reject'], + help='Update acceptance statistics' + ) + + parser.add_argument( + '--check-rejection', + help='Check if a suggestion was previously rejected' + ) + + args = parser.parse_args() + + # Find Oracle and Guardian + oracle_path = find_oracle_root() + guardian_path = find_guardian_root() + + if not oracle_path and not guardian_path: + print("Warning: Neither Oracle nor Guardian initialized", file=sys.stderr) + + # Handle check rejection + if args.check_rejection: + if not guardian_path: + print(json.dumps({'found': False, 'message': 'Guardian not initialized'})) + sys.exit(0) + + rejection_history = load_rejection_history(guardian_path) + result = check_rejection_history(args.check_rejection, rejection_history) + + if result: + print(json.dumps({'found': True, 'details': result}, indent=2)) + else: + print(json.dumps({'found': False})) + sys.exit(0) + + # Handle record rejection + if args.record_rejection: + if not guardian_path: + print("Error: Guardian not initialized", file=sys.stderr) + sys.exit(1) + + if not args.rejection_reason: + print("Error: --rejection-reason required", file=sys.stderr) + sys.exit(1) + + record_rejection(guardian_path, args.record_rejection, args.rejection_reason, args.category) + print(json.dumps({'recorded': True})) + sys.exit(0) + + # Handle update stats + if args.update_stats: + if not guardian_path: + print("Error: Guardian not initialized", file=sys.stderr) + sys.exit(1) + + update_acceptance_stats(guardian_path, args.category, args.update_stats == 'accept') + print(json.dumps({'updated': True})) + sys.exit(0) + + # Handle single suggestion validation + if args.suggestion: + suggestions = [{'text': args.suggestion, 'category': args.category}] + results = validate_multiple_suggestions(suggestions, oracle_path, guardian_path) + print(json.dumps(results[0], indent=2)) + sys.exit(0) + + # Handle suggestions file + if args.suggestions_file: + try: + with open(args.suggestions_file, 'r', encoding='utf-8') as f: + suggestions = json.load(f) + except (json.JSONDecodeError, FileNotFoundError, OSError, IOError) as e: + print(f"Error: Failed to load suggestions file: {e}", file=sys.stderr) + sys.exit(1) + + results = validate_multiple_suggestions(suggestions, oracle_path, guardian_path) + print(json.dumps(results, indent=2)) + sys.exit(0) + + parser.print_help() + sys.exit(1) + + +if __name__ == '__main__': + main()