Initial commit
This commit is contained in:
302
skills/meta-automation-architect/scripts/agent_reuse.py
Normal file
302
skills/meta-automation-architect/scripts/agent_reuse.py
Normal file
@@ -0,0 +1,302 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Agent Reuse Manager
|
||||
Avoids regenerating automation for similar projects
|
||||
Reuses successful configurations
|
||||
"""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Optional
|
||||
from difflib import SequenceMatcher
|
||||
|
||||
class AgentReuseManager:
|
||||
"""Manages reuse of automation configurations"""
|
||||
|
||||
def __init__(self, storage_path: str = ".claude/meta-automation/configurations"):
|
||||
self.storage_dir = Path(storage_path)
|
||||
self.storage_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.index_path = self.storage_dir / "index.json"
|
||||
self.index = self._load_index()
|
||||
|
||||
def _load_index(self) -> Dict:
|
||||
"""Load configuration index"""
|
||||
if self.index_path.exists():
|
||||
try:
|
||||
with open(self.index_path, 'r') as f:
|
||||
return json.load(f)
|
||||
except:
|
||||
return {'configurations': []}
|
||||
return {'configurations': []}
|
||||
|
||||
def _save_index(self):
|
||||
"""Save configuration index"""
|
||||
with open(self.index_path, 'w') as f:
|
||||
json.dump(self.index, f, indent=2)
|
||||
|
||||
def save_configuration(self, config: Dict) -> str:
|
||||
"""
|
||||
Save a successful automation configuration
|
||||
|
||||
Args:
|
||||
config: {
|
||||
'project_type': str,
|
||||
'project_name': str,
|
||||
'tech_stack': List[str],
|
||||
'agents_used': List[str],
|
||||
'skills_generated': List[str],
|
||||
'commands_generated': List[str],
|
||||
'hooks_generated': List[str],
|
||||
'success_metrics': Dict,
|
||||
'user_satisfaction': int (1-5)
|
||||
}
|
||||
|
||||
Returns:
|
||||
Configuration ID
|
||||
"""
|
||||
config_id = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
|
||||
# Add metadata
|
||||
config_with_meta = {
|
||||
**config,
|
||||
'config_id': config_id,
|
||||
'created_at': datetime.now().isoformat(),
|
||||
'reuse_count': 0
|
||||
}
|
||||
|
||||
# Save full configuration
|
||||
config_path = self.storage_dir / f"{config_id}.json"
|
||||
with open(config_path, 'w') as f:
|
||||
json.dump(config_with_meta, f, indent=2)
|
||||
|
||||
# Update index
|
||||
self.index['configurations'].append({
|
||||
'config_id': config_id,
|
||||
'project_type': config['project_type'],
|
||||
'project_name': config.get('project_name', 'unknown'),
|
||||
'tech_stack': config.get('tech_stack', []),
|
||||
'created_at': config_with_meta['created_at'],
|
||||
'reuse_count': 0
|
||||
})
|
||||
self._save_index()
|
||||
|
||||
return config_id
|
||||
|
||||
def find_similar_configurations(self, project_info: Dict, min_similarity: float = 0.7) -> List[Dict]:
|
||||
"""
|
||||
Find similar configurations that could be reused
|
||||
|
||||
Args:
|
||||
project_info: {
|
||||
'project_type': str,
|
||||
'tech_stack': List[str],
|
||||
'existing_tools': List[str]
|
||||
}
|
||||
min_similarity: Minimum similarity score (0-1)
|
||||
|
||||
Returns:
|
||||
List of similar configurations sorted by similarity
|
||||
"""
|
||||
similar = []
|
||||
|
||||
for config_ref in self.index['configurations']:
|
||||
config = self._load_configuration(config_ref['config_id'])
|
||||
if not config:
|
||||
continue
|
||||
|
||||
similarity = self._calculate_similarity(project_info, config)
|
||||
|
||||
if similarity >= min_similarity:
|
||||
similar.append({
|
||||
**config_ref,
|
||||
'similarity': round(similarity, 2),
|
||||
'full_config': config
|
||||
})
|
||||
|
||||
# Sort by similarity (descending)
|
||||
similar.sort(key=lambda x: x['similarity'], reverse=True)
|
||||
|
||||
return similar
|
||||
|
||||
def _calculate_similarity(self, project_info: Dict, config: Dict) -> float:
|
||||
"""
|
||||
Calculate similarity between project and configuration
|
||||
|
||||
Returns:
|
||||
Similarity score 0-1
|
||||
"""
|
||||
score = 0.0
|
||||
weights = {
|
||||
'project_type': 0.4,
|
||||
'tech_stack': 0.4,
|
||||
'size': 0.2
|
||||
}
|
||||
|
||||
# Project type match
|
||||
if project_info.get('project_type') == config.get('project_type'):
|
||||
score += weights['project_type']
|
||||
|
||||
# Tech stack similarity
|
||||
project_stack = set(project_info.get('tech_stack', []))
|
||||
config_stack = set(config.get('tech_stack', []))
|
||||
|
||||
if project_stack and config_stack:
|
||||
intersection = len(project_stack & config_stack)
|
||||
union = len(project_stack | config_stack)
|
||||
tech_similarity = intersection / union if union > 0 else 0
|
||||
score += weights['tech_stack'] * tech_similarity
|
||||
|
||||
return min(score, 1.0)
|
||||
|
||||
def reuse_configuration(self, config_id: str) -> Dict:
|
||||
"""
|
||||
Reuse a configuration
|
||||
|
||||
Args:
|
||||
config_id: ID of configuration to reuse
|
||||
|
||||
Returns:
|
||||
Configuration to apply
|
||||
"""
|
||||
config = self._load_configuration(config_id)
|
||||
if not config:
|
||||
return None
|
||||
|
||||
# Increment reuse count
|
||||
config['reuse_count'] += 1
|
||||
self._save_configuration(config_id, config)
|
||||
|
||||
# Update index
|
||||
for cfg in self.index['configurations']:
|
||||
if cfg['config_id'] == config_id:
|
||||
cfg['reuse_count'] += 1
|
||||
self._save_index()
|
||||
|
||||
return config
|
||||
|
||||
def get_reuse_recommendation(self, project_info: Dict) -> Optional[Dict]:
|
||||
"""
|
||||
Get recommendation for reusing a configuration
|
||||
|
||||
Args:
|
||||
project_info: Information about current project
|
||||
|
||||
Returns:
|
||||
Recommendation or None if no good match
|
||||
"""
|
||||
similar = self.find_similar_configurations(project_info, min_similarity=0.75)
|
||||
|
||||
if not similar:
|
||||
return None
|
||||
|
||||
best_match = similar[0]
|
||||
|
||||
return {
|
||||
'recommended': True,
|
||||
'config_id': best_match['config_id'],
|
||||
'similarity': best_match['similarity'],
|
||||
'project_name': best_match['project_name'],
|
||||
'created_at': best_match['created_at'],
|
||||
'reuse_count': best_match['reuse_count'],
|
||||
'time_saved': '5-10 minutes (no need to regenerate)',
|
||||
'agents': best_match['full_config']['agents_used'],
|
||||
'skills': best_match['full_config']['skills_generated'],
|
||||
'reason': f"This configuration was successful for a similar {best_match['project_type']} project"
|
||||
}
|
||||
|
||||
def _load_configuration(self, config_id: str) -> Optional[Dict]:
|
||||
"""Load a configuration file"""
|
||||
config_path = self.storage_dir / f"{config_id}.json"
|
||||
if not config_path.exists():
|
||||
return None
|
||||
|
||||
try:
|
||||
with open(config_path, 'r') as f:
|
||||
return json.load(f)
|
||||
except:
|
||||
return None
|
||||
|
||||
def _save_configuration(self, config_id: str, config: Dict):
|
||||
"""Save a configuration file"""
|
||||
config_path = self.storage_dir / f"{config_id}.json"
|
||||
with open(config_path, 'w') as f:
|
||||
json.dump(config, f, indent=2)
|
||||
|
||||
def get_statistics(self) -> Dict:
|
||||
"""Get reuse statistics"""
|
||||
total_configs = len(self.index['configurations'])
|
||||
total_reuses = sum(cfg['reuse_count'] for cfg in self.index['configurations'])
|
||||
|
||||
project_types = {}
|
||||
for cfg in self.index['configurations']:
|
||||
ptype = cfg['project_type']
|
||||
if ptype not in project_types:
|
||||
project_types[ptype] = 0
|
||||
project_types[ptype] += 1
|
||||
|
||||
return {
|
||||
'total_configurations': total_configs,
|
||||
'total_reuses': total_reuses,
|
||||
'average_reuses': round(total_reuses / total_configs, 1) if total_configs > 0 else 0,
|
||||
'project_types': project_types,
|
||||
'most_reused': sorted(
|
||||
self.index['configurations'],
|
||||
key=lambda x: x['reuse_count'],
|
||||
reverse=True
|
||||
)[:3]
|
||||
}
|
||||
|
||||
# Example usage
|
||||
if __name__ == '__main__':
|
||||
manager = AgentReuseManager()
|
||||
|
||||
# Save a configuration
|
||||
print("Saving successful configuration...")
|
||||
config_id = manager.save_configuration({
|
||||
'project_type': 'programming',
|
||||
'project_name': 'my-web-app',
|
||||
'tech_stack': ['TypeScript', 'React', 'Next.js'],
|
||||
'agents_used': ['project-analyzer', 'security-analyzer', 'test-coverage-analyzer'],
|
||||
'skills_generated': ['security-scanner', 'test-generator'],
|
||||
'commands_generated': ['/security-check', '/generate-tests'],
|
||||
'hooks_generated': ['pre-commit-security'],
|
||||
'success_metrics': {
|
||||
'time_saved': 50,
|
||||
'issues_prevented': 3
|
||||
},
|
||||
'user_satisfaction': 5
|
||||
})
|
||||
|
||||
print(f"Saved configuration: {config_id}\n")
|
||||
|
||||
# Find similar
|
||||
print("Finding similar configurations for new project...")
|
||||
similar = manager.find_similar_configurations({
|
||||
'project_type': 'programming',
|
||||
'tech_stack': ['TypeScript', 'React', 'Vite'], # Similar but not exact
|
||||
'existing_tools': ['ESLint']
|
||||
})
|
||||
|
||||
print(f"Found {len(similar)} similar configurations\n")
|
||||
|
||||
if similar:
|
||||
print("Best match:")
|
||||
print(json.dumps({
|
||||
'config_id': similar[0]['config_id'],
|
||||
'similarity': similar[0]['similarity'],
|
||||
'project_name': similar[0]['project_name']
|
||||
}, indent=2))
|
||||
|
||||
# Get recommendation
|
||||
print("\nRecommendation:")
|
||||
rec = manager.get_reuse_recommendation({
|
||||
'project_type': 'programming',
|
||||
'tech_stack': ['TypeScript', 'React', 'Vite']
|
||||
})
|
||||
print(json.dumps(rec, indent=2))
|
||||
|
||||
# Statistics
|
||||
print("\nReuse Statistics:")
|
||||
stats = manager.get_statistics()
|
||||
print(json.dumps(stats, indent=2))
|
||||
@@ -0,0 +1,198 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Simple Project Metrics Collector
|
||||
Just collects basic data - NO decision making or pattern matching
|
||||
The project-analyzer agent does the intelligent analysis
|
||||
"""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from collections import defaultdict
|
||||
from typing import Dict
|
||||
|
||||
class ProjectMetricsCollector:
|
||||
"""Collects basic project metrics for agent analysis"""
|
||||
|
||||
def __init__(self, project_root: str = "."):
|
||||
self.root = Path(project_root).resolve()
|
||||
|
||||
def collect_metrics(self) -> Dict:
|
||||
"""Collect basic project metrics"""
|
||||
return {
|
||||
'file_analysis': self._analyze_files(),
|
||||
'directory_structure': self._get_directory_structure(),
|
||||
'key_files': self._find_key_files(),
|
||||
'project_stats': self._get_basic_stats()
|
||||
}
|
||||
|
||||
def _analyze_files(self) -> Dict:
|
||||
"""Count files by category"""
|
||||
type_categories = {
|
||||
'code': {'.py', '.js', '.ts', '.jsx', '.tsx', '.java', '.go', '.rs', '.c', '.cpp', '.php', '.rb'},
|
||||
'markup': {'.html', '.xml', '.svg'},
|
||||
'stylesheet': {'.css', '.scss', '.sass', '.less'},
|
||||
'document': {'.md', '.txt', '.pdf', '.doc', '.docx', '.odt'},
|
||||
'latex': {'.tex', '.bib', '.cls', '.sty'},
|
||||
'spreadsheet': {'.xlsx', '.xls', '.ods', '.csv'},
|
||||
'image': {'.jpg', '.jpeg', '.png', '.gif', '.svg', '.webp'},
|
||||
'video': {'.mp4', '.avi', '.mov', '.mkv'},
|
||||
'data': {'.json', '.yaml', '.yml', '.toml', '.xml'},
|
||||
'notebook': {'.ipynb'},
|
||||
}
|
||||
|
||||
counts = defaultdict(int)
|
||||
files_by_type = defaultdict(list)
|
||||
|
||||
for item in self.root.rglob('*'):
|
||||
if item.is_file() and not self._is_ignored(item):
|
||||
suffix = item.suffix.lower()
|
||||
categorized = False
|
||||
|
||||
for category, extensions in type_categories.items():
|
||||
if suffix in extensions:
|
||||
counts[category] += 1
|
||||
files_by_type[category].append(str(item.relative_to(self.root)))
|
||||
categorized = True
|
||||
break
|
||||
|
||||
if not categorized:
|
||||
counts['other'] += 1
|
||||
|
||||
total = sum(counts.values()) or 1
|
||||
|
||||
return {
|
||||
'counts': dict(counts),
|
||||
'percentages': {k: round((v / total) * 100, 1) for k, v in counts.items()},
|
||||
'total_files': total,
|
||||
'sample_files': {k: v[:5] for k, v in files_by_type.items()} # First 5 of each type
|
||||
}
|
||||
|
||||
def _get_directory_structure(self) -> Dict:
|
||||
"""Get top-level directory structure"""
|
||||
dirs = []
|
||||
for item in self.root.iterdir():
|
||||
if item.is_dir() and not self._is_ignored(item):
|
||||
file_count = sum(1 for _ in item.rglob('*') if _.is_file())
|
||||
dirs.append({
|
||||
'name': item.name,
|
||||
'file_count': file_count
|
||||
})
|
||||
|
||||
return {
|
||||
'top_level_directories': sorted(dirs, key=lambda x: x['file_count'], reverse=True),
|
||||
'total_directories': len(dirs)
|
||||
}
|
||||
|
||||
def _find_key_files(self) -> Dict:
|
||||
"""Find common configuration and important files"""
|
||||
key_patterns = {
|
||||
# Programming
|
||||
'package.json': 'Node.js project',
|
||||
'requirements.txt': 'Python project',
|
||||
'Cargo.toml': 'Rust project',
|
||||
'go.mod': 'Go project',
|
||||
'pom.xml': 'Java Maven project',
|
||||
'build.gradle': 'Java Gradle project',
|
||||
|
||||
# Configuration
|
||||
'.eslintrc*': 'ESLint config',
|
||||
'tsconfig.json': 'TypeScript config',
|
||||
'jest.config.js': 'Jest testing',
|
||||
'pytest.ini': 'Pytest config',
|
||||
|
||||
# CI/CD
|
||||
'.github/workflows': 'GitHub Actions',
|
||||
'.gitlab-ci.yml': 'GitLab CI',
|
||||
'Jenkinsfile': 'Jenkins',
|
||||
|
||||
# Hooks
|
||||
'.pre-commit-config.yaml': 'Pre-commit hooks',
|
||||
'.husky': 'Husky hooks',
|
||||
|
||||
# Documentation
|
||||
'README.md': 'README',
|
||||
'CONTRIBUTING.md': 'Contribution guide',
|
||||
'LICENSE': 'License file',
|
||||
|
||||
# LaTeX
|
||||
'main.tex': 'LaTeX main',
|
||||
'*.bib': 'Bibliography',
|
||||
|
||||
# Build tools
|
||||
'Makefile': 'Makefile',
|
||||
'CMakeLists.txt': 'CMake',
|
||||
'docker-compose.yml': 'Docker Compose',
|
||||
'Dockerfile': 'Docker',
|
||||
}
|
||||
|
||||
found = {}
|
||||
for pattern, description in key_patterns.items():
|
||||
matches = list(self.root.glob(pattern))
|
||||
if matches:
|
||||
found[pattern] = {
|
||||
'description': description,
|
||||
'count': len(matches),
|
||||
'paths': [str(m.relative_to(self.root)) for m in matches[:3]]
|
||||
}
|
||||
|
||||
return found
|
||||
|
||||
def _get_basic_stats(self) -> Dict:
|
||||
"""Get basic project statistics"""
|
||||
total_files = 0
|
||||
total_dirs = 0
|
||||
total_size = 0
|
||||
max_depth = 0
|
||||
|
||||
for item in self.root.rglob('*'):
|
||||
if self._is_ignored(item):
|
||||
continue
|
||||
|
||||
if item.is_file():
|
||||
total_files += 1
|
||||
try:
|
||||
total_size += item.stat().st_size
|
||||
except:
|
||||
pass
|
||||
|
||||
depth = len(item.relative_to(self.root).parts)
|
||||
max_depth = max(max_depth, depth)
|
||||
elif item.is_dir():
|
||||
total_dirs += 1
|
||||
|
||||
return {
|
||||
'total_files': total_files,
|
||||
'total_directories': total_dirs,
|
||||
'total_size_bytes': total_size,
|
||||
'total_size_mb': round(total_size / (1024 * 1024), 2),
|
||||
'deepest_nesting': max_depth
|
||||
}
|
||||
|
||||
def _is_ignored(self, path: Path) -> bool:
|
||||
"""Check if path should be ignored"""
|
||||
ignore_patterns = {
|
||||
'node_modules', '.git', '__pycache__', '.venv', 'venv',
|
||||
'dist', 'build', '.cache', '.pytest_cache', 'coverage',
|
||||
'.next', '.nuxt', 'out', 'target'
|
||||
}
|
||||
|
||||
parts = path.parts
|
||||
return any(pattern in parts for pattern in ignore_patterns)
|
||||
|
||||
def generate_report(self) -> Dict:
|
||||
"""Generate complete metrics report"""
|
||||
metrics = self.collect_metrics()
|
||||
|
||||
return {
|
||||
'project_root': str(self.root),
|
||||
'scan_purpose': 'Basic metrics collection for intelligent agent analysis',
|
||||
'metrics': metrics,
|
||||
'note': 'This is raw data. The project-analyzer agent will interpret it intelligently.'
|
||||
}
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
path = sys.argv[1] if len(sys.argv) > 1 else '.'
|
||||
collector = ProjectMetricsCollector(path)
|
||||
report = collector.generate_report()
|
||||
print(json.dumps(report, indent=2))
|
||||
234
skills/meta-automation-architect/scripts/cost_estimator.py
Normal file
234
skills/meta-automation-architect/scripts/cost_estimator.py
Normal file
@@ -0,0 +1,234 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Cost & Performance Estimator
|
||||
Provides transparent estimates for automation operations
|
||||
"""
|
||||
|
||||
import json
|
||||
from typing import Dict, List
|
||||
from dataclasses import dataclass, asdict
|
||||
|
||||
@dataclass
|
||||
class AgentEstimate:
|
||||
"""Estimate for a single agent"""
|
||||
agent_name: str
|
||||
description: str
|
||||
estimated_tokens: int
|
||||
estimated_minutes: int
|
||||
priority: str # high, medium, low
|
||||
purpose: str
|
||||
|
||||
@dataclass
|
||||
class AutomationEstimate:
|
||||
"""Complete automation estimate"""
|
||||
mode: str # quick, focused, comprehensive
|
||||
total_agents: int
|
||||
agents: List[AgentEstimate]
|
||||
total_tokens_min: int
|
||||
total_tokens_max: int
|
||||
total_minutes_min: int
|
||||
total_minutes_max: int
|
||||
total_cost_min: float
|
||||
total_cost_max: float
|
||||
recommendations: List[str]
|
||||
|
||||
class CostEstimator:
|
||||
"""Estimates cost and time for automation"""
|
||||
|
||||
# Token costs (as of Jan 2025, Claude Sonnet)
|
||||
TOKEN_COST_INPUT = 0.000003 # $3 per 1M input tokens
|
||||
TOKEN_COST_OUTPUT = 0.000015 # $15 per 1M output tokens
|
||||
|
||||
# Approximate tokens per agent type
|
||||
AGENT_TOKEN_ESTIMATES = {
|
||||
'project-analyzer': {'input': 2000, 'output': 1500, 'minutes': 3},
|
||||
'structure-analyzer': {'input': 1000, 'output': 800, 'minutes': 2},
|
||||
'security-analyzer': {'input': 1500, 'output': 1000, 'minutes': 3},
|
||||
'performance-analyzer': {'input': 1500, 'output': 1000, 'minutes': 4},
|
||||
'test-coverage-analyzer': {'input': 1200, 'output': 800, 'minutes': 3},
|
||||
'latex-structure-analyzer': {'input': 1000, 'output': 800, 'minutes': 3},
|
||||
'citation-analyzer': {'input': 800, 'output': 600, 'minutes': 2},
|
||||
'link-validator': {'input': 1000, 'output': 700, 'minutes': 2},
|
||||
}
|
||||
|
||||
# Default estimate for unknown agents
|
||||
DEFAULT_ESTIMATE = {'input': 1000, 'output': 800, 'minutes': 3}
|
||||
|
||||
def estimate_agent(self, agent_name: str, priority: str = 'medium', purpose: str = '') -> AgentEstimate:
|
||||
"""
|
||||
Estimate cost/time for a single agent
|
||||
|
||||
Args:
|
||||
agent_name: Name of the agent
|
||||
priority: high, medium, or low
|
||||
purpose: What this agent does
|
||||
|
||||
Returns:
|
||||
AgentEstimate object
|
||||
"""
|
||||
estimate = self.AGENT_TOKEN_ESTIMATES.get(agent_name, self.DEFAULT_ESTIMATE)
|
||||
|
||||
total_tokens = estimate['input'] + estimate['output']
|
||||
minutes = estimate['minutes']
|
||||
|
||||
return AgentEstimate(
|
||||
agent_name=agent_name,
|
||||
description=purpose or f"Analyzes {agent_name.replace('-', ' ')}",
|
||||
estimated_tokens=total_tokens,
|
||||
estimated_minutes=minutes,
|
||||
priority=priority,
|
||||
purpose=purpose
|
||||
)
|
||||
|
||||
def estimate_quick_mode(self) -> AutomationEstimate:
|
||||
"""Estimate for quick analysis mode"""
|
||||
agents = [
|
||||
self.estimate_agent('project-analyzer', 'high', 'Intelligent project analysis'),
|
||||
]
|
||||
|
||||
return self._calculate_total_estimate('quick', agents, [
|
||||
'Fastest way to understand your project',
|
||||
'Low cost, high value',
|
||||
'Can expand to full automation after'
|
||||
])
|
||||
|
||||
def estimate_focused_mode(self, focus_areas: List[str]) -> AutomationEstimate:
|
||||
"""Estimate for focused automation mode"""
|
||||
# Map focus areas to agents
|
||||
area_to_agents = {
|
||||
'security': ['security-analyzer'],
|
||||
'testing': ['test-coverage-analyzer'],
|
||||
'performance': ['performance-analyzer'],
|
||||
'structure': ['structure-analyzer'],
|
||||
'latex': ['latex-structure-analyzer', 'citation-analyzer'],
|
||||
'links': ['link-validator'],
|
||||
}
|
||||
|
||||
agents = [self.estimate_agent('project-analyzer', 'high', 'Initial analysis')]
|
||||
|
||||
for area in focus_areas:
|
||||
for agent_name in area_to_agents.get(area, []):
|
||||
agents.append(self.estimate_agent(agent_name, 'high', f'Analyze {area}'))
|
||||
|
||||
return self._calculate_total_estimate('focused', agents, [
|
||||
'Targeted automation for your specific needs',
|
||||
'Medium cost, high relevance',
|
||||
'Focuses on what matters most to you'
|
||||
])
|
||||
|
||||
def estimate_comprehensive_mode(self, project_type: str) -> AutomationEstimate:
|
||||
"""Estimate for comprehensive automation mode"""
|
||||
agents = [
|
||||
self.estimate_agent('project-analyzer', 'high', 'Project analysis'),
|
||||
self.estimate_agent('structure-analyzer', 'high', 'Structure analysis'),
|
||||
]
|
||||
|
||||
# Add type-specific agents
|
||||
if project_type in ['programming', 'web_app', 'cli']:
|
||||
agents.extend([
|
||||
self.estimate_agent('security-analyzer', 'high', 'Security audit'),
|
||||
self.estimate_agent('performance-analyzer', 'medium', 'Performance check'),
|
||||
self.estimate_agent('test-coverage-analyzer', 'high', 'Test coverage'),
|
||||
])
|
||||
|
||||
elif project_type in ['academic_writing', 'research']:
|
||||
agents.extend([
|
||||
self.estimate_agent('latex-structure-analyzer', 'high', 'LaTeX structure'),
|
||||
self.estimate_agent('citation-analyzer', 'high', 'Citations & bibliography'),
|
||||
self.estimate_agent('link-validator', 'medium', 'Link validation'),
|
||||
])
|
||||
|
||||
return self._calculate_total_estimate('comprehensive', agents, [
|
||||
'Complete automation system',
|
||||
'Highest cost, most comprehensive',
|
||||
'Full agent suite, skills, commands, hooks'
|
||||
])
|
||||
|
||||
def _calculate_total_estimate(self, mode: str, agents: List[AgentEstimate], recommendations: List[str]) -> AutomationEstimate:
|
||||
"""Calculate total estimates from agent list"""
|
||||
total_tokens = sum(a.estimated_tokens for a in agents)
|
||||
total_minutes = max(5, sum(a.estimated_minutes for a in agents) // 2) # Parallel execution
|
||||
|
||||
# Add buffer (20-50% uncertainty)
|
||||
tokens_min = total_tokens
|
||||
tokens_max = int(total_tokens * 1.5)
|
||||
minutes_min = total_minutes
|
||||
minutes_max = int(total_minutes * 1.3)
|
||||
|
||||
# Calculate costs (rough approximation: 60% input, 40% output)
|
||||
cost_min = (tokens_min * 0.6 * self.TOKEN_COST_INPUT) + (tokens_min * 0.4 * self.TOKEN_COST_OUTPUT)
|
||||
cost_max = (tokens_max * 0.6 * self.TOKEN_COST_INPUT) + (tokens_max * 0.4 * self.TOKEN_COST_OUTPUT)
|
||||
|
||||
return AutomationEstimate(
|
||||
mode=mode,
|
||||
total_agents=len(agents),
|
||||
agents=agents,
|
||||
total_tokens_min=tokens_min,
|
||||
total_tokens_max=tokens_max,
|
||||
total_minutes_min=minutes_min,
|
||||
total_minutes_max=minutes_max,
|
||||
total_cost_min=round(cost_min, 3),
|
||||
total_cost_max=round(cost_max, 3),
|
||||
recommendations=recommendations
|
||||
)
|
||||
|
||||
def format_estimate(self, estimate: AutomationEstimate) -> str:
|
||||
"""Format estimate for display"""
|
||||
lines = []
|
||||
|
||||
lines.append(f"╔══════════════════════════════════════════════════╗")
|
||||
lines.append(f"║ Automation Estimate - {estimate.mode.upper()} Mode")
|
||||
lines.append(f"╚══════════════════════════════════════════════════╝")
|
||||
lines.append("")
|
||||
|
||||
# Agent list
|
||||
for agent in estimate.agents:
|
||||
priority_icon = "⭐" if agent.priority == "high" else "•"
|
||||
lines.append(f"{priority_icon} {agent.agent_name}")
|
||||
lines.append(f" {agent.description}")
|
||||
lines.append(f" ⏱️ ~{agent.estimated_minutes} min | 💰 ~{agent.estimated_tokens} tokens")
|
||||
lines.append("")
|
||||
|
||||
lines.append("────────────────────────────────────────────────────")
|
||||
|
||||
# Totals
|
||||
lines.append(f"Total Agents: {estimate.total_agents}")
|
||||
lines.append(f"Estimated Time: {estimate.total_minutes_min}-{estimate.total_minutes_max} minutes")
|
||||
lines.append(f"Estimated Tokens: {estimate.total_tokens_min:,}-{estimate.total_tokens_max:,}")
|
||||
lines.append(f"Estimated Cost: ${estimate.total_cost_min:.3f}-${estimate.total_cost_max:.3f}")
|
||||
|
||||
lines.append("")
|
||||
lines.append("💡 Notes:")
|
||||
for rec in estimate.recommendations:
|
||||
lines.append(f" • {rec}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def export_estimate(self, estimate: AutomationEstimate, output_path: str = None) -> Dict:
|
||||
"""Export estimate as JSON"""
|
||||
data = asdict(estimate)
|
||||
|
||||
if output_path:
|
||||
with open(output_path, 'w') as f:
|
||||
json.dump(data, f, indent=2)
|
||||
|
||||
return data
|
||||
|
||||
# Example usage
|
||||
if __name__ == '__main__':
|
||||
estimator = CostEstimator()
|
||||
|
||||
print("1. QUICK MODE ESTIMATE")
|
||||
print("="*60)
|
||||
quick = estimator.estimate_quick_mode()
|
||||
print(estimator.format_estimate(quick))
|
||||
|
||||
print("\n\n2. FOCUSED MODE ESTIMATE (Security + Testing)")
|
||||
print("="*60)
|
||||
focused = estimator.estimate_focused_mode(['security', 'testing'])
|
||||
print(estimator.format_estimate(focused))
|
||||
|
||||
print("\n\n3. COMPREHENSIVE MODE ESTIMATE (Programming Project)")
|
||||
print("="*60)
|
||||
comprehensive = estimator.estimate_comprehensive_mode('programming')
|
||||
print(estimator.format_estimate(comprehensive))
|
||||
@@ -0,0 +1,342 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Existing Tool Discovery
|
||||
Checks what automation tools are already in place
|
||||
Prevents duplication and suggests integration points
|
||||
"""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Dict, List
|
||||
from collections import defaultdict
|
||||
|
||||
class ExistingToolDiscovery:
|
||||
"""Discovers existing automation tools in a project"""
|
||||
|
||||
def __init__(self, project_root: str = "."):
|
||||
self.root = Path(project_root).resolve()
|
||||
|
||||
def discover_all(self) -> Dict:
|
||||
"""Discover all existing automation tools"""
|
||||
return {
|
||||
'linting': self._discover_linting(),
|
||||
'testing': self._discover_testing(),
|
||||
'ci_cd': self._discover_ci_cd(),
|
||||
'git_hooks': self._discover_git_hooks(),
|
||||
'formatting': self._discover_formatting(),
|
||||
'security': self._discover_security(),
|
||||
'documentation': self._discover_documentation(),
|
||||
'summary': self._generate_summary()
|
||||
}
|
||||
|
||||
def _discover_linting(self) -> Dict:
|
||||
"""Find linting tools"""
|
||||
tools = {}
|
||||
|
||||
linting_patterns = {
|
||||
'.eslintrc*': {'tool': 'ESLint', 'language': 'JavaScript/TypeScript', 'purpose': 'Code linting'},
|
||||
'.pylintrc': {'tool': 'Pylint', 'language': 'Python', 'purpose': 'Code linting'},
|
||||
'pylint.rc': {'tool': 'Pylint', 'language': 'Python', 'purpose': 'Code linting'},
|
||||
'.flake8': {'tool': 'Flake8', 'language': 'Python', 'purpose': 'Code linting'},
|
||||
'tslint.json': {'tool': 'TSLint', 'language': 'TypeScript', 'purpose': 'Code linting'},
|
||||
'.rubocop.yml': {'tool': 'RuboCop', 'language': 'Ruby', 'purpose': 'Code linting'},
|
||||
'phpcs.xml': {'tool': 'PHP_CodeSniffer', 'language': 'PHP', 'purpose': 'Code linting'},
|
||||
}
|
||||
|
||||
for pattern, info in linting_patterns.items():
|
||||
matches = list(self.root.glob(pattern))
|
||||
if matches:
|
||||
tools[info['tool']] = {
|
||||
**info,
|
||||
'config_file': str(matches[0].relative_to(self.root)),
|
||||
'found': True
|
||||
}
|
||||
|
||||
return {
|
||||
'tools_found': tools,
|
||||
'count': len(tools),
|
||||
'recommendation': self._linting_recommendation(tools)
|
||||
}
|
||||
|
||||
def _discover_testing(self) -> Dict:
|
||||
"""Find testing frameworks"""
|
||||
tools = {}
|
||||
|
||||
testing_patterns = {
|
||||
'jest.config.js': {'tool': 'Jest', 'language': 'JavaScript', 'purpose': 'Unit testing'},
|
||||
'jest.config.ts': {'tool': 'Jest', 'language': 'TypeScript', 'purpose': 'Unit testing'},
|
||||
'pytest.ini': {'tool': 'Pytest', 'language': 'Python', 'purpose': 'Unit testing'},
|
||||
'phpunit.xml': {'tool': 'PHPUnit', 'language': 'PHP', 'purpose': 'Unit testing'},
|
||||
'karma.conf.js': {'tool': 'Karma', 'language': 'JavaScript', 'purpose': 'Test runner'},
|
||||
'.rspec': {'tool': 'RSpec', 'language': 'Ruby', 'purpose': 'Testing'},
|
||||
'go.mod': {'tool': 'Go test', 'language': 'Go', 'purpose': 'Testing'},
|
||||
}
|
||||
|
||||
for pattern, info in testing_patterns.items():
|
||||
matches = list(self.root.glob(pattern))
|
||||
if matches:
|
||||
tools[info['tool']] = {
|
||||
**info,
|
||||
'config_file': str(matches[0].relative_to(self.root)),
|
||||
'found': True
|
||||
}
|
||||
|
||||
# Check for test directories
|
||||
test_dirs = []
|
||||
for pattern in ['tests/', 'test/', '__tests__/', 'spec/']:
|
||||
if (self.root / pattern).exists():
|
||||
test_dirs.append(pattern)
|
||||
|
||||
return {
|
||||
'tools_found': tools,
|
||||
'test_directories': test_dirs,
|
||||
'count': len(tools),
|
||||
'recommendation': self._testing_recommendation(tools, test_dirs)
|
||||
}
|
||||
|
||||
def _discover_ci_cd(self) -> Dict:
|
||||
"""Find CI/CD configurations"""
|
||||
tools = {}
|
||||
|
||||
ci_patterns = {
|
||||
'.github/workflows': {'tool': 'GitHub Actions', 'platform': 'GitHub', 'purpose': 'CI/CD'},
|
||||
'.gitlab-ci.yml': {'tool': 'GitLab CI', 'platform': 'GitLab', 'purpose': 'CI/CD'},
|
||||
'.circleci/config.yml': {'tool': 'CircleCI', 'platform': 'CircleCI', 'purpose': 'CI/CD'},
|
||||
'Jenkinsfile': {'tool': 'Jenkins', 'platform': 'Jenkins', 'purpose': 'CI/CD'},
|
||||
'.travis.yml': {'tool': 'Travis CI', 'platform': 'Travis', 'purpose': 'CI/CD'},
|
||||
'azure-pipelines.yml': {'tool': 'Azure Pipelines', 'platform': 'Azure', 'purpose': 'CI/CD'},
|
||||
'.drone.yml': {'tool': 'Drone CI', 'platform': 'Drone', 'purpose': 'CI/CD'},
|
||||
}
|
||||
|
||||
for pattern, info in ci_patterns.items():
|
||||
path = self.root / pattern
|
||||
if path.exists():
|
||||
tools[info['tool']] = {
|
||||
**info,
|
||||
'config': str(Path(pattern)),
|
||||
'found': True
|
||||
}
|
||||
|
||||
return {
|
||||
'tools_found': tools,
|
||||
'count': len(tools),
|
||||
'recommendation': self._ci_cd_recommendation(tools)
|
||||
}
|
||||
|
||||
def _discover_git_hooks(self) -> Dict:
|
||||
"""Find git hooks configuration"""
|
||||
tools = {}
|
||||
|
||||
hook_patterns = {
|
||||
'.pre-commit-config.yaml': {'tool': 'pre-commit', 'purpose': 'Pre-commit hooks'},
|
||||
'.husky': {'tool': 'Husky', 'purpose': 'Git hooks (Node.js)'},
|
||||
'.git/hooks': {'tool': 'Native Git hooks', 'purpose': 'Git hooks'},
|
||||
'lefthook.yml': {'tool': 'Lefthook', 'purpose': 'Git hooks'},
|
||||
}
|
||||
|
||||
for pattern, info in hook_patterns.items():
|
||||
path = self.root / pattern
|
||||
if path.exists():
|
||||
tools[info['tool']] = {
|
||||
**info,
|
||||
'location': str(Path(pattern)),
|
||||
'found': True
|
||||
}
|
||||
|
||||
return {
|
||||
'tools_found': tools,
|
||||
'count': len(tools),
|
||||
'recommendation': self._git_hooks_recommendation(tools)
|
||||
}
|
||||
|
||||
def _discover_formatting(self) -> Dict:
|
||||
"""Find code formatting tools"""
|
||||
tools = {}
|
||||
|
||||
formatting_patterns = {
|
||||
'.prettierrc*': {'tool': 'Prettier', 'language': 'JavaScript/TypeScript', 'purpose': 'Code formatting'},
|
||||
'.editorconfig': {'tool': 'EditorConfig', 'language': 'Universal', 'purpose': 'Editor settings'},
|
||||
'pyproject.toml': {'tool': 'Black (if configured)', 'language': 'Python', 'purpose': 'Code formatting'},
|
||||
'.php-cs-fixer.php': {'tool': 'PHP-CS-Fixer', 'language': 'PHP', 'purpose': 'Code formatting'},
|
||||
}
|
||||
|
||||
for pattern, info in formatting_patterns.items():
|
||||
matches = list(self.root.glob(pattern))
|
||||
if matches:
|
||||
tools[info['tool']] = {
|
||||
**info,
|
||||
'config_file': str(matches[0].relative_to(self.root)),
|
||||
'found': True
|
||||
}
|
||||
|
||||
return {
|
||||
'tools_found': tools,
|
||||
'count': len(tools),
|
||||
'recommendation': self._formatting_recommendation(tools)
|
||||
}
|
||||
|
||||
def _discover_security(self) -> Dict:
|
||||
"""Find security scanning tools"""
|
||||
tools = {}
|
||||
|
||||
# Check for dependency scanning
|
||||
if (self.root / 'package.json').exists():
|
||||
tools['npm audit'] = {
|
||||
'tool': 'npm audit',
|
||||
'platform': 'Node.js',
|
||||
'purpose': 'Dependency scanning',
|
||||
'found': True
|
||||
}
|
||||
|
||||
if (self.root / 'Pipfile').exists():
|
||||
tools['pipenv check'] = {
|
||||
'tool': 'pipenv check',
|
||||
'platform': 'Python',
|
||||
'purpose': 'Dependency scanning',
|
||||
'found': True
|
||||
}
|
||||
|
||||
# Check for security configs
|
||||
security_patterns = {
|
||||
'.snyk': {'tool': 'Snyk', 'purpose': 'Security scanning'},
|
||||
'sonar-project.properties': {'tool': 'SonarQube', 'purpose': 'Code quality & security'},
|
||||
}
|
||||
|
||||
for pattern, info in security_patterns.items():
|
||||
if (self.root / pattern).exists():
|
||||
tools[info['tool']] = {
|
||||
**info,
|
||||
'config': pattern,
|
||||
'found': True
|
||||
}
|
||||
|
||||
return {
|
||||
'tools_found': tools,
|
||||
'count': len(tools),
|
||||
'recommendation': self._security_recommendation(tools)
|
||||
}
|
||||
|
||||
def _discover_documentation(self) -> Dict:
|
||||
"""Find documentation tools"""
|
||||
tools = {}
|
||||
|
||||
doc_patterns = {
|
||||
'mkdocs.yml': {'tool': 'MkDocs', 'purpose': 'Documentation site'},
|
||||
'docusaurus.config.js': {'tool': 'Docusaurus', 'purpose': 'Documentation site'},
|
||||
'conf.py': {'tool': 'Sphinx', 'purpose': 'Documentation (Python)'},
|
||||
'jsdoc.json': {'tool': 'JSDoc', 'purpose': 'JavaScript documentation'},
|
||||
'.readthedocs.yml': {'tool': 'ReadTheDocs', 'purpose': 'Documentation hosting'},
|
||||
}
|
||||
|
||||
for pattern, info in doc_patterns.items():
|
||||
if (self.root / pattern).exists():
|
||||
tools[info['tool']] = {
|
||||
**info,
|
||||
'config': pattern,
|
||||
'found': True
|
||||
}
|
||||
|
||||
return {
|
||||
'tools_found': tools,
|
||||
'count': len(tools),
|
||||
'recommendation': self._documentation_recommendation(tools)
|
||||
}
|
||||
|
||||
def _generate_summary(self) -> Dict:
|
||||
"""Generate overall summary"""
|
||||
all_discoveries = [
|
||||
self._discover_linting(),
|
||||
self._discover_testing(),
|
||||
self._discover_ci_cd(),
|
||||
self._discover_git_hooks(),
|
||||
self._discover_formatting(),
|
||||
self._discover_security(),
|
||||
self._discover_documentation(),
|
||||
]
|
||||
|
||||
total_tools = sum(d['count'] for d in all_discoveries)
|
||||
|
||||
maturity_level = "minimal"
|
||||
if total_tools >= 10:
|
||||
maturity_level = "comprehensive"
|
||||
elif total_tools >= 5:
|
||||
maturity_level = "moderate"
|
||||
|
||||
return {
|
||||
'total_tools_found': total_tools,
|
||||
'maturity_level': maturity_level,
|
||||
'gaps': self._identify_gaps(all_discoveries)
|
||||
}
|
||||
|
||||
def _identify_gaps(self, discoveries: List[Dict]) -> List[str]:
|
||||
"""Identify missing automation"""
|
||||
gaps = []
|
||||
|
||||
# Check for common gaps
|
||||
linting = discoveries[0]
|
||||
testing = discoveries[1]
|
||||
ci_cd = discoveries[2]
|
||||
security = discoveries[5]
|
||||
|
||||
if linting['count'] == 0:
|
||||
gaps.append('No linting tools configured')
|
||||
|
||||
if testing['count'] == 0:
|
||||
gaps.append('No testing framework configured')
|
||||
|
||||
if ci_cd['count'] == 0:
|
||||
gaps.append('No CI/CD pipeline configured')
|
||||
|
||||
if security['count'] == 0:
|
||||
gaps.append('No security scanning tools')
|
||||
|
||||
return gaps
|
||||
|
||||
# Recommendation methods
|
||||
def _linting_recommendation(self, tools: Dict) -> str:
|
||||
if not tools:
|
||||
return "ADD: Set up linting (ESLint for JS/TS, Pylint for Python)"
|
||||
return "ENHANCE: Extend existing linting rules"
|
||||
|
||||
def _testing_recommendation(self, tools: Dict, test_dirs: List) -> str:
|
||||
if not tools and not test_dirs:
|
||||
return "ADD: Set up testing framework (Jest, Pytest, etc.)"
|
||||
if tools and not test_dirs:
|
||||
return "ADD: Create test directories and write tests"
|
||||
return "ENHANCE: Increase test coverage"
|
||||
|
||||
def _ci_cd_recommendation(self, tools: Dict) -> str:
|
||||
if not tools:
|
||||
return "ADD: Set up CI/CD (GitHub Actions, GitLab CI, etc.)"
|
||||
return "ENHANCE: Add more checks to existing CI/CD"
|
||||
|
||||
def _git_hooks_recommendation(self, tools: Dict) -> str:
|
||||
if not tools:
|
||||
return "ADD: Set up pre-commit hooks for quality checks"
|
||||
return "ENHANCE: Add more hooks (pre-push, commit-msg, etc.)"
|
||||
|
||||
def _formatting_recommendation(self, tools: Dict) -> str:
|
||||
if not tools:
|
||||
return "ADD: Set up code formatting (Prettier, Black, etc.)"
|
||||
return "OK: Formatting tools in place"
|
||||
|
||||
def _security_recommendation(self, tools: Dict) -> str:
|
||||
if not tools:
|
||||
return "ADD: Set up security scanning (critical gap!)"
|
||||
return "ENHANCE: Add more security tools (SAST, dependency scanning)"
|
||||
|
||||
def _documentation_recommendation(self, tools: Dict) -> str:
|
||||
if not tools:
|
||||
return "ADD: Set up documentation generation"
|
||||
return "OK: Documentation tools in place"
|
||||
|
||||
def generate_report(self) -> Dict:
|
||||
"""Generate complete discovery report"""
|
||||
return self.discover_all()
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
path = sys.argv[1] if len(sys.argv) > 1 else '.'
|
||||
discoverer = ExistingToolDiscovery(path)
|
||||
report = discoverer.generate_report()
|
||||
print(json.dumps(report, indent=2))
|
||||
1054
skills/meta-automation-architect/scripts/generate_agents.py
Executable file
1054
skills/meta-automation-architect/scripts/generate_agents.py
Executable file
File diff suppressed because it is too large
Load Diff
451
skills/meta-automation-architect/scripts/generate_coordinator.py
Executable file
451
skills/meta-automation-architect/scripts/generate_coordinator.py
Executable file
@@ -0,0 +1,451 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Coordinator Generator Script
|
||||
Creates the orchestrator agent that manages multi-agent workflows
|
||||
"""
|
||||
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
def generate_coordinator(session_id: str, agents: list, output_path: str) -> None:
|
||||
"""Generate coordinator agent"""
|
||||
|
||||
agent_list = ', '.join(agents)
|
||||
|
||||
content = f'''---
|
||||
name: automation-coordinator
|
||||
description: Orchestrates multi-agent automation workflow. Manages agent execution, synthesizes findings, and generates final automation system.
|
||||
tools: Task, Read, Write, Bash, Grep, Glob
|
||||
color: White
|
||||
model: sonnet
|
||||
---
|
||||
|
||||
# Automation Coordinator
|
||||
|
||||
You are the Automation Coordinator, responsible for orchestrating a multi-agent workflow to create a comprehensive automation system.
|
||||
|
||||
## Your Role
|
||||
|
||||
As coordinator, you:
|
||||
1. Launch specialized agents in the correct order
|
||||
2. Monitor their progress
|
||||
3. Read and synthesize their reports
|
||||
4. Make final decisions on what to generate
|
||||
5. Create the automation artifacts
|
||||
6. Validate everything works
|
||||
7. Document the system
|
||||
|
||||
## Communication Setup
|
||||
|
||||
**Session ID**: `{session_id}`
|
||||
**Context Directory**: `.claude/agents/context/{session_id}/`
|
||||
**Your Agents**: {agent_list}
|
||||
|
||||
## Execution Workflow
|
||||
|
||||
### Phase 1: Launch Analysis Agents (Parallel)
|
||||
|
||||
Launch these agents **in parallel** using the Task tool:
|
||||
|
||||
{chr(10).join([f'- {agent}' for agent in agents if 'analyzer' in agent])}
|
||||
|
||||
```bash
|
||||
# Example of parallel launch
|
||||
"Launch the following agents in parallel:
|
||||
- security-analyzer
|
||||
- performance-analyzer
|
||||
- code-quality-analyzer
|
||||
- dependency-analyzer
|
||||
- documentation-analyzer
|
||||
|
||||
Use the Task tool to run each agent concurrently."
|
||||
```
|
||||
|
||||
### Phase 2: Monitor Progress
|
||||
|
||||
While agents work, monitor their status:
|
||||
|
||||
```bash
|
||||
# Watch coordination file
|
||||
watch -n 2 'cat .claude/agents/context/{session_id}/coordination.json | jq ".agents"'
|
||||
|
||||
# Or check manually
|
||||
cat .claude/agents/context/{session_id}/coordination.json | jq '.agents | to_entries | map({{name: .key, status: .value.status}})'
|
||||
|
||||
# Follow message log for real-time updates
|
||||
tail -f .claude/agents/context/{session_id}/messages.jsonl
|
||||
```
|
||||
|
||||
### Phase 3: Synthesize Findings
|
||||
|
||||
Once all analysis agents complete, read their reports:
|
||||
|
||||
```bash
|
||||
# Read all reports
|
||||
for report in .claude/agents/context/{session_id}/reports/*-analyzer.json; do
|
||||
echo "=== $(basename $report) ==="
|
||||
cat "$report" | jq '.summary, .findings | length'
|
||||
done
|
||||
|
||||
# Aggregate key metrics
|
||||
cat .claude/agents/context/{session_id}/reports/*.json | jq -s '
|
||||
{{
|
||||
total_findings: map(.findings | length) | add,
|
||||
high_severity: map(.findings[] | select(.severity == "high")) | length,
|
||||
automation_opportunities: map(.recommendations_for_automation) | flatten | length
|
||||
}}
|
||||
'
|
||||
```
|
||||
|
||||
### Phase 4: Make Decisions
|
||||
|
||||
Based on synthesis, decide what to generate:
|
||||
|
||||
**Decision Framework:**
|
||||
|
||||
1. **Skills**: Generate if multiple findings suggest a reusable pattern
|
||||
- Example: If security-analyzer finds repeated auth issues → generate "secure-auth-checker" skill
|
||||
|
||||
2. **Commands**: Generate for frequent manual tasks
|
||||
- Example: If testing issues detected → generate "/test-fix" command
|
||||
|
||||
3. **Hooks**: Generate for workflow automation points
|
||||
- Example: If formatting inconsistencies → generate PostToolUse format hook
|
||||
|
||||
4. **MCP Integrations**: Configure for external services needed
|
||||
- Example: If GitHub integration would help → configure github MCP
|
||||
|
||||
### Phase 5: Launch Implementation Agents (Parallel)
|
||||
|
||||
Based on decisions, launch implementation agents:
|
||||
|
||||
```bash
|
||||
# Launch generators in parallel
|
||||
"Launch the following implementation agents in parallel:
|
||||
- skill-generator (to create custom skills)
|
||||
- command-generator (to create slash commands)
|
||||
- hook-generator (to create automation hooks)
|
||||
- mcp-configurator (to set up external integrations)
|
||||
|
||||
Each should read the analysis reports and my decision notes."
|
||||
```
|
||||
|
||||
### Phase 6: Monitor Implementation
|
||||
|
||||
```bash
|
||||
# Check implementation progress
|
||||
cat .claude/agents/context/{session_id}/coordination.json | \\
|
||||
jq '.agents | to_entries | map(select(.key | endswith("generator") or . == "mcp-configurator"))'
|
||||
```
|
||||
|
||||
### Phase 7: Launch Validation Agents (Sequential)
|
||||
|
||||
After implementation, validate:
|
||||
|
||||
```bash
|
||||
# Launch validation sequentially
|
||||
"Launch integration-tester agent to validate all automation components"
|
||||
|
||||
# Wait for completion, then
|
||||
"Launch documentation-validator agent to ensure everything is documented"
|
||||
```
|
||||
|
||||
### Phase 8: Aggregate & Report
|
||||
|
||||
Create final deliverables:
|
||||
|
||||
1. **Automation Summary**
|
||||
|
||||
```bash
|
||||
cat > .claude/AUTOMATION_README.md << 'EOF'
|
||||
# Automation System for [Project Name]
|
||||
|
||||
## Generated On
|
||||
$(date)
|
||||
|
||||
## Session ID
|
||||
{session_id}
|
||||
|
||||
## What Was Created
|
||||
|
||||
### Analysis Phase
|
||||
$(cat .claude/agents/context/{session_id}/reports/*-analyzer.json | jq -r '.agent_name + ": " + .summary')
|
||||
|
||||
### Generated Artifacts
|
||||
|
||||
#### Custom Agents (X)
|
||||
- **agent-name**: Description and usage
|
||||
|
||||
#### Skills (X)
|
||||
- **skill-name**: What it does and when to use
|
||||
|
||||
#### Commands (X)
|
||||
- **/command**: Purpose and syntax
|
||||
|
||||
#### Hooks (X)
|
||||
- **HookType**: What triggers it
|
||||
|
||||
#### MCP Servers (X)
|
||||
- **server-name**: External service integrated
|
||||
|
||||
## Quick Start
|
||||
|
||||
1. Test an agent:
|
||||
```bash
|
||||
"Use the security-analyzer agent on src/"
|
||||
```
|
||||
|
||||
2. Try a skill:
|
||||
```bash
|
||||
"Check code quality using the quality-checker skill"
|
||||
```
|
||||
|
||||
3. Execute a command:
|
||||
```bash
|
||||
/test-fix
|
||||
```
|
||||
|
||||
## Full Documentation
|
||||
|
||||
See individual agent/skill/command files for details.
|
||||
|
||||
## Customization
|
||||
|
||||
All generated automation can be customized:
|
||||
- Edit agents in `.claude/agents/`
|
||||
- Modify skills in `.claude/skills/`
|
||||
- Update commands in `.claude/commands/`
|
||||
- Adjust hooks in `.claude/hooks/`
|
||||
|
||||
## Communication Protocol
|
||||
|
||||
This automation system uses the Agent Communication Protocol (ACP).
|
||||
See `.claude/agents/context/{session_id}/` for:
|
||||
- `coordination.json`: Agent status tracking
|
||||
- `messages.jsonl`: Event log
|
||||
- `reports/`: Individual agent reports
|
||||
- `data/`: Shared data artifacts
|
||||
|
||||
## Support
|
||||
|
||||
For issues or questions:
|
||||
1. Review agent reports in `reports/`
|
||||
2. Check message log in `messages.jsonl`
|
||||
3. Consult individual documentation
|
||||
|
||||
---
|
||||
*Generated by Meta-Automation Architect*
|
||||
*Session: {session_id}*
|
||||
EOF
|
||||
```
|
||||
|
||||
2. **Quick Reference Card**
|
||||
|
||||
```bash
|
||||
cat > .claude/QUICK_REFERENCE.md << 'EOF'
|
||||
# Quick Reference
|
||||
|
||||
## Available Agents
|
||||
$(ls .claude/agents/*.md | xargs -I {{}} basename {{}} .md | sed 's/^/- /')
|
||||
|
||||
## Available Commands
|
||||
$(ls .claude/commands/*.md | xargs -I {{}} basename {{}} .md | sed 's/^/\\//')
|
||||
|
||||
## Available Skills
|
||||
$(ls .claude/skills/*/SKILL.md | xargs -I {{}} dirname {{}} | xargs basename | sed 's/^/- /')
|
||||
|
||||
## Hooks Configured
|
||||
$(cat .claude/settings.json | jq -r '.hooks | keys[]')
|
||||
|
||||
## MCP Servers
|
||||
$(cat .claude/settings.json | jq -r '.mcpServers | keys[]')
|
||||
|
||||
## Usage Examples
|
||||
|
||||
### Use an agent:
|
||||
"Use the [agent-name] agent to [task]"
|
||||
|
||||
### Invoke a skill:
|
||||
"[Natural description that matches skill's description]"
|
||||
|
||||
### Execute command:
|
||||
/[command-name] [args]
|
||||
|
||||
### Check hooks:
|
||||
cat .claude/settings.json | jq '.hooks'
|
||||
|
||||
## Session Data
|
||||
|
||||
All agent communication is logged in:
|
||||
`.claude/agents/context/{session_id}/`
|
||||
|
||||
Review this directory to understand what happened during generation.
|
||||
EOF
|
||||
```
|
||||
|
||||
### Phase 9: Final Validation
|
||||
|
||||
```bash
|
||||
# Verify all components exist
|
||||
echo "Validating generated automation..."
|
||||
|
||||
# Check agents
|
||||
echo "Agents: $(ls .claude/agents/*.md 2>/dev/null | wc -l) files"
|
||||
|
||||
# Check skills
|
||||
echo "Skills: $(find .claude/skills -name 'SKILL.md' 2>/dev/null | wc -l) files"
|
||||
|
||||
# Check commands
|
||||
echo "Commands: $(ls .claude/commands/*.md 2>/dev/null | wc -l) files"
|
||||
|
||||
# Check hooks
|
||||
echo "Hooks: $(ls .claude/hooks/*.py 2>/dev/null | wc -l) files"
|
||||
|
||||
# Check settings
|
||||
echo "Settings updated: $(test -f .claude/settings.json && echo 'YES' || echo 'NO')"
|
||||
|
||||
# Test agent communication
|
||||
echo "Testing agent communication protocol..."
|
||||
if [ -d ".claude/agents/context/{session_id}" ]; then
|
||||
echo "✅ Context directory exists"
|
||||
echo "✅ Reports: $(ls .claude/agents/context/{session_id}/reports/*.json 2>/dev/null | wc -l)"
|
||||
echo "✅ Messages: $(wc -l < .claude/agents/context/{session_id}/messages.jsonl) events"
|
||||
fi
|
||||
```
|
||||
|
||||
## Coordination Protocol
|
||||
|
||||
### Checking Agent Status
|
||||
|
||||
```bash
|
||||
# Get status of all agents
|
||||
jq '.agents' .claude/agents/context/{session_id}/coordination.json
|
||||
|
||||
# Check specific agent
|
||||
jq '.agents["security-analyzer"]' .claude/agents/context/{session_id}/coordination.json
|
||||
|
||||
# List completed agents
|
||||
jq '.agents | to_entries | map(select(.value.status == "completed")) | map(.key)' \\
|
||||
.claude/agents/context/{session_id}/coordination.json
|
||||
```
|
||||
|
||||
### Reading Reports
|
||||
|
||||
```bash
|
||||
# Read a specific report
|
||||
cat .claude/agents/context/{session_id}/reports/security-analyzer.json | jq
|
||||
|
||||
# Get all summaries
|
||||
jq -r '.summary' .claude/agents/context/{session_id}/reports/*.json
|
||||
|
||||
# Find high-severity findings across all reports
|
||||
jq -s 'map(.findings[]) | map(select(.severity == "high"))' \\
|
||||
.claude/agents/context/{session_id}/reports/*.json
|
||||
```
|
||||
|
||||
### Monitoring Message Bus
|
||||
|
||||
```bash
|
||||
# Watch live events
|
||||
tail -f .claude/agents/context/{session_id}/messages.jsonl | jq
|
||||
|
||||
# Get events from specific agent
|
||||
jq 'select(.from == "security-analyzer")' .claude/agents/context/{session_id}/messages.jsonl
|
||||
|
||||
# Count events by type
|
||||
jq -s 'group_by(.type) | map({{type: .[0].type, count: length}})' \\
|
||||
.claude/agents/context/{session_id}/messages.jsonl
|
||||
```
|
||||
|
||||
## Error Handling
|
||||
|
||||
If any agent fails:
|
||||
|
||||
1. Check its status in coordination.json
|
||||
2. Review messages.jsonl for error events
|
||||
3. Look for partial report in reports/
|
||||
4. Decide whether to:
|
||||
- Retry the agent
|
||||
- Continue without it
|
||||
- Manual intervention needed
|
||||
|
||||
```bash
|
||||
# Check for failed agents
|
||||
jq '.agents | to_entries | map(select(.value.status == "failed"))' \\
|
||||
.claude/agents/context/{session_id}/coordination.json
|
||||
|
||||
# If agent failed, check its last message
|
||||
jq 'select(.from == "failed-agent-name") | select(.type == "error")' \\
|
||||
.claude/agents/context/{session_id}/messages.jsonl | tail -1
|
||||
```
|
||||
|
||||
## Success Criteria
|
||||
|
||||
Your coordination is successful when:
|
||||
|
||||
✅ All analysis agents completed
|
||||
✅ Findings were synthesized
|
||||
✅ Implementation agents generated artifacts
|
||||
✅ Validation agents confirmed everything works
|
||||
✅ Documentation is comprehensive
|
||||
✅ User can immediately use the automation
|
||||
|
||||
## Final Report to User
|
||||
|
||||
After everything is complete, report to the user:
|
||||
|
||||
```markdown
|
||||
## Automation System Complete! 🎉
|
||||
|
||||
### What Was Created
|
||||
|
||||
**Analysis Phase:**
|
||||
- Analyzed security, performance, code quality, dependencies, and documentation
|
||||
- Found [X] high-priority issues and [Y] optimization opportunities
|
||||
|
||||
**Generated Automation:**
|
||||
- **[N] Custom Agents**: Specialized for your project needs
|
||||
- **[N] Skills**: Auto-invoked for common patterns
|
||||
- **[N] Commands**: Quick shortcuts for frequent tasks
|
||||
- **[N] Hooks**: Workflow automation at key points
|
||||
- **[N] MCP Integrations**: Connected to external services
|
||||
|
||||
### How to Use
|
||||
|
||||
1. **Try an agent**: "Use the security-analyzer agent on src/"
|
||||
2. **Test a command**: /test-fix
|
||||
3. **Invoke a skill**: Describe a task matching a skill's purpose
|
||||
|
||||
### Documentation
|
||||
|
||||
- **Main Guide**: `.claude/AUTOMATION_README.md`
|
||||
- **Quick Reference**: `.claude/QUICK_REFERENCE.md`
|
||||
- **Session Details**: `.claude/agents/context/{session_id}/`
|
||||
|
||||
### Next Steps
|
||||
|
||||
1. Review generated automation
|
||||
2. Customize for your specific needs
|
||||
3. Run validation tests
|
||||
4. Start using in your workflow!
|
||||
|
||||
All agents communicated successfully through the ACP protocol. Check the session directory for full details on what happened.
|
||||
```
|
||||
|
||||
Remember: You're orchestrating a symphony of specialized agents. Your job is to ensure they work together harmoniously through the communication protocol!
|
||||
'''
|
||||
|
||||
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
|
||||
Path(output_path).write_text(content)
|
||||
print(f"Generated coordinator agent at {output_path}")
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description='Generate coordinator agent')
|
||||
parser.add_argument('--session-id', required=True, help='Session ID')
|
||||
parser.add_argument('--agents', required=True, help='Comma-separated list of agent names')
|
||||
parser.add_argument('--output', required=True, help='Output file path')
|
||||
|
||||
args = parser.parse_args()
|
||||
agents = [a.strip() for a in args.agents.split(',')]
|
||||
|
||||
generate_coordinator(args.session_id, agents, args.output)
|
||||
388
skills/meta-automation-architect/scripts/metrics_tracker.py
Normal file
388
skills/meta-automation-architect/scripts/metrics_tracker.py
Normal file
@@ -0,0 +1,388 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Metrics Tracker
|
||||
Tracks actual time saved vs estimates
|
||||
Measures real impact of automation
|
||||
"""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
class MetricsTracker:
|
||||
"""Tracks automation effectiveness metrics"""
|
||||
|
||||
def __init__(self, session_id: str, storage_path: str = None):
|
||||
self.session_id = session_id
|
||||
if storage_path:
|
||||
self.storage_path = Path(storage_path)
|
||||
else:
|
||||
self.storage_path = Path(f".claude/meta-automation/metrics/{session_id}.json")
|
||||
|
||||
self.storage_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
self.metrics = self._load_or_create()
|
||||
|
||||
def _load_or_create(self) -> Dict:
|
||||
"""Load existing metrics or create new"""
|
||||
if self.storage_path.exists():
|
||||
try:
|
||||
with open(self.storage_path, 'r') as f:
|
||||
return json.load(f)
|
||||
except:
|
||||
return self._create_new()
|
||||
return self._create_new()
|
||||
|
||||
def _create_new(self) -> Dict:
|
||||
"""Create new metrics structure"""
|
||||
return {
|
||||
'session_id': self.session_id,
|
||||
'created_at': datetime.now().isoformat(),
|
||||
'project_info': {},
|
||||
'automation_generated': {
|
||||
'agents': [],
|
||||
'skills': [],
|
||||
'commands': [],
|
||||
'hooks': []
|
||||
},
|
||||
'time_tracking': {
|
||||
'setup_time_minutes': 0,
|
||||
'estimated_time_saved_hours': 0,
|
||||
'actual_time_saved_hours': 0,
|
||||
'accuracy': 0
|
||||
},
|
||||
'usage_metrics': {
|
||||
'skills_run_count': {},
|
||||
'commands_run_count': {},
|
||||
'automation_frequency': []
|
||||
},
|
||||
'value_metrics': {
|
||||
'issues_prevented': 0,
|
||||
'quality_improvements': [],
|
||||
'deployment_count': 0,
|
||||
'test_failures_caught': 0
|
||||
},
|
||||
'cost_metrics': {
|
||||
'setup_cost': 0,
|
||||
'ongoing_cost': 0,
|
||||
'total_cost': 0
|
||||
},
|
||||
'user_feedback': {
|
||||
'satisfaction_ratings': [],
|
||||
'comments': [],
|
||||
'pain_points_resolved': []
|
||||
}
|
||||
}
|
||||
|
||||
def _save(self):
|
||||
"""Save metrics to disk"""
|
||||
with open(self.storage_path, 'w') as f:
|
||||
json.dump(self.metrics, f, indent=2)
|
||||
|
||||
def set_project_info(self, info: Dict):
|
||||
"""Set project information"""
|
||||
self.metrics['project_info'] = {
|
||||
**info,
|
||||
'recorded_at': datetime.now().isoformat()
|
||||
}
|
||||
self._save()
|
||||
|
||||
def record_automation_generated(self, category: str, items: List[str]):
|
||||
"""
|
||||
Record what automation was generated
|
||||
|
||||
Args:
|
||||
category: 'agents', 'skills', 'commands', 'hooks'
|
||||
items: List of generated items
|
||||
"""
|
||||
if category in self.metrics['automation_generated']:
|
||||
self.metrics['automation_generated'][category].extend(items)
|
||||
self._save()
|
||||
|
||||
def record_setup_time(self, minutes: int):
|
||||
"""Record time spent setting up automation"""
|
||||
self.metrics['time_tracking']['setup_time_minutes'] = minutes
|
||||
self._save()
|
||||
|
||||
def record_estimated_time_saved(self, hours: float):
|
||||
"""Record estimated time savings"""
|
||||
self.metrics['time_tracking']['estimated_time_saved_hours'] = hours
|
||||
self._save()
|
||||
|
||||
def record_actual_time_saved(self, hours: float, description: str):
|
||||
"""
|
||||
Record actual time saved from automation
|
||||
|
||||
Args:
|
||||
hours: Hours actually saved
|
||||
description: What was automated
|
||||
"""
|
||||
current = self.metrics['time_tracking']['actual_time_saved_hours']
|
||||
self.metrics['time_tracking']['actual_time_saved_hours'] = current + hours
|
||||
|
||||
# Calculate accuracy
|
||||
estimated = self.metrics['time_tracking']['estimated_time_saved_hours']
|
||||
if estimated > 0:
|
||||
actual = self.metrics['time_tracking']['actual_time_saved_hours']
|
||||
self.metrics['time_tracking']['accuracy'] = round((actual / estimated) * 100, 1)
|
||||
|
||||
# Track individual savings
|
||||
if 'time_savings_breakdown' not in self.metrics:
|
||||
self.metrics['time_savings_breakdown'] = []
|
||||
|
||||
self.metrics['time_savings_breakdown'].append({
|
||||
'hours_saved': hours,
|
||||
'description': description,
|
||||
'recorded_at': datetime.now().isoformat()
|
||||
})
|
||||
|
||||
self._save()
|
||||
|
||||
def record_skill_usage(self, skill_name: str):
|
||||
"""Record that a skill was used"""
|
||||
if skill_name not in self.metrics['usage_metrics']['skills_run_count']:
|
||||
self.metrics['usage_metrics']['skills_run_count'][skill_name] = 0
|
||||
|
||||
self.metrics['usage_metrics']['skills_run_count'][skill_name] += 1
|
||||
self._save()
|
||||
|
||||
def record_command_usage(self, command_name: str):
|
||||
"""Record that a command was used"""
|
||||
if command_name not in self.metrics['usage_metrics']['commands_run_count']:
|
||||
self.metrics['usage_metrics']['commands_run_count'][command_name] = 0
|
||||
|
||||
self.metrics['usage_metrics']['commands_run_count'][command_name] += 1
|
||||
self._save()
|
||||
|
||||
def record_issue_prevented(self, issue_type: str, description: str):
|
||||
"""Record that automation prevented an issue"""
|
||||
self.metrics['value_metrics']['issues_prevented'] += 1
|
||||
|
||||
if 'prevented_issues' not in self.metrics['value_metrics']:
|
||||
self.metrics['value_metrics']['prevented_issues'] = []
|
||||
|
||||
self.metrics['value_metrics']['prevented_issues'].append({
|
||||
'type': issue_type,
|
||||
'description': description,
|
||||
'prevented_at': datetime.now().isoformat()
|
||||
})
|
||||
|
||||
self._save()
|
||||
|
||||
def record_quality_improvement(self, metric: str, before: float, after: float):
|
||||
"""
|
||||
Record quality improvement
|
||||
|
||||
Args:
|
||||
metric: What improved (e.g., 'test_coverage', 'build_success_rate')
|
||||
before: Value before automation
|
||||
after: Value after automation
|
||||
"""
|
||||
improvement = {
|
||||
'metric': metric,
|
||||
'before': before,
|
||||
'after': after,
|
||||
'improvement_percent': round(((after - before) / before) * 100, 1) if before > 0 else 0,
|
||||
'recorded_at': datetime.now().isoformat()
|
||||
}
|
||||
|
||||
self.metrics['value_metrics']['quality_improvements'].append(improvement)
|
||||
self._save()
|
||||
|
||||
def record_user_feedback(self, rating: int, comment: str = None):
|
||||
"""
|
||||
Record user satisfaction
|
||||
|
||||
Args:
|
||||
rating: 1-5 rating
|
||||
comment: Optional comment
|
||||
"""
|
||||
self.metrics['user_feedback']['satisfaction_ratings'].append({
|
||||
'rating': rating,
|
||||
'comment': comment,
|
||||
'recorded_at': datetime.now().isoformat()
|
||||
})
|
||||
|
||||
self._save()
|
||||
|
||||
def get_roi(self) -> Dict:
|
||||
"""Calculate return on investment"""
|
||||
setup_time = self.metrics['time_tracking']['setup_time_minutes'] / 60 # hours
|
||||
actual_saved = self.metrics['time_tracking']['actual_time_saved_hours']
|
||||
|
||||
if setup_time == 0:
|
||||
return {
|
||||
'roi': 0,
|
||||
'message': 'No setup time recorded'
|
||||
}
|
||||
|
||||
roi = actual_saved / setup_time
|
||||
|
||||
return {
|
||||
'roi': round(roi, 1),
|
||||
'setup_hours': round(setup_time, 1),
|
||||
'saved_hours': round(actual_saved, 1),
|
||||
'net_gain_hours': round(actual_saved - setup_time, 1),
|
||||
'break_even_reached': actual_saved > setup_time
|
||||
}
|
||||
|
||||
def get_effectiveness(self) -> Dict:
|
||||
"""Calculate automation effectiveness"""
|
||||
generated = self.metrics['automation_generated']
|
||||
usage = self.metrics['usage_metrics']
|
||||
|
||||
total_generated = sum(len(items) for items in generated.values())
|
||||
total_used = (
|
||||
len(usage['skills_run_count']) +
|
||||
len(usage['commands_run_count'])
|
||||
)
|
||||
|
||||
if total_generated == 0:
|
||||
return {
|
||||
'effectiveness': 0,
|
||||
'message': 'No automation generated yet'
|
||||
}
|
||||
|
||||
effectiveness = (total_used / total_generated) * 100
|
||||
|
||||
return {
|
||||
'effectiveness_percent': round(effectiveness, 1),
|
||||
'total_generated': total_generated,
|
||||
'total_used': total_used,
|
||||
'unused': total_generated - total_used
|
||||
}
|
||||
|
||||
def get_summary(self) -> Dict:
|
||||
"""Get comprehensive metrics summary"""
|
||||
roi = self.get_roi()
|
||||
effectiveness = self.get_effectiveness()
|
||||
|
||||
avg_satisfaction = 0
|
||||
if self.metrics['user_feedback']['satisfaction_ratings']:
|
||||
ratings = [r['rating'] for r in self.metrics['user_feedback']['satisfaction_ratings']]
|
||||
avg_satisfaction = round(sum(ratings) / len(ratings), 1)
|
||||
|
||||
return {
|
||||
'session_id': self.session_id,
|
||||
'project': self.metrics['project_info'].get('project_type', 'unknown'),
|
||||
'automation_generated': {
|
||||
category: len(items)
|
||||
for category, items in self.metrics['automation_generated'].items()
|
||||
},
|
||||
'time_metrics': {
|
||||
'setup_time_hours': round(self.metrics['time_tracking']['setup_time_minutes'] / 60, 1),
|
||||
'estimated_saved_hours': self.metrics['time_tracking']['estimated_time_saved_hours'],
|
||||
'actual_saved_hours': self.metrics['time_tracking']['actual_time_saved_hours'],
|
||||
'accuracy': f"{self.metrics['time_tracking']['accuracy']}%",
|
||||
'net_gain_hours': roi.get('net_gain_hours', 0)
|
||||
},
|
||||
'roi': roi,
|
||||
'effectiveness': effectiveness,
|
||||
'value': {
|
||||
'issues_prevented': self.metrics['value_metrics']['issues_prevented'],
|
||||
'quality_improvements_count': len(self.metrics['value_metrics']['quality_improvements']),
|
||||
'average_satisfaction': avg_satisfaction
|
||||
},
|
||||
'most_used': {
|
||||
'skills': sorted(
|
||||
self.metrics['usage_metrics']['skills_run_count'].items(),
|
||||
key=lambda x: x[1],
|
||||
reverse=True
|
||||
)[:3],
|
||||
'commands': sorted(
|
||||
self.metrics['usage_metrics']['commands_run_count'].items(),
|
||||
key=lambda x: x[1],
|
||||
reverse=True
|
||||
)[:3]
|
||||
}
|
||||
}
|
||||
|
||||
def export_report(self) -> str:
|
||||
"""Export formatted metrics report"""
|
||||
summary = self.get_summary()
|
||||
|
||||
report = f"""
|
||||
# Automation Metrics Report
|
||||
**Session:** {summary['session_id']}
|
||||
**Project Type:** {summary['project']}
|
||||
|
||||
## Automation Generated
|
||||
- **Agents:** {summary['automation_generated']['agents']}
|
||||
- **Skills:** {summary['automation_generated']['skills']}
|
||||
- **Commands:** {summary['automation_generated']['commands']}
|
||||
- **Hooks:** {summary['automation_generated']['hooks']}
|
||||
|
||||
## Time Savings
|
||||
- **Setup Time:** {summary['time_metrics']['setup_time_hours']} hours
|
||||
- **Estimated Savings:** {summary['time_metrics']['estimated_saved_hours']} hours
|
||||
- **Actual Savings:** {summary['time_metrics']['actual_saved_hours']} hours
|
||||
- **Accuracy:** {summary['time_metrics']['accuracy']}
|
||||
- **Net Gain:** {summary['time_metrics']['net_gain_hours']} hours
|
||||
|
||||
## ROI
|
||||
- **Return on Investment:** {summary['roi']['roi']}x
|
||||
- **Break-Even:** {'✅ Yes' if summary['roi']['break_even_reached'] else '❌ Not yet'}
|
||||
|
||||
## Effectiveness
|
||||
- **Usage Rate:** {summary['effectiveness']['effectiveness_percent']}%
|
||||
- **Generated:** {summary['effectiveness']['total_generated']} items
|
||||
- **Actually Used:** {summary['effectiveness']['total_used']} items
|
||||
- **Unused:** {summary['effectiveness']['unused']} items
|
||||
|
||||
## Value Delivered
|
||||
- **Issues Prevented:** {summary['value']['issues_prevented']}
|
||||
- **Quality Improvements:** {summary['value']['quality_improvements_count']}
|
||||
- **User Satisfaction:** {summary['value']['average_satisfaction']}/5
|
||||
|
||||
## Most Used Automation
|
||||
"""
|
||||
|
||||
if summary['most_used']['skills']:
|
||||
report += "\n**Skills:**\n"
|
||||
for skill, count in summary['most_used']['skills']:
|
||||
report += f"- {skill}: {count} times\n"
|
||||
|
||||
if summary['most_used']['commands']:
|
||||
report += "\n**Commands:**\n"
|
||||
for cmd, count in summary['most_used']['commands']:
|
||||
report += f"- {cmd}: {count} times\n"
|
||||
|
||||
return report
|
||||
|
||||
# Example usage
|
||||
if __name__ == '__main__':
|
||||
tracker = MetricsTracker('test-session-123')
|
||||
|
||||
# Simulate automation setup
|
||||
tracker.set_project_info({
|
||||
'project_type': 'programming',
|
||||
'project_name': 'my-web-app'
|
||||
})
|
||||
|
||||
tracker.record_automation_generated('skills', ['security-scanner', 'test-generator'])
|
||||
tracker.record_automation_generated('commands', ['/security-check', '/generate-tests'])
|
||||
|
||||
tracker.record_setup_time(30) # 30 minutes to set up
|
||||
tracker.record_estimated_time_saved(50) # Estimated 50 hours saved
|
||||
|
||||
# Simulate usage over time
|
||||
tracker.record_skill_usage('security-scanner')
|
||||
tracker.record_skill_usage('security-scanner')
|
||||
tracker.record_skill_usage('test-generator')
|
||||
|
||||
tracker.record_command_usage('/security-check')
|
||||
|
||||
# Record actual time saved
|
||||
tracker.record_actual_time_saved(5, 'Security scan caught 3 vulnerabilities before deployment')
|
||||
tracker.record_actual_time_saved(8, 'Auto-generated 15 test scaffolds')
|
||||
|
||||
# Record quality improvements
|
||||
tracker.record_quality_improvement('test_coverage', 42, 75)
|
||||
|
||||
# Record issues prevented
|
||||
tracker.record_issue_prevented('security', 'SQL injection vulnerability caught')
|
||||
|
||||
# User feedback
|
||||
tracker.record_user_feedback(5, 'This saved me so much time!')
|
||||
|
||||
print(tracker.export_report())
|
||||
304
skills/meta-automation-architect/scripts/rollback_manager.py
Normal file
304
skills/meta-automation-architect/scripts/rollback_manager.py
Normal file
@@ -0,0 +1,304 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Rollback Manager
|
||||
Allows undoing automation if it's not helpful
|
||||
Creates backups and can restore to pre-automation state
|
||||
"""
|
||||
|
||||
import json
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
class RollbackManager:
|
||||
"""Manages rollback of automation changes"""
|
||||
|
||||
def __init__(self, session_id: str):
|
||||
self.session_id = session_id
|
||||
self.backup_dir = Path(f".claude/meta-automation/backups/{session_id}")
|
||||
self.manifest_path = self.backup_dir / "manifest.json"
|
||||
self.backup_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def create_backup(self, description: str = "Automation setup") -> str:
|
||||
"""
|
||||
Create backup before making changes
|
||||
|
||||
Args:
|
||||
description: What this backup is for
|
||||
|
||||
Returns:
|
||||
Backup ID
|
||||
"""
|
||||
backup_id = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
backup_path = self.backup_dir / backup_id
|
||||
|
||||
# Create backup manifest
|
||||
manifest = {
|
||||
'backup_id': backup_id,
|
||||
'session_id': self.session_id,
|
||||
'created_at': datetime.now().isoformat(),
|
||||
'description': description,
|
||||
'backed_up_files': [],
|
||||
'created_files': [], # Files that didn't exist before
|
||||
'can_rollback': True
|
||||
}
|
||||
|
||||
# Save manifest
|
||||
with open(self.manifest_path, 'w') as f:
|
||||
json.dump(manifest, f, indent=2)
|
||||
|
||||
return backup_id
|
||||
|
||||
def track_file_creation(self, file_path: str):
|
||||
"""
|
||||
Track that a file was created by automation
|
||||
|
||||
Args:
|
||||
file_path: Path to file that was created
|
||||
"""
|
||||
manifest = self._load_manifest()
|
||||
if manifest:
|
||||
if file_path not in manifest['created_files']:
|
||||
manifest['created_files'].append(file_path)
|
||||
self._save_manifest(manifest)
|
||||
|
||||
def backup_file_before_change(self, file_path: str):
|
||||
"""
|
||||
Backup a file before changing it
|
||||
|
||||
Args:
|
||||
file_path: Path to file to backup
|
||||
"""
|
||||
manifest = self._load_manifest()
|
||||
if not manifest:
|
||||
return
|
||||
|
||||
source = Path(file_path)
|
||||
if not source.exists():
|
||||
return
|
||||
|
||||
# Create backup
|
||||
backup_id = manifest['backup_id']
|
||||
backup_path = self.backup_dir / backup_id
|
||||
backup_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Preserve directory structure in backup
|
||||
rel_path = source.relative_to(Path.cwd()) if source.is_absolute() else source
|
||||
dest = backup_path / rel_path
|
||||
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
shutil.copy2(source, dest)
|
||||
|
||||
# Track in manifest
|
||||
if str(rel_path) not in manifest['backed_up_files']:
|
||||
manifest['backed_up_files'].append(str(rel_path))
|
||||
self._save_manifest(manifest)
|
||||
|
||||
def rollback(self) -> Dict:
|
||||
"""
|
||||
Rollback all automation changes
|
||||
|
||||
Returns:
|
||||
Summary of what was rolled back
|
||||
"""
|
||||
manifest = self._load_manifest()
|
||||
if not manifest:
|
||||
return {
|
||||
'success': False,
|
||||
'message': 'No backup found for this session'
|
||||
}
|
||||
|
||||
if not manifest['can_rollback']:
|
||||
return {
|
||||
'success': False,
|
||||
'message': 'Rollback already performed or backup corrupted'
|
||||
}
|
||||
|
||||
files_restored = []
|
||||
files_deleted = []
|
||||
errors = []
|
||||
|
||||
# Restore backed up files
|
||||
backup_id = manifest['backup_id']
|
||||
backup_path = self.backup_dir / backup_id
|
||||
|
||||
for file_path in manifest['backed_up_files']:
|
||||
try:
|
||||
source = backup_path / file_path
|
||||
dest = Path(file_path)
|
||||
|
||||
if source.exists():
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
shutil.copy2(source, dest)
|
||||
files_restored.append(file_path)
|
||||
else:
|
||||
errors.append(f"Backup not found: {file_path}")
|
||||
except Exception as e:
|
||||
errors.append(f"Error restoring {file_path}: {str(e)}")
|
||||
|
||||
# Delete files that were created
|
||||
for file_path in manifest['created_files']:
|
||||
try:
|
||||
path = Path(file_path)
|
||||
if path.exists():
|
||||
path.unlink()
|
||||
files_deleted.append(file_path)
|
||||
except Exception as e:
|
||||
errors.append(f"Error deleting {file_path}: {str(e)}")
|
||||
|
||||
# Mark as rolled back
|
||||
manifest['can_rollback'] = False
|
||||
manifest['rolled_back_at'] = datetime.now().isoformat()
|
||||
self._save_manifest(manifest)
|
||||
|
||||
return {
|
||||
'success': len(errors) == 0,
|
||||
'files_restored': files_restored,
|
||||
'files_deleted': files_deleted,
|
||||
'errors': errors,
|
||||
'summary': f"Restored {len(files_restored)} files, deleted {len(files_deleted)} files"
|
||||
}
|
||||
|
||||
def get_backup_info(self) -> Optional[Dict]:
|
||||
"""Get information about current backup"""
|
||||
manifest = self._load_manifest()
|
||||
if not manifest:
|
||||
return None
|
||||
|
||||
return {
|
||||
'backup_id': manifest['backup_id'],
|
||||
'created_at': manifest['created_at'],
|
||||
'description': manifest['description'],
|
||||
'backed_up_files_count': len(manifest['backed_up_files']),
|
||||
'created_files_count': len(manifest['created_files']),
|
||||
'can_rollback': manifest['can_rollback'],
|
||||
'total_changes': len(manifest['backed_up_files']) + len(manifest['created_files'])
|
||||
}
|
||||
|
||||
def preview_rollback(self) -> Dict:
|
||||
"""
|
||||
Preview what would be rolled back
|
||||
|
||||
Returns:
|
||||
Details of what would happen
|
||||
"""
|
||||
manifest = self._load_manifest()
|
||||
if not manifest:
|
||||
return {
|
||||
'can_rollback': False,
|
||||
'message': 'No backup found'
|
||||
}
|
||||
|
||||
return {
|
||||
'can_rollback': manifest['can_rollback'],
|
||||
'will_restore': manifest['backed_up_files'],
|
||||
'will_delete': manifest['created_files'],
|
||||
'total_changes': len(manifest['backed_up_files']) + len(manifest['created_files']),
|
||||
'created_at': manifest['created_at'],
|
||||
'description': manifest['description']
|
||||
}
|
||||
|
||||
def _load_manifest(self) -> Optional[Dict]:
|
||||
"""Load backup manifest"""
|
||||
if not self.manifest_path.exists():
|
||||
return None
|
||||
|
||||
try:
|
||||
with open(self.manifest_path, 'r') as f:
|
||||
return json.load(f)
|
||||
except:
|
||||
return None
|
||||
|
||||
def _save_manifest(self, manifest: Dict):
|
||||
"""Save backup manifest"""
|
||||
with open(self.manifest_path, 'w') as f:
|
||||
json.dump(manifest, f, indent=2)
|
||||
|
||||
# Convenience wrapper for use in skills
|
||||
class AutomationSnapshot:
|
||||
"""
|
||||
Context manager for creating automatic backups
|
||||
|
||||
Usage:
|
||||
with AutomationSnapshot(session_id, "Adding security checks") as snapshot:
|
||||
# Make changes
|
||||
create_new_file("skill.md")
|
||||
snapshot.track_creation("skill.md")
|
||||
|
||||
modify_file("existing.md")
|
||||
snapshot.track_modification("existing.md")
|
||||
|
||||
# Automatic backup created, can rollback later
|
||||
"""
|
||||
|
||||
def __init__(self, session_id: str, description: str):
|
||||
self.manager = RollbackManager(session_id)
|
||||
self.description = description
|
||||
|
||||
def __enter__(self):
|
||||
self.manager.create_backup(self.description)
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
# Nothing to do on exit
|
||||
pass
|
||||
|
||||
def track_creation(self, file_path: str):
|
||||
"""Track file creation"""
|
||||
self.manager.track_file_creation(file_path)
|
||||
|
||||
def track_modification(self, file_path: str):
|
||||
"""Track file modification (backs up before change)"""
|
||||
self.manager.backup_file_before_change(file_path)
|
||||
|
||||
# Example usage
|
||||
if __name__ == '__main__':
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
# Create test files
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
os.chdir(tmpdir)
|
||||
|
||||
# Create some test files
|
||||
Path("existing.txt").write_text("original content")
|
||||
|
||||
manager = RollbackManager("test-session")
|
||||
|
||||
print("Creating backup...")
|
||||
backup_id = manager.create_backup("Test automation setup")
|
||||
|
||||
# Simulate automation changes
|
||||
print("\nMaking changes...")
|
||||
manager.backup_file_before_change("existing.txt")
|
||||
Path("existing.txt").write_text("modified content")
|
||||
|
||||
Path("new_skill.md").write_text("# New Skill")
|
||||
manager.track_file_creation("new_skill.md")
|
||||
|
||||
Path("new_command.md").write_text("# New Command")
|
||||
manager.track_file_creation("new_command.md")
|
||||
|
||||
# Show backup info
|
||||
print("\nBackup info:")
|
||||
info = manager.get_backup_info()
|
||||
print(json.dumps(info, indent=2))
|
||||
|
||||
# Preview rollback
|
||||
print("\nRollback preview:")
|
||||
preview = manager.preview_rollback()
|
||||
print(json.dumps(preview, indent=2))
|
||||
|
||||
# Perform rollback
|
||||
print("\nPerforming rollback...")
|
||||
result = manager.rollback()
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
# Check files
|
||||
print("\nFiles after rollback:")
|
||||
print(f"existing.txt exists: {Path('existing.txt').exists()}")
|
||||
if Path("existing.txt").exists():
|
||||
print(f"existing.txt content: {Path('existing.txt').read_text()}")
|
||||
print(f"new_skill.md exists: {Path('new_skill.md').exists()}")
|
||||
print(f"new_command.md exists: {Path('new_command.md').exists()}")
|
||||
@@ -0,0 +1,94 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Simple Template Renderer
|
||||
Renders templates with variable substitution
|
||||
"""
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any
|
||||
|
||||
class TemplateRenderer:
|
||||
"""Simple template renderer using {{variable}} syntax"""
|
||||
|
||||
def __init__(self, template_dir: str = "templates"):
|
||||
self.template_dir = Path(__file__).parent.parent / template_dir
|
||||
|
||||
def render(self, template_name: str, context: Dict[str, Any]) -> str:
|
||||
"""
|
||||
Render a template with the given context
|
||||
|
||||
Args:
|
||||
template_name: Name of template file (e.g., 'agent-base.md.template')
|
||||
context: Dictionary of variables to substitute
|
||||
|
||||
Returns:
|
||||
Rendered template string
|
||||
"""
|
||||
template_path = self.template_dir / template_name
|
||||
|
||||
if not template_path.exists():
|
||||
raise FileNotFoundError(f"Template not found: {template_path}")
|
||||
|
||||
template_content = template_path.read_text(encoding='utf-8')
|
||||
|
||||
# Simple variable substitution using {{variable}} syntax
|
||||
def replace_var(match):
|
||||
var_name = match.group(1)
|
||||
value = context.get(var_name, f"{{{{MISSING: {var_name}}}}}")
|
||||
return str(value)
|
||||
|
||||
rendered = re.sub(r'\{\{(\w+)\}\}', replace_var, template_content)
|
||||
|
||||
return rendered
|
||||
|
||||
def render_to_file(self, template_name: str, context: Dict[str, Any], output_path: str) -> None:
|
||||
"""
|
||||
Render template and write to file
|
||||
|
||||
Args:
|
||||
template_name: Name of template file
|
||||
context: Dictionary of variables
|
||||
output_path: Where to write rendered output
|
||||
"""
|
||||
rendered = self.render(template_name, context)
|
||||
|
||||
output = Path(output_path)
|
||||
output.parent.mkdir(parents=True, exist_ok=True)
|
||||
output.write_text(rendered, encoding='utf-8')
|
||||
|
||||
def list_templates(self) -> list:
|
||||
"""List available templates"""
|
||||
if not self.template_dir.exists():
|
||||
return []
|
||||
|
||||
return [
|
||||
f.name for f in self.template_dir.iterdir()
|
||||
if f.is_file() and f.suffix == '.template'
|
||||
]
|
||||
|
||||
# Example usage
|
||||
if __name__ == '__main__':
|
||||
renderer = TemplateRenderer()
|
||||
|
||||
# Example: Render an agent
|
||||
context = {
|
||||
'agent_name': 'security-analyzer',
|
||||
'agent_title': 'Security Analyzer',
|
||||
'description': 'Analyzes code for security vulnerabilities',
|
||||
'tools': 'Read, Grep, Glob, Bash',
|
||||
'color': 'Red',
|
||||
'model': 'sonnet',
|
||||
'session_id': 'test-123',
|
||||
'mission': 'Find security vulnerabilities in the codebase',
|
||||
'process': '1. Scan for common patterns\n2. Check dependencies\n3. Review auth code'
|
||||
}
|
||||
|
||||
print("Available templates:")
|
||||
for template in renderer.list_templates():
|
||||
print(f" - {template}")
|
||||
|
||||
print("\nRendering example agent...")
|
||||
rendered = renderer.render('agent-base.md.template', context)
|
||||
print("\n" + "="*60)
|
||||
print(rendered[:500] + "...")
|
||||
307
skills/meta-automation-architect/scripts/user_preferences.py
Normal file
307
skills/meta-automation-architect/scripts/user_preferences.py
Normal file
@@ -0,0 +1,307 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
User Preference Learning
|
||||
Learns from user's choices to provide better recommendations over time
|
||||
"""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Optional
|
||||
from collections import defaultdict
|
||||
|
||||
class UserPreferences:
|
||||
"""Learns and stores user preferences for automation"""
|
||||
|
||||
def __init__(self, storage_path: str = ".claude/meta-automation/user_preferences.json"):
|
||||
self.storage_path = Path(storage_path)
|
||||
self.storage_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
self.preferences = self._load()
|
||||
|
||||
def _load(self) -> Dict:
|
||||
"""Load existing preferences or create new"""
|
||||
if self.storage_path.exists():
|
||||
try:
|
||||
with open(self.storage_path, 'r') as f:
|
||||
return json.load(f)
|
||||
except:
|
||||
return self._create_new()
|
||||
return self._create_new()
|
||||
|
||||
def _create_new(self) -> Dict:
|
||||
"""Create new preferences structure"""
|
||||
return {
|
||||
'version': '1.0',
|
||||
'created_at': datetime.now().isoformat(),
|
||||
'projects_analyzed': 0,
|
||||
'automation_mode_preferences': {
|
||||
'quick': 0,
|
||||
'focused': 0,
|
||||
'comprehensive': 0
|
||||
},
|
||||
'agent_usage': {},
|
||||
'skill_usage': {},
|
||||
'project_type_history': {},
|
||||
'time_saved_total': 0,
|
||||
'cost_spent_total': 0,
|
||||
'satisfaction_ratings': [],
|
||||
'most_valuable_automations': [],
|
||||
'rarely_used': [],
|
||||
'integration_preferences': {
|
||||
'focus_on_gaps': 0,
|
||||
'enhance_existing': 0,
|
||||
'independent': 0
|
||||
},
|
||||
'sessions': []
|
||||
}
|
||||
|
||||
def _save(self):
|
||||
"""Save preferences to disk"""
|
||||
with open(self.storage_path, 'w') as f:
|
||||
json.dump(self.preferences, f, indent=2)
|
||||
|
||||
def record_session(self, session_data: Dict):
|
||||
"""
|
||||
Record a new automation session
|
||||
|
||||
Args:
|
||||
session_data: {
|
||||
'session_id': str,
|
||||
'project_type': str,
|
||||
'mode': 'quick|focused|comprehensive',
|
||||
'agents_used': List[str],
|
||||
'skills_generated': List[str],
|
||||
'time_spent_minutes': int,
|
||||
'cost': float,
|
||||
'time_saved_estimate': int, # hours
|
||||
'user_satisfaction': int, # 1-5
|
||||
'integration_choice': str, # gaps|enhance|independent
|
||||
}
|
||||
"""
|
||||
# Update counts
|
||||
self.preferences['projects_analyzed'] += 1
|
||||
|
||||
# Record mode preference
|
||||
mode = session_data.get('mode', 'quick')
|
||||
self.preferences['automation_mode_preferences'][mode] += 1
|
||||
|
||||
# Record agent usage
|
||||
for agent in session_data.get('agents_used', []):
|
||||
if agent not in self.preferences['agent_usage']:
|
||||
self.preferences['agent_usage'][agent] = 0
|
||||
self.preferences['agent_usage'][agent] += 1
|
||||
|
||||
# Record skill usage
|
||||
for skill in session_data.get('skills_generated', []):
|
||||
if skill not in self.preferences['skill_usage']:
|
||||
self.preferences['skill_usage'][skill] = 0
|
||||
self.preferences['skill_usage'][skill] += 1
|
||||
|
||||
# Record project type
|
||||
project_type = session_data.get('project_type', 'unknown')
|
||||
if project_type not in self.preferences['project_type_history']:
|
||||
self.preferences['project_type_history'][project_type] = 0
|
||||
self.preferences['project_type_history'][project_type] += 1
|
||||
|
||||
# Track totals
|
||||
self.preferences['time_saved_total'] += session_data.get('time_saved_estimate', 0)
|
||||
self.preferences['cost_spent_total'] += session_data.get('cost', 0)
|
||||
|
||||
# Track satisfaction
|
||||
satisfaction = session_data.get('user_satisfaction')
|
||||
if satisfaction:
|
||||
self.preferences['satisfaction_ratings'].append({
|
||||
'session_id': session_data.get('session_id'),
|
||||
'rating': satisfaction,
|
||||
'date': datetime.now().isoformat()
|
||||
})
|
||||
|
||||
# Track integration preference
|
||||
integration = session_data.get('integration_choice')
|
||||
if integration in self.preferences['integration_preferences']:
|
||||
self.preferences['integration_preferences'][integration] += 1
|
||||
|
||||
# Store full session
|
||||
self.preferences['sessions'].append({
|
||||
**session_data,
|
||||
'recorded_at': datetime.now().isoformat()
|
||||
})
|
||||
|
||||
self._save()
|
||||
|
||||
def get_recommended_mode(self) -> str:
|
||||
"""Get recommended automation mode based on history"""
|
||||
prefs = self.preferences['automation_mode_preferences']
|
||||
|
||||
if self.preferences['projects_analyzed'] == 0:
|
||||
return 'quick' # Default for first-time users
|
||||
|
||||
# Return mode user uses most
|
||||
return max(prefs.items(), key=lambda x: x[1])[0]
|
||||
|
||||
def get_recommended_agents(self, project_type: str, count: int = 5) -> List[str]:
|
||||
"""Get recommended agents based on past usage and project type"""
|
||||
# Get agents user has used
|
||||
agent_usage = self.preferences['agent_usage']
|
||||
|
||||
if not agent_usage:
|
||||
# Default recommendations for new users
|
||||
defaults = {
|
||||
'programming': ['project-analyzer', 'security-analyzer', 'test-coverage-analyzer'],
|
||||
'academic_writing': ['project-analyzer', 'latex-structure-analyzer', 'citation-analyzer'],
|
||||
'educational': ['project-analyzer', 'learning-path-analyzer', 'assessment-analyzer'],
|
||||
}
|
||||
return defaults.get(project_type, ['project-analyzer'])
|
||||
|
||||
# Sort by usage count
|
||||
sorted_agents = sorted(agent_usage.items(), key=lambda x: x[1], reverse=True)
|
||||
|
||||
return [agent for agent, _ in sorted_agents[:count]]
|
||||
|
||||
def get_rarely_used(self) -> List[str]:
|
||||
"""Get agents/skills that user never finds valuable"""
|
||||
rarely_used = []
|
||||
|
||||
# Check for agents used only once or twice
|
||||
for agent, count in self.preferences['agent_usage'].items():
|
||||
if count <= 2 and self.preferences['projects_analyzed'] > 5:
|
||||
rarely_used.append(agent)
|
||||
|
||||
return rarely_used
|
||||
|
||||
def should_skip_agent(self, agent_name: str) -> bool:
|
||||
"""Check if this agent is rarely useful for this user"""
|
||||
rarely_used = self.get_rarely_used()
|
||||
return agent_name in rarely_used
|
||||
|
||||
def get_integration_preference(self) -> str:
|
||||
"""Get preferred integration approach"""
|
||||
prefs = self.preferences['integration_preferences']
|
||||
|
||||
if sum(prefs.values()) == 0:
|
||||
return 'focus_on_gaps' # Default
|
||||
|
||||
return max(prefs.items(), key=lambda x: x[1])[0]
|
||||
|
||||
def get_statistics(self) -> Dict:
|
||||
"""Get usage statistics"""
|
||||
total_sessions = self.preferences['projects_analyzed']
|
||||
|
||||
if total_sessions == 0:
|
||||
return {
|
||||
'total_sessions': 0,
|
||||
'message': 'No automation sessions yet'
|
||||
}
|
||||
|
||||
avg_satisfaction = 0
|
||||
if self.preferences['satisfaction_ratings']:
|
||||
avg_satisfaction = sum(r['rating'] for r in self.preferences['satisfaction_ratings']) / len(self.preferences['satisfaction_ratings'])
|
||||
|
||||
return {
|
||||
'total_sessions': total_sessions,
|
||||
'time_saved_total_hours': self.preferences['time_saved_total'],
|
||||
'cost_spent_total': round(self.preferences['cost_spent_total'], 2),
|
||||
'average_satisfaction': round(avg_satisfaction, 1),
|
||||
'preferred_mode': self.get_recommended_mode(),
|
||||
'most_used_agents': sorted(
|
||||
self.preferences['agent_usage'].items(),
|
||||
key=lambda x: x[1],
|
||||
reverse=True
|
||||
)[:5],
|
||||
'project_types': self.preferences['project_type_history'],
|
||||
'roi': round(self.preferences['time_saved_total'] / max(1, self.preferences['cost_spent_total'] * 60), 1)
|
||||
}
|
||||
|
||||
def get_recommendations_for_user(self, project_type: str) -> Dict:
|
||||
"""Get personalized recommendations"""
|
||||
stats = self.get_statistics()
|
||||
|
||||
if stats['total_sessions'] == 0:
|
||||
return {
|
||||
'recommended_mode': 'quick',
|
||||
'reason': 'First time - start with quick analysis to see how it works',
|
||||
'recommended_agents': ['project-analyzer'],
|
||||
'skip_agents': []
|
||||
}
|
||||
|
||||
return {
|
||||
'recommended_mode': self.get_recommended_mode(),
|
||||
'reason': f"You've used {self.get_recommended_mode()} mode {self.preferences['automation_mode_preferences'][self.get_recommended_mode()]} times",
|
||||
'recommended_agents': self.get_recommended_agents(project_type),
|
||||
'skip_agents': self.get_rarely_used(),
|
||||
'integration_preference': self.get_integration_preference(),
|
||||
'stats': {
|
||||
'total_time_saved': f"{stats['time_saved_total_hours']} hours",
|
||||
'average_satisfaction': stats.get('average_satisfaction', 0),
|
||||
'roi': f"{stats.get('roi', 0)}x return on investment"
|
||||
}
|
||||
}
|
||||
|
||||
def export_report(self) -> str:
|
||||
"""Export usage report"""
|
||||
stats = self.get_statistics()
|
||||
|
||||
report = f"""
|
||||
# Meta-Automation Usage Report
|
||||
|
||||
## Overview
|
||||
- **Total Sessions:** {stats['total_sessions']}
|
||||
- **Time Saved:** {stats.get('time_saved_total_hours', 0)} hours
|
||||
- **Cost Spent:** ${stats.get('cost_spent_total', 0):.2f}
|
||||
- **ROI:** {stats.get('roi', 0)}x (hours saved per dollar spent × 60)
|
||||
- **Avg Satisfaction:** {stats.get('average_satisfaction', 0)}/5
|
||||
|
||||
## Your Preferences
|
||||
- **Preferred Mode:** {stats.get('preferred_mode', 'quick')}
|
||||
- **Integration Style:** {self.get_integration_preference()}
|
||||
|
||||
## Most Used Agents
|
||||
"""
|
||||
|
||||
for agent, count in stats.get('most_used_agents', []):
|
||||
report += f"- {agent}: {count} times\n"
|
||||
|
||||
report += "\n## Project Types\n"
|
||||
for ptype, count in self.preferences['project_type_history'].items():
|
||||
report += f"- {ptype}: {count} projects\n"
|
||||
|
||||
return report
|
||||
|
||||
# Example usage
|
||||
if __name__ == '__main__':
|
||||
prefs = UserPreferences()
|
||||
|
||||
# Simulate some sessions
|
||||
print("Simulating usage...\n")
|
||||
|
||||
prefs.record_session({
|
||||
'session_id': 'session-1',
|
||||
'project_type': 'programming',
|
||||
'mode': 'quick',
|
||||
'agents_used': ['project-analyzer'],
|
||||
'skills_generated': [],
|
||||
'time_spent_minutes': 5,
|
||||
'cost': 0.03,
|
||||
'time_saved_estimate': 10,
|
||||
'user_satisfaction': 4,
|
||||
'integration_choice': 'focus_on_gaps'
|
||||
})
|
||||
|
||||
prefs.record_session({
|
||||
'session_id': 'session-2',
|
||||
'project_type': 'programming',
|
||||
'mode': 'focused',
|
||||
'agents_used': ['project-analyzer', 'security-analyzer', 'test-coverage-analyzer'],
|
||||
'skills_generated': ['security-scanner', 'test-generator'],
|
||||
'time_spent_minutes': 8,
|
||||
'cost': 0.09,
|
||||
'time_saved_estimate': 50,
|
||||
'user_satisfaction': 5,
|
||||
'integration_choice': 'focus_on_gaps'
|
||||
})
|
||||
|
||||
print(prefs.export_report())
|
||||
|
||||
print("\nRecommendations for next programming project:")
|
||||
recs = prefs.get_recommendations_for_user('programming')
|
||||
print(json.dumps(recs, indent=2))
|
||||
Reference in New Issue
Block a user