Initial commit

This commit is contained in:
Zhongwei Li
2025-11-29 18:47:33 +08:00
commit 5edac65f28
21 changed files with 6893 additions and 0 deletions

View File

@@ -0,0 +1 @@
# Utility modules for Bubble Tea maintenance agent

328
scripts/utils/go_parser.py Normal file
View File

@@ -0,0 +1,328 @@
#!/usr/bin/env python3
"""
Go code parser utilities for Bubble Tea maintenance agent.
Extracts models, functions, types, and code structure.
"""
import re
from typing import Dict, List, Tuple, Optional
from pathlib import Path
def extract_model_struct(content: str) -> Optional[Dict[str, any]]:
"""Extract the main model struct from Go code."""
# Pattern: type XxxModel struct { ... }
pattern = r'type\s+(\w*[Mm]odel)\s+struct\s*\{([^}]+)\}'
match = re.search(pattern, content, re.DOTALL)
if not match:
return None
model_name = match.group(1)
model_body = match.group(2)
# Parse fields
fields = []
for line in model_body.split('\n'):
line = line.strip()
if not line or line.startswith('//'):
continue
# Parse field: name type [tag]
field_match = re.match(r'(\w+)\s+([^\s`]+)(?:\s+`([^`]+)`)?', line)
if field_match:
fields.append({
"name": field_match.group(1),
"type": field_match.group(2),
"tag": field_match.group(3) if field_match.group(3) else None
})
return {
"name": model_name,
"fields": fields,
"field_count": len(fields),
"raw_body": model_body
}
def extract_update_function(content: str) -> Optional[Dict[str, any]]:
"""Extract the Update() function."""
# Find Update function
pattern = r'func\s+\((\w+)\s+(\*?)(\w+)\)\s+Update\s*\([^)]*\)\s*\([^)]*\)\s*\{(.+?)(?=\nfunc\s|\Z)'
match = re.search(pattern, content, re.DOTALL | re.MULTILINE)
if not match:
return None
receiver_name = match.group(1)
is_pointer = match.group(2) == '*'
receiver_type = match.group(3)
function_body = match.group(4)
# Count cases in switch statements
case_count = len(re.findall(r'\bcase\s+', function_body))
# Find message types handled
handled_messages = re.findall(r'case\s+(\w+\.?\w*):', function_body)
return {
"receiver_name": receiver_name,
"receiver_type": receiver_type,
"is_pointer_receiver": is_pointer,
"body_lines": len(function_body.split('\n')),
"case_count": case_count,
"handled_messages": list(set(handled_messages)),
"raw_body": function_body
}
def extract_view_function(content: str) -> Optional[Dict[str, any]]:
"""Extract the View() function."""
pattern = r'func\s+\((\w+)\s+(\*?)(\w+)\)\s+View\s*\(\s*\)\s+string\s*\{(.+?)(?=\nfunc\s|\Z)'
match = re.search(pattern, content, re.DOTALL | re.MULTILINE)
if not match:
return None
receiver_name = match.group(1)
is_pointer = match.group(2) == '*'
receiver_type = match.group(3)
function_body = match.group(4)
# Analyze complexity
string_concat_count = len(re.findall(r'\+\s*"', function_body))
lipgloss_calls = len(re.findall(r'lipgloss\.', function_body))
return {
"receiver_name": receiver_name,
"receiver_type": receiver_type,
"is_pointer_receiver": is_pointer,
"body_lines": len(function_body.split('\n')),
"string_concatenations": string_concat_count,
"lipgloss_calls": lipgloss_calls,
"raw_body": function_body
}
def extract_init_function(content: str) -> Optional[Dict[str, any]]:
"""Extract the Init() function."""
pattern = r'func\s+\((\w+)\s+(\*?)(\w+)\)\s+Init\s*\(\s*\)\s+tea\.Cmd\s*\{(.+?)(?=\nfunc\s|\Z)'
match = re.search(pattern, content, re.DOTALL | re.MULTILINE)
if not match:
return None
receiver_name = match.group(1)
is_pointer = match.group(2) == '*'
receiver_type = match.group(3)
function_body = match.group(4)
return {
"receiver_name": receiver_name,
"receiver_type": receiver_type,
"is_pointer_receiver": is_pointer,
"body_lines": len(function_body.split('\n')),
"raw_body": function_body
}
def extract_custom_messages(content: str) -> List[Dict[str, any]]:
"""Extract custom message type definitions."""
# Pattern: type xxxMsg struct { ... }
pattern = r'type\s+(\w+Msg)\s+struct\s*\{([^}]*)\}'
matches = re.finditer(pattern, content, re.DOTALL)
messages = []
for match in matches:
msg_name = match.group(1)
msg_body = match.group(2)
# Parse fields
fields = []
for line in msg_body.split('\n'):
line = line.strip()
if not line or line.startswith('//'):
continue
field_match = re.match(r'(\w+)\s+([^\s]+)', line)
if field_match:
fields.append({
"name": field_match.group(1),
"type": field_match.group(2)
})
messages.append({
"name": msg_name,
"fields": fields,
"field_count": len(fields)
})
return messages
def extract_tea_commands(content: str) -> List[Dict[str, any]]:
"""Extract tea.Cmd functions."""
# Pattern: func xxxCmd() tea.Msg { ... }
pattern = r'func\s+(\w+)\s*\(\s*\)\s+tea\.Msg\s*\{(.+?)^\}'
matches = re.finditer(pattern, content, re.DOTALL | re.MULTILINE)
commands = []
for match in matches:
cmd_name = match.group(1)
cmd_body = match.group(2)
# Check for blocking operations
has_http = bool(re.search(r'\bhttp\.(Get|Post|Do)', cmd_body))
has_sleep = bool(re.search(r'time\.Sleep', cmd_body))
has_io = bool(re.search(r'\bos\.(Open|Read|Write)', cmd_body))
commands.append({
"name": cmd_name,
"body_lines": len(cmd_body.split('\n')),
"has_http": has_http,
"has_sleep": has_sleep,
"has_io": has_io,
"is_blocking": has_http or has_io # sleep is expected in commands
})
return commands
def extract_imports(content: str) -> List[str]:
"""Extract import statements."""
imports = []
# Single import
single_pattern = r'import\s+"([^"]+)"'
imports.extend(re.findall(single_pattern, content))
# Multi-line import block
block_pattern = r'import\s+\(([^)]+)\)'
block_matches = re.finditer(block_pattern, content, re.DOTALL)
for match in block_matches:
block_content = match.group(1)
# Extract quoted imports
quoted = re.findall(r'"([^"]+)"', block_content)
imports.extend(quoted)
return list(set(imports))
def find_bubbletea_components(content: str) -> List[Dict[str, any]]:
"""Find usage of Bubble Tea components (list, viewport, etc.)."""
components = []
component_patterns = {
"list": r'list\.Model',
"viewport": r'viewport\.Model',
"textinput": r'textinput\.Model',
"textarea": r'textarea\.Model',
"table": r'table\.Model',
"progress": r'progress\.Model',
"spinner": r'spinner\.Model',
"timer": r'timer\.Model',
"stopwatch": r'stopwatch\.Model',
"filepicker": r'filepicker\.Model',
"paginator": r'paginator\.Model',
}
for comp_name, pattern in component_patterns.items():
if re.search(pattern, content):
# Count occurrences
count = len(re.findall(pattern, content))
components.append({
"component": comp_name,
"occurrences": count
})
return components
def analyze_code_structure(file_path: Path) -> Dict[str, any]:
"""Comprehensive code structure analysis."""
try:
content = file_path.read_text()
except Exception as e:
return {"error": str(e)}
return {
"model": extract_model_struct(content),
"update": extract_update_function(content),
"view": extract_view_function(content),
"init": extract_init_function(content),
"custom_messages": extract_custom_messages(content),
"tea_commands": extract_tea_commands(content),
"imports": extract_imports(content),
"components": find_bubbletea_components(content),
"file_size": len(content),
"line_count": len(content.split('\n')),
"uses_lipgloss": '"github.com/charmbracelet/lipgloss"' in content,
"uses_bubbletea": '"github.com/charmbracelet/bubbletea"' in content
}
def find_function_by_name(content: str, func_name: str) -> Optional[str]:
"""Find a specific function by name and return its body."""
pattern = rf'func\s+(?:\([^)]+\)\s+)?{func_name}\s*\([^)]*\)[^{{]*\{{(.+?)(?=\nfunc\s|\Z)'
match = re.search(pattern, content, re.DOTALL | re.MULTILINE)
if match:
return match.group(1)
return None
def extract_state_machine_states(content: str) -> Optional[Dict[str, any]]:
"""Extract state machine enum if present."""
# Pattern: type xxxState int; const ( state1 state2 = iota ... )
state_type_pattern = r'type\s+(\w+State)\s+(int|string)'
state_type_match = re.search(state_type_pattern, content)
if not state_type_match:
return None
state_type = state_type_match.group(1)
# Find const block with iota
const_pattern = rf'const\s+\(([^)]+)\)'
const_matches = re.finditer(const_pattern, content, re.DOTALL)
states = []
for const_match in const_matches:
const_body = const_match.group(1)
if state_type in const_body and 'iota' in const_body:
# Extract state names
state_names = re.findall(rf'(\w+)\s+{state_type}', const_body)
states = state_names
break
return {
"type": state_type,
"states": states,
"count": len(states)
}
# Example usage and testing
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: go_parser.py <go_file>")
sys.exit(1)
file_path = Path(sys.argv[1])
result = analyze_code_structure(file_path)
import json
print(json.dumps(result, indent=2))

View File

@@ -0,0 +1 @@
# Validator modules for Bubble Tea maintenance agent

View File

@@ -0,0 +1,349 @@
#!/usr/bin/env python3
"""
Common validation utilities for Bubble Tea maintenance agent.
"""
from typing import Dict, List, Any, Optional
def validate_result_structure(result: Dict[str, Any], required_keys: List[str]) -> Dict[str, Any]:
"""
Validate that a result dictionary has required keys.
Args:
result: Result dictionary to validate
required_keys: List of required key names
Returns:
Validation dict with status, summary, and checks
"""
if 'error' in result:
return {
"status": "error",
"summary": result['error'],
"valid": False
}
checks = {}
for key in required_keys:
checks[f"has_{key}"] = key in result and result[key] is not None
all_pass = all(checks.values())
status = "pass" if all_pass else "fail"
summary = "Validation passed" if all_pass else f"Missing required keys: {[k for k, v in checks.items() if not v]}"
return {
"status": status,
"summary": summary,
"checks": checks,
"valid": all_pass
}
def validate_issue_list(issues: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Validate a list of issues has proper structure.
Expected issue structure:
- severity: CRITICAL, HIGH, WARNING, or INFO
- category: performance, layout, reliability, etc.
- issue: Description
- location: File path and line number
- explanation: Why it's a problem
- fix: How to fix it
"""
if not isinstance(issues, list):
return {
"status": "error",
"summary": "Issues must be a list",
"valid": False
}
required_fields = ["severity", "issue", "location", "explanation"]
valid_severities = ["CRITICAL", "HIGH", "MEDIUM", "WARNING", "LOW", "INFO"]
checks = {
"is_list": True,
"all_have_severity": True,
"valid_severity_values": True,
"all_have_issue": True,
"all_have_location": True,
"all_have_explanation": True
}
for issue in issues:
if not isinstance(issue, dict):
checks["is_list"] = False
continue
if "severity" not in issue:
checks["all_have_severity"] = False
elif issue["severity"] not in valid_severities:
checks["valid_severity_values"] = False
if "issue" not in issue or not issue["issue"]:
checks["all_have_issue"] = False
if "location" not in issue or not issue["location"]:
checks["all_have_location"] = False
if "explanation" not in issue or not issue["explanation"]:
checks["all_have_explanation"] = False
all_pass = all(checks.values())
status = "pass" if all_pass else "warning"
failed = [k for k, v in checks.items() if not v]
summary = "All issues properly structured" if all_pass else f"Issues have problems: {failed}"
return {
"status": status,
"summary": summary,
"checks": checks,
"valid": all_pass,
"issue_count": len(issues)
}
def validate_score(score: int, min_val: int = 0, max_val: int = 100) -> bool:
"""Validate a numeric score is in range."""
return isinstance(score, (int, float)) and min_val <= score <= max_val
def validate_health_score(health_score: int) -> Dict[str, Any]:
"""Validate health score and categorize."""
if not validate_score(health_score):
return {
"status": "error",
"summary": "Invalid health score",
"valid": False
}
if health_score >= 90:
category = "excellent"
status = "pass"
elif health_score >= 75:
category = "good"
status = "pass"
elif health_score >= 60:
category = "fair"
status = "warning"
elif health_score >= 40:
category = "poor"
status = "warning"
else:
category = "critical"
status = "critical"
return {
"status": status,
"summary": f"{category.capitalize()} health ({health_score}/100)",
"category": category,
"valid": True,
"score": health_score
}
def validate_file_path(file_path: str) -> bool:
"""Validate file path format."""
from pathlib import Path
try:
path = Path(file_path)
return path.exists()
except Exception:
return False
def validate_best_practices_compliance(compliance: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
"""Validate best practices compliance structure."""
if not isinstance(compliance, dict):
return {
"status": "error",
"summary": "Compliance must be a dictionary",
"valid": False
}
required_tip_fields = ["status", "score", "message"]
valid_statuses = ["pass", "fail", "warning", "info"]
checks = {
"has_tips": len(compliance) > 0,
"all_tips_valid": True,
"valid_statuses": True,
"valid_scores": True
}
for tip_name, tip_data in compliance.items():
if not isinstance(tip_data, dict):
checks["all_tips_valid"] = False
continue
for field in required_tip_fields:
if field not in tip_data:
checks["all_tips_valid"] = False
if tip_data.get("status") not in valid_statuses:
checks["valid_statuses"] = False
if not validate_score(tip_data.get("score", -1)):
checks["valid_scores"] = False
all_pass = all(checks.values())
status = "pass" if all_pass else "warning"
return {
"status": status,
"summary": f"Validated {len(compliance)} tips",
"checks": checks,
"valid": all_pass,
"tip_count": len(compliance)
}
def validate_bottlenecks(bottlenecks: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Validate performance bottleneck list."""
if not isinstance(bottlenecks, list):
return {
"status": "error",
"summary": "Bottlenecks must be a list",
"valid": False
}
required_fields = ["severity", "category", "issue", "location", "explanation", "fix"]
valid_severities = ["CRITICAL", "HIGH", "MEDIUM", "LOW"]
valid_categories = ["performance", "memory", "io", "rendering"]
checks = {
"is_list": True,
"all_have_severity": True,
"valid_severities": True,
"all_have_category": True,
"valid_categories": True,
"all_have_fix": True
}
for bottleneck in bottlenecks:
if not isinstance(bottleneck, dict):
checks["is_list"] = False
continue
if "severity" not in bottleneck:
checks["all_have_severity"] = False
elif bottleneck["severity"] not in valid_severities:
checks["valid_severities"] = False
if "category" not in bottleneck:
checks["all_have_category"] = False
elif bottleneck["category"] not in valid_categories:
checks["valid_categories"] = False
if "fix" not in bottleneck or not bottleneck["fix"]:
checks["all_have_fix"] = False
all_pass = all(checks.values())
status = "pass" if all_pass else "warning"
return {
"status": status,
"summary": f"Validated {len(bottlenecks)} bottlenecks",
"checks": checks,
"valid": all_pass,
"bottleneck_count": len(bottlenecks)
}
def validate_architecture_analysis(result: Dict[str, Any]) -> Dict[str, Any]:
"""Validate architecture analysis result."""
required_keys = ["current_pattern", "complexity_score", "recommended_pattern", "refactoring_steps"]
checks = {}
for key in required_keys:
checks[f"has_{key}"] = key in result and result[key] is not None
# Validate complexity score
if "complexity_score" in result:
checks["valid_complexity_score"] = validate_score(result["complexity_score"])
else:
checks["valid_complexity_score"] = False
# Validate refactoring steps
if "refactoring_steps" in result:
checks["has_refactoring_steps"] = isinstance(result["refactoring_steps"], list) and len(result["refactoring_steps"]) > 0
else:
checks["has_refactoring_steps"] = False
all_pass = all(checks.values())
status = "pass" if all_pass else "warning"
return {
"status": status,
"summary": "Architecture analysis validated" if all_pass else "Architecture analysis incomplete",
"checks": checks,
"valid": all_pass
}
def validate_layout_fixes(fixes: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Validate layout fix list."""
if not isinstance(fixes, list):
return {
"status": "error",
"summary": "Fixes must be a list",
"valid": False
}
required_fields = ["location", "original", "fixed", "explanation"]
checks = {
"is_list": True,
"all_have_location": True,
"all_have_explanation": True,
"all_have_fix": True
}
for fix in fixes:
if not isinstance(fix, dict):
checks["is_list"] = False
continue
if "location" not in fix or not fix["location"]:
checks["all_have_location"] = False
if "explanation" not in fix or not fix["explanation"]:
checks["all_have_explanation"] = False
if "fixed" not in fix or not fix["fixed"]:
checks["all_have_fix"] = False
all_pass = all(checks.values())
status = "pass" if all_pass else "warning"
return {
"status": status,
"summary": f"Validated {len(fixes)} fixes",
"checks": checks,
"valid": all_pass,
"fix_count": len(fixes)
}
# Example usage
if __name__ == "__main__":
# Test validation functions
test_issues = [
{
"severity": "CRITICAL",
"category": "performance",
"issue": "Blocking operation",
"location": "main.go:45",
"explanation": "HTTP call blocks event loop",
"fix": "Move to tea.Cmd"
}
]
result = validate_issue_list(test_issues)
print(f"Issue validation: {result}")
health_result = validate_health_score(75)
print(f"Health validation: {health_result}")