Initial commit

This commit is contained in:
Zhongwei Li
2025-11-30 08:46:58 +08:00
commit 6eb041c48f
16 changed files with 4262 additions and 0 deletions

View File

@@ -0,0 +1,225 @@
# Guardian Scripts
This directory contains the implementation scripts for the Guardian skill.
## Core Scripts
### guardian.py
**Main orchestrator** - Coordinates all Guardian components.
Usage:
```bash
# Review code
python guardian.py review --file auth.py --focus security
# Plan complex task
python guardian.py plan --task "Build REST API"
# Debug error
python guardian.py debug --file app.py --error "TypeError: cannot unpack"
# Check triggers
python guardian.py check
# Session status
python guardian.py status
```
### monitor_session.py
**Session health monitor** - Tracks metrics and detects when intervention is needed.
Key Features:
- Tracks code volume, errors, file edits, corrections
- Detects threshold crossings
- Calculates session health scores
- Minimal storage (only metrics, not full conversation)
Usage:
```bash
# Track events
python monitor_session.py --event code-written --file auth.py --lines 60
python monitor_session.py --event error --message "TypeError: ..."
python monitor_session.py --event file-edit --file app.py
python monitor_session.py --event correction --message "that's wrong..."
# Check health
python monitor_session.py --check-health
python monitor_session.py --check-triggers
# Initialize/reset
python monitor_session.py --init
python monitor_session.py --reset
```
### context_filter.py
**Minimal context extractor** - Extracts only what's needed for subagent tasks.
Key Principle: "Caller should only pass exactly what is needed for the task so it can be laser focused"
Features:
- Loads only relevant files
- Extracts relevant Oracle patterns (max 5)
- Finds recent corrections (max 3)
- NO full conversation passing
Usage:
```bash
# Extract context for review
python context_filter.py --task review --file auth.py --focus security
# Extract context for planning
python context_filter.py --task plan --description "Build REST API"
# Extract context for debugging
python context_filter.py --task debug --file app.py --error "TypeError: ..."
# Output as JSON
python context_filter.py --task review --file auth.py --format json
```
### validator.py
**Suggestion validator** - Cross-checks subagent suggestions against Oracle knowledge.
Key Principle: "Subagent might be missing important codebase context - need validation layer"
Features:
- Validates against Oracle patterns
- Detects contradictions with known practices
- Checks rejection history
- Calculates confidence scores
- Learns from acceptance rates
Usage:
```bash
# Validate single suggestion
python validator.py --suggestion "Use MD5 for passwords" --category security
# Validate from file
python validator.py --suggestions-file suggestions.json
# Record rejection
python validator.py --record-rejection "Add rate limiting" --rejection-reason "We handle at nginx level" --category performance
# Update stats
python validator.py --update-stats accept --category security
python validator.py --update-stats reject --category style
# Check rejection history
python validator.py --check-rejection "Use rate limiting"
```
### learning.py
**Learning system** - Adjusts thresholds based on user feedback.
Features:
- Tracks acceptance/rejection rates
- Adjusts thresholds to maintain target acceptance rate
- Learns anti-patterns from rejections
- Updates auto-review rules dynamically
Usage:
```bash
# Apply adjustments
python learning.py --adjust
# Show recommendations (dry run)
python learning.py --recommend
# View statistics
python learning.py --stats
# Configure
python learning.py --set-target 0.75
python learning.py --set-speed 0.1
```
## Architecture
```
guardian.py (orchestrator)
|
+-- monitor_session.py (check triggers & health)
|
+-- context_filter.py (extract minimal context)
|
+-- [Task tool with Haiku agent] (perform review/planning)
|
+-- validator.py (validate suggestions)
|
+-- [Present to user with confidence scores]
|
+-- learning.py (adjust based on feedback)
```
## Data Storage
Guardian stores minimal data in `.guardian/`:
```
.guardian/
├── config.json # Configuration and thresholds
├── session_state.json # Current session metrics (NOT full conversation)
├── rejection_history.json # Recently rejected suggestions
└── acceptance_stats.json # Acceptance rate statistics
```
## Key Design Principles
1. **Minimal Context Passing** - Never pass full conversations to subagents
2. **Suggestion Mode** - Present findings as suggestions, not commands
3. **Oracle Validation** - Cross-check all suggestions against known patterns
4. **Learning from Feedback** - Adjust sensitivity based on user acceptance
5. **Haiku Only** - All subagents use haiku model (fast & cheap)
6. **User Authority** - User has final say on all suggestions
7. **Read-Only Subagents** - Subagents can ONLY read and analyze, NEVER modify files
## Integration with Oracle
Guardian automatically:
- Loads relevant Oracle patterns before review
- Validates suggestions against Oracle knowledge
- Records validated suggestions in Oracle
- Stores rejection reasons as anti-patterns in Oracle
## Example Workflow
1. **User writes 60 lines of auth code**
2. **monitor_session.py** detects threshold crossed
3. **guardian.py** extracts minimal context via **context_filter.py**
4. **Guardian spawns Haiku agent** with only: auth.py + security patterns
5. **Haiku agent** returns suggestions
6. **validator.py** checks suggestions against Oracle
7. **Guardian presents** filtered suggestions with confidence scores
8. **User accepts/rejects**
9. **learning.py** adjusts thresholds based on feedback
## Testing
```bash
# Initialize Guardian
cd /path/to/your/project
python guardian.py --init
# Simulate code writing
python monitor_session.py --event code-written --file test.py --lines 60
# Check if should trigger
python monitor_session.py --check-triggers
# Perform review
python guardian.py review --file test.py --focus security
# View session health
python guardian.py status
# View learning stats
python learning.py --stats
```
## Future Enhancements
- Integration with Claude Code Task tool for spawning Haiku agents
- Real-time monitoring via background process
- Web dashboard for session health visualization
- Team-wide learning (shared Guardian config via git)
- Integration with CI/CD pipelines

View File

@@ -0,0 +1,530 @@
#!/usr/bin/env python3
"""
Guardian Context Filter
Extracts MINIMAL context for subagent tasks - following the key principle:
"Caller should only pass exactly what is needed for the task so it can be laser focused."
This script NEVER passes full conversation history. It extracts only:
- Specific files being reviewed
- Relevant Oracle patterns (max 5)
- Recent corrections in the same area (max 3)
- A focused task description
Usage:
# Extract context for code review
python context_filter.py --task review --file auth.py --focus security
# Extract context for planning
python context_filter.py --task plan --description "Build REST API with auth"
# Extract context for debugging
python context_filter.py --task debug --file app.py --error "TypeError: cannot unpack"
Environment Variables:
ORACLE_PATH: Path to Oracle directory [default: .oracle]
"""
import os
import sys
import json
import argparse
from pathlib import Path
from typing import Dict, List, Optional, Any
import re
def find_oracle_root() -> Optional[Path]:
"""Find the .oracle directory."""
current = Path.cwd()
while current != current.parent:
oracle_path = current / '.oracle'
if oracle_path.exists():
return oracle_path
current = current.parent
return None
def load_oracle_knowledge(oracle_path: Path, categories: Optional[List[str]] = None) -> List[Dict[str, Any]]:
"""Load Oracle knowledge from specified categories.
Args:
oracle_path: Path to .oracle directory
categories: List of categories to load (defaults to all)
Returns:
List of knowledge entries
"""
knowledge_dir = oracle_path / 'knowledge'
all_knowledge: List[Dict[str, Any]] = []
if categories is None:
categories = ['patterns', 'preferences', 'gotchas', 'solutions', 'corrections']
for category in categories:
file_path = knowledge_dir / f'{category}.json'
if file_path.exists():
try:
with open(file_path, 'r', encoding='utf-8') as f:
entries = json.load(f)
for entry in entries:
if isinstance(entry, dict):
entry['_category'] = category
all_knowledge.append(entry)
except (json.JSONDecodeError, FileNotFoundError, OSError, IOError):
continue
return all_knowledge
def extract_file_patterns(file_path: str) -> List[str]:
"""Extract patterns from a file path for matching Oracle knowledge.
Args:
file_path: Path to the file
Returns:
List of patterns (extension, directory names, filename)
"""
patterns = []
path = Path(file_path)
# Add file extension
if path.suffix:
patterns.append(path.suffix[1:]) # Remove dot
# Add filename without extension
if path.stem:
patterns.append(path.stem)
# Add directory components
for part in path.parts[:-1]:
if part and part != '.' and part != '..':
patterns.append(part)
return patterns
def find_relevant_patterns(
oracle_knowledge: List[Dict[str, Any]],
file_patterns: List[str],
focus_keywords: Optional[List[str]] = None,
max_patterns: int = 5
) -> List[Dict[str, Any]]:
"""Find relevant Oracle patterns for the context.
Args:
oracle_knowledge: All Oracle knowledge
file_patterns: Patterns extracted from file path
focus_keywords: Optional focus keywords (e.g., "security", "performance")
max_patterns: Maximum number of patterns to return
Returns:
List of relevant pattern entries
"""
# Filter to patterns category only
patterns = [k for k in oracle_knowledge if k.get('_category') == 'patterns']
scored_patterns = []
for pattern in patterns:
score = 0.0
# Priority scoring
priority = pattern.get('priority', 'medium')
if priority == 'critical':
score += 1.0
elif priority == 'high':
score += 0.7
elif priority == 'medium':
score += 0.4
# Tag matching
tags = pattern.get('tags', [])
if tags and file_patterns:
matches = sum(1 for fp in file_patterns
if any(re.search(r'\b' + re.escape(fp.lower()) + r'\b', tag.lower())
for tag in tags))
score += matches * 0.3
# Focus keyword matching
if focus_keywords:
content = f"{pattern.get('title', '')} {pattern.get('content', '')}".lower()
keyword_matches = sum(1 for keyword in focus_keywords
if re.search(r'\b' + re.escape(keyword.lower()) + r'\b', content))
score += keyword_matches * 0.5
scored_patterns.append((pattern, score))
# Sort by score descending
scored_patterns.sort(key=lambda x: x[1], reverse=True)
# Return top N
return [pattern for pattern, score in scored_patterns[:max_patterns]]
def find_relevant_gotchas(
oracle_knowledge: List[Dict[str, Any]],
file_patterns: List[str],
focus_keywords: Optional[List[str]] = None,
max_gotchas: int = 5
) -> List[Dict[str, Any]]:
"""Find relevant Oracle gotchas for the context."""
gotchas = [k for k in oracle_knowledge if k.get('_category') == 'gotchas']
scored_gotchas = []
for gotcha in gotchas:
score = 0.0
# Priority scoring (gotchas are critical by nature)
priority = gotcha.get('priority', 'high')
if priority == 'critical':
score += 1.0
elif priority == 'high':
score += 0.8
# Tag matching
tags = gotcha.get('tags', [])
if tags and file_patterns:
matches = sum(1 for fp in file_patterns
if any(re.search(r'\b' + re.escape(fp.lower()) + r'\b', tag.lower())
for tag in tags))
score += matches * 0.4
# Focus keyword matching
if focus_keywords:
content = f"{gotcha.get('title', '')} {gotcha.get('content', '')}".lower()
keyword_matches = sum(1 for keyword in focus_keywords
if re.search(r'\b' + re.escape(keyword.lower()) + r'\b', content))
score += keyword_matches * 0.6
scored_gotchas.append((gotcha, score))
scored_gotchas.sort(key=lambda x: x[1], reverse=True)
return [gotcha for gotcha, score in scored_gotchas[:max_gotchas]]
def find_recent_corrections(
oracle_knowledge: List[Dict[str, Any]],
file_patterns: List[str],
max_corrections: int = 3
) -> List[Dict[str, Any]]:
"""Find recent relevant corrections from Oracle.
Args:
oracle_knowledge: All Oracle knowledge
file_patterns: Patterns from file path
max_corrections: Maximum corrections to return
Returns:
List of recent relevant corrections
"""
corrections = [k for k in oracle_knowledge if k.get('_category') == 'corrections']
# Sort by creation date (most recent first)
sorted_corrections = sorted(
corrections,
key=lambda x: x.get('created', ''),
reverse=True
)
# Filter for relevance
relevant = []
for correction in sorted_corrections:
tags = correction.get('tags', [])
content = f"{correction.get('title', '')} {correction.get('content', '')}".lower()
# Check if relevant to current file patterns
is_relevant = False
if tags and file_patterns:
if any(fp.lower() in tag.lower() for fp in file_patterns for tag in tags):
is_relevant = True
if file_patterns:
if any(re.search(r'\b' + re.escape(fp.lower()) + r'\b', content) for fp in file_patterns):
is_relevant = True
if is_relevant:
relevant.append(correction)
if len(relevant) >= max_corrections:
break
return relevant
def build_minimal_context(
task_type: str,
file_path: Optional[str] = None,
focus: Optional[str] = None,
description: Optional[str] = None,
error_message: Optional[str] = None,
oracle_path: Optional[Path] = None
) -> Dict[str, Any]:
"""Build minimal context for subagent task.
Args:
task_type: Type of task (review, plan, debug)
file_path: Optional file path to review
focus: Optional focus keywords (e.g., "security performance")
description: Optional task description
error_message: Optional error message for debugging
oracle_path: Optional path to Oracle directory
Returns:
Minimal context dictionary
"""
context: Dict[str, Any] = {
'task': task_type,
'files': {},
'oracle_patterns': [],
'oracle_gotchas': [],
'recent_corrections': [],
'focus': ''
}
# Parse focus keywords
focus_keywords = focus.split() if focus else []
# Extract file patterns
file_patterns = extract_file_patterns(file_path) if file_path else []
# Load file content if provided (with size limit to avoid memory exhaustion)
MAX_FILE_SIZE = 1024 * 1024 # 1MB limit
if file_path:
try:
path = Path(file_path)
if path.exists() and path.is_file():
file_size = path.stat().st_size
if file_size > MAX_FILE_SIZE:
context['files'][file_path] = f"[File too large: {file_size} bytes. Showing first 1MB only]\n"
with open(path, 'r', encoding='utf-8') as f:
# Read only first 1MB
context['files'][file_path] += f.read(MAX_FILE_SIZE)
else:
with open(path, 'r', encoding='utf-8') as f:
context['files'][file_path] = f.read()
except (OSError, IOError, UnicodeDecodeError) as e:
context['files'][file_path] = "[Error: Could not read file]"
# Load Oracle knowledge if available
if oracle_path:
oracle_knowledge = load_oracle_knowledge(oracle_path)
# Find relevant patterns
context['oracle_patterns'] = find_relevant_patterns(
oracle_knowledge,
file_patterns,
focus_keywords,
max_patterns=5
)
# Find relevant gotchas
context['oracle_gotchas'] = find_relevant_gotchas(
oracle_knowledge,
file_patterns,
focus_keywords,
max_gotchas=5
)
# Find recent corrections
context['recent_corrections'] = find_recent_corrections(
oracle_knowledge,
file_patterns,
max_corrections=3
)
# Build focus description
if task_type == 'review':
if focus:
context['focus'] = f"Review {file_path or 'code'} for {focus} issues"
else:
context['focus'] = f"Review {file_path or 'code'} for potential issues"
elif task_type == 'plan':
context['focus'] = f"Break down this task into subtasks: {description or 'Complex task'}"
elif task_type == 'debug':
context['focus'] = f"Debug error in {file_path or 'code'}: {error_message or 'Unknown error'}"
if error_message:
context['error'] = error_message
return context
def format_context_for_agent(context: Dict[str, Any], format_type: str = 'text') -> str:
"""Format context for subagent consumption.
Args:
context: Minimal context dictionary
format_type: Output format (text or json)
Returns:
Formatted context string
"""
if format_type == 'json':
return json.dumps(context, indent=2)
# Text format
lines = []
lines.append(f"# Task: {context['task'].capitalize()}")
lines.append("")
lines.append(f"**Focus**: {context['focus']}")
lines.append("")
# Files
if context['files']:
lines.append("## Files to Review")
lines.append("")
for file_path, content in context['files'].items():
lines.append(f"### {file_path}")
lines.append("")
lines.append("```")
lines.append(content[:5000]) # Limit file size
if len(content) > 5000:
lines.append("... [truncated]")
lines.append("```")
lines.append("")
# Oracle patterns
if context['oracle_patterns']:
lines.append("## Relevant Patterns (from Oracle)")
lines.append("")
for pattern in context['oracle_patterns']:
title = pattern.get('title', 'Untitled')
content = pattern.get('content', '')
lines.append(f"- **{title}**")
if content:
lines.append(f" {content[:200]}")
lines.append("")
# Oracle gotchas
if context['oracle_gotchas']:
lines.append("## Gotchas to Watch For (from Oracle)")
lines.append("")
for gotcha in context['oracle_gotchas']:
title = gotcha.get('title', 'Untitled')
content = gotcha.get('content', '')
priority = gotcha.get('priority', 'medium')
if priority == 'critical':
lines.append(f"- **[CRITICAL]** {title}")
else:
lines.append(f"- {title}")
if content:
lines.append(f" {content[:200]}")
lines.append("")
# Recent corrections
if context['recent_corrections']:
lines.append("## Recent Corrections (from Oracle)")
lines.append("")
for correction in context['recent_corrections']:
content = correction.get('content', '')
title = correction.get('title', 'Correction')
# Try to extract the "Right:" part
if 'Right:' in content:
try:
right_part = content.split('Right:', 1)[1].split('\n', 1)[0].strip()
if right_part:
lines.append(f"- {right_part}")
else:
lines.append(f"- {title}")
except (IndexError, ValueError, AttributeError):
lines.append(f"- {title}")
else:
lines.append(f"- {title}")
lines.append("")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(
description='Extract minimal context for Guardian subagent tasks',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'--task',
required=True,
choices=['review', 'plan', 'debug'],
help='Type of task'
)
parser.add_argument(
'--file',
help='File path to review/debug'
)
parser.add_argument(
'--focus',
help='Focus keywords (e.g., "security performance")'
)
parser.add_argument(
'--description',
help='Task description (for planning tasks)'
)
parser.add_argument(
'--error',
help='Error message (for debugging tasks)'
)
parser.add_argument(
'--format',
choices=['text', 'json'],
default='text',
help='Output format'
)
parser.add_argument(
'--no-oracle',
action='store_true',
help='Skip Oracle knowledge loading'
)
args = parser.parse_args()
# Validate arguments based on task type
if args.task == 'review' and not args.file:
print("Error: --file required for review tasks", file=sys.stderr)
sys.exit(1)
if args.task == 'plan' and not args.description:
print("Error: --description required for planning tasks", file=sys.stderr)
sys.exit(1)
if args.task == 'debug' and not args.file:
print("Error: --file required for debug tasks", file=sys.stderr)
sys.exit(1)
# Find Oracle
oracle_path = None
if not args.no_oracle:
oracle_path = find_oracle_root()
# Build minimal context
context = build_minimal_context(
task_type=args.task,
file_path=args.file,
focus=args.focus,
description=args.description,
error_message=args.error,
oracle_path=oracle_path
)
# Format and output
output = format_context_for_agent(context, args.format)
print(output)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,524 @@
#!/usr/bin/env python3
"""
Guardian - Main Orchestrator
Coordinates Guardian components to provide automatic quality gates and session health monitoring.
This is the main entry point for Guardian operations. It:
1. Checks session health metrics
2. Determines if intervention is needed
3. Extracts minimal context
4. Spawns Haiku subagent for focused review/planning
5. Validates suggestions against Oracle
6. Presents results to user with confidence scores
7. Learns from user feedback
Usage:
# Manual code review
python guardian.py review --file auth.py --focus security
# Check if Guardian should trigger
python guardian.py check
# Plan a complex task
python guardian.py plan --task "Build REST API with auth and rate limiting"
# Debug an error
python guardian.py debug --file app.py --error "TypeError: cannot unpack"
# Get session health status
python guardian.py status
Environment Variables:
GUARDIAN_MODEL: Model to use for subagents [default: haiku]
"""
import os
import sys
import json
import argparse
from pathlib import Path
from typing import Dict, List, Optional, Any
import subprocess
def find_scripts_dir() -> Path:
"""Find the Guardian Scripts directory."""
return Path(__file__).parent
def run_script(script_name: str, args: List[str]) -> Dict[str, Any]:
"""Run a Guardian script and return JSON output.
Args:
script_name: Name of the script (without .py extension)
args: List of command-line arguments
Returns:
Parsed JSON output from the script
"""
scripts_dir = find_scripts_dir()
script_path = scripts_dir / f"{script_name}.py"
if not script_path.exists():
raise FileNotFoundError(f"Script not found: {script_path}")
try:
result = subprocess.run(
['python', str(script_path)] + args,
capture_output=True,
text=True,
timeout=30
)
if result.returncode != 0:
# Try to parse error as JSON
try:
return json.loads(result.stdout)
except json.JSONDecodeError:
return {'error': result.stderr or result.stdout}
# Parse output as JSON
try:
return json.loads(result.stdout)
except json.JSONDecodeError:
return {'output': result.stdout}
except subprocess.TimeoutExpired:
return {'error': 'Script timeout'}
except Exception as e:
return {'error': str(e)}
def check_triggers() -> Dict[str, Any]:
"""Check if any Guardian triggers have fired."""
return run_script('monitor_session', ['--check-triggers'])
def get_session_health() -> Dict[str, Any]:
"""Get current session health metrics."""
return run_script('monitor_session', ['--check-health'])
def extract_context(
task_type: str,
file_path: Optional[str] = None,
focus: Optional[str] = None,
description: Optional[str] = None,
error_message: Optional[str] = None
) -> str:
"""Extract minimal context for subagent task."""
args = ['--task', task_type, '--format', 'text']
if file_path:
args.extend(['--file', file_path])
if focus:
args.extend(['--focus', focus])
if description:
args.extend(['--description', description])
if error_message:
args.extend(['--error', error_message])
result = run_script('context_filter', args)
if 'output' in result:
return result['output']
elif 'error' in result:
return f"Error extracting context: {result['error']}"
else:
return str(result)
def validate_suggestions(suggestions: List[Dict[str, str]]) -> List[Dict[str, Any]]:
"""Validate suggestions against Oracle knowledge.
Args:
suggestions: List of suggestion dictionaries with 'text' and 'category' keys
Returns:
List of validated suggestions with confidence scores
"""
# Create temporary file with suggestions
import tempfile
suggestions_file = None
try:
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(suggestions, f)
suggestions_file = f.name
result = run_script('validator', ['--suggestions-file', suggestions_file])
return result if isinstance(result, list) else []
finally:
# Clean up temp file
if suggestions_file:
try:
os.unlink(suggestions_file)
except OSError:
pass
def format_suggestions_for_user(validated_suggestions: List[Dict[str, Any]]) -> str:
"""Format validated suggestions for user presentation.
Args:
validated_suggestions: List of validated suggestions
Returns:
Formatted string for user
"""
lines = []
# Filter to presentable suggestions
presentable = [s for s in validated_suggestions if s.get('should_present', True)]
if not presentable:
return "Guardian: No suggestions to present (all filtered by validation)"
lines.append(f"Guardian Review Found {len(presentable)} Suggestions:")
lines.append("")
for i, suggestion in enumerate(presentable, 1):
confidence = suggestion.get('confidence', 0.0)
text = suggestion.get('suggestion', '')
category = suggestion.get('category', 'general')
warnings = suggestion.get('warnings', [])
notes = suggestion.get('notes', [])
# Format confidence indicator
if confidence >= 0.7:
conf_indicator = f"[{confidence:.2f}]"
elif confidence >= 0.5:
conf_indicator = f"?[{confidence:.2f}]"
else:
conf_indicator = f"![{confidence:.2f}]"
lines.append(f"{i}. {conf_indicator} {text}")
lines.append(f" Category: {category}")
# Add warnings
for warning in warnings:
severity = warning.get('severity', 'low')
message = warning.get('message', '')
if severity == 'high':
lines.append(f" WARNING: {message}")
else:
lines.append(f" Note: {message}")
# Add notes
for note in notes:
lines.append(f" {note}")
lines.append("")
# Add command options
lines.append("Options:")
lines.append(" a - Accept all high-confidence suggestions (>=0.7)")
lines.append(" 1,3,5 - Accept specific suggestions by number")
lines.append(" r - Reject all with reason")
lines.append(" i <reason> - Reject and add to anti-patterns")
lines.append(" d <num> - Discuss specific suggestion")
lines.append(" q - Dismiss review")
return "\n".join(lines)
def perform_review(
file_path: str,
focus: Optional[str] = None
) -> None:
"""Perform code review using Guardian.
Args:
file_path: Path to file to review
focus: Optional focus keywords (e.g., "security performance")
"""
print(f"Guardian: Reviewing {file_path}...")
print()
# Extract minimal context
context_text = extract_context('review', file_path=file_path, focus=focus)
# Build read-only prompt for Haiku agent
agent_prompt = f"""You are a READ-ONLY code reviewer for Guardian. You can ONLY analyze and suggest.
CRITICAL CONSTRAINTS:
- DO NOT use Write, Edit, NotebookEdit, or Bash tools
- DO NOT modify any files
- DO NOT execute any code
- ONLY read the provided context and return suggestions
Your task: Review the code for potential issues and return suggestions.
{context_text}
Return your findings as a JSON array of suggestions with this format:
[
{{
"text": "Clear description of the issue and recommended fix",
"category": "security|performance|style|bugs|maintainability",
"file": "file path (if applicable)",
"line": line_number (if applicable, otherwise null)
}}
]
If you find no issues, return an empty array: []
Remember: You are READ-ONLY. Only analyze and suggest, never modify."""
print("Spawning Haiku review agent with minimal context...")
print()
# Note: This would be implemented when Guardian is used as a skill
# For standalone script usage, we output instructions instead
print("=" * 60)
print("READY TO SPAWN HAIKU AGENT")
print("=" * 60)
print("To complete this review, use the Task tool with:")
print(f" subagent_type: general-purpose")
print(f" model: haiku")
print(f" prompt: <see agent_prompt below>")
print()
print("Agent Prompt:")
print("-" * 60)
print(agent_prompt)
print("=" * 60)
print()
print("Once you have the agent's response, the suggestions will be:")
print(" 1. Validated against Oracle knowledge")
print(" 2. Presented with confidence scores")
print(" 3. Offered for user acceptance/rejection")
print()
print("Example suggestion handling:")
mock_suggestions = [
{
'text': 'Consider using bcrypt for password hashing instead of MD5',
'category': 'security',
'file': file_path,
'line': None
}
]
validated = validate_suggestions(mock_suggestions)
presentation = format_suggestions_for_user(validated)
print(presentation)
def perform_planning(task_description: str) -> None:
"""Break down a complex task using Guardian planning.
Args:
task_description: Description of the task to break down
"""
print("Guardian: Breaking down complex task...")
print()
# Extract minimal context
context_text = extract_context('plan', description=task_description)
# Build read-only prompt for Haiku planner
agent_prompt = f"""You are a READ-ONLY task planner for Guardian. You can ONLY analyze and plan.
CRITICAL CONSTRAINTS:
- DO NOT use Write, Edit, NotebookEdit, or Bash tools
- DO NOT modify any files
- DO NOT execute any code
- ONLY analyze the task and return a breakdown plan
Your task: Break down this complex task into manageable subtasks.
{context_text}
Return your plan as a JSON array of subtasks with this format:
[
{{
"task": "Clear description of the subtask",
"estimated_lines": approximate lines of code needed,
"dependencies": ["list", "of", "prerequisite", "subtask", "numbers"],
"files_affected": ["list of files that will be created/modified"],
"priority": "high|medium|low"
}}
]
Consider:
- Dependencies between tasks
- Logical ordering
- Potential complexity and risks
- Integration points
Remember: You are READ-ONLY. Only analyze and plan, never modify."""
print("=" * 60)
print("READY TO SPAWN HAIKU PLANNER")
print("=" * 60)
print("To complete this planning task, use the Task tool with:")
print(f" subagent_type: Plan")
print(f" model: haiku")
print(f" prompt: <see agent_prompt below>")
print()
print("Agent Prompt:")
print("-" * 60)
print(agent_prompt)
print("=" * 60)
def perform_debug(file_path: str, error_message: str) -> None:
"""Debug an error using Guardian.
Args:
file_path: Path to file with error
error_message: Error message to debug
"""
print(f"Guardian: Debugging error in {file_path}...")
print()
# Extract minimal context
context_text = extract_context('debug', file_path=file_path, error_message=error_message)
# Build read-only prompt for Haiku debugger
agent_prompt = f"""You are a READ-ONLY error debugger for Guardian. You can ONLY analyze and suggest fixes.
CRITICAL CONSTRAINTS:
- DO NOT use Write, Edit, NotebookEdit, or Bash tools
- DO NOT modify any files
- DO NOT execute any code
- ONLY analyze the error and return debugging suggestions
Your task: Analyze this error and suggest potential fixes.
{context_text}
Return your analysis as a JSON object with this format:
{{
"root_cause": "Most likely cause of the error",
"affected_code": {{
"file": "file path",
"line": line_number (if known)
}},
"suggestions": [
{{
"text": "Clear description of the fix",
"category": "bug",
"confidence": 0.0 to 1.0
}}
],
"similar_patterns": ["Any similar error patterns from Oracle knowledge"]
}}
Consider:
- What the error message indicates
- Common causes of this error type
- Relevant Oracle patterns or gotchas
- Edge cases that might trigger this
Remember: You are READ-ONLY. Only analyze and suggest, never modify."""
print("=" * 60)
print("READY TO SPAWN HAIKU DEBUGGER")
print("=" * 60)
print("To complete this debug task, use the Task tool with:")
print(f" subagent_type: general-purpose")
print(f" model: haiku")
print(f" prompt: <see agent_prompt below>")
print()
print("Agent Prompt:")
print("-" * 60)
print(agent_prompt)
print("=" * 60)
def check_if_should_trigger() -> bool:
"""Check if Guardian should automatically trigger.
Returns:
True if Guardian should trigger, False otherwise
"""
triggers = check_triggers()
if isinstance(triggers, list) and len(triggers) > 0:
print("Guardian: Detected triggers:")
print(json.dumps(triggers, indent=2))
return True
return False
def show_status() -> None:
"""Show current session health status."""
health = get_session_health()
print("Guardian Session Health Status:")
print("=" * 60)
print(json.dumps(health, indent=2))
print("=" * 60)
# Check triggers
triggers = check_triggers()
if isinstance(triggers, list) and len(triggers) > 0:
print()
print("Active Triggers:")
for trigger in triggers:
trigger_type = trigger.get('trigger', 'unknown')
priority = trigger.get('priority', 'medium')
print(f" - [{priority.upper()}] {trigger_type}")
for key, value in trigger.items():
if key not in ['trigger', 'priority']:
print(f" {key}: {value}")
def main():
parser = argparse.ArgumentParser(
description='Guardian - Quality gate and session health monitor',
formatter_class=argparse.RawDescriptionHelpFormatter
)
subparsers = parser.add_subparsers(dest='command', help='Guardian commands')
# Review command
review_parser = subparsers.add_parser('review', help='Review code for issues')
review_parser.add_argument('--file', required=True, help='File to review')
review_parser.add_argument('--focus', help='Focus keywords (e.g., "security performance")')
# Plan command
plan_parser = subparsers.add_parser('plan', help='Break down complex task')
plan_parser.add_argument('--task', required=True, help='Task description')
# Debug command
debug_parser = subparsers.add_parser('debug', help='Debug an error')
debug_parser.add_argument('--file', required=True, help='File with error')
debug_parser.add_argument('--error', required=True, help='Error message')
# Check command
subparsers.add_parser('check', help='Check if Guardian should trigger')
# Status command
subparsers.add_parser('status', help='Show session health status')
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
# Execute command
if args.command == 'review':
perform_review(args.file, args.focus)
elif args.command == 'plan':
perform_planning(args.task)
elif args.command == 'debug':
perform_debug(args.file, args.error)
elif args.command == 'check':
if check_if_should_trigger():
sys.exit(0) # Should trigger
else:
print("Guardian: No triggers detected")
sys.exit(1) # Should not trigger
elif args.command == 'status':
show_status()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,512 @@
#!/usr/bin/env python3
"""
Guardian Learning System
Adjusts Guardian sensitivity and thresholds based on user feedback.
Key Principle: "Learn from feedback to adjust sensitivity"
This script:
1. Tracks acceptance/rejection rates per category
2. Adjusts thresholds dynamically to maintain target acceptance rate
3. Learns which file types need more/less review
4. Stores anti-patterns based on rejection reasons
5. Adapts to user's working style over time
Usage:
# Adjust thresholds based on recent feedback
python learning.py --adjust
# Get current threshold recommendations
python learning.py --recommend
# Manually set acceptance rate target
python learning.py --set-target 0.75
# View learning statistics
python learning.py --stats
Environment Variables:
GUARDIAN_CONFIG_PATH: Path to Guardian config file [default: .guardian/config.json]
"""
import os
import sys
import json
import argparse
from pathlib import Path
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
def find_guardian_root() -> Optional[Path]:
"""Find the .guardian directory."""
current = Path.cwd()
while current != current.parent:
guardian_path = current / '.guardian'
if guardian_path.exists():
return guardian_path
current = current.parent
return None
def load_config(guardian_path: Path) -> Dict[str, Any]:
"""Load Guardian configuration."""
config_path = guardian_path / 'config.json'
if not config_path.exists():
return {}
try:
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError, OSError, IOError):
return {}
def save_config(guardian_path: Path, config: Dict[str, Any]) -> None:
"""Save Guardian configuration."""
config_path = guardian_path / 'config.json'
try:
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2)
except (OSError, IOError) as e:
print(f"Error: Failed to save config: {e}", file=sys.stderr)
sys.exit(1)
def load_acceptance_stats(guardian_path: Path) -> Dict[str, Any]:
"""Load acceptance rate statistics."""
stats_file = guardian_path / 'acceptance_stats.json'
if not stats_file.exists():
return {
'by_category': {},
'by_type': {},
'overall': {
'accepted': 0,
'rejected': 0,
'rate': 0.0
}
}
try:
with open(stats_file, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError, OSError, IOError):
return {'by_category': {}, 'by_type': {}, 'overall': {'accepted': 0, 'rejected': 0, 'rate': 0.0}}
def load_rejection_history(guardian_path: Path, days: int = 30) -> List[Dict[str, Any]]:
"""Load recent rejection history.
Args:
guardian_path: Path to Guardian directory
days: Number of days to look back
Returns:
List of recent rejections
"""
history_file = guardian_path / 'rejection_history.json'
if not history_file.exists():
return []
try:
with open(history_file, 'r', encoding='utf-8') as f:
all_history = json.load(f)
except (json.JSONDecodeError, FileNotFoundError, OSError, IOError):
return []
# Filter to recent rejections
cutoff = datetime.now() - timedelta(days=days)
recent = []
for rejection in all_history:
try:
ts = datetime.fromisoformat(rejection.get('timestamp', ''))
# Handle timezone-aware timestamps
if ts.tzinfo and not cutoff.tzinfo:
cutoff = cutoff.replace(tzinfo=ts.tzinfo)
if ts >= cutoff:
recent.append(rejection)
except (ValueError, TypeError):
continue
return recent
def calculate_threshold_adjustments(
config: Dict[str, Any],
acceptance_stats: Dict[str, Any],
target_rate: float = 0.7,
adjustment_speed: float = 0.1
) -> Dict[str, int]:
"""Calculate new threshold values based on acceptance rates.
Args:
config: Current configuration
acceptance_stats: Acceptance statistics
target_rate: Target acceptance rate (0.0 to 1.0)
adjustment_speed: How fast to adjust (0.0 to 1.0)
Returns:
Dictionary of adjusted threshold values
"""
current_sensitivity = config.get('sensitivity', {})
overall_rate = acceptance_stats.get('overall', {}).get('rate', 0.5)
adjustments = {}
# Lines threshold adjustment
current_lines = current_sensitivity.get('lines_threshold', 50)
if overall_rate < target_rate - 0.1:
# Too many false positives - increase threshold (trigger less often)
adjustment = int(current_lines * adjustment_speed)
adjustments['lines_threshold'] = current_lines + max(5, adjustment)
elif overall_rate > target_rate + 0.1:
# Too many missed issues - decrease threshold (trigger more often)
adjustment = int(current_lines * adjustment_speed)
adjustments['lines_threshold'] = max(20, current_lines - max(5, adjustment))
else:
# Well-calibrated
adjustments['lines_threshold'] = current_lines
# Error repeat threshold adjustment
error_stats = acceptance_stats.get('by_category', {}).get('error_analysis', {})
error_rate = error_stats.get('rate', 0.5)
current_error_threshold = current_sensitivity.get('error_repeat_threshold', 3)
if error_rate < target_rate - 0.1:
adjustments['error_repeat_threshold'] = min(10, current_error_threshold + 1)
elif error_rate > target_rate + 0.1:
adjustments['error_repeat_threshold'] = max(2, current_error_threshold - 1)
else:
adjustments['error_repeat_threshold'] = current_error_threshold
# File churn threshold adjustment
churn_stats = acceptance_stats.get('by_category', {}).get('file_churn', {})
churn_rate = churn_stats.get('rate', 0.5)
current_churn_threshold = current_sensitivity.get('file_churn_threshold', 5)
if churn_rate < target_rate - 0.1:
adjustments['file_churn_threshold'] = min(15, current_churn_threshold + 1)
elif churn_rate > target_rate + 0.1:
adjustments['file_churn_threshold'] = max(3, current_churn_threshold - 1)
else:
adjustments['file_churn_threshold'] = current_churn_threshold
# Correction threshold adjustment
correction_stats = acceptance_stats.get('by_category', {}).get('corrections', {})
correction_rate = correction_stats.get('rate', 0.5)
current_correction_threshold = current_sensitivity.get('correction_threshold', 3)
if correction_rate < target_rate - 0.1:
adjustments['correction_threshold'] = min(10, current_correction_threshold + 1)
elif correction_rate > target_rate + 0.1:
adjustments['correction_threshold'] = max(2, current_correction_threshold - 1)
else:
adjustments['correction_threshold'] = current_correction_threshold
return adjustments
def learn_from_rejections(
rejection_history: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""Analyze rejection patterns to learn anti-patterns.
Args:
rejection_history: List of rejections
Returns:
Dictionary of learned anti-patterns and insights
"""
insights = {
'common_rejection_reasons': {},
'frequently_rejected_categories': {},
'anti_patterns': []
}
# Count rejection reasons
for rejection in rejection_history:
reason = rejection.get('reason', 'Unknown')
category = rejection.get('category', 'general')
# Count by reason
if reason not in insights['common_rejection_reasons']:
insights['common_rejection_reasons'][reason] = 0
insights['common_rejection_reasons'][reason] += 1
# Count by category
if category not in insights['frequently_rejected_categories']:
insights['frequently_rejected_categories'][category] = 0
insights['frequently_rejected_categories'][category] += 1
# Identify anti-patterns (highly rejected categories)
for category, count in insights['frequently_rejected_categories'].items():
if count >= 5: # Rejected 5+ times
rejection_rate = count / len(rejection_history) if len(rejection_history) > 0 else 0
if rejection_rate > 0.8: # 80%+ rejection rate
insights['anti_patterns'].append({
'category': category,
'rejection_count': count,
'rejection_rate': rejection_rate,
'recommendation': f"Stop suggesting {category} - rejection rate {rejection_rate:.0%}"
})
return insights
def update_auto_review_rules(
config: Dict[str, Any],
acceptance_stats: Dict[str, Any],
insights: Dict[str, Any]
) -> Dict[str, Any]:
"""Update auto-review rules based on learning.
Args:
config: Current configuration
acceptance_stats: Acceptance statistics
insights: Learned insights from rejections
Returns:
Updated auto_review configuration
"""
auto_review = config.get('auto_review', {
'enabled': True,
'always_review': ['auth', 'security', 'crypto', 'payment'],
'never_review': ['test', 'mock', 'fixture']
})
# Add anti-patterns to never_review
for anti_pattern in insights.get('anti_patterns', []):
category = anti_pattern['category']
if category not in auto_review['never_review']:
auto_review['never_review'].append(category)
# Check which categories have high acceptance rates
by_category = acceptance_stats.get('by_category', {})
for category, stats in by_category.items():
rate = stats.get('rate', 0.0)
total = stats.get('accepted', 0) + stats.get('rejected', 0)
# If category has >90% acceptance and >10 samples, add to always_review
if rate > 0.9 and total > 10:
if category not in auto_review['always_review']:
auto_review['always_review'].append(category)
# If category has <20% acceptance and >10 samples, add to never_review
if rate < 0.2 and total > 10:
if category not in auto_review['never_review']:
auto_review['never_review'].append(category)
return auto_review
def apply_adjustments(
guardian_path: Path,
dry_run: bool = False
) -> Dict[str, Any]:
"""Apply learned adjustments to configuration.
Args:
guardian_path: Path to Guardian directory
dry_run: If True, don't save changes (just return recommendations)
Returns:
Dictionary with adjustment details
"""
config = load_config(guardian_path)
acceptance_stats = load_acceptance_stats(guardian_path)
rejection_history = load_rejection_history(guardian_path)
# Get learning parameters
learning_config = config.get('learning', {})
target_rate = learning_config.get('acceptance_rate_target', 0.7)
adjustment_speed = learning_config.get('adjustment_speed', 0.1)
# Calculate threshold adjustments
threshold_adjustments = calculate_threshold_adjustments(
config,
acceptance_stats,
target_rate,
adjustment_speed
)
# Learn from rejections
insights = learn_from_rejections(rejection_history)
# Update auto-review rules
updated_auto_review = update_auto_review_rules(config, acceptance_stats, insights)
# Prepare result
result = {
'current_acceptance_rate': acceptance_stats.get('overall', {}).get('rate', 0.0),
'target_acceptance_rate': target_rate,
'threshold_adjustments': threshold_adjustments,
'auto_review_updates': updated_auto_review,
'insights': insights,
'applied': not dry_run
}
# Apply changes if not dry run
if not dry_run:
# Update sensitivity thresholds
if 'sensitivity' not in config:
config['sensitivity'] = {}
config['sensitivity'].update(threshold_adjustments)
# Update auto_review
config['auto_review'] = updated_auto_review
# Save updated config
save_config(guardian_path, config)
return result
def get_statistics(guardian_path: Path) -> Dict[str, Any]:
"""Get learning statistics.
Args:
guardian_path: Path to Guardian directory
Returns:
Statistics dictionary
"""
config = load_config(guardian_path)
acceptance_stats = load_acceptance_stats(guardian_path)
rejection_history = load_rejection_history(guardian_path, days=30)
overall = acceptance_stats.get('overall', {})
by_category = acceptance_stats.get('by_category', {})
stats = {
'overall': {
'accepted': overall.get('accepted', 0),
'rejected': overall.get('rejected', 0),
'acceptance_rate': overall.get('rate', 0.0)
},
'by_category': {},
'current_thresholds': config.get('sensitivity', {}),
'target_acceptance_rate': config.get('learning', {}).get('acceptance_rate_target', 0.7),
'recent_rejections_30d': len(rejection_history),
'auto_review_rules': config.get('auto_review', {})
}
# Add category breakdown
for category, cat_stats in by_category.items():
stats['by_category'][category] = {
'accepted': cat_stats.get('accepted', 0),
'rejected': cat_stats.get('rejected', 0),
'rate': cat_stats.get('rate', 0.0)
}
return stats
def main():
parser = argparse.ArgumentParser(
description='Guardian learning system - adjust thresholds based on feedback',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'--adjust',
action='store_true',
help='Apply threshold adjustments based on recent feedback'
)
parser.add_argument(
'--recommend',
action='store_true',
help='Show recommended adjustments without applying (dry run)'
)
parser.add_argument(
'--stats',
action='store_true',
help='Show learning statistics'
)
parser.add_argument(
'--set-target',
type=float,
help='Set target acceptance rate (0.0 to 1.0)'
)
parser.add_argument(
'--set-speed',
type=float,
help='Set adjustment speed (0.0 to 1.0)'
)
args = parser.parse_args()
# Find Guardian
guardian_path = find_guardian_root()
if not guardian_path:
print("Error: Guardian not initialized (.guardian directory not found)", file=sys.stderr)
sys.exit(1)
# Handle set target
if args.set_target is not None:
if not 0.0 <= args.set_target <= 1.0:
print("Error: Target acceptance rate must be between 0.0 and 1.0", file=sys.stderr)
sys.exit(1)
config = load_config(guardian_path)
if 'learning' not in config:
config['learning'] = {}
config['learning']['acceptance_rate_target'] = args.set_target
save_config(guardian_path, config)
print(json.dumps({'target_set': args.set_target}))
sys.exit(0)
# Handle set speed
if args.set_speed is not None:
if not 0.0 <= args.set_speed <= 1.0:
print("Error: Adjustment speed must be between 0.0 and 1.0", file=sys.stderr)
sys.exit(1)
config = load_config(guardian_path)
if 'learning' not in config:
config['learning'] = {}
config['learning']['adjustment_speed'] = args.set_speed
save_config(guardian_path, config)
print(json.dumps({'speed_set': args.set_speed}))
sys.exit(0)
# Handle stats
if args.stats:
stats = get_statistics(guardian_path)
print(json.dumps(stats, indent=2))
sys.exit(0)
# Handle recommend (dry run)
if args.recommend:
result = apply_adjustments(guardian_path, dry_run=True)
print(json.dumps(result, indent=2))
sys.exit(0)
# Handle adjust
if args.adjust:
result = apply_adjustments(guardian_path, dry_run=False)
print(json.dumps(result, indent=2))
sys.exit(0)
parser.print_help()
sys.exit(1)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,610 @@
#!/usr/bin/env python3
"""
Guardian Session Monitor
Tracks session health metrics and detects when intervention would be helpful.
This script monitors in the background and triggers Guardian when thresholds are crossed.
Key Principle: MINIMAL STORAGE - only tracks metrics, NOT full conversation content.
Usage:
# Track a code write event
python monitor_session.py --event code-written --file auth.py --lines 60
# Track an error
python monitor_session.py --event error --message "TypeError: cannot unpack non-iterable"
# Track a user correction
python monitor_session.py --event correction --message "that's wrong, use bcrypt instead"
# Check session health
python monitor_session.py --check-health
# Reset session metrics
python monitor_session.py --reset
Environment Variables:
GUARDIAN_CONFIG_PATH: Path to Guardian config file [default: .guardian/config.json]
"""
import os
import sys
import json
import argparse
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Any
from collections import defaultdict
def parse_timestamp_with_tz(ts_str: str, reference_time: datetime) -> Optional[datetime]:
"""Parse ISO timestamp and make it comparable with reference_time.
Args:
ts_str: ISO format timestamp string
reference_time: Reference datetime to match timezone with
Returns:
Parsed datetime that's comparable with reference_time, or None if parsing fails
"""
try:
ts = datetime.fromisoformat(ts_str)
# If timestamp has timezone but reference doesn't, make timestamp naive
if ts.tzinfo and not reference_time.tzinfo:
ts = ts.replace(tzinfo=None)
# If reference has timezone but timestamp doesn't, make timestamp aware
elif not ts.tzinfo and reference_time.tzinfo:
ts = ts.replace(tzinfo=reference_time.tzinfo)
return ts
except (ValueError, TypeError):
return None
def find_guardian_root() -> Optional[Path]:
"""Find the .guardian directory."""
current = Path.cwd()
while current != current.parent:
guardian_path = current / '.guardian'
if guardian_path.exists():
return guardian_path
current = current.parent
return None
def init_guardian_if_needed() -> Path:
"""Initialize .guardian directory if it doesn't exist."""
guardian_path = Path.cwd() / '.guardian'
if not guardian_path.exists():
guardian_path.mkdir(parents=True, exist_ok=True)
# Create default config
default_config = {
"enabled": True,
"sensitivity": {
"lines_threshold": 50,
"error_repeat_threshold": 3,
"file_churn_threshold": 5,
"correction_threshold": 3,
"context_warning_percent": 0.7
},
"trigger_phrases": {
"review_needed": ["can you review", "does this look right"],
"struggling": ["still not working", "same error"],
"complexity": ["this is complex", "not sure how to"]
},
"auto_review": {
"enabled": True,
"always_review": ["auth", "security", "crypto", "payment"],
"never_review": ["test", "mock", "fixture"]
},
"learning": {
"acceptance_rate_target": 0.7,
"adjustment_speed": 0.1,
"memory_window_days": 30
},
"model": "haiku"
}
config_path = guardian_path / 'config.json'
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(default_config, f, indent=2)
# Create session state file
state_path = guardian_path / 'session_state.json'
with open(state_path, 'w', encoding='utf-8') as f:
json.dump({
"session_start": datetime.now().isoformat(),
"metrics": {},
"history": []
}, f, indent=2)
return guardian_path
def load_config(guardian_path: Path) -> Dict[str, Any]:
"""Load Guardian configuration."""
config_path = guardian_path / 'config.json'
if not config_path.exists():
# Return default config
return {
"enabled": True,
"sensitivity": {
"lines_threshold": 50,
"error_repeat_threshold": 3,
"file_churn_threshold": 5,
"correction_threshold": 3,
"context_warning_percent": 0.7
}
}
try:
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError, OSError, IOError):
return {"enabled": False}
def load_session_state(guardian_path: Path) -> Dict[str, Any]:
"""Load current session state."""
state_path = guardian_path / 'session_state.json'
if not state_path.exists():
return {
"session_start": datetime.now().isoformat(),
"metrics": {},
"history": []
}
try:
with open(state_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError, OSError, IOError):
return {
"session_start": datetime.now().isoformat(),
"metrics": {},
"history": []
}
def save_session_state(guardian_path: Path, state: Dict[str, Any]) -> None:
"""Save session state."""
state_path = guardian_path / 'session_state.json'
try:
with open(state_path, 'w', encoding='utf-8') as f:
json.dump(state, f, indent=2)
except (OSError, IOError) as e:
print(f"Warning: Failed to save session state: {e}", file=sys.stderr)
def track_code_written(state: Dict[str, Any], file_path: str, lines: int) -> None:
"""Track code writing event."""
MAX_EVENTS = 100 # Limit to prevent unbounded memory growth
if 'code_written' not in state['metrics']:
state['metrics']['code_written'] = {}
if file_path not in state['metrics']['code_written']:
state['metrics']['code_written'][file_path] = {
'total_lines': 0,
'events': []
}
state['metrics']['code_written'][file_path]['total_lines'] += lines
state['metrics']['code_written'][file_path]['events'].append({
'timestamp': datetime.now().isoformat(),
'lines': lines
})
# Keep only recent events
if len(state['metrics']['code_written'][file_path]['events']) > MAX_EVENTS:
state['metrics']['code_written'][file_path]['events'] = \
state['metrics']['code_written'][file_path]['events'][-MAX_EVENTS:]
def track_error(state: Dict[str, Any], error_message: str) -> None:
"""Track error occurrence."""
MAX_OCCURRENCES = 50 # Limit to prevent unbounded memory growth
if 'errors' not in state['metrics']:
state['metrics']['errors'] = {}
# Normalize error message (first line only)
error_key = error_message.split('\n')[0].strip()[:200]
if error_key not in state['metrics']['errors']:
state['metrics']['errors'][error_key] = {
'count': 0,
'first_seen': datetime.now().isoformat(),
'last_seen': None,
'occurrences': []
}
state['metrics']['errors'][error_key]['count'] += 1
state['metrics']['errors'][error_key]['last_seen'] = datetime.now().isoformat()
state['metrics']['errors'][error_key]['occurrences'].append({
'timestamp': datetime.now().isoformat()
})
# Keep only recent occurrences
if len(state['metrics']['errors'][error_key]['occurrences']) > MAX_OCCURRENCES:
state['metrics']['errors'][error_key]['occurrences'] = \
state['metrics']['errors'][error_key]['occurrences'][-MAX_OCCURRENCES:]
def track_file_edit(state: Dict[str, Any], file_path: str) -> None:
"""Track file edit event."""
MAX_TIMESTAMPS = 100 # Limit to prevent unbounded memory growth
if 'file_edits' not in state['metrics']:
state['metrics']['file_edits'] = {}
if file_path not in state['metrics']['file_edits']:
state['metrics']['file_edits'][file_path] = {
'count': 0,
'timestamps': []
}
state['metrics']['file_edits'][file_path]['count'] += 1
state['metrics']['file_edits'][file_path]['timestamps'].append(
datetime.now().isoformat()
)
# Keep only recent timestamps
if len(state['metrics']['file_edits'][file_path]['timestamps']) > MAX_TIMESTAMPS:
state['metrics']['file_edits'][file_path]['timestamps'] = \
state['metrics']['file_edits'][file_path]['timestamps'][-MAX_TIMESTAMPS:]
def track_correction(state: Dict[str, Any], message: str) -> None:
"""Track user correction event."""
MAX_CORRECTIONS = 100 # Limit to prevent unbounded memory growth
if 'corrections' not in state['metrics']:
state['metrics']['corrections'] = []
state['metrics']['corrections'].append({
'timestamp': datetime.now().isoformat(),
'message': message[:500] # Truncate to avoid storing too much
})
# Keep only recent corrections
if len(state['metrics']['corrections']) > MAX_CORRECTIONS:
state['metrics']['corrections'] = state['metrics']['corrections'][-MAX_CORRECTIONS:]
def check_code_volume_threshold(state: Dict[str, Any], config: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Check if code volume threshold is crossed."""
threshold = config.get('sensitivity', {}).get('lines_threshold', 50)
code_written = state['metrics'].get('code_written', {})
for file_path, data in code_written.items():
if data['total_lines'] >= threshold:
# Check if this file is in auto_review categories
auto_review = config.get('auto_review', {})
always_review = auto_review.get('always_review', [])
never_review = auto_review.get('never_review', [])
# Check never_review first
if any(keyword in file_path.lower() for keyword in never_review):
continue
# Check always_review or threshold
should_review = any(keyword in file_path.lower() for keyword in always_review)
if should_review or data['total_lines'] >= threshold:
return {
'trigger': 'code_volume',
'file': file_path,
'lines': data['total_lines'],
'threshold': threshold,
'priority': 'high' if should_review else 'medium'
}
return None
def check_repeated_errors(state: Dict[str, Any], config: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Check if same error is repeated."""
threshold = config.get('sensitivity', {}).get('error_repeat_threshold', 3)
errors = state['metrics'].get('errors', {})
for error_key, data in errors.items():
if data['count'] >= threshold:
return {
'trigger': 'repeated_errors',
'error': error_key,
'count': data['count'],
'threshold': threshold,
'priority': 'critical'
}
return None
def check_file_churn(state: Dict[str, Any], config: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Check if a file is being edited too frequently."""
threshold = config.get('sensitivity', {}).get('file_churn_threshold', 5)
time_window_minutes = 10
file_edits = state['metrics'].get('file_edits', {})
for file_path, data in file_edits.items():
timestamps = data['timestamps']
# Count edits in last 10 minutes
cutoff = datetime.now() - timedelta(minutes=time_window_minutes)
recent_edits = []
for ts_str in timestamps:
ts = parse_timestamp_with_tz(ts_str, cutoff)
if ts and ts >= cutoff:
recent_edits.append(ts_str)
if len(recent_edits) >= threshold:
return {
'trigger': 'file_churn',
'file': file_path,
'edits': len(recent_edits),
'time_window_minutes': time_window_minutes,
'threshold': threshold,
'priority': 'high'
}
return None
def check_repeated_corrections(state: Dict[str, Any], config: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Check if user is making repeated corrections."""
threshold = config.get('sensitivity', {}).get('correction_threshold', 3)
time_window_minutes = 30
corrections = state['metrics'].get('corrections', [])
# Count corrections in last 30 minutes
cutoff = datetime.now() - timedelta(minutes=time_window_minutes)
recent_corrections = []
for correction in corrections:
try:
ts = parse_timestamp_with_tz(correction['timestamp'], cutoff)
if ts and ts >= cutoff:
recent_corrections.append(correction)
except KeyError:
continue
if len(recent_corrections) >= threshold:
return {
'trigger': 'repeated_corrections',
'count': len(recent_corrections),
'time_window_minutes': time_window_minutes,
'threshold': threshold,
'priority': 'critical'
}
return None
def calculate_session_health(state: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]:
"""Calculate overall session health score."""
health_score = 100
recommendations = []
# Check error rate
errors = state['metrics'].get('errors', {})
total_errors = sum(data['count'] for data in errors.values())
try:
session_start = datetime.fromisoformat(state['session_start'])
now = datetime.now(session_start.tzinfo) if session_start.tzinfo else datetime.now()
session_duration_minutes = max(1, (now - session_start).total_seconds() / 60)
error_rate = total_errors / session_duration_minutes
except (ValueError, TypeError):
error_rate = 0
session_duration_minutes = 0
if error_rate > 0.5: # More than 1 error per 2 minutes
health_score -= 20
recommendations.append("High error rate - consider taking a break or reassessing approach")
# Check correction rate
corrections = state['metrics'].get('corrections', [])
correction_rate = len(corrections) / max(1, session_duration_minutes)
if correction_rate > 0.1: # More than 1 correction per 10 minutes
health_score -= 15
recommendations.append("Frequent corrections - session may be going off track")
# Check file churn
file_edits = state['metrics'].get('file_edits', {})
for file_path, data in file_edits.items():
if data['count'] > 5:
health_score -= 10
recommendations.append(f"High churn on {file_path} - consider taking a break from this file")
break
# Check repeated errors
for error_key, data in errors.items():
if data['count'] >= 3:
health_score -= 20
recommendations.append(f"Repeated error: {error_key[:50]}... - approach may be fundamentally wrong")
break
return {
'score': max(0, health_score),
'session_duration_minutes': int(session_duration_minutes),
'total_errors': total_errors,
'total_corrections': len(corrections),
'recommendations': recommendations
}
def check_triggers(state: Dict[str, Any], config: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Check all triggers and return those that fired."""
triggered = []
# Check each trigger
trigger = check_code_volume_threshold(state, config)
if trigger:
triggered.append(trigger)
trigger = check_repeated_errors(state, config)
if trigger:
triggered.append(trigger)
trigger = check_file_churn(state, config)
if trigger:
triggered.append(trigger)
trigger = check_repeated_corrections(state, config)
if trigger:
triggered.append(trigger)
return triggered
def main():
parser = argparse.ArgumentParser(
description='Guardian session health monitor',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'--event',
choices=['code-written', 'error', 'file-edit', 'correction'],
help='Type of event to track'
)
parser.add_argument(
'--file',
help='File path (for code-written and file-edit events)'
)
parser.add_argument(
'--lines',
type=int,
help='Number of lines written (for code-written events)'
)
parser.add_argument(
'--message',
help='Error or correction message'
)
parser.add_argument(
'--check-health',
action='store_true',
help='Check current session health'
)
parser.add_argument(
'--check-triggers',
action='store_true',
help='Check if any triggers have fired'
)
parser.add_argument(
'--reset',
action='store_true',
help='Reset session metrics'
)
parser.add_argument(
'--init',
action='store_true',
help='Initialize Guardian for this project'
)
args = parser.parse_args()
# Initialize Guardian if requested
if args.init:
guardian_path = init_guardian_if_needed()
print(f"Guardian initialized at {guardian_path}")
sys.exit(0)
# Find or create Guardian directory
guardian_path = find_guardian_root()
if not guardian_path:
guardian_path = init_guardian_if_needed()
# Load config and state
config = load_config(guardian_path)
if not config.get('enabled', False):
print("Guardian is disabled in config", file=sys.stderr)
sys.exit(0)
state = load_session_state(guardian_path)
# Handle reset
if args.reset:
state = {
"session_start": datetime.now().isoformat(),
"metrics": {},
"history": []
}
save_session_state(guardian_path, state)
print("Session metrics reset")
sys.exit(0)
# Handle health check
if args.check_health:
health = calculate_session_health(state, config)
print(json.dumps(health, indent=2))
sys.exit(0)
# Handle trigger check
if args.check_triggers:
triggers = check_triggers(state, config)
print(json.dumps(triggers, indent=2))
sys.exit(0)
# Handle event tracking
if args.event:
if args.event == 'code-written':
if not args.file or args.lines is None:
print("Error: --file and --lines required for code-written event", file=sys.stderr)
sys.exit(1)
track_code_written(state, args.file, args.lines)
elif args.event == 'error':
if not args.message:
print("Error: --message required for error event", file=sys.stderr)
sys.exit(1)
track_error(state, args.message)
elif args.event == 'file-edit':
if not args.file:
print("Error: --file required for file-edit event", file=sys.stderr)
sys.exit(1)
track_file_edit(state, args.file)
elif args.event == 'correction':
if not args.message:
print("Error: --message required for correction event", file=sys.stderr)
sys.exit(1)
track_correction(state, args.message)
# Save updated state
save_session_state(guardian_path, state)
# Check if any triggers fired
triggers = check_triggers(state, config)
if triggers:
print(json.dumps({'event_recorded': True, 'triggers': triggers}, indent=2))
else:
print(json.dumps({'event_recorded': True}, indent=2))
else:
parser.print_help()
sys.exit(1)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,303 @@
#!/usr/bin/env python3
"""
Guardian Template Loader
Loads and applies Guardian review/planning templates for consistent,
structured agent interactions.
Usage:
# List available templates
python template_loader.py --list
# Load a template
python template_loader.py --template security_review
# Load template and apply to context
python template_loader.py --template security_review --file auth.py --output prompt.txt
# Create custom template
python template_loader.py --create my_review --based-on security_review
"""
import os
import sys
import json
import argparse
from pathlib import Path
from typing import Dict, List, Optional, Any
def find_templates_dir() -> Path:
"""Find the Guardian Templates directory."""
# First try relative to this script
script_dir = Path(__file__).parent
templates_dir = script_dir.parent / 'Templates'
if templates_dir.exists():
return templates_dir
# Try from current directory
templates_dir = Path.cwd() / 'skills' / 'guardian' / 'Templates'
if templates_dir.exists():
return templates_dir
raise FileNotFoundError("Guardian Templates directory not found")
def list_templates() -> List[Dict[str, str]]:
"""List all available Guardian templates.
Returns:
List of template metadata dictionaries
"""
templates_dir = find_templates_dir()
templates = []
for template_file in templates_dir.glob('*.json'):
try:
with open(template_file, 'r', encoding='utf-8') as f:
template = json.load(f)
templates.append({
'name': template.get('name', template_file.stem),
'description': template.get('description', 'No description'),
'task_type': template.get('task_type', 'unknown'),
'file': str(template_file)
})
except (json.JSONDecodeError, OSError, IOError):
continue
return templates
def load_template(template_name: str) -> Dict[str, Any]:
"""Load a Guardian template by name.
Args:
template_name: Name of the template (with or without .json extension)
Returns:
Template configuration dictionary
"""
templates_dir = find_templates_dir()
# Remove .json extension if provided
if template_name.endswith('.json'):
template_name = template_name[:-5]
template_file = templates_dir / f"{template_name}.json"
if not template_file.exists():
raise FileNotFoundError(f"Template not found: {template_name}")
try:
with open(template_file, 'r', encoding='utf-8') as f:
return json.load(f)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid template JSON: {e}")
def apply_template_to_context(
template: Dict[str, Any],
context: str
) -> str:
"""Apply a template to extracted context.
Args:
template: Template configuration
context: Extracted minimal context
Returns:
Formatted agent prompt
"""
prompt_template = template.get('agent_prompt_template', '')
# Replace context placeholder
prompt = prompt_template.replace('{context}', context)
return prompt
def get_template_config(template: Dict[str, Any]) -> Dict[str, Any]:
"""Extract configuration parameters from template.
Args:
template: Template configuration
Returns:
Configuration dictionary for context_filter.py
"""
return {
'task_type': template.get('task_type', 'review'),
'focus': template.get('focus', ''),
'oracle_categories': template.get('oracle_categories', ['patterns', 'gotchas']),
'oracle_tags': template.get('oracle_tags_required', []),
'max_patterns': template.get('max_oracle_patterns', 5),
'max_gotchas': template.get('max_oracle_gotchas', 5),
'validation_rules': template.get('validation_rules', {})
}
def create_custom_template(
name: str,
base_template: Optional[str] = None,
description: str = "",
task_type: str = "review"
) -> Path:
"""Create a new custom template.
Args:
name: Name for the new template
base_template: Optional template to base this on
description: Template description
task_type: Type of task (review, plan, debug)
Returns:
Path to the created template file
"""
templates_dir = find_templates_dir()
if base_template:
# Load base template
base = load_template(base_template)
new_template = base.copy()
new_template['name'] = name
if description:
new_template['description'] = description
else:
# Create minimal template
new_template = {
"name": name,
"description": description or f"Custom {task_type} template",
"task_type": task_type,
"focus": "",
"agent_prompt_template": "You are a READ-ONLY code reviewer for Guardian.\n\nCRITICAL CONSTRAINTS:\n- DO NOT modify any files\n- ONLY read and analyze\n\n{context}\n\nReturn suggestions as JSON array.",
"oracle_categories": ["patterns", "gotchas"],
"oracle_tags_required": [],
"max_oracle_patterns": 5,
"max_oracle_gotchas": 5,
"always_include_files": [],
"validation_rules": {
"min_confidence": 0.5,
"block_contradictions": true
}
}
# Save template
template_file = templates_dir / f"{name}.json"
with open(template_file, 'w', encoding='utf-8') as f:
json.dump(new_template, f, indent=2)
return template_file
def main():
parser = argparse.ArgumentParser(
description='Guardian template loader and manager',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'--list',
action='store_true',
help='List all available templates'
)
parser.add_argument(
'--template',
help='Template name to load'
)
parser.add_argument(
'--file',
help='File to apply template to (used with --output)'
)
parser.add_argument(
'--output',
help='Output file for generated prompt'
)
parser.add_argument(
'--create',
help='Create a new custom template with this name'
)
parser.add_argument(
'--based-on',
help='Base the new template on an existing one'
)
parser.add_argument(
'--description',
help='Description for new template'
)
parser.add_argument(
'--show-config',
action='store_true',
help='Show template configuration (for use with context_filter.py)'
)
args = parser.parse_args()
# List templates
if args.list:
templates = list_templates()
print("Available Guardian Templates:")
print("=" * 60)
for template in templates:
print(f"\nName: {template['name']}")
print(f" Type: {template['task_type']}")
print(f" Description: {template['description']}")
print(f" File: {template['file']}")
print("\n" + "=" * 60)
print(f"Total: {len(templates)} templates")
sys.exit(0)
# Create template
if args.create:
template_file = create_custom_template(
args.create,
args.based_on,
args.description or "",
"review"
)
print(f"Created template: {template_file}")
print("Edit this file to customize your template")
sys.exit(0)
# Load template
if args.template:
template = load_template(args.template)
if args.show_config:
config = get_template_config(template)
print(json.dumps(config, indent=2))
sys.exit(0)
if args.output and args.file:
# Apply template to file
# This would integrate with context_filter.py
print(f"Applying template '{args.template}' to {args.file}...")
print(f"Output will be saved to: {args.output}")
print("\nTo apply this template:")
print(f" 1. Run context_filter.py with template config")
print(f" 2. Apply template to extracted context")
print(f" 3. Save to {args.output}")
else:
# Just show template
print(json.dumps(template, indent=2))
sys.exit(0)
parser.print_help()
sys.exit(1)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,586 @@
#!/usr/bin/env python3
"""
Guardian Suggestion Validator
Cross-checks subagent suggestions against Oracle knowledge before presenting to user.
Key Principle: "Subagent might be missing important codebase context - need validation layer"
This script:
1. Validates suggestions against Oracle patterns
2. Detects contradictions with known good practices
3. Checks rejection history for similar suggestions
4. Calculates confidence scores
5. Flags suggestions that contradict Oracle knowledge
Usage:
# Validate a single suggestion
python validator.py --suggestion "Use MD5 for password hashing" --category security
# Validate multiple suggestions from JSON
python validator.py --suggestions-file suggestions.json
# Check rejection history
python validator.py --check-rejection "Add rate limiting to endpoint"
Environment Variables:
ORACLE_PATH: Path to Oracle directory [default: .oracle]
GUARDIAN_PATH: Path to Guardian directory [default: .guardian]
"""
import os
import sys
import json
import argparse
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
import re
from datetime import datetime, timedelta
def find_oracle_root() -> Optional[Path]:
"""Find the .oracle directory."""
current = Path.cwd()
while current != current.parent:
oracle_path = current / '.oracle'
if oracle_path.exists():
return oracle_path
current = current.parent
return None
def find_guardian_root() -> Optional[Path]:
"""Find the .guardian directory."""
current = Path.cwd()
while current != current.parent:
guardian_path = current / '.guardian'
if guardian_path.exists():
return guardian_path
current = current.parent
return None
def load_oracle_knowledge(oracle_path: Path) -> List[Dict[str, Any]]:
"""Load all Oracle knowledge."""
knowledge_dir = oracle_path / 'knowledge'
all_knowledge: List[Dict[str, Any]] = []
categories = ['patterns', 'preferences', 'gotchas', 'solutions', 'corrections']
for category in categories:
file_path = knowledge_dir / f'{category}.json'
if file_path.exists():
try:
with open(file_path, 'r', encoding='utf-8') as f:
entries = json.load(f)
for entry in entries:
if isinstance(entry, dict):
entry['_category'] = category
all_knowledge.append(entry)
except (json.JSONDecodeError, FileNotFoundError, OSError, IOError):
continue
return all_knowledge
def load_rejection_history(guardian_path: Path) -> List[Dict[str, Any]]:
"""Load rejection history from Guardian."""
history_file = guardian_path / 'rejection_history.json'
if not history_file.exists():
return []
try:
with open(history_file, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError, OSError, IOError):
return []
def save_rejection_history(guardian_path: Path, history: List[Dict[str, Any]]) -> None:
"""Save rejection history to Guardian."""
history_file = guardian_path / 'rejection_history.json'
try:
with open(history_file, 'w', encoding='utf-8') as f:
json.dump(history, f, indent=2)
except (OSError, IOError) as e:
print(f"Warning: Failed to save rejection history: {e}", file=sys.stderr)
def load_acceptance_stats(guardian_path: Path) -> Dict[str, Any]:
"""Load acceptance rate statistics."""
stats_file = guardian_path / 'acceptance_stats.json'
if not stats_file.exists():
return {
'by_category': {},
'by_type': {},
'overall': {
'accepted': 0,
'rejected': 0,
'rate': 0.0
}
}
try:
with open(stats_file, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError, OSError, IOError):
return {'by_category': {}, 'by_type': {}, 'overall': {'accepted': 0, 'rejected': 0, 'rate': 0.0}}
def check_contradiction_with_patterns(
suggestion: str,
oracle_knowledge: List[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""Check if suggestion contradicts known Oracle patterns.
Args:
suggestion: Suggestion text
oracle_knowledge: All Oracle knowledge
Returns:
Contradiction details if found, None otherwise
"""
# Get patterns and gotchas
patterns = [k for k in oracle_knowledge if k.get('_category') == 'patterns']
gotchas = [k for k in oracle_knowledge if k.get('_category') == 'gotchas']
suggestion_lower = suggestion.lower()
# Check against patterns (high priority ones)
for pattern in patterns:
if pattern.get('priority') not in ['critical', 'high']:
continue
title = pattern.get('title', '').lower()
content = pattern.get('content', '').lower()
# Look for direct contradictions
# Example: suggestion says "use MD5" but pattern says "never use MD5"
if 'never' in content or 'don\'t' in content or 'avoid' in content:
# Extract what not to do
words = re.findall(r'\b\w+\b', suggestion_lower)
for word in words:
if len(word) > 3 and word in content:
# Potential contradiction
return {
'type': 'pattern_contradiction',
'pattern': pattern.get('title', 'Unknown pattern'),
'pattern_content': pattern.get('content', ''),
'priority': pattern.get('priority', 'medium'),
'confidence': 0.8
}
# Check against gotchas (critical warnings)
for gotcha in gotchas:
title = gotcha.get('title', '').lower()
content = gotcha.get('content', '').lower()
# Check if suggestion relates to a known gotcha
words = re.findall(r'\b\w+\b', suggestion_lower)
common_words = set(words) & set(re.findall(r'\b\w+\b', content))
if len(common_words) > 3:
return {
'type': 'gotcha_warning',
'gotcha': gotcha.get('title', 'Unknown gotcha'),
'gotcha_content': gotcha.get('content', ''),
'priority': gotcha.get('priority', 'high'),
'confidence': 0.6
}
return None
def check_rejection_history(
suggestion: str,
rejection_history: List[Dict[str, Any]],
similarity_threshold: float = 0.6
) -> Optional[Dict[str, Any]]:
"""Check if similar suggestion was previously rejected.
Args:
suggestion: Suggestion text
rejection_history: List of previously rejected suggestions
similarity_threshold: Minimum similarity to consider a match
Returns:
Rejection details if found, None otherwise
"""
suggestion_words = set(re.findall(r'\b\w+\b', suggestion.lower()))
for rejection in rejection_history:
rejected_text = rejection.get('suggestion', '').lower()
rejected_words = set(re.findall(r'\b\w+\b', rejected_text))
# Calculate Jaccard similarity
if len(suggestion_words) == 0 or len(rejected_words) == 0:
continue
intersection = suggestion_words & rejected_words
union = suggestion_words | rejected_words
similarity = len(intersection) / len(union) if len(union) > 0 else 0.0
if similarity >= similarity_threshold:
return {
'type': 'previously_rejected',
'rejected_suggestion': rejection.get('suggestion', ''),
'rejection_reason': rejection.get('reason', 'No reason provided'),
'rejected_date': rejection.get('timestamp', 'Unknown'),
'similarity': similarity,
'confidence': 0.3 # Low confidence due to previous rejection
}
return None
def calculate_acceptance_rate(
category: str,
acceptance_stats: Dict[str, Any]
) -> float:
"""Calculate acceptance rate for a category.
Args:
category: Suggestion category
acceptance_stats: Acceptance statistics
Returns:
Acceptance rate (0.0 to 1.0)
"""
by_category = acceptance_stats.get('by_category', {})
if category not in by_category:
# No history, return neutral rate
return 0.5
stats = by_category[category]
accepted = stats.get('accepted', 0)
rejected = stats.get('rejected', 0)
total = accepted + rejected
if total == 0:
return 0.5
return accepted / total
def validate_suggestion(
suggestion: str,
category: str,
oracle_knowledge: List[Dict[str, Any]],
rejection_history: List[Dict[str, Any]],
acceptance_stats: Dict[str, Any]
) -> Dict[str, Any]:
"""Validate a suggestion against Oracle knowledge.
Args:
suggestion: Suggestion text
category: Suggestion category (security, performance, etc.)
oracle_knowledge: All Oracle knowledge
rejection_history: Previous rejections
acceptance_stats: Acceptance rate statistics
Returns:
Validation result with confidence score and warnings
"""
result = {
'suggestion': suggestion,
'category': category,
'confidence': 0.5, # Start neutral
'warnings': [],
'should_present': True,
'notes': []
}
# Check for contradictions with Oracle patterns
contradiction = check_contradiction_with_patterns(suggestion, oracle_knowledge)
if contradiction:
result['confidence'] = contradiction['confidence']
result['warnings'].append({
'severity': 'high',
'type': contradiction['type'],
'message': f"Contradicts Oracle {contradiction['type']}: {contradiction.get('pattern', contradiction.get('gotcha', 'Unknown'))}",
'details': contradiction
})
if contradiction.get('priority') == 'critical':
result['should_present'] = False
result['notes'].append("BLOCKED: Contradicts critical Oracle pattern")
# Check rejection history
previous_rejection = check_rejection_history(suggestion, rejection_history)
if previous_rejection:
result['confidence'] = min(result['confidence'], previous_rejection['confidence'])
result['warnings'].append({
'severity': 'medium',
'type': 'previously_rejected',
'message': f"Similar suggestion rejected before ({previous_rejection['similarity']:.0%} similar)",
'details': previous_rejection
})
result['notes'].append(f"Previous rejection reason: {previous_rejection['rejection_reason']}")
# Calculate confidence from acceptance rate
acceptance_rate = calculate_acceptance_rate(category, acceptance_stats)
# Adjust confidence based on historical acceptance
if not result['warnings']:
# No warnings, use acceptance rate
result['confidence'] = acceptance_rate
else:
# Has warnings, blend with acceptance rate (60% warning, 40% acceptance)
result['confidence'] = result['confidence'] * 0.6 + acceptance_rate * 0.4
# Add acceptance rate note
result['notes'].append(f"Historical acceptance rate for {category}: {acceptance_rate:.0%}")
# Final decision on whether to present
if result['confidence'] < 0.3:
result['should_present'] = False
result['notes'].append("Confidence too low - suggestion blocked")
return result
def validate_multiple_suggestions(
suggestions: List[Dict[str, str]],
oracle_path: Optional[Path] = None,
guardian_path: Optional[Path] = None
) -> List[Dict[str, Any]]:
"""Validate multiple suggestions.
Args:
suggestions: List of suggestion dictionaries with 'text' and 'category' keys
oracle_path: Path to Oracle directory
guardian_path: Path to Guardian directory
Returns:
List of validation results
"""
# Load Oracle knowledge
oracle_knowledge = []
if oracle_path:
oracle_knowledge = load_oracle_knowledge(oracle_path)
# Load Guardian data
rejection_history = []
acceptance_stats = {'by_category': {}, 'by_type': {}, 'overall': {'accepted': 0, 'rejected': 0, 'rate': 0.0}}
if guardian_path:
rejection_history = load_rejection_history(guardian_path)
acceptance_stats = load_acceptance_stats(guardian_path)
# Validate each suggestion
results = []
for suggestion_dict in suggestions:
suggestion_text = suggestion_dict.get('text', '')
category = suggestion_dict.get('category', 'general')
result = validate_suggestion(
suggestion_text,
category,
oracle_knowledge,
rejection_history,
acceptance_stats
)
results.append(result)
return results
def record_rejection(
guardian_path: Path,
suggestion: str,
reason: str,
category: str
) -> None:
"""Record a rejected suggestion for future reference.
Args:
guardian_path: Path to Guardian directory
suggestion: Rejected suggestion text
reason: Reason for rejection
category: Suggestion category
"""
rejection_history = load_rejection_history(guardian_path)
rejection_history.append({
'suggestion': suggestion,
'reason': reason,
'category': category,
'timestamp': datetime.now().isoformat()
})
# Keep only last 100 rejections
if len(rejection_history) > 100:
rejection_history = rejection_history[-100:]
save_rejection_history(guardian_path, rejection_history)
def update_acceptance_stats(
guardian_path: Path,
category: str,
accepted: bool
) -> None:
"""Update acceptance rate statistics.
Args:
guardian_path: Path to Guardian directory
category: Suggestion category
accepted: Whether the suggestion was accepted
"""
stats_file = guardian_path / 'acceptance_stats.json'
stats = load_acceptance_stats(guardian_path)
# Update category stats
if category not in stats['by_category']:
stats['by_category'][category] = {'accepted': 0, 'rejected': 0}
if accepted:
stats['by_category'][category]['accepted'] += 1
stats['overall']['accepted'] += 1
else:
stats['by_category'][category]['rejected'] += 1
stats['overall']['rejected'] += 1
# Recalculate overall rate
total = stats['overall']['accepted'] + stats['overall']['rejected']
stats['overall']['rate'] = stats['overall']['accepted'] / total if total > 0 else 0.0
# Recalculate category rates
for cat, cat_stats in stats['by_category'].items():
cat_total = cat_stats['accepted'] + cat_stats['rejected']
cat_stats['rate'] = cat_stats['accepted'] / cat_total if cat_total > 0 else 0.0
try:
with open(stats_file, 'w', encoding='utf-8') as f:
json.dump(stats, f, indent=2)
except (OSError, IOError) as e:
print(f"Warning: Failed to save acceptance stats: {e}", file=sys.stderr)
def main():
parser = argparse.ArgumentParser(
description='Validate Guardian suggestions against Oracle knowledge',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'--suggestion',
help='Single suggestion to validate'
)
parser.add_argument(
'--category',
default='general',
help='Suggestion category (security, performance, style, etc.)'
)
parser.add_argument(
'--suggestions-file',
help='JSON file with multiple suggestions to validate'
)
parser.add_argument(
'--record-rejection',
help='Record a rejected suggestion'
)
parser.add_argument(
'--rejection-reason',
help='Reason for rejection (used with --record-rejection)'
)
parser.add_argument(
'--update-stats',
choices=['accept', 'reject'],
help='Update acceptance statistics'
)
parser.add_argument(
'--check-rejection',
help='Check if a suggestion was previously rejected'
)
args = parser.parse_args()
# Find Oracle and Guardian
oracle_path = find_oracle_root()
guardian_path = find_guardian_root()
if not oracle_path and not guardian_path:
print("Warning: Neither Oracle nor Guardian initialized", file=sys.stderr)
# Handle check rejection
if args.check_rejection:
if not guardian_path:
print(json.dumps({'found': False, 'message': 'Guardian not initialized'}))
sys.exit(0)
rejection_history = load_rejection_history(guardian_path)
result = check_rejection_history(args.check_rejection, rejection_history)
if result:
print(json.dumps({'found': True, 'details': result}, indent=2))
else:
print(json.dumps({'found': False}))
sys.exit(0)
# Handle record rejection
if args.record_rejection:
if not guardian_path:
print("Error: Guardian not initialized", file=sys.stderr)
sys.exit(1)
if not args.rejection_reason:
print("Error: --rejection-reason required", file=sys.stderr)
sys.exit(1)
record_rejection(guardian_path, args.record_rejection, args.rejection_reason, args.category)
print(json.dumps({'recorded': True}))
sys.exit(0)
# Handle update stats
if args.update_stats:
if not guardian_path:
print("Error: Guardian not initialized", file=sys.stderr)
sys.exit(1)
update_acceptance_stats(guardian_path, args.category, args.update_stats == 'accept')
print(json.dumps({'updated': True}))
sys.exit(0)
# Handle single suggestion validation
if args.suggestion:
suggestions = [{'text': args.suggestion, 'category': args.category}]
results = validate_multiple_suggestions(suggestions, oracle_path, guardian_path)
print(json.dumps(results[0], indent=2))
sys.exit(0)
# Handle suggestions file
if args.suggestions_file:
try:
with open(args.suggestions_file, 'r', encoding='utf-8') as f:
suggestions = json.load(f)
except (json.JSONDecodeError, FileNotFoundError, OSError, IOError) as e:
print(f"Error: Failed to load suggestions file: {e}", file=sys.stderr)
sys.exit(1)
results = validate_multiple_suggestions(suggestions, oracle_path, guardian_path)
print(json.dumps(results, indent=2))
sys.exit(0)
parser.print_help()
sys.exit(1)
if __name__ == '__main__':
main()