Files
gh-epieczko-betty/skills/plugin.build/plugin_build.py
2025-11-29 18:26:08 +08:00

649 lines
20 KiB
Python
Executable File

#!/usr/bin/env python3
"""
plugin_build.py - Implementation of the plugin.build Skill
Bundles plugin directory into a deployable Claude Code plugin package.
"""
import os
import sys
import json
import yaml
import tarfile
import zipfile
import hashlib
import shutil
from typing import Dict, Any, List, Optional, Tuple
from datetime import datetime, timezone
from pathlib import Path
from pydantic import ValidationError as PydanticValidationError
from betty.config import BASE_DIR
from betty.logging_utils import setup_logger
from betty.telemetry_capture import capture_skill_execution
from betty.models import PluginManifest
from utils.telemetry_utils import capture_telemetry
logger = setup_logger(__name__)
def load_plugin_yaml(plugin_path: str) -> Dict[str, Any]:
"""
Load and parse plugin.yaml file.
Args:
plugin_path: Path to plugin.yaml
Returns:
Parsed plugin configuration
Raises:
FileNotFoundError: If plugin.yaml doesn't exist
yaml.YAMLError: If YAML is invalid
ValueError: If schema validation fails
"""
try:
with open(plugin_path) as f:
config = yaml.safe_load(f)
# Validate with Pydantic schema
try:
PluginManifest.model_validate(config)
logger.info(f"✅ Loaded and validated plugin.yaml from {plugin_path}")
except PydanticValidationError as exc:
errors = []
for error in exc.errors():
field = ".".join(str(loc) for loc in error["loc"])
message = error["msg"]
errors.append(f"{field}: {message}")
error_msg = f"Plugin schema validation failed: {'; '.join(errors)}"
logger.error(f"{error_msg}")
raise ValueError(error_msg)
return config
except FileNotFoundError:
logger.error(f"❌ plugin.yaml not found: {plugin_path}")
raise
except yaml.YAMLError as e:
logger.error(f"❌ Invalid YAML in {plugin_path}: {e}")
raise
def validate_entrypoints(config: Dict[str, Any], base_dir: str) -> Tuple[List[Dict], List[str]]:
"""
Validate that all entrypoint handlers exist on disk.
Args:
config: Plugin configuration
base_dir: Base directory for the plugin
Returns:
Tuple of (valid_entrypoints, missing_files)
"""
valid_entrypoints = []
missing_files = []
commands = config.get("commands", [])
logger.info(f"📝 Validating {len(commands)} command entrypoints...")
for command in commands:
name = command.get("name", "unknown")
handler = command.get("handler", {})
script_path = handler.get("script", "")
if not script_path:
missing_files.append(f"Command '{name}': no handler script specified")
continue
full_path = os.path.join(base_dir, script_path)
if os.path.exists(full_path):
valid_entrypoints.append({
"command": name,
"handler": script_path,
"runtime": handler.get("runtime", "python"),
"path": full_path
})
logger.debug(f"{name}: {script_path}")
else:
missing_files.append(f"Command '{name}': handler not found at {script_path}")
logger.warning(f"{name}: {script_path} (not found)")
return valid_entrypoints, missing_files
def gather_package_files(config: Dict[str, Any], base_dir: str) -> List[Tuple[str, str]]:
"""
Gather all files that need to be included in the package.
Args:
config: Plugin configuration
base_dir: Base directory for the plugin
Returns:
List of (source_path, archive_path) tuples
"""
files_to_package = []
# Always include plugin.yaml
plugin_yaml_path = os.path.join(base_dir, "plugin.yaml")
if os.path.exists(plugin_yaml_path):
files_to_package.append((plugin_yaml_path, "plugin.yaml"))
logger.debug(" + plugin.yaml")
# Gather all skill directories from commands
skill_dirs = set()
commands = config.get("commands", [])
for command in commands:
handler = command.get("handler", {})
script_path = handler.get("script", "")
if script_path.startswith("skills/"):
# Extract skill directory (e.g., "skills/api.validate" from "skills/api.validate/api_validate.py")
parts = script_path.split("/")
if len(parts) >= 2:
skill_dir = f"{parts[0]}/{parts[1]}"
skill_dirs.add(skill_dir)
# Add all files from skill directories
logger.info(f"📦 Gathering files from {len(skill_dirs)} skill directories...")
for skill_dir in sorted(skill_dirs):
skill_path = os.path.join(base_dir, skill_dir)
if os.path.isdir(skill_path):
for root, dirs, files in os.walk(skill_path):
# Skip __pycache__ and .pyc files
dirs[:] = [d for d in dirs if d != "__pycache__"]
for file in files:
if file.endswith(".pyc") or file.startswith("."):
continue
source_path = os.path.join(root, file)
rel_path = os.path.relpath(source_path, base_dir)
files_to_package.append((source_path, rel_path))
logger.debug(f" + {rel_path}")
# Add betty/ utility package
betty_dir = os.path.join(base_dir, "betty")
if os.path.isdir(betty_dir):
logger.info("📦 Adding betty/ utility package...")
for root, dirs, files in os.walk(betty_dir):
dirs[:] = [d for d in dirs if d != "__pycache__"]
for file in files:
if file.endswith(".pyc") or file.startswith("."):
continue
source_path = os.path.join(root, file)
rel_path = os.path.relpath(source_path, base_dir)
files_to_package.append((source_path, rel_path))
logger.debug(f" + {rel_path}")
# Add registry/ files
registry_dir = os.path.join(base_dir, "registry")
if os.path.isdir(registry_dir):
logger.info("📦 Adding registry/ files...")
for file in os.listdir(registry_dir):
if file.endswith(".json"):
source_path = os.path.join(registry_dir, file)
rel_path = f"registry/{file}"
files_to_package.append((source_path, rel_path))
logger.debug(f" + {rel_path}")
# Add optional files if they exist
optional_files = [
"requirements.txt",
"README.md",
"LICENSE",
"CHANGELOG.md"
]
for filename in optional_files:
file_path = os.path.join(base_dir, filename)
if os.path.exists(file_path):
files_to_package.append((file_path, filename))
logger.debug(f" + {filename}")
logger.info(f"📦 Total files to package: {len(files_to_package)}")
return files_to_package
def create_tarball(files: List[Tuple[str, str]], output_path: str, plugin_name: str) -> str:
"""
Create a .tar.gz archive.
Args:
files: List of (source_path, archive_path) tuples
output_path: Output directory
plugin_name: Base name for the archive
Returns:
Path to created archive
"""
archive_path = os.path.join(output_path, f"{plugin_name}.tar.gz")
logger.info(f"🗜️ Creating tar.gz archive: {archive_path}")
with tarfile.open(archive_path, "w:gz") as tar:
for source_path, archive_path_in_tar in files:
# Add files with plugin name prefix
arcname = f"{plugin_name}/{archive_path_in_tar}"
tar.add(source_path, arcname=arcname)
logger.debug(f" Added: {arcname}")
return archive_path
def create_zip(files: List[Tuple[str, str]], output_path: str, plugin_name: str) -> str:
"""
Create a .zip archive.
Args:
files: List of (source_path, archive_path) tuples
output_path: Output directory
plugin_name: Base name for the archive
Returns:
Path to created archive
"""
archive_path = os.path.join(output_path, f"{plugin_name}.zip")
logger.info(f"🗜️ Creating zip archive: {archive_path}")
with zipfile.ZipFile(archive_path, "w", zipfile.ZIP_DEFLATED) as zf:
for source_path, archive_path_in_zip in files:
# Add files with plugin name prefix
arcname = f"{plugin_name}/{archive_path_in_zip}"
zf.write(source_path, arcname=arcname)
logger.debug(f" Added: {arcname}")
return archive_path
def calculate_checksum(file_path: str) -> Dict[str, str]:
"""
Calculate checksums for the package file.
Args:
file_path: Path to the package file
Returns:
Dictionary with md5 and sha256 checksums
"""
logger.info("🔐 Calculating checksums...")
md5_hash = hashlib.md5()
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
md5_hash.update(chunk)
sha256_hash.update(chunk)
checksums = {
"md5": md5_hash.hexdigest(),
"sha256": sha256_hash.hexdigest()
}
logger.info(f" MD5: {checksums['md5']}")
logger.info(f" SHA256: {checksums['sha256']}")
return checksums
def generate_build_report(
config: Dict[str, Any],
valid_entrypoints: List[Dict],
missing_files: List[str],
package_path: str,
checksums: Dict[str, str],
files_count: int
) -> Dict[str, Any]:
"""
Generate build report JSON.
Args:
config: Plugin configuration
valid_entrypoints: List of validated entrypoints
missing_files: List of missing files
package_path: Path to created package
checksums: Package checksums
files_count: Number of files packaged
Returns:
Build report dictionary
"""
file_size = os.path.getsize(package_path)
report = {
"build_timestamp": datetime.now(timezone.utc).isoformat(),
"plugin": {
"name": config.get("name"),
"version": config.get("version"),
"description": config.get("description")
},
"validation": {
"total_commands": len(config.get("commands", [])),
"valid_entrypoints": len(valid_entrypoints),
"missing_files": missing_files,
"has_errors": len(missing_files) > 0
},
"package": {
"path": package_path,
"size_bytes": file_size,
"size_human": f"{file_size / 1024 / 1024:.2f} MB" if file_size > 1024 * 1024 else f"{file_size / 1024:.2f} KB",
"files_count": files_count,
"format": "tar.gz" if package_path.endswith(".tar.gz") else "zip",
"checksums": checksums
},
"entrypoints": valid_entrypoints
}
return report
def generate_manifest(
config: Dict[str, Any],
valid_entrypoints: List[Dict],
checksums: Dict[str, str],
package_path: str
) -> Dict[str, Any]:
"""
Generate manifest.json for the plugin package.
Args:
config: Plugin configuration
valid_entrypoints: List of validated entrypoints
checksums: Package checksums
package_path: Path to created package
Returns:
Manifest dictionary
"""
file_size = os.path.getsize(package_path)
package_filename = os.path.basename(package_path)
manifest = {
"name": config.get("name"),
"version": config.get("version"),
"description": config.get("description"),
"author": config.get("author", {}),
"license": config.get("license"),
"metadata": {
"homepage": config.get("metadata", {}).get("homepage"),
"repository": config.get("metadata", {}).get("repository"),
"documentation": config.get("metadata", {}).get("documentation"),
"tags": config.get("metadata", {}).get("tags", []),
"generated_at": datetime.now(timezone.utc).isoformat()
},
"requirements": {
"python": config.get("requirements", {}).get("python"),
"packages": config.get("requirements", {}).get("packages", [])
},
"permissions": config.get("permissions", []),
"package": {
"filename": package_filename,
"size_bytes": file_size,
"checksums": checksums
},
"entrypoints": [
{
"command": ep["command"],
"handler": ep["handler"],
"runtime": ep["runtime"]
}
for ep in valid_entrypoints
],
"commands_count": len(valid_entrypoints),
"agents": [
{
"name": agent.get("name"),
"description": agent.get("description")
}
for agent in config.get("agents", [])
]
}
return manifest
def create_plugin_preview(config: Dict[str, Any], output_path: str) -> Optional[str]:
"""
Create plugin.preview.yaml with current plugin configuration.
This allows reviewing changes before overwriting plugin.yaml.
Args:
config: Plugin configuration
output_path: Directory to write preview file
Returns:
Path to preview file or None if creation failed
"""
try:
preview_path = os.path.join(output_path, "plugin.preview.yaml")
# Add preview metadata
preview_config = config.copy()
if "metadata" not in preview_config:
preview_config["metadata"] = {}
preview_config["metadata"]["preview_generated_at"] = datetime.now(timezone.utc).isoformat()
preview_config["metadata"]["preview_note"] = "Review before applying to plugin.yaml"
with open(preview_path, "w") as f:
yaml.dump(preview_config, f, default_flow_style=False, sort_keys=False)
logger.info(f"📋 Preview file created: {preview_path}")
return preview_path
except Exception as e:
logger.warning(f"⚠️ Failed to create preview file: {e}")
return None
def build_plugin(
plugin_path: str = None,
output_format: str = "tar.gz",
output_dir: str = None
) -> Dict[str, Any]:
"""
Main build function.
Args:
plugin_path: Path to plugin.yaml (defaults to ./plugin.yaml)
output_format: Package format (tar.gz or zip)
output_dir: Output directory (defaults to ./dist)
Returns:
Build result dictionary
"""
# Track execution time for telemetry
start_time = datetime.now(timezone.utc)
# Set defaults
if plugin_path is None:
plugin_path = os.path.join(BASE_DIR, "plugin.yaml")
if output_dir is None:
output_dir = os.path.join(BASE_DIR, "dist")
# Normalize format
output_format = output_format.lower()
if output_format not in ["tar.gz", "zip"]:
raise ValueError(f"Unsupported output format: {output_format}. Use 'tar.gz' or 'zip'")
logger.info("🏗️ Starting plugin build...")
logger.info(f"📄 Plugin: {plugin_path}")
logger.info(f"📦 Format: {output_format}")
logger.info(f"📁 Output: {output_dir}")
# Load plugin.yaml
config = load_plugin_yaml(plugin_path)
# Get base directory (parent of plugin.yaml)
base_dir = os.path.dirname(os.path.abspath(plugin_path))
# Validate entrypoints
valid_entrypoints, missing_files = validate_entrypoints(config, base_dir)
if missing_files:
logger.warning(f"⚠️ Found {len(missing_files)} missing files:")
for missing in missing_files:
logger.warning(f" - {missing}")
# Gather files to package
files_to_package = gather_package_files(config, base_dir)
# Create output directory
os.makedirs(output_dir, exist_ok=True)
# Generate package name
plugin_name = config.get("name", "plugin")
plugin_version = config.get("version", "1.0.0")
package_basename = f"{plugin_name}-{plugin_version}"
# Create package
if output_format == "tar.gz":
package_path = create_tarball(files_to_package, output_dir, package_basename)
else:
package_path = create_zip(files_to_package, output_dir, package_basename)
# Calculate checksums
checksums = calculate_checksum(package_path)
# Generate build report
build_report = generate_build_report(
config,
valid_entrypoints,
missing_files,
package_path,
checksums,
len(files_to_package)
)
# Write build report JSON
report_path = os.path.join(output_dir, f"{package_basename}-build-report.json")
with open(report_path, "w") as f:
json.dump(build_report, f, indent=2)
logger.info(f"📊 Build report: {report_path}")
# Generate and write manifest.json
manifest = generate_manifest(config, valid_entrypoints, checksums, package_path)
manifest_path = os.path.join(output_dir, "manifest.json")
with open(manifest_path, "w") as f:
json.dump(manifest, f, indent=2)
logger.info(f"📋 Manifest: {manifest_path}")
# Create plugin preview (optional)
preview_path = create_plugin_preview(config, output_dir)
# Build result
result = {
"ok": not build_report["validation"]["has_errors"],
"status": "success" if not missing_files else "success_with_warnings",
"package_path": package_path,
"report_path": report_path,
"manifest_path": manifest_path,
"preview_path": preview_path,
"build_report": build_report,
"manifest": manifest
}
# Calculate execution duration and capture telemetry
end_time = datetime.now(timezone.utc)
duration_ms = int((end_time - start_time).total_seconds() * 1000)
capture_skill_execution(
skill_name="plugin.build",
inputs={
"plugin_path": plugin_path,
"output_format": output_format,
"output_dir": output_dir
},
status="success" if result["ok"] else "success_with_warnings",
duration_ms=duration_ms,
caller="cli",
plugin_name=config.get("name"),
plugin_version=config.get("version"),
files_count=len(files_to_package),
package_size_bytes=os.path.getsize(package_path),
warnings_count=len(missing_files)
)
return result
@capture_telemetry(skill_name="plugin.build", caller="cli")
def main():
"""Main CLI entry point."""
import argparse
parser = argparse.ArgumentParser(
description="Build deployable Claude Code plugin package"
)
parser.add_argument(
"plugin_path",
nargs="?",
default=None,
help="Path to plugin.yaml (defaults to ./plugin.yaml)"
)
parser.add_argument(
"--format",
choices=["tar.gz", "zip"],
default="tar.gz",
help="Package format (default: tar.gz)"
)
parser.add_argument(
"--output-dir",
default=None,
help="Output directory (default: ./dist)"
)
args = parser.parse_args()
try:
result = build_plugin(
plugin_path=args.plugin_path,
output_format=args.format,
output_dir=args.output_dir
)
# Print summary
report = result["build_report"]
logger.info("\n" + "=" * 60)
logger.info("🎉 BUILD COMPLETE")
logger.info("=" * 60)
logger.info(f"📦 Package: {result['package_path']}")
logger.info(f"📊 Report: {result['report_path']}")
logger.info(f"📋 Manifest: {result['manifest_path']}")
if result.get('preview_path'):
logger.info(f"👁️ Preview: {result['preview_path']}")
logger.info(f"✅ Commands: {report['validation']['valid_entrypoints']}/{report['validation']['total_commands']}")
logger.info(f"📏 Size: {report['package']['size_human']}")
logger.info(f"📝 Files: {report['package']['files_count']}")
if report["validation"]["missing_files"]:
logger.warning(f"⚠️ Warnings: {len(report['validation']['missing_files'])}")
logger.info("=" * 60)
print(json.dumps(result, indent=2))
sys.exit(0 if result["ok"] else 1)
except Exception as e:
logger.error(f"❌ Build failed: {e}")
result = {
"ok": False,
"status": "failed",
"error": str(e)
}
print(json.dumps(result, indent=2))
sys.exit(1)
if __name__ == "__main__":
main()