Files
gh-atc-net-atc-agentic-tool…/skills/iot-edge-module/scripts/detect_project_structure.py
2025-11-29 17:58:35 +08:00

292 lines
9.5 KiB
Python

#!/usr/bin/env python3
"""
Detect IoT Edge project structure by scanning for existing patterns.
This script auto-detects:
- Modules base path
- Contracts project path and name
- Deployment manifests location
- Project namespace
- Container registry (from existing manifests)
- NuGet feed URL (from existing Dockerfiles)
Returns JSON with detected configuration or empty fields if not found.
"""
import json
import os
import re
import sys
from pathlib import Path
from typing import Optional, Dict, Any
def find_files_recursive(root_path: Path, pattern: str) -> list[Path]:
"""Find files matching glob pattern recursively, excluding test directories."""
files = list(root_path.rglob(pattern))
# Always filter out test directories
files = [f for f in files if not any(part.lower().startswith('test') for part in f.parts)]
return files
def find_modules_base_path(root_path: Path) -> Optional[str]:
"""
Find modules base path by looking for existing modules.
Expected pattern: **/modules/*/Program.cs
"""
module_programs = find_files_recursive(root_path, "modules/*/Program.cs")
if module_programs:
# Extract the modules directory path
# E.g., /path/to/src/IoTEdgeModules/modules/foo/Program.cs -> src/IoTEdgeModules/modules
first_match = module_programs[0]
modules_dir = first_match.parent.parent # Go up from /foo/Program.cs to /modules
relative_path = modules_dir.relative_to(root_path)
return str(relative_path).replace('\\', '/')
return None
def find_contracts_project(root_path: Path) -> Optional[Dict[str, str]]:
"""
Find contracts project by looking for *Modules.Contracts*.csproj or *Contracts*.csproj
Returns dict with path and name, or None if not found.
"""
# Try specific pattern first
contracts_projects = find_files_recursive(root_path, "*Modules.Contracts*.csproj")
# Fallback to generic contracts pattern
if not contracts_projects:
contracts_projects = find_files_recursive(root_path, "*Contracts*.csproj")
if contracts_projects:
first_match = contracts_projects[0]
project_dir = first_match.parent
project_name = first_match.stem # Filename without .csproj
relative_path = project_dir.relative_to(root_path)
return {
"path": str(relative_path).replace('\\', '/'),
"name": project_name
}
return None
def find_deployment_manifests(root_path: Path) -> list[str]:
"""
Find all deployment manifest files (*.deployment.manifest.json).
Returns list of relative paths.
"""
manifests = find_files_recursive(root_path, "*.deployment.manifest.json")
return [str(m.relative_to(root_path)).replace('\\', '/') for m in manifests]
def extract_namespace_from_csharp(root_path: Path, contracts_path: Optional[str] = None) -> Optional[str]:
"""
Extract project namespace by:
1. Reading a C# file from contracts project (if available)
2. Finding pattern: namespace <Something>.Modules.Contracts.<ModuleName>
3. Extracting base: <Something>
"""
cs_files = []
# Try contracts project first
if contracts_path:
contracts_full_path = root_path / contracts_path
cs_files = list(contracts_full_path.rglob("*.cs"))
# Fallback: scan any .cs file in project
if not cs_files:
cs_files = list(root_path.rglob("modules/*/*.cs"))[:10] # Sample first 10
# Pattern to match: namespace <Something>.Modules.Contracts.<ModuleName> or similar
namespace_pattern = re.compile(r'namespace\s+([A-Za-z0-9.]+?)\.Modules(?:\.Contracts)?(?:\.[A-Za-z0-9]+)?;')
for cs_file in cs_files:
try:
content = cs_file.read_text(encoding='utf-8')
match = namespace_pattern.search(content)
if match:
# Extract the base namespace before .Modules
return match.group(1)
except Exception:
continue
return None
def extract_container_registry(root_path: Path, manifest_paths: list[str]) -> Optional[str]:
"""
Extract container registry URL from existing module.json or deployment manifests.
Looks for "repository": "registry.azurecr.io/modulename" pattern.
"""
registry_pattern = re.compile(r'"repository":\s*"([^/"]+)/[^"]+"')
# First try module.json files (more reliable)
module_jsons = find_files_recursive(root_path, "modules/*/module.json")
for module_json in module_jsons[:5]: # Check first 5
try:
content = module_json.read_text(encoding='utf-8')
match = registry_pattern.search(content)
if match:
return match.group(1)
except Exception:
continue
# Fallback to deployment manifests
for manifest_path in manifest_paths:
try:
full_path = root_path / manifest_path
content = full_path.read_text(encoding='utf-8')
match = registry_pattern.search(content)
if match:
return match.group(1)
except Exception:
continue
return None
def extract_nuget_feed_url(root_path: Path, modules_base_path: Optional[str]) -> Optional[str]:
"""
Extract NuGet feed URL from existing Dockerfiles.
Looks for VSS_NUGET_EXTERNAL_FEED_ENDPOINTS pattern.
"""
dockerfiles = find_files_recursive(root_path, "modules/*/Dockerfile*")
# Match both regular and escaped quotes: "endpoint":"url" or \"endpoint\":\"url\"
nuget_pattern = re.compile(r'endpoint\\?":\\s*\\?"(https://[^"\\]+/nuget/v3/index\.json)\\?"')
for dockerfile in dockerfiles[:10]: # Check first 10
try:
content = dockerfile.read_text(encoding='utf-8')
match = nuget_pattern.search(content)
if match:
return match.group(1)
except Exception:
continue
return None
def load_saved_config(root_path: Path) -> Optional[Dict[str, Any]]:
"""Load saved project configuration if it exists."""
config_path = root_path / ".claude" / ".iot-edge-module-config.json"
if config_path.exists():
try:
return json.loads(config_path.read_text(encoding='utf-8'))
except Exception:
return None
return None
def save_project_config(root_path: Path, config: Dict[str, Any]) -> bool:
"""Save project configuration for future runs."""
config_path = root_path / ".claude" / ".iot-edge-module-config.json"
try:
config_path.parent.mkdir(parents=True, exist_ok=True)
config_path.write_text(json.dumps(config, indent=2), encoding='utf-8')
return True
except Exception as e:
print(f"Warning: Could not save config: {e}", file=sys.stderr)
return False
def detect_project_structure(root_path: Path, force_detect: bool = False) -> Dict[str, Any]:
"""
Detect project structure by scanning for patterns.
Args:
root_path: Root directory of the project
force_detect: If True, ignore saved config and re-detect
Returns:
Dictionary with detected configuration
"""
# Try to load saved config first
if not force_detect:
saved_config = load_saved_config(root_path)
if saved_config:
saved_config["config_source"] = "saved"
return saved_config
# Perform detection
modules_base_path = find_modules_base_path(root_path)
contracts_project = find_contracts_project(root_path)
manifest_paths = find_deployment_manifests(root_path)
contracts_path = contracts_project["path"] if contracts_project else None
project_namespace = extract_namespace_from_csharp(root_path, contracts_path)
container_registry = extract_container_registry(root_path, manifest_paths)
nuget_feed_url = extract_nuget_feed_url(root_path, modules_base_path)
config = {
"config_source": "detected",
"modules_base_path": modules_base_path,
"contracts_project_path": contracts_path,
"contracts_project_name": contracts_project["name"] if contracts_project else None,
"manifests_found": manifest_paths,
"manifests_base_path": os.path.dirname(manifest_paths[0]) if manifest_paths else None,
"project_namespace": project_namespace,
"container_registry": container_registry,
"nuget_feed_url": nuget_feed_url,
"has_contracts_project": contracts_project is not None,
"has_nuget_feed": nuget_feed_url is not None
}
return config
def main():
"""Main entry point."""
import argparse
parser = argparse.ArgumentParser(
description="Detect IoT Edge project structure"
)
parser.add_argument(
"--root",
type=str,
default=".",
help="Root directory of the project (default: current directory)"
)
parser.add_argument(
"--force",
action="store_true",
help="Force re-detection, ignore saved config"
)
parser.add_argument(
"--save",
action="store_true",
help="Save detected configuration for future runs"
)
args = parser.parse_args()
root_path = Path(args.root).resolve()
if not root_path.exists():
print(json.dumps({"error": f"Path does not exist: {root_path}"}))
sys.exit(1)
# Detect structure
config = detect_project_structure(root_path, force_detect=args.force)
# Save if requested
if args.save and config["config_source"] == "detected":
if save_project_config(root_path, config):
config["config_saved"] = True
# Output as JSON
print(json.dumps(config, indent=2))
if __name__ == "__main__":
main()