Initial commit

This commit is contained in:
Zhongwei Li
2025-11-29 17:57:23 +08:00
commit 06f0dc99da
10 changed files with 1701 additions and 0 deletions

32
hooks/hooks.json Normal file
View File

@@ -0,0 +1,32 @@
{
"hooks": {
"PostToolUse": [
{
"matcher": "Write|Edit|MultiEdit",
"hooks": [
{
"type": "command",
"command": "${CLAUDE_PLUGIN_ROOT}/hooks/scripts/code-quality-reporter.py",
"description": "Report code quality metrics"
},
{
"type": "command",
"command": "${CLAUDE_PLUGIN_ROOT}/hooks/scripts/universal-linter.py",
"description": "Universal code linting"
}
]
}
],
"SessionStart": [
{
"hooks": [
{
"type": "command",
"command": "${CLAUDE_PLUGIN_ROOT}/hooks/scripts/pnpm-enforcer.py",
"description": "Enforce pnpm usage"
}
]
}
]
}
}

View File

@@ -0,0 +1,382 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.10"
# dependencies = []
# ///
import json
import sys
from datetime import datetime
from pathlib import Path
from typing import Any
class CodeQualityReporter:
def __init__(self):
self.session_file = Path(__file__).parent / ".session-quality.json"
self.reports_dir = Path.cwd() / "docs" / "reports"
self.ensure_reports_directory()
self.load_session()
def ensure_reports_directory(self):
"""Ensure reports directory exists"""
try:
self.reports_dir.mkdir(parents=True, exist_ok=True)
except Exception:
# Silently fail - don't interrupt the workflow
pass
def load_session(self):
"""Load or initialize session data"""
try:
if self.session_file.exists():
with open(self.session_file, encoding="utf-8") as f:
data = json.load(f)
# Convert list back to set for filesModified
self.session = data
if isinstance(data.get("filesModified"), list):
self.session["filesModified"] = set(data["filesModified"])
else:
self.session["filesModified"] = set()
else:
self.session = self.create_new_session()
except Exception:
self.session = self.create_new_session()
def create_new_session(self) -> dict[str, Any]:
"""Create a new session"""
return {
"startTime": datetime.now().isoformat(),
"filesModified": set(),
"violations": [],
"improvements": [],
"statistics": {
"totalFiles": 0,
"totalViolations": 0,
"blockedOperations": 0,
"autoFixed": 0,
},
}
def process_event(self, input_data: dict[str, Any]) -> dict[str, str] | None:
"""Process hook event"""
event = input_data.get("event")
tool_name = input_data.get("tool_name")
tool_input = input_data.get("tool_input", {})
message = input_data.get("message")
file_path = tool_input.get("file_path")
# Security: Validate file path
if file_path:
try:
resolved_path = Path(file_path).resolve()
cwd = Path.cwd()
# Ensure the path is within the current working directory
resolved_path.relative_to(cwd)
except (ValueError, OSError):
return {"message": "Invalid or unsafe file path detected"}
# Track file modifications
if file_path and tool_name in ["Write", "Edit", "MultiEdit", "Task"]:
self.session["filesModified"].add(file_path)
self.session["statistics"]["totalFiles"] += 1
# Track violations and improvements
if message:
if "" in message:
self.session["statistics"]["blockedOperations"] += 1
self.record_violation(message, file_path)
elif "⚠️" in message:
self.session["statistics"]["totalViolations"] += 1
self.record_violation(message, file_path)
elif "" in message and "organized" in message:
self.session["statistics"]["autoFixed"] += 1
self.record_improvement(message, file_path)
# Save session data
self.save_session()
# Generate report on Stop event
if event == "Stop":
return self.generate_report()
return None
def record_violation(self, message: str, file_path: str | None):
"""Record a violation"""
lines = message.split("\n")
violations = [
line.strip()[2:] # Remove '- '
for line in lines
if ":" in line and line.strip().startswith("-")
]
for violation in violations:
self.session["violations"].append(
{
"file": file_path or "unknown",
"issue": violation,
"timestamp": datetime.now().isoformat(),
}
)
def record_improvement(self, message: str, file_path: str | None):
"""Record an improvement"""
self.session["improvements"].append(
{
"file": file_path or "unknown",
"action": message.split("\n")[0],
"timestamp": datetime.now().isoformat(),
}
)
def save_session(self):
"""Save session data"""
try:
# Convert Set to List for JSON serialization
session_data = {
**self.session,
"filesModified": list(self.session["filesModified"]),
}
with open(self.session_file, "w", encoding="utf-8") as f:
json.dump(session_data, f, indent=2)
except Exception:
# Silently fail - don't interrupt the workflow
pass
def generate_report(self) -> dict[str, str]:
"""Generate quality report"""
duration = self.calculate_duration()
top_issues = self.get_top_issues()
file_stats = self.get_file_statistics()
report = [
"# Code Quality Session Report",
"",
f"**Duration:** {duration} ",
f'**Files Modified:** {len(self.session["filesModified"])} ',
f"**Generated:** {datetime.now().isoformat()}",
"",
"## Statistics",
"",
f'- **Total Operations:** {self.session["statistics"]["totalFiles"]}',
f'- **Violations Found:** {self.session["statistics"]["totalViolations"]}',
f'- **Operations Blocked:** {self.session["statistics"]["blockedOperations"]}',
f'- **Auto-fixes Applied:** {self.session["statistics"]["autoFixed"]}',
"",
]
if top_issues:
report.extend(["## Top Issues", ""])
for issue in top_issues:
report.append(f'- **{issue["type"]}** ({issue["count"]} occurrences)')
report.append("")
if self.session["improvements"]:
report.extend(["## Improvements Made", ""])
for imp in self.session["improvements"][:5]:
report.append(f'- **{Path(imp["file"]).name}:** {imp["action"]}')
report.append("")
if file_stats["mostProblematic"]:
report.extend(["## Files Needing Attention", ""])
for file in file_stats["mostProblematic"]:
report.append(f'- **{file["path"]}** ({file["issues"]} issues)')
report.append("")
report.extend(["## Recommendations", ""])
for rec in self.get_recommendations():
report.append(f'- {rec.lstrip("- ")}')
report.extend(
[
"",
"## Reference",
"",
"For detailed coding standards, see: [docs/architecture/coding-standards.md](../architecture/coding-standards.md)",
]
)
# Save report to file with proper naming
self.save_report_to_file("\n".join(report))
# Clean up session file
self.cleanup()
return {"message": "📊 Code quality session report generated"}
def save_report_to_file(self, report_content: str):
"""Save report to file with proper kebab-case naming"""
try:
timestamp = datetime.now().isoformat()[:19].replace(":", "-")
filename = f"code-quality-session-{timestamp}.md"
filepath = self.reports_dir / filename
with open(filepath, "w", encoding="utf-8") as f:
f.write(report_content)
print(f"📁 Report saved: docs/reports/{filename}", file=sys.stderr)
except Exception as error:
print(f"⚠️ Failed to save report: {error}", file=sys.stderr)
def calculate_duration(self) -> str:
"""Calculate session duration"""
start = datetime.fromisoformat(self.session["startTime"])
end = datetime.now()
diff = end - start
hours = int(diff.total_seconds() // 3600)
minutes = int((diff.total_seconds() % 3600) // 60)
if hours > 0:
return f"{hours}h {minutes}m"
return f"{minutes}m"
def get_top_issues(self) -> list[dict[str, Any]]:
"""Get top issues by frequency"""
issue_counts = {}
for violation in self.session["violations"]:
issue_type = violation["issue"].split(":")[0]
issue_counts[issue_type] = issue_counts.get(issue_type, 0) + 1
return sorted(
[{"type": type_, "count": count} for type_, count in issue_counts.items()],
key=lambda x: x["count"],
reverse=True,
)[:5]
def get_file_statistics(self) -> dict[str, list[dict[str, Any]]]:
"""Get file statistics"""
file_issues = {}
for violation in self.session["violations"]:
if violation["file"] and violation["file"] != "unknown":
file_issues[violation["file"]] = (
file_issues.get(violation["file"], 0) + 1
)
most_problematic = sorted(
[
{"path": Path(path).name, "issues": issues}
for path, issues in file_issues.items()
],
key=lambda x: x["issues"],
reverse=True,
)[:3]
return {"mostProblematic": most_problematic}
def get_recommendations(self) -> list[str]:
"""Generate recommendations based on findings"""
recommendations = []
top_issues = self.get_top_issues()
# Check for specific issue patterns
has_any_type = any("Any Type" in issue["type"] for issue in top_issues)
has_var = any("Var" in issue["type"] for issue in top_issues)
has_null_safety = any("Null Safety" in issue["type"] for issue in top_issues)
if has_any_type:
recommendations.extend(
[
' - Replace "any" types with "unknown" or specific types',
" - Run: pnpm typecheck to identify type issues",
]
)
if has_var:
recommendations.extend(
[
' - Use "const" or "let" instead of "var"',
" - Enable no-var ESLint rule for automatic detection",
]
)
if has_null_safety:
recommendations.extend(
[
" - Use optional chaining (?.) for nullable values",
" - Add null checks before property access",
]
)
if self.session["statistics"]["blockedOperations"] > 0:
recommendations.extend(
[
" - Review blocked operations and fix violations",
" - Run: pnpm biome:check for comprehensive linting",
]
)
if not recommendations:
recommendations.extend(
[
" - Great job! Continue following coding standards",
" - Consider running: pnpm code-quality for full validation",
]
)
return recommendations
def cleanup(self):
"""Clean up session data"""
try:
if self.session_file.exists():
self.session_file.unlink()
except Exception:
# Silently fail
pass
def main():
"""Main execution"""
try:
input_data = json.load(sys.stdin)
# Comprehensive logging functionality
# Ensure log directory exists
log_dir = Path.cwd() / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
log_path = log_dir / "code_quality_reporter.json"
# Read existing log data or initialize empty list
if log_path.exists():
with open(log_path) as f:
try:
log_data = json.load(f)
except (json.JSONDecodeError, ValueError):
log_data = []
else:
log_data = []
# Add timestamp to the log entry
timestamp = datetime.now().strftime("%b %d, %I:%M%p").lower()
input_data["timestamp"] = timestamp
# Process the event and get results
reporter = CodeQualityReporter()
result = reporter.process_event(input_data)
# Add processing result to log entry if available
if result:
input_data["processing_result"] = result
# Append new data to log
log_data.append(input_data)
# Write back to file with formatting
with open(log_path, "w") as f:
json.dump(log_data, f, indent=2)
if result:
print(json.dumps(result))
else:
# No output for non-Stop events
print(json.dumps({"message": ""}))
except Exception as error:
print(json.dumps({"message": f"Reporter error: {error}"}))
if __name__ == "__main__":
main()

202
hooks/scripts/pnpm-enforcer.py Executable file
View File

@@ -0,0 +1,202 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.10"
# dependencies = []
# ///
import json
import re
import sys
from datetime import datetime
from pathlib import Path
from typing import Any
class PnpmEnforcer:
def __init__(self, input_data: dict[str, Any]):
self.input = input_data
def detect_npm_usage(self, command: str) -> dict[str, Any] | None:
"""Check if command contains npm or npx usage"""
if not command or not isinstance(command, str):
return None
# Common npm/npx patterns to block
npm_patterns = [
r"(?:^|\s|;|&&|\|\|)npm\s+",
r"(?:^|\s|;|&&|\|\|)npx\s+",
r"(?:^|\s|;|&&|\|\|)npm$",
r"(?:^|\s|;|&&|\|\|)npx$",
]
for pattern in npm_patterns:
match = re.search(pattern, command)
if match:
return {
"detected": True,
"original": command.strip(),
"suggestion": self.generate_pnpm_alternative(command),
}
return None
def generate_pnpm_alternative(self, command: str) -> str:
"""Generate pnpm alternative for npm/npx commands"""
# Common npm -> pnpm conversions
conversions = [
# Basic package management
(r"npm install(?:\s|$)", "pnpm install"),
(r"npm i(?:\s|$)", "pnpm install"),
(r"npm install\s+(.+)", r"pnpm add \1"),
(r"npm i\s+(.+)", r"pnpm add \1"),
(r"npm install\s+--save-dev\s+(.+)", r"pnpm add -D \1"),
(r"npm install\s+-D\s+(.+)", r"pnpm add -D \1"),
# Global installs are project-specific in CDEV
(
r"npm install\s+--global\s+(.+)",
r"# Global installs not supported - use npx or install as dev dependency",
),
(
r"npm install\s+-g\s+(.+)",
r"# Global installs not supported - use npx or install as dev dependency",
),
# Uninstall
(r"npm uninstall\s+(.+)", r"pnpm remove \1"),
(r"npm remove\s+(.+)", r"pnpm remove \1"),
(r"npm rm\s+(.+)", r"pnpm remove \1"),
# Scripts
(r"npm run\s+(.+)", r"pnpm run \1"),
(r"npm start", "pnpm start"),
(r"npm test", "pnpm test"),
(r"npm build", "pnpm build"),
(r"npm dev", "pnpm dev"),
# Other commands
(r"npm list", "pnpm list"),
(r"npm ls", "pnpm list"),
(r"npm outdated", "pnpm outdated"),
(r"npm update", "pnpm update"),
(r"npm audit", "pnpm audit"),
(r"npm ci", "pnpm install --frozen-lockfile"),
# npx commands
(r"npx\s+(.+)", r"pnpm dlx \1"),
(r"npx", "pnpm dlx"),
]
suggestion = command
for pattern, replacement in conversions:
if re.search(pattern, command):
suggestion = re.sub(pattern, replacement, command)
break
# If no specific conversion found, do basic substitution
if suggestion == command:
suggestion = re.sub(r"(?:^|\s)npm(?:\s|$)", " pnpm ", command)
suggestion = re.sub(r"(?:^|\s)npx(?:\s|$)", " pnpm dlx ", suggestion)
suggestion = suggestion.strip()
return suggestion
def validate(self) -> dict[str, Any]:
"""Validate and process the bash command"""
try:
# Parse Claude Code hook input format
tool_name = self.input.get("tool_name")
if tool_name != "Bash":
return self.approve()
tool_input = self.input.get("tool_input", {})
command = tool_input.get("command")
if not command:
return self.approve()
# Check for npm/npx usage
npm_usage = self.detect_npm_usage(command)
if npm_usage:
return self.block(npm_usage)
return self.approve()
except Exception as error:
return self.approve(f"PNPM enforcer error: {error}")
def approve(self, custom_message: str | None = None) -> dict[str, Any]:
"""Approve the command"""
return {"approve": True, "message": custom_message or "✅ Command approved"}
def block(self, npm_usage: dict[str, Any]) -> dict[str, Any]:
"""Block npm/npx command and suggest pnpm alternative"""
message = [
"🚫 NPM/NPX Usage Blocked",
"",
f'❌ Blocked command: {npm_usage["original"]}',
f'✅ Use this instead: {npm_usage["suggestion"]}',
"",
"📋 Why pnpm?",
" • Faster installation and better disk efficiency",
" • More reliable dependency resolution",
" • Better monorepo support",
" • Consistent with project standards",
"",
"💡 Quick pnpm reference:",
" • pnpm install → Install dependencies",
" • pnpm add <pkg> → Add package",
" • pnpm add -D <pkg> → Add dev dependency",
" • pnpm run <script> → Run package script",
" • pnpm dlx <cmd> → Execute package (like npx)",
"",
"Please use the suggested pnpm command instead.",
]
return {"approve": False, "message": "\n".join(message)}
def main():
"""Main execution"""
try:
input_data = json.load(sys.stdin)
# Ensure log directory exists
log_dir = Path.cwd() / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
log_path = log_dir / "pnpm_enforcer.json"
# Read existing log data or initialize empty list
if log_path.exists():
with open(log_path) as f:
try:
log_data = json.load(f)
except (json.JSONDecodeError, ValueError):
log_data = []
else:
log_data = []
# Add timestamp to the log entry
timestamp = datetime.now().strftime("%b %d, %I:%M%p").lower()
input_data["timestamp"] = timestamp
# Process enforcement logic
enforcer = PnpmEnforcer(input_data)
result = enforcer.validate()
# Add result to log entry
input_data["enforcement_result"] = result
# Append new data to log
log_data.append(input_data)
# Write back to file with formatting
with open(log_path, "w") as f:
json.dump(log_data, f, indent=2)
print(json.dumps(result))
except Exception as error:
print(json.dumps({"approve": True, "message": f"PNPM enforcer error: {error}"}))
if __name__ == "__main__":
main()

509
hooks/scripts/universal-linter.py Executable file
View File

@@ -0,0 +1,509 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.10"
# dependencies = []
# ///
import hashlib
import json
import subprocess
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
# Simple file validation cache to prevent redundant work
validation_cache = {}
CACHE_TTL = timedelta(minutes=5)
def get_file_hash(file_path: str) -> str | None:
"""Generate file hash for cache key"""
try:
path = Path(file_path)
if not path.exists():
return None
content = path.read_text(encoding="utf-8")
mtime = path.stat().st_mtime
return hashlib.md5(f"{content}{mtime}".encode()).hexdigest()
except Exception:
return None
def is_cached_valid(file_path: str) -> dict[str, Any] | None:
"""Check if file was recently validated"""
file_hash = get_file_hash(file_path)
if not file_hash:
return None
cache_key = f"{file_path}:{file_hash}"
cached = validation_cache.get(cache_key)
if cached and datetime.now() - cached["timestamp"] < CACHE_TTL:
return cached["result"]
return None
def cache_result(file_path: str, result: dict[str, Any]):
"""Cache validation result"""
file_hash = get_file_hash(file_path)
if not file_hash:
return
cache_key = f"{file_path}:{file_hash}"
validation_cache[cache_key] = {"result": result, "timestamp": datetime.now()}
def should_validate_file(file_path: str, project_type: str) -> bool:
"""Check if file should be validated"""
if not file_path:
return False
# Skip non-existent files
if not Path(file_path).exists():
return False
# Get file extension
ext = Path(file_path).suffix
# Check based on project type
if project_type == "javascript":
return ext in [".ts", ".tsx", ".js", ".jsx", ".mjs", ".cjs"]
elif project_type == "python":
return ext in [".py", ".pyi"]
elif project_type == "rust":
return ext in [".rs"]
elif project_type == "go":
return ext in [".go"]
# For unknown project types, try to validate common code files
return ext in [".ts", ".tsx", ".js", ".jsx", ".py", ".rs", ".go"]
def detect_package_manager() -> str:
"""Detect which package manager to use based on project files"""
project_root = Path.cwd()
# Check for lock files in order of preference
if (project_root / "pnpm-lock.yaml").exists():
return "pnpm"
elif (project_root / "yarn.lock").exists():
return "yarn"
elif (project_root / "package-lock.json").exists():
return "npm"
# Fallback to npm if no lock file found
return "npm"
def detect_project_type() -> str:
"""Detect project type based on files and dependencies"""
project_root = Path.cwd()
# Check for Python files
if (project_root / "pyproject.toml").exists() or (
project_root / "requirements.txt"
).exists():
return "python"
# Check for Rust files
if (project_root / "Cargo.toml").exists():
return "rust"
# Check for package.json (JavaScript/TypeScript)
if (project_root / "package.json").exists():
return "javascript"
# Check for Go files
if (project_root / "go.mod").exists():
return "go"
return "unknown"
def get_available_linters(project_type: str) -> list:
"""Get available linting tools for the project"""
linters = []
project_root = Path.cwd()
if project_type == "python":
# Check for Python linters
if subprocess.run(["which", "ruff"], capture_output=True).returncode == 0:
linters.append(("ruff", ["ruff", "check", "--fix"]))
if subprocess.run(["which", "black"], capture_output=True).returncode == 0:
linters.append(("black", ["black", "."]))
if subprocess.run(["which", "flake8"], capture_output=True).returncode == 0:
linters.append(("flake8", ["flake8"]))
if subprocess.run(["which", "pylint"], capture_output=True).returncode == 0:
linters.append(("pylint", ["pylint"]))
elif project_type == "javascript":
package_manager = detect_package_manager()
# Check package.json for available scripts and dependencies
package_json_path = project_root / "package.json"
if package_json_path.exists():
try:
with open(package_json_path) as f:
package_data = json.load(f)
scripts = package_data.get("scripts", {})
deps = {
**package_data.get("dependencies", {}),
**package_data.get("devDependencies", {}),
}
# Check for common linting scripts
if "lint" in scripts:
linters.append(("lint", [package_manager, "run", "lint"]))
if "lint:fix" in scripts:
linters.append(("lint:fix", [package_manager, "run", "lint:fix"]))
# Check for Biome
if "biome" in scripts or "@biomejs/biome" in deps:
linters.append(
("biome", [package_manager, "biome", "check", "--apply"])
)
# Check for ESLint
if "eslint" in deps:
linters.append(("eslint", [package_manager, "run", "lint"]))
# Check for Prettier
if "prettier" in deps:
linters.append(("prettier", [package_manager, "run", "format"]))
except (json.JSONDecodeError, FileNotFoundError):
pass
elif project_type == "rust":
# Check for Rust tools
if subprocess.run(["which", "cargo"], capture_output=True).returncode == 0:
linters.append(("clippy", ["cargo", "clippy", "--fix", "--allow-dirty"]))
linters.append(("fmt", ["cargo", "fmt"]))
elif project_type == "go":
# Check for Go tools
if subprocess.run(["which", "go"], capture_output=True).returncode == 0:
linters.append(("fmt", ["go", "fmt", "./..."]))
linters.append(("vet", ["go", "vet", "./..."]))
if (
subprocess.run(["which", "golangci-lint"], capture_output=True).returncode
== 0
):
linters.append(("golangci-lint", ["golangci-lint", "run", "--fix"]))
return linters
def get_available_type_checkers(project_type: str) -> list:
"""Get available type checking tools for the project"""
type_checkers = []
project_root = Path.cwd()
if project_type == "python":
if subprocess.run(["which", "mypy"], capture_output=True).returncode == 0:
type_checkers.append(("mypy", ["mypy", "."]))
if subprocess.run(["which", "pyright"], capture_output=True).returncode == 0:
type_checkers.append(("pyright", ["pyright"]))
elif project_type == "javascript":
package_manager = detect_package_manager()
package_json_path = project_root / "package.json"
if package_json_path.exists():
try:
with open(package_json_path) as f:
package_data = json.load(f)
scripts = package_data.get("scripts", {})
deps = {
**package_data.get("dependencies", {}),
**package_data.get("devDependencies", {}),
}
# Check for TypeScript
if "typecheck" in scripts:
type_checkers.append(
("typecheck", [package_manager, "run", "typecheck"])
)
elif "typescript" in deps:
type_checkers.append(("tsc", [package_manager, "tsc", "--noEmit"]))
except (json.JSONDecodeError, FileNotFoundError):
pass
elif project_type == "rust":
# Rust has built-in type checking via cargo check
if subprocess.run(["which", "cargo"], capture_output=True).returncode == 0:
type_checkers.append(("check", ["cargo", "check"]))
elif project_type == "go":
# Go has built-in type checking via go build
if subprocess.run(["which", "go"], capture_output=True).returncode == 0:
type_checkers.append(("build", ["go", "build", "./..."]))
return type_checkers
def run_linting_checks(file_path: str, project_type: str) -> list:
"""Run all available linting checks"""
results = []
linters = get_available_linters(project_type)
if not linters:
return [
{
"success": True,
"message": " No linters available, skipping checks",
"output": "",
}
]
for linter_name, linter_cmd in linters:
try:
# For file-specific linters, add the file path
if linter_name in ["ruff", "biome"] and file_path:
cmd = linter_cmd + [file_path]
else:
cmd = linter_cmd
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
results.append(
{
"success": True,
"message": f'{linter_name} check passed for {Path(file_path).name if file_path else "project"}',
"output": result.stdout,
"linter": linter_name,
}
)
except subprocess.CalledProcessError as error:
error_output = error.stdout or error.stderr or str(error)
results.append(
{
"success": False,
"message": f'{linter_name} found issues in {Path(file_path).name if file_path else "project"}',
"output": error_output,
"fix": f'Run: {" ".join(cmd)}',
"linter": linter_name,
}
)
except FileNotFoundError:
results.append(
{
"success": True,
"message": f" {linter_name} not available, skipping check",
"output": "",
"linter": linter_name,
}
)
return results
def run_type_checks(project_type: str) -> list:
"""Run all available type checking"""
results = []
type_checkers = get_available_type_checkers(project_type)
if not type_checkers:
return [
{
"success": True,
"message": " No type checkers available, skipping checks",
"output": "",
}
]
for checker_name, checker_cmd in type_checkers:
try:
result = subprocess.run(
checker_cmd, capture_output=True, text=True, check=True
)
results.append(
{
"success": True,
"message": f"{checker_name} type check passed",
"output": result.stdout,
"checker": checker_name,
}
)
except subprocess.CalledProcessError as error:
error_output = error.stdout or error.stderr or str(error)
results.append(
{
"success": False,
"message": f"{checker_name} type check failed",
"output": error_output,
"fix": f'Run: {" ".join(checker_cmd)}',
"checker": checker_name,
}
)
except FileNotFoundError:
results.append(
{
"success": True,
"message": f" {checker_name} not available, skipping check",
"output": "",
"checker": checker_name,
}
)
return results
def validate_file(file_path: str) -> dict[str, Any]:
"""Validate a single file"""
# Check cache first
cached = is_cached_valid(file_path)
if cached:
return cached
# Detect project type
project_type = detect_project_type()
# Check if file should be validated
if not should_validate_file(file_path, project_type):
result = {
"approve": True,
"message": f" Skipped {Path(file_path).name} (not a supported file type for {project_type} project)",
}
return result
# Run linting checks
lint_results = run_linting_checks(file_path, project_type)
# Run type checking (project-wide)
type_results = run_type_checks(project_type)
# Combine all results
all_results = lint_results + type_results
all_passed = all(result["success"] for result in all_results)
if all_passed:
successful_tools = [
r.get("linter", r.get("checker", "tool"))
for r in all_results
if r["success"]
]
tools_used = ", ".join(filter(None, successful_tools))
result = {
"approve": True,
"message": f"✅ All checks passed for {Path(file_path).name}"
+ (f" ({tools_used})" if tools_used else ""),
}
else:
issues = []
fixes = []
for check_result in all_results:
if not check_result["success"]:
issues.append(check_result["message"])
if "fix" in check_result:
fixes.append(check_result["fix"])
message_parts = ["❌ Validation failed:"] + issues
if fixes:
message_parts.extend(["", "🔧 Fixes:"] + fixes)
result = {"approve": False, "message": "\n".join(message_parts)}
# Cache result
cache_result(file_path, result)
return result
def main():
"""Main execution"""
try:
input_data = json.load(sys.stdin)
# Extract file path from tool input
tool_input = input_data.get("tool_input", {})
file_path = tool_input.get("file_path")
if not file_path:
# No file path provided, approve by default
result = {
"approve": True,
"message": " No file path provided, skipping validation",
}
else:
# Show user-friendly message that linter is running
file_name = Path(file_path).name if file_path else "file"
print(f"🔍 Running linter on {file_name}...", file=sys.stderr)
result = validate_file(file_path)
# Show result to user
if result.get("approve", True):
print(f"✨ Linting complete for {file_name}", file=sys.stderr)
else:
print(
f"🔧 Linter found issues in {file_name} (see details above)",
file=sys.stderr,
)
# Log the linting activity
try:
# Ensure log directory exists
log_dir = Path.cwd() / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
log_path = log_dir / "universal_linter.json"
# Read existing log data or initialize empty list
if log_path.exists():
with open(log_path) as f:
try:
log_data = json.load(f)
except (json.JSONDecodeError, ValueError):
log_data = []
else:
log_data = []
# Create log entry with relevant data
log_entry = {
"file_path": file_path,
"project_type": detect_project_type() if file_path else "unknown",
"result": result.get("approve", True),
"message": result.get("message", ""),
"tool_input": tool_input,
"session_id": input_data.get("session_id", "unknown"),
}
# Add timestamp to the log entry
timestamp = datetime.now().strftime("%b %d, %I:%M%p").lower()
log_entry["timestamp"] = timestamp
# Append new data
log_data.append(log_entry)
# Write back to file with formatting
with open(log_path, "w") as f:
json.dump(log_data, f, indent=2)
except Exception:
# Don't let logging errors break the hook
pass
print(json.dumps(result))
except Exception as error:
print(
json.dumps({"approve": True, "message": f"Universal linter error: {error}"})
)
if __name__ == "__main__":
main()