420 lines
15 KiB
Python
Executable File
420 lines
15 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Dependency evaluator script for gathering package ecosystem data.
|
|
Automates command execution and data collection for dependency analysis.
|
|
|
|
Uses only Python standard library - no external dependencies required.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
import urllib.request
|
|
import urllib.error
|
|
from datetime import datetime
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
|
|
|
|
class DependencyEvaluator:
|
|
"""Main class for evaluating dependencies across package ecosystems."""
|
|
|
|
def __init__(self, package_name: str, ecosystem: str):
|
|
"""
|
|
Initialize the dependency evaluator.
|
|
|
|
Args:
|
|
package_name: Name of the package to evaluate
|
|
ecosystem: Package ecosystem (npm, pypi, cargo, go)
|
|
"""
|
|
self.package_name = package_name
|
|
self.ecosystem = ecosystem.lower()
|
|
self.errors: List[str] = []
|
|
self.warnings: List[str] = []
|
|
|
|
def run_command(self, cmd: List[str], timeout: int = 30) -> Tuple[bool, str, str]:
|
|
"""
|
|
Execute a shell command and return results.
|
|
|
|
Args:
|
|
cmd: Command and arguments as list
|
|
timeout: Command timeout in seconds
|
|
|
|
Returns:
|
|
Tuple of (success, stdout, stderr)
|
|
"""
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout
|
|
)
|
|
return (result.returncode == 0, result.stdout, result.stderr)
|
|
except subprocess.TimeoutExpired:
|
|
self.warnings.append(f"Command timed out after {timeout}s: {' '.join(cmd)}")
|
|
return (False, "", f"Timeout after {timeout}s")
|
|
except FileNotFoundError:
|
|
self.warnings.append(f"Command not found: {cmd[0]}")
|
|
return (False, "", f"Command not found: {cmd[0]}")
|
|
except Exception as e:
|
|
self.warnings.append(f"Command failed: {' '.join(cmd)} - {str(e)}")
|
|
return (False, "", str(e))
|
|
|
|
def fetch_url(self, url: str, timeout: int = 10) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Fetch JSON data from a URL.
|
|
|
|
Args:
|
|
url: URL to fetch
|
|
timeout: Request timeout in seconds
|
|
|
|
Returns:
|
|
Parsed JSON data or None on failure
|
|
"""
|
|
try:
|
|
req = urllib.request.Request(url)
|
|
req.add_header('User-Agent', 'dependency-evaluator/1.0')
|
|
|
|
with urllib.request.urlopen(req, timeout=timeout) as response:
|
|
data = response.read().decode('utf-8')
|
|
return json.loads(data)
|
|
except urllib.error.HTTPError as e:
|
|
if e.code == 404:
|
|
self.errors.append(f"Resource not found: {url}")
|
|
elif e.code == 403:
|
|
self.warnings.append(f"Access forbidden (rate limit?): {url}")
|
|
else:
|
|
self.warnings.append(f"HTTP {e.code} error fetching {url}")
|
|
return None
|
|
except urllib.error.URLError as e:
|
|
self.warnings.append(f"Network error fetching {url}: {str(e)}")
|
|
return None
|
|
except json.JSONDecodeError as e:
|
|
self.warnings.append(f"Invalid JSON from {url}: {str(e)}")
|
|
return None
|
|
except Exception as e:
|
|
self.warnings.append(f"Error fetching {url}: {str(e)}")
|
|
return None
|
|
|
|
def gather_npm_data(self) -> Dict[str, Any]:
|
|
"""Gather data for npm packages."""
|
|
data = {}
|
|
|
|
# Get package metadata
|
|
success, stdout, stderr = self.run_command(['npm', 'view', self.package_name, '--json'])
|
|
if success and stdout:
|
|
try:
|
|
npm_data = json.loads(stdout)
|
|
data['latest_version'] = npm_data.get('version', '')
|
|
data['license'] = npm_data.get('license', '')
|
|
data['description'] = npm_data.get('description', '')
|
|
data['homepage'] = npm_data.get('homepage', '')
|
|
data['repository_url'] = npm_data.get('repository', {}).get('url', '') if isinstance(npm_data.get('repository'), dict) else npm_data.get('repository', '')
|
|
data['maintainers'] = npm_data.get('maintainers', [])
|
|
data['keywords'] = npm_data.get('keywords', [])
|
|
except json.JSONDecodeError:
|
|
self.warnings.append("Failed to parse npm view output")
|
|
|
|
# Get version history
|
|
success, stdout, stderr = self.run_command(['npm', 'view', self.package_name, 'time', '--json'])
|
|
if success and stdout:
|
|
try:
|
|
time_data = json.loads(stdout)
|
|
data['publish_history'] = time_data
|
|
data['versions_count'] = len([k for k in time_data.keys() if k not in ['created', 'modified']])
|
|
except json.JSONDecodeError:
|
|
self.warnings.append("Failed to parse npm time output")
|
|
|
|
# Get all versions
|
|
success, stdout, stderr = self.run_command(['npm', 'view', self.package_name, 'versions', '--json'])
|
|
if success and stdout:
|
|
try:
|
|
versions = json.loads(stdout)
|
|
data['all_versions'] = versions if isinstance(versions, list) else [versions]
|
|
except json.JSONDecodeError:
|
|
self.warnings.append("Failed to parse npm versions output")
|
|
|
|
return data
|
|
|
|
def gather_pypi_data(self) -> Dict[str, Any]:
|
|
"""Gather data for PyPI packages."""
|
|
data = {}
|
|
|
|
# Use PyPI JSON API
|
|
pypi_url = f"https://pypi.org/pypi/{self.package_name}/json"
|
|
pypi_data = self.fetch_url(pypi_url)
|
|
|
|
if pypi_data:
|
|
info = pypi_data.get('info', {})
|
|
data['latest_version'] = info.get('version', '')
|
|
data['license'] = info.get('license', '')
|
|
data['description'] = info.get('summary', '')
|
|
data['homepage'] = info.get('home_page', '')
|
|
data['repository_url'] = info.get('project_urls', {}).get('Source', info.get('project_url', ''))
|
|
data['author'] = info.get('author', '')
|
|
data['keywords'] = info.get('keywords', '').split(',') if info.get('keywords') else []
|
|
|
|
# Get release history
|
|
releases = pypi_data.get('releases', {})
|
|
data['versions_count'] = len(releases)
|
|
data['publish_history'] = {
|
|
version: release_list[0].get('upload_time', '') if release_list else ''
|
|
for version, release_list in releases.items()
|
|
}
|
|
|
|
return data
|
|
|
|
def gather_cargo_data(self) -> Dict[str, Any]:
|
|
"""Gather data for Cargo/Rust crates."""
|
|
data = {}
|
|
|
|
# Use crates.io API
|
|
crates_url = f"https://crates.io/api/v1/crates/{self.package_name}"
|
|
crate_data = self.fetch_url(crates_url)
|
|
|
|
if crate_data and 'crate' in crate_data:
|
|
crate = crate_data['crate']
|
|
data['latest_version'] = crate.get('max_version', '')
|
|
data['license'] = ', '.join(crate.get('license', '').split(' OR '))
|
|
data['description'] = crate.get('description', '')
|
|
data['homepage'] = crate.get('homepage', '')
|
|
data['repository_url'] = crate.get('repository', '')
|
|
data['downloads'] = crate.get('downloads', 0)
|
|
data['recent_downloads'] = crate.get('recent_downloads', 0)
|
|
|
|
# Get versions
|
|
versions_url = f"https://crates.io/api/v1/crates/{self.package_name}/versions"
|
|
versions_data = self.fetch_url(versions_url)
|
|
if versions_data and 'versions' in versions_data:
|
|
data['versions_count'] = len(versions_data['versions'])
|
|
data['all_versions'] = [v.get('num', '') for v in versions_data['versions']]
|
|
|
|
return data
|
|
|
|
def gather_go_data(self) -> Dict[str, Any]:
|
|
"""Gather data for Go modules."""
|
|
data = {}
|
|
|
|
# Try go list command
|
|
success, stdout, stderr = self.run_command(['go', 'list', '-m', '-json', self.package_name])
|
|
if success and stdout:
|
|
try:
|
|
go_data = json.loads(stdout)
|
|
data['module_path'] = go_data.get('Path', '')
|
|
data['latest_version'] = go_data.get('Version', '')
|
|
data['time'] = go_data.get('Time', '')
|
|
except json.JSONDecodeError:
|
|
self.warnings.append("Failed to parse go list output")
|
|
|
|
return data
|
|
|
|
def extract_github_repo(self, repo_url: str) -> Optional[Tuple[str, str]]:
|
|
"""
|
|
Extract owner and repo name from GitHub URL.
|
|
|
|
Args:
|
|
repo_url: GitHub repository URL
|
|
|
|
Returns:
|
|
Tuple of (owner, repo) or None
|
|
"""
|
|
if not repo_url:
|
|
return None
|
|
|
|
# Handle various GitHub URL formats
|
|
import re
|
|
patterns = [
|
|
r'github\.com[:/]([^/]+)/([^/\.]+)',
|
|
r'github\.com/([^/]+)/([^/\.]+)',
|
|
]
|
|
|
|
for pattern in patterns:
|
|
match = re.search(pattern, repo_url)
|
|
if match:
|
|
owner, repo = match.groups()
|
|
# Remove .git suffix if present
|
|
repo = repo.replace('.git', '')
|
|
return (owner, repo)
|
|
|
|
return None
|
|
|
|
def gather_github_data(self, repo_url: str) -> Dict[str, Any]:
|
|
"""
|
|
Gather data from GitHub repository.
|
|
|
|
Args:
|
|
repo_url: GitHub repository URL
|
|
|
|
Returns:
|
|
Dictionary of GitHub data
|
|
"""
|
|
data = {}
|
|
|
|
github_info = self.extract_github_repo(repo_url)
|
|
if not github_info:
|
|
self.warnings.append(f"Could not parse GitHub URL: {repo_url}")
|
|
return data
|
|
|
|
owner, repo = github_info
|
|
data['repository_url'] = f"https://github.com/{owner}/{repo}"
|
|
|
|
# Try using gh CLI first
|
|
success, stdout, stderr = self.run_command(['gh', 'api', f'repos/{owner}/{repo}'])
|
|
if success and stdout:
|
|
try:
|
|
repo_data = json.loads(stdout)
|
|
data['pushed_at'] = repo_data.get('pushed_at', '')
|
|
data['open_issues_count'] = repo_data.get('open_issues_count', 0)
|
|
data['stargazers_count'] = repo_data.get('stargazers_count', 0)
|
|
data['forks_count'] = repo_data.get('forks_count', 0)
|
|
data['watchers_count'] = repo_data.get('watchers_count', 0)
|
|
data['default_branch'] = repo_data.get('default_branch', '')
|
|
except json.JSONDecodeError:
|
|
self.warnings.append("Failed to parse gh api output")
|
|
else:
|
|
# Fallback to direct API call
|
|
api_url = f"https://api.github.com/repos/{owner}/{repo}"
|
|
repo_data = self.fetch_url(api_url)
|
|
if repo_data:
|
|
data['pushed_at'] = repo_data.get('pushed_at', '')
|
|
data['open_issues_count'] = repo_data.get('open_issues_count', 0)
|
|
data['stargazers_count'] = repo_data.get('stargazers_count', 0)
|
|
data['forks_count'] = repo_data.get('forks_count', 0)
|
|
data['watchers_count'] = repo_data.get('watchers_count', 0)
|
|
data['default_branch'] = repo_data.get('default_branch', '')
|
|
|
|
# Get community health
|
|
success, stdout, stderr = self.run_command(['gh', 'api', f'repos/{owner}/{repo}/community/profile'])
|
|
if success and stdout:
|
|
try:
|
|
community_data = json.loads(stdout)
|
|
data['community_health'] = {
|
|
'health_percentage': community_data.get('health_percentage', 0),
|
|
'files': community_data.get('files', {})
|
|
}
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Get contributors count
|
|
success, stdout, stderr = self.run_command(['gh', 'api', f'repos/{owner}/{repo}/contributors', '--jq', 'length'])
|
|
if success and stdout.strip().isdigit():
|
|
data['contributors_count'] = int(stdout.strip())
|
|
|
|
# Get license
|
|
success, stdout, stderr = self.run_command(['gh', 'api', f'repos/{owner}/{repo}/license', '--jq', '.license.spdx_id'])
|
|
if success and stdout.strip():
|
|
data['license_info'] = {'spdx_id': stdout.strip()}
|
|
|
|
return data
|
|
|
|
def gather_security_data(self) -> Dict[str, Any]:
|
|
"""Gather security-related data."""
|
|
data = {}
|
|
|
|
if self.ecosystem == 'npm':
|
|
# Note: npm audit requires package.json, which we don't have in isolation
|
|
# This is a limitation - would need to create temp package.json
|
|
self.warnings.append("npm audit requires package.json context - skipping")
|
|
|
|
return data
|
|
|
|
def gather_dependency_footprint(self) -> Dict[str, Any]:
|
|
"""Gather dependency tree information."""
|
|
data = {
|
|
'direct_dependencies': 0,
|
|
'total_dependencies': 0,
|
|
'tree_depth': 1
|
|
}
|
|
|
|
if self.ecosystem == 'npm':
|
|
# npm ls requires the package to be installed
|
|
self.warnings.append("npm ls requires package installation - skipping")
|
|
|
|
return data
|
|
|
|
def evaluate(self) -> Dict[str, Any]:
|
|
"""
|
|
Run the full evaluation and return structured results.
|
|
|
|
Returns:
|
|
Dictionary containing all gathered data
|
|
"""
|
|
result = {
|
|
'package': self.package_name,
|
|
'ecosystem': self.ecosystem,
|
|
'timestamp': datetime.utcnow().isoformat() + 'Z',
|
|
'registry_data': {},
|
|
'github_data': {},
|
|
'security_data': {},
|
|
'dependency_footprint': {},
|
|
'errors': [],
|
|
'warnings': []
|
|
}
|
|
|
|
# Gather ecosystem-specific data
|
|
if self.ecosystem == 'npm':
|
|
result['registry_data'] = self.gather_npm_data()
|
|
elif self.ecosystem == 'pypi':
|
|
result['registry_data'] = self.gather_pypi_data()
|
|
elif self.ecosystem == 'cargo':
|
|
result['registry_data'] = self.gather_cargo_data()
|
|
elif self.ecosystem == 'go':
|
|
result['registry_data'] = self.gather_go_data()
|
|
else:
|
|
self.errors.append(f"Unsupported ecosystem: {self.ecosystem}")
|
|
result['errors'] = self.errors
|
|
result['warnings'] = self.warnings
|
|
return result
|
|
|
|
# Gather GitHub data if repository URL found
|
|
repo_url = result['registry_data'].get('repository_url', '')
|
|
if repo_url and 'github.com' in repo_url:
|
|
result['github_data'] = self.gather_github_data(repo_url)
|
|
|
|
# Gather security data
|
|
result['security_data'] = self.gather_security_data()
|
|
|
|
# Gather dependency footprint
|
|
result['dependency_footprint'] = self.gather_dependency_footprint()
|
|
|
|
# Add errors and warnings
|
|
result['errors'] = self.errors
|
|
result['warnings'] = self.warnings
|
|
|
|
return result
|
|
|
|
|
|
def main():
|
|
"""Main entry point for the script."""
|
|
parser = argparse.ArgumentParser(
|
|
description='Evaluate a package dependency across different ecosystems'
|
|
)
|
|
parser.add_argument(
|
|
'package',
|
|
help='Package name to evaluate'
|
|
)
|
|
parser.add_argument(
|
|
'ecosystem',
|
|
choices=['npm', 'pypi', 'cargo', 'go'],
|
|
help='Package ecosystem'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
evaluator = DependencyEvaluator(args.package, args.ecosystem)
|
|
result = evaluator.evaluate()
|
|
|
|
# Output JSON to stdout
|
|
print(json.dumps(result, indent=2))
|
|
|
|
# Exit with non-zero code if there were errors
|
|
if result['errors']:
|
|
sys.exit(1)
|
|
else:
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|