Initial commit

This commit is contained in:
Zhongwei Li
2025-11-29 17:57:48 +08:00
commit 70dd319c2b
8 changed files with 1638 additions and 0 deletions

21
hooks/hooks.json Normal file
View File

@@ -0,0 +1,21 @@
{
"hooks": {
"PostToolUse": [
{
"matcher": "Write|Edit|MultiEdit",
"hooks": [
{
"type": "command",
"command": "${CLAUDE_PLUGIN_ROOT}/hooks/scripts/typescript-validator.py",
"description": "Validate TypeScript code"
},
{
"type": "command",
"command": "${CLAUDE_PLUGIN_ROOT}/hooks/scripts/import-organizer.py",
"description": "Organize imports automatically"
}
]
}
]
}
}

318
hooks/scripts/import-organizer.py Executable file
View File

@@ -0,0 +1,318 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.10"
# dependencies = []
# ///
import json
import re
import sys
from datetime import datetime
from pathlib import Path
from typing import Any
class ImportOrganizer:
def __init__(self, input_data: dict[str, Any]):
self.input = input_data
self.import_groups = {
"react": [],
"thirdParty": [],
"absolute": [],
"relative": [],
"types": [],
}
def organize(self) -> dict[str, Any]:
"""Main organization entry point"""
tool_input = self.input.get("tool_input", {})
output = self.input.get("output", {})
content = tool_input.get("content")
file_path = tool_input.get("file_path")
# Security: Basic input validation
if file_path and (
"../" in file_path or "..\\" in file_path or file_path.startswith("/")
):
return self.skip("Potentially unsafe file path detected")
# Only process TypeScript/JavaScript files
file_ext = Path(file_path).suffix if file_path else ""
if file_ext not in [".ts", ".tsx", ".js", ".jsx"]:
return self.skip("Not a TypeScript/JavaScript file")
# Work with the output content if available (PostToolUse), otherwise input content
code_content = output.get("content") or content
if not code_content:
return self.skip("No content to organize")
try:
organized = self.organize_imports(code_content)
# If content changed, write it back
if organized != code_content:
self.write_organized_content(file_path, organized)
return self.success("Imports organized successfully")
else:
return self.skip("Imports already organized")
except Exception as error:
return self.error(f"Failed to organize imports: {error}")
def organize_imports(self, content: str) -> str:
"""Parse and organize imports"""
lines = content.split("\n")
first_import_index = -1
last_import_index = -1
file_header = []
# Find import boundaries and directives
for i, line in enumerate(lines):
trimmed_line = line.strip()
# Check for 'use client' or 'use server' directives
if trimmed_line in ["'use client'", '"use client"']:
file_header.append(line)
continue
if trimmed_line in ["'use server'", '"use server"']:
file_header.append(line)
continue
# Skip shebang and comments at the top
if i == 0 and trimmed_line.startswith("#!"):
file_header.append(line)
continue
# Detect imports
if self.is_import_line(trimmed_line):
if first_import_index == -1:
first_import_index = i
last_import_index = i
self.categorize_import(line)
elif first_import_index != -1 and trimmed_line != "":
# Stop when we hit non-import, non-empty content
break
# If no imports found, return original content
if first_import_index == -1:
return content
# Build organized imports
organized_imports = self.build_organized_imports()
# Reconstruct the file
before_imports = lines[:first_import_index]
after_imports = lines[last_import_index + 1 :]
# Combine everything
result = []
result.extend(file_header)
if file_header:
result.append("") # Add blank line after directives
result.extend([line for line in before_imports if line not in file_header])
result.extend(organized_imports)
result.extend(after_imports)
return "\n".join(result)
def is_import_line(self, line: str) -> bool:
"""Check if a line is an import statement"""
return bool(
re.match(r"^import\s+", line)
or re.match(r"^import\s*{", line)
or re.match(r"^import\s*type", line)
)
def categorize_import(self, import_line: str):
"""Categorize import into appropriate group"""
trimmed = import_line.strip()
# Type imports
if "import type" in trimmed or "import { type" in trimmed:
self.import_groups["types"].append(import_line)
return
# Extract the module path
module_match = re.search(r"from\s+['\"]([^'\"]+)['\"]", import_line)
if not module_match:
# Handle side-effect imports (import 'module')
if "react" in import_line or "next" in import_line:
self.import_groups["react"].append(import_line)
else:
self.import_groups["thirdParty"].append(import_line)
return
module_path = module_match.group(1)
# React/Next.js imports
if self.is_react_import(module_path):
self.import_groups["react"].append(import_line)
# Absolute imports (@/)
elif module_path.startswith("@/"):
self.import_groups["absolute"].append(import_line)
# Relative imports
elif module_path.startswith("."):
self.import_groups["relative"].append(import_line)
# Third-party imports
else:
self.import_groups["thirdParty"].append(import_line)
def is_react_import(self, module_path: str) -> bool:
"""Check if import is React/Next.js related"""
react_patterns = [
"react",
"react-dom",
"next",
"@next",
"next/",
"@vercel",
]
return any(
module_path == pattern or module_path.startswith(pattern + "/")
for pattern in react_patterns
)
def build_organized_imports(self) -> list[str]:
"""Build organized import groups"""
groups = []
# Add each group with proper spacing
if self.import_groups["react"]:
groups.extend(self.sort_imports(self.import_groups["react"]))
if self.import_groups["thirdParty"]:
if groups:
groups.append("") # Add blank line
groups.extend(self.sort_imports(self.import_groups["thirdParty"]))
if self.import_groups["absolute"]:
if groups:
groups.append("") # Add blank line
groups.extend(self.sort_imports(self.import_groups["absolute"]))
if self.import_groups["relative"]:
if groups:
groups.append("") # Add blank line
groups.extend(self.sort_imports(self.import_groups["relative"]))
if self.import_groups["types"]:
if groups:
groups.append("") # Add blank line
groups.extend(self.sort_imports(self.import_groups["types"]))
return groups
def sort_imports(self, imports: list[str]) -> list[str]:
"""Sort imports alphabetically within a group"""
def get_path(imp: str) -> str:
match = re.search(r"from\s+['\"]([^'\"]+)['\"]", imp)
return match.group(1) if match else imp
return sorted(imports, key=get_path)
def write_organized_content(self, file_path: str, content: str):
"""Write organized content back to file"""
try:
with open(file_path, "w", encoding="utf-8") as f:
f.write(content)
except Exception as error:
raise Exception(f"Failed to write file: {error}")
def success(self, message: str) -> dict[str, Any]:
"""Return success response"""
return {"success": True, "message": f"{message}", "modified": True}
def skip(self, reason: str) -> dict[str, Any]:
"""Return skip response"""
return {"success": True, "message": f" Skipped: {reason}", "modified": False}
def error(self, message: str) -> dict[str, Any]:
"""Return error response"""
return {"success": False, "message": f"{message}", "modified": False}
def log_import_organizer_activity(input_data, result):
"""Log import organizer activity to a structured JSON file."""
try:
# Ensure log directory exists
log_dir = Path.cwd() / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
log_path = log_dir / "import_organizer.json"
# Read existing log data or initialize empty list
if log_path.exists():
with open(log_path) as f:
try:
log_data = json.load(f)
except (json.JSONDecodeError, ValueError):
log_data = []
else:
log_data = []
# Add timestamp and hook event name to the log entry
timestamp = datetime.now().strftime("%b %d, %I:%M%p").lower()
log_entry = input_data.copy()
log_entry["timestamp"] = timestamp
log_entry["hook_event_name"] = "ImportOrganizer"
log_entry["result"] = result
log_entry["working_directory"] = str(Path.cwd())
# Append new data
log_data.append(log_entry)
# Write back to file with formatting
with open(log_path, "w") as f:
json.dump(log_data, f, indent=2)
except Exception as e:
# Don't let logging errors break the hook
print(f"Logging error: {e}", file=sys.stderr)
def main():
"""Main execution"""
input_data = None
result = None
try:
input_data = json.load(sys.stdin)
# Extract file path for user-friendly message
tool_input = input_data.get("tool_input", {})
file_path = tool_input.get("file_path", "")
file_name = Path(file_path).name if file_path else "file"
# Show friendly message
print(f"📦 Organizing imports in {file_name}...", file=sys.stderr)
organizer = ImportOrganizer(input_data)
result = organizer.organize()
# Log the activity
log_import_organizer_activity(input_data, result)
# Show result to user
if result.get("modified", False):
print(f"✅ Imports organized in {file_name}", file=sys.stderr)
else:
print(f"👍 Imports already organized in {file_name}", file=sys.stderr)
# For PostToolUse hooks, we don't need to return approve/block
print(json.dumps({"message": result["message"]}))
except Exception as error:
# Log the error if we have input_data
if input_data:
error_result = {
"success": False,
"message": f"Import organizer error: {error}",
"modified": False,
}
log_import_organizer_activity(input_data, error_result)
print(json.dumps({"message": f"Import organizer error: {error}"}))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,638 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.10"
# dependencies = []
# ///
import hashlib
import json
import logging
import os
import re
import subprocess
import sys
import threading
from collections import OrderedDict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
# Configure logging for cache operations
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
# Thread-safe LRU cache with size limit
class ThreadSafeLRUCache:
def __init__(self, max_size: int = 100, ttl: timedelta = timedelta(minutes=5)):
self.max_size = max_size
self.ttl = ttl
self._cache: OrderedDict = OrderedDict()
self._lock = threading.RLock()
def get(self, key: str) -> dict[str, Any] | None:
"""Get cached value if exists and not expired"""
with self._lock:
if key not in self._cache:
return None
entry = self._cache[key]
if datetime.now() - entry["timestamp"] >= self.ttl:
# Remove expired entry
del self._cache[key]
return None
# Move to end (most recently used)
self._cache.move_to_end(key)
return entry["result"]
def set(self, key: str, value: dict[str, Any]) -> None:
"""Set cached value with automatic cleanup"""
with self._lock:
# Remove oldest entries if at capacity
while len(self._cache) >= self.max_size:
self._cache.popitem(last=False)
self._cache[key] = {"result": value, "timestamp": datetime.now()}
# Move to end
self._cache.move_to_end(key)
def clear_expired(self) -> int:
"""Clear expired entries and return count removed"""
with self._lock:
current_time = datetime.now()
expired_keys = [
key
for key, entry in self._cache.items()
if current_time - entry["timestamp"] >= self.ttl
]
for key in expired_keys:
del self._cache[key]
return len(expired_keys)
def size(self) -> int:
"""Get current cache size"""
with self._lock:
return len(self._cache)
# Global cache instance
validation_cache = ThreadSafeLRUCache(max_size=100, ttl=timedelta(minutes=5))
# Configuration
DEBUG_MODE = os.environ.get("CLAUDE_HOOKS_DEBUG") == "1"
FAST_MODE = "--fast" in sys.argv
class TypeScriptValidator:
def __init__(self, hook_input: dict[str, Any]):
self.hook_input = hook_input
self.errors: list[str] = []
self.warnings: list[str] = []
self.violations: list[dict[str, Any]] = []
self.blockers: list[str] = []
self.results: dict[str, Any] = {
"biome": None,
"typecheck": None,
"codeStandards": None,
}
async def validate(self) -> dict[str, Any]:
"""Main validation entry point"""
tool_input = self.hook_input.get("tool_input")
phase = self.hook_input.get("phase")
# Extract file path and determine if we should validate
file_path = self.extract_file_path(tool_input)
if not file_path or not self.should_validate_file(file_path):
return self.approve("File skipped - not a TypeScript/JavaScript file")
# Check cache first
cached = self.get_cached_result(file_path)
if cached and not FAST_MODE:
if DEBUG_MODE:
print(
f"Using cached TypeScript validation for: {file_path}",
file=sys.stderr,
)
return cached
# Determine validation mode based on phase and context
validation_mode = self.determine_validation_mode(tool_input, phase)
if DEBUG_MODE:
print(
f"TypeScript validation mode: {validation_mode['type']} ({validation_mode['reason']})",
file=sys.stderr,
)
# Run validation steps
self.validate_biome(file_path, validation_mode)
self.validate_typecheck(validation_mode)
self.validate_coding_standards(tool_input, file_path)
# Determine final result
final_result = self.get_final_result()
# Cache result
self.cache_result(file_path, final_result)
return final_result
def extract_file_path(self, tool_input: Any) -> str | None:
"""Extract file path from tool input"""
if isinstance(tool_input, dict):
return tool_input.get("file_path")
return None
def should_validate_file(self, file_path: str) -> bool:
"""Check if file should be validated"""
if not file_path:
return False
ext = Path(file_path).suffix
return ext in [".ts", ".tsx", ".js", ".jsx"]
def get_cached_result(self, file_path: str) -> dict[str, Any] | None:
"""Get cached validation result"""
try:
if not Path(file_path).exists():
return None
with open(file_path, encoding="utf-8") as f:
content = f.read()
mtime = Path(file_path).stat().st_mtime
# Use SHA-256 for better performance and security
cache_key = hashlib.sha256(f"{content}{mtime}".encode()).hexdigest()
return validation_cache.get(f"{file_path}:{cache_key}")
except FileNotFoundError:
logger.warning(f"File not found for cache lookup: {file_path}")
return None
except PermissionError:
logger.warning(f"Permission denied reading file for cache: {file_path}")
return None
except UnicodeDecodeError:
logger.warning(f"Unicode decode error reading file for cache: {file_path}")
return None
except OSError as e:
logger.warning(f"OS error reading file for cache {file_path}: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error in cache lookup for {file_path}: {e}")
return None
def cache_result(self, file_path: str, result: dict[str, Any]):
"""Cache validation result"""
try:
if not Path(file_path).exists():
return
with open(file_path, encoding="utf-8") as f:
content = f.read()
mtime = Path(file_path).stat().st_mtime
# Use SHA-256 for better performance and security
cache_key = hashlib.sha256(f"{content}{mtime}".encode()).hexdigest()
validation_cache.set(f"{file_path}:{cache_key}", result)
# Periodically clean up expired entries
if validation_cache.size() > 80: # Clean when 80% full
expired_count = validation_cache.clear_expired()
if expired_count > 0 and DEBUG_MODE:
logger.info(f"Cleaned {expired_count} expired cache entries")
except FileNotFoundError:
logger.warning(f"File not found for caching: {file_path}")
except PermissionError:
logger.warning(f"Permission denied reading file for caching: {file_path}")
except UnicodeDecodeError:
logger.warning(
f"Unicode decode error reading file for caching: {file_path}"
)
except OSError as e:
logger.warning(f"OS error reading file for caching {file_path}: {e}")
except Exception as e:
logger.error(f"Unexpected error caching result for {file_path}: {e}")
def determine_validation_mode(
self, tool_input: Any, phase: str | None
) -> dict[str, str]:
"""Determine validation mode based on phase and context"""
if phase == "Stop":
return {"type": "full", "reason": "Stop phase requires full validation"}
if isinstance(tool_input, dict) and tool_input.get("file_path"):
return {"type": "file-specific", "reason": "File-specific validation"}
return {"type": "incremental", "reason": "Incremental validation"}
def validate_biome(self, file_path: str, validation_mode: dict[str, str]):
"""Run Biome validation (formatting, linting, imports)"""
try:
biome_command = self.build_biome_command(file_path, validation_mode)
if DEBUG_MODE:
print(f"Running: {' '.join(biome_command)}", file=sys.stderr)
subprocess.run(biome_command, check=True, capture_output=True, text=True)
self.results["biome"] = {
"success": True,
"message": "Biome validation passed",
}
except subprocess.CalledProcessError as error:
error_output = error.stdout or error.stderr or str(error)
# Parse Biome error types
biome_errors = []
if "Format" in error_output:
biome_errors.append(f"Biome formatting issues in {file_path}")
if "Lint" in error_output:
biome_errors.append(f"Biome linting issues in {file_path}")
if "Organize imports" in error_output:
biome_errors.append(f"Import organization issues in {file_path}")
if not biome_errors:
biome_errors.append(
f"Biome check failed for {file_path}: {error_output[:200]}"
)
self.errors.extend(biome_errors)
self.results["biome"] = {
"success": False,
"errors": biome_errors,
"fix": (
"Run 'pnpm biome:check --apply' on changed files"
if validation_mode["type"] == "incremental"
else "Run 'pnpm biome:check --apply' and fix all remaining issues"
),
}
def validate_typecheck(self, validation_mode: dict[str, str]):
"""Run TypeScript type checking"""
try:
typecheck_command = self.build_typecheck_command(validation_mode)
if DEBUG_MODE:
print(f"Running: {' '.join(typecheck_command)}", file=sys.stderr)
subprocess.run(
typecheck_command, check=True, capture_output=True, text=True
)
self.results["typecheck"] = {
"success": True,
"message": "TypeScript check passed",
}
except subprocess.CalledProcessError as error:
error_output = error.stdout or error.stderr or str(error)
self.errors.append(f"TypeScript type errors: {error_output[:300]}")
self.results["typecheck"] = {
"success": False,
"error": error_output,
"fix": (
"Fix TypeScript errors in modified files"
if validation_mode["type"] == "incremental"
else "Fix all TypeScript errors before completing task"
),
}
def validate_coding_standards(self, tool_input: Any, file_path: str):
"""Run coding standards validation"""
try:
content = (
tool_input.get("content") if isinstance(tool_input, dict) else None
)
if not content:
self.results["codeStandards"] = {
"success": True,
"message": "No content to validate",
}
return
# Run all coding standards checks
self.validate_no_any_type(content)
self.validate_no_var(content)
self.validate_null_safety(content)
self.validate_implicit_globals(content)
self.validate_empty_catch(content)
self.validate_magic_numbers(content)
self.validate_component_structure(content, file_path)
self.validate_api_route_structure(content, file_path)
self.validate_file_name(file_path)
self.results["codeStandards"] = {
"success": len(self.blockers) == 0,
"violations": len(self.violations),
"blockers": len(self.blockers),
}
except Exception as error:
self.warnings.append(f"Coding standards validation error: {error}")
self.results["codeStandards"] = {
"success": True,
"message": "Coding standards check skipped due to error",
}
def build_biome_command(
self, file_path: str, validation_mode: dict[str, str]
) -> list[str]:
"""Build Biome command based on validation mode"""
if validation_mode["type"] == "full":
return ["pnpm", "biome:check", "--apply"]
if validation_mode["type"] == "file-specific":
return ["pnpm", "biome", "check", file_path, "--apply"]
# For incremental validation, check changed files
try:
changed_files = subprocess.run(
["git", "diff", "--name-only", "HEAD"],
capture_output=True,
text=True,
check=True,
).stdout.strip()
staged_files = subprocess.run(
["git", "diff", "--cached", "--name-only"],
capture_output=True,
text=True,
check=True,
).stdout.strip()
if not changed_files and not staged_files:
return ["pnpm", "biome", "check", file_path, "--apply"]
# Build command for changed files
all_files = []
if changed_files:
all_files.extend(changed_files.split("\n"))
if staged_files:
all_files.extend(staged_files.split("\n"))
# Filter for TypeScript/JavaScript files
ts_files = [
f for f in all_files if Path(f).suffix in [".ts", ".tsx", ".js", ".jsx"]
]
if ts_files:
command = ["pnpm", "biome", "check"] + ts_files + ["--apply"]
return command
else:
return ["pnpm", "biome", "check", file_path, "--apply"]
except subprocess.CalledProcessError:
return ["pnpm", "biome", "check", file_path, "--apply"]
def build_typecheck_command(self, validation_mode: dict[str, str]) -> list[str]:
"""Build TypeScript check command"""
if validation_mode["type"] == "full":
return ["pnpm", "typecheck"]
else:
return ["pnpm", "typecheck", "--noEmit"]
def validate_no_any_type(self, content: str):
"""Check for 'any' type usage"""
any_pattern = r"\b:\s*any\b"
matches = re.findall(any_pattern, content)
if matches:
self.violations.append(
{
"rule": "No Any Type",
"message": f'Found {len(matches)} usage(s) of "any" type',
"severity": "error",
}
)
self.blockers.append('Use "unknown" or specific types instead of "any"')
def validate_no_var(self, content: str):
"""Check for 'var' declarations"""
var_pattern = r"\bvar\s+\w+"
matches = re.findall(var_pattern, content)
if matches:
self.violations.append(
{
"rule": "No Var",
"message": f'Found {len(matches)} usage(s) of "var" declaration',
"severity": "error",
}
)
self.blockers.append('Use "const" or "let" instead of "var"')
def validate_null_safety(self, content: str):
"""Check for null safety issues"""
# DISABLED: This regex-based check causes too many false positives
# TypeScript's type system and strict null checks handle this better
# To properly implement this, we would need AST parsing to understand:
# - Type guarantees (non-nullable types)
# - Control flow analysis (null checks before access)
# - Type guards and narrowing
#
# Example false positives this regex would catch:
# - myArray.map() where myArray is guaranteed non-null by type
# - obj.method() after explicit null check
# - React component props that are required
#
# If you need null safety checks, enable TypeScript's strictNullChecks instead
pass
def validate_implicit_globals(self, content: str):
"""Check for implicit global variables"""
# DISABLED: This regex-based check is too simplistic and causes false positives
# Issues with the current approach:
# - Doesn't understand scoping (function parameters, block scope, module scope)
# - Doesn't recognize property assignments (this.prop = value, obj.prop = value)
# - Doesn't understand destructuring assignments
# - Doesn't recognize TypeScript class properties
# - Doesn't handle imports/exports
#
# Example false positives:
# - Class property assignments: this.name = 'value'
# - Object property updates: user.name = 'new name'
# - Array element updates: items[0] = newItem
# - Destructuring: const { name } = user; name = 'new'
# - Function parameters: function(param) { param = transform(param) }
#
# TypeScript's noImplicitAny and strict mode handle this properly
pass
def validate_empty_catch(self, content: str):
"""Check for empty catch blocks"""
empty_catch_pattern = r"catch\s*\(\s*\w*\s*\)\s*\{\s*\}"
if re.search(empty_catch_pattern, content):
self.violations.append(
{
"rule": "Empty Catch",
"message": "Empty catch block detected",
"severity": "warning",
}
)
def validate_magic_numbers(self, content: str):
"""Check for magic numbers"""
magic_number_pattern = r"\b\d{2,}\b"
matches = re.findall(magic_number_pattern, content)
if len(matches) > 3:
self.violations.append(
{
"rule": "Magic Numbers",
"message": f"Found {len(matches)} potential magic numbers",
"severity": "warning",
}
)
def validate_component_structure(self, content: str, file_path: str):
"""Validate React component structure"""
if Path(file_path).suffix in [".tsx", ".jsx"]:
if "export default" not in content:
self.violations.append(
{
"rule": "Component Structure",
"message": "React component should have default export",
"severity": "warning",
}
)
def validate_api_route_structure(self, content: str, file_path: str):
"""Validate API route structure"""
if "/api/" in file_path:
if "export" not in content:
self.violations.append(
{
"rule": "API Route Structure",
"message": "API route should export handler functions",
"severity": "warning",
}
)
def validate_file_name(self, file_path: str):
"""Validate file naming conventions"""
file_name = Path(file_path).name
if not re.match(r"^[a-z0-9-_.]+$", file_name):
self.violations.append(
{
"rule": "File Naming",
"message": f'File name "{file_name}" should use kebab-case',
"severity": "warning",
}
)
def get_final_result(self) -> dict[str, Any]:
"""Determine final validation result"""
if self.errors or self.blockers:
return self.block()
else:
return self.approve()
def approve(self, custom_message: str | None = None) -> dict[str, Any]:
"""Approve validation"""
message = custom_message or "✅ TypeScript validation passed"
if self.warnings:
message += f" ({len(self.warnings)} warnings)"
return {"approve": True, "message": message}
def block(self) -> dict[str, Any]:
"""Block validation due to errors"""
message_parts = ["❌ TypeScript validation failed:"]
if self.errors:
message_parts.extend([f" - {error}" for error in self.errors])
if self.blockers:
message_parts.append("")
message_parts.append("🔧 Required fixes:")
message_parts.extend([f" - {blocker}" for blocker in self.blockers])
return {"approve": False, "message": "\n".join(message_parts)}
async def main():
"""Main execution"""
try:
input_data = json.load(sys.stdin)
# Ensure log directory exists
log_dir = Path.cwd() / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
log_path = log_dir / "typescript_validator.json"
# Read existing log data or initialize empty list
if log_path.exists():
with open(log_path) as f:
try:
log_data = json.load(f)
except (json.JSONDecodeError, ValueError):
log_data = []
else:
log_data = []
# Add timestamp to the log entry
timestamp = datetime.now().strftime("%b %d, %I:%M%p").lower()
input_data["timestamp"] = timestamp
# Run validation
validator = TypeScriptValidator(input_data)
result = await validator.validate()
# Add result to log entry
input_data["result"] = result
# Append new data
log_data.append(input_data)
# Write back to file with formatting
with open(log_path, "w") as f:
json.dump(log_data, f, indent=2)
print(json.dumps(result))
except Exception as error:
error_result = {
"approve": False,
"message": f"TypeScript validator error: {error}",
}
# Try to log the error as well
try:
log_dir = Path.cwd() / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
log_path = log_dir / "typescript_validator.json"
if log_path.exists():
with open(log_path) as f:
try:
log_data = json.load(f)
except (json.JSONDecodeError, ValueError):
log_data = []
else:
log_data = []
timestamp = datetime.now().strftime("%b %d, %I:%M%p").lower()
error_entry = {
"timestamp": timestamp,
"error": str(error),
"result": error_result,
}
log_data.append(error_entry)
with open(log_path, "w") as f:
json.dump(log_data, f, indent=2)
except Exception:
# If logging fails, continue with the original error response
pass
print(json.dumps(error_result))
sys.exit(1)
if __name__ == "__main__":
import asyncio
asyncio.run(main())