Initial commit

This commit is contained in:
Zhongwei Li
2025-11-29 18:00:27 +08:00
commit 0c6988a884
19 changed files with 5729 additions and 0 deletions

View File

@@ -0,0 +1,469 @@
#!/usr/bin/env -S uv run --script --quiet
# /// script
# requires-python = ">=3.11"
# dependencies = []
# ///
"""
CEPH Cluster Health Checker
Validates CEPH storage cluster health including:
- Cluster health status
- Monitor and manager status
- OSD status and distribution
- Pool configuration and usage
- PG state verification
Usage:
python check_ceph_health.py [--node NODE] [--json]
Examples:
# Check CEPH health (requires SSH access to cluster node)
python check_ceph_health.py --node foxtrot
# Output as JSON for parsing
python check_ceph_health.py --node foxtrot --json
# Check minimum OSD count
python check_ceph_health.py --node foxtrot --min-osds 12
"""
import argparse
import json
import re
import subprocess
import sys
from dataclasses import dataclass, asdict, field
from typing import Dict, List, Optional
@dataclass
class OSDStatus:
"""OSD status information"""
osd_id: int
host: str
status: str # up/down
in_cluster: bool
weight: float
device_class: str
@dataclass
class PoolStatus:
"""Pool status information"""
name: str
pool_id: int
size: int
min_size: int
pg_num: int
pgp_num: int
used_bytes: int
max_avail_bytes: int
percent_used: float
@dataclass
class MonitorStatus:
"""Monitor status"""
name: str
rank: int
address: str
in_quorum: bool
@dataclass
class ManagerStatus:
"""Manager status"""
name: str
active: bool
address: str
@dataclass
class CEPHHealth:
"""Overall CEPH health"""
status: str # HEALTH_OK, HEALTH_WARN, HEALTH_ERR
num_osds: int
num_up_osds: int
num_in_osds: int
num_pgs: int
num_active_clean_pgs: int
monitors: List[MonitorStatus] = field(default_factory=list)
managers: List[ManagerStatus] = field(default_factory=list)
osds: List[OSDStatus] = field(default_factory=list)
pools: List[PoolStatus] = field(default_factory=list)
data_bytes: int = 0
used_bytes: int = 0
avail_bytes: int = 0
warnings: List[str] = field(default_factory=list)
errors: List[str] = field(default_factory=list)
@property
def is_healthy(self) -> bool:
"""Check if CEPH is in healthy state"""
return (
self.status == 'HEALTH_OK' and
self.num_up_osds == self.num_osds and
self.num_in_osds == self.num_osds and
self.num_active_clean_pgs == self.num_pgs and
len(self.errors) == 0
)
@property
def percent_used(self) -> float:
"""Calculate cluster usage percentage"""
if self.data_bytes == 0:
return 0.0
return (self.used_bytes / self.data_bytes) * 100
class CEPHHealthChecker:
"""Check CEPH cluster health via SSH"""
def __init__(self, node: str):
# Validate node is a valid hostname or IP address
if not self._validate_node(node):
raise ValueError(f"Invalid node name or IP address: {node}")
self.node = node
self.health = CEPHHealth(
status="UNKNOWN",
num_osds=0,
num_up_osds=0,
num_in_osds=0,
num_pgs=0,
num_active_clean_pgs=0
)
def _validate_node(self, node: str) -> bool:
"""Validate node is a valid hostname or IP address"""
# Allow valid hostnames and IPv4/IPv6 addresses
hostname_pattern = r'^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?)*$'
ipv4_pattern = r'^(\d{1,3}\.){3}\d{1,3}$'
ipv6_pattern = r'^([0-9a-fA-F]{0,4}:){2,7}[0-9a-fA-F]{0,4}$'
return bool(
re.match(hostname_pattern, node) or
re.match(ipv4_pattern, node) or
re.match(ipv6_pattern, node)
)
def run_command(self, command: str) -> str:
"""Execute command on remote node via SSH"""
try:
# Use -- to prevent SSH option injection
result = subprocess.run(
["ssh", "-o", "BatchMode=yes", f"root@{self.node}", "--", command],
capture_output=True,
text=True,
check=True,
timeout=30
)
return result.stdout
except subprocess.TimeoutExpired as e:
error_msg = f"Command timed out after 30s: {command}"
self.health.errors.append(error_msg)
raise RuntimeError(error_msg) from e
except subprocess.CalledProcessError as e:
error_msg = f"Command failed: {command}: {e.stderr}"
self.health.errors.append(error_msg)
raise RuntimeError(error_msg) from e
def check_ceph_status(self):
"""Check ceph status output"""
output = self.run_command("ceph status --format json")
if not output:
self.health.errors.append("Failed to get CEPH status")
return
try:
status_data = json.loads(output)
# Parse overall health
self.health.status = status_data.get('health', {}).get('status', 'UNKNOWN')
# Parse OSD map
osd_map = status_data.get('osdmap', {}).get('osdmap', {})
self.health.num_osds = osd_map.get('num_osds', 0)
self.health.num_up_osds = osd_map.get('num_up_osds', 0)
self.health.num_in_osds = osd_map.get('num_in_osds', 0)
# Parse PG map
pg_map = status_data.get('pgmap', {})
self.health.num_pgs = pg_map.get('num_pgs', 0)
# Parse PG states
pg_states = pg_map.get('pgs_by_state', [])
for state in pg_states:
if state.get('state_name') == 'active+clean':
self.health.num_active_clean_pgs = state.get('count', 0)
# Parse storage usage
self.health.data_bytes = pg_map.get('data_bytes', 0)
self.health.used_bytes = pg_map.get('bytes_used', 0)
self.health.avail_bytes = pg_map.get('bytes_avail', 0)
# Check for health warnings
health_checks = status_data.get('health', {}).get('checks', {})
for check_name, check_data in health_checks.items():
severity = check_data.get('severity', '')
summary = check_data.get('summary', {}).get('message', '')
if severity == 'HEALTH_ERR':
self.health.errors.append(f"{check_name}: {summary}")
elif severity == 'HEALTH_WARN':
self.health.warnings.append(f"{check_name}: {summary}")
except (json.JSONDecodeError, KeyError) as e:
self.health.errors.append(f"Failed to parse CEPH status: {e}")
def check_monitors(self):
"""Check monitor status"""
output = self.run_command("ceph mon dump --format json")
if not output:
self.health.warnings.append("Failed to get monitor status")
return
try:
mon_data = json.loads(output)
quorum = set()
# Get quorum members
quorum_output = self.run_command("ceph quorum_status --format json")
if quorum_output:
quorum_data = json.loads(quorum_output)
quorum = set(quorum_data.get('quorum', []))
# Parse monitors
for mon in mon_data.get('mons', []):
self.health.monitors.append(MonitorStatus(
name=mon.get('name', ''),
rank=mon.get('rank', -1),
address=mon.get('addr', ''),
in_quorum=mon.get('rank', -1) in quorum
))
# Check if all monitors are in quorum
not_in_quorum = [m.name for m in self.health.monitors if not m.in_quorum]
if not_in_quorum:
self.health.warnings.append(
f"Monitors not in quorum: {', '.join(not_in_quorum)}"
)
except (json.JSONDecodeError, KeyError) as e:
self.health.warnings.append(f"Failed to parse monitor status: {e}")
def check_managers(self):
"""Check manager status"""
output = self.run_command("ceph mgr dump --format json")
if not output:
self.health.warnings.append("Failed to get manager status")
return
try:
mgr_data = json.loads(output)
# Active manager
active_name = mgr_data.get('active_name', '')
active_addr = mgr_data.get('active_addr', '')
if active_name:
self.health.managers.append(ManagerStatus(
name=active_name,
active=True,
address=active_addr
))
# Standby managers
for standby in mgr_data.get('standbys', []):
self.health.managers.append(ManagerStatus(
name=standby.get('name', ''),
active=False,
address=standby.get('gid', '')
))
except (json.JSONDecodeError, KeyError) as e:
self.health.warnings.append(f"Failed to parse manager status: {e}")
def check_osds(self):
"""Check OSD status"""
output = self.run_command("ceph osd tree --format json")
if not output:
self.health.warnings.append("Failed to get OSD tree")
return
try:
osd_data = json.loads(output)
# Parse OSD nodes
for node in osd_data.get('nodes', []):
if node.get('type') == 'osd':
osd_id = node.get('id', -1)
status = node.get('status', 'unknown')
in_cluster = node.get('exists', 0) == 1
self.health.osds.append(OSDStatus(
osd_id=osd_id,
host=node.get('name', 'unknown'),
status=status,
in_cluster=in_cluster,
weight=node.get('crush_weight', 0.0),
device_class=node.get('device_class', 'unknown')
))
# Check for down OSDs
down_osds = [o.osd_id for o in self.health.osds if o.status != 'up']
if down_osds:
self.health.errors.append(f"OSDs down: {down_osds}")
except (json.JSONDecodeError, KeyError) as e:
self.health.warnings.append(f"Failed to parse OSD tree: {e}")
def check_pools(self):
"""Check pool status"""
output = self.run_command("ceph osd pool ls detail --format json")
if not output:
self.health.warnings.append("Failed to get pool information")
return
try:
pool_data = json.loads(output)
for pool in pool_data:
pool_name = pool.get('pool_name', '')
# Get pool stats
stats_output = self.run_command(f"ceph osd pool stats {pool_name} --format json")
if stats_output:
stats = json.loads(stats_output)
pool_stats = stats[0] if stats else {}
self.health.pools.append(PoolStatus(
name=pool_name,
pool_id=pool.get('pool', 0),
size=pool.get('size', 0),
min_size=pool.get('min_size', 0),
pg_num=pool.get('pg_num', 0),
pgp_num=pool.get('pgp_num', 0),
used_bytes=pool_stats.get('bytes_used', 0),
max_avail_bytes=pool_stats.get('max_avail', 0),
percent_used=pool_stats.get('percent_used', 0.0) * 100
))
except (json.JSONDecodeError, KeyError) as e:
self.health.warnings.append(f"Failed to parse pool information: {e}")
def check_pg_state(self):
"""Verify all PGs are active+clean"""
if self.health.num_active_clean_pgs != self.health.num_pgs:
self.health.errors.append(
f"Not all PGs active+clean: {self.health.num_active_clean_pgs}/{self.health.num_pgs}"
)
def run_all_checks(self) -> CEPHHealth:
"""Run all health checks"""
self.check_ceph_status()
self.check_monitors()
self.check_managers()
self.check_osds()
self.check_pools()
self.check_pg_state()
return self.health
def human_readable_size(bytes_val: int) -> str:
"""Convert bytes to human readable format"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
if bytes_val < 1024.0:
return f"{bytes_val:.2f} {unit}"
bytes_val /= 1024.0
return f"{bytes_val:.2f} EB"
def main():
parser = argparse.ArgumentParser(
description="Check CEPH cluster health",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__
)
parser.add_argument(
'--node',
default='foxtrot',
help='Cluster node to check (default: foxtrot)'
)
parser.add_argument(
'--json',
action='store_true',
help='Output as JSON'
)
parser.add_argument(
'--min-osds',
type=int,
help='Minimum expected OSD count (error if below this)'
)
args = parser.parse_args()
# Run health checks
checker = CEPHHealthChecker(args.node)
health = checker.run_all_checks()
# Check minimum OSD count
if args.min_osds and health.num_osds < args.min_osds:
health.errors.append(
f"OSD count below minimum: {health.num_osds} < {args.min_osds}"
)
if args.json:
# Output as JSON
print(json.dumps(asdict(health), indent=2))
# Exit with appropriate code based on health status
sys.exit(0 if health.is_healthy else 1)
else:
# Human-readable output
print("CEPH Cluster Health Check")
print("=" * 60)
print(f"Overall Status: {health.status}")
print(f"OSDs: {health.num_up_osds}/{health.num_osds} up, {health.num_in_osds}/{health.num_osds} in")
print(f"PGs: {health.num_active_clean_pgs}/{health.num_pgs} active+clean")
print(f"Usage: {health.percent_used:.1f}% ({human_readable_size(health.used_bytes)}/{human_readable_size(health.data_bytes)})")
print("\nMonitors:")
for mon in health.monitors:
quorum_status = "" if mon.in_quorum else ""
print(f" {quorum_status} {mon.name} (rank: {mon.rank}, {mon.address})")
print("\nManagers:")
for mgr in health.managers:
active_status = "ACTIVE" if mgr.active else "STANDBY"
print(f" {mgr.name} ({active_status}, {mgr.address})")
print("\nOSDs:")
for osd in health.osds:
status = "" if osd.status == 'up' else ""
in_status = "in" if osd.in_cluster else "out"
print(f" {status} osd.{osd.osd_id} on {osd.host} ({in_status}, {osd.device_class})")
print("\nPools:")
for pool in health.pools:
print(f" {pool.name}: size={pool.size}, min_size={pool.min_size}, "
f"pgs={pool.pg_num}, used={pool.percent_used:.1f}%")
if health.warnings:
print("\nWarnings:")
for warning in health.warnings:
print(f"{warning}")
if health.errors:
print("\nErrors:")
for error in health.errors:
print(f"{error}")
print("\n" + "=" * 60)
if health.is_healthy:
print("Status: ✓ HEALTHY")
sys.exit(0)
else:
print("Status: ✗ UNHEALTHY")
sys.exit(1)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,339 @@
#!/usr/bin/env -S uv run --script --quiet
# /// script
# requires-python = ">=3.11"
# dependencies = []
# ///
"""
Proxmox Cluster Health Checker
Validates Proxmox cluster health including:
- Cluster quorum status
- Node membership and status
- Corosync ring health
- Resource manager status
- Configuration version sync
Usage:
python check_cluster_health.py [--node NODE] [--json]
Examples:
# Check cluster health (requires SSH access to cluster node)
python check_cluster_health.py --node foxtrot
# Output as JSON for parsing
python check_cluster_health.py --node foxtrot --json
"""
import argparse
import json
import re
import subprocess
import sys
from dataclasses import dataclass, asdict
from typing import Dict, List, Optional
@dataclass
class NodeStatus:
"""Cluster node status"""
name: str
online: bool
node_id: int
ip: str
@dataclass
class CorosyncStatus:
"""Corosync ring status"""
ring_id: int
nodes: List[str]
status: str
@dataclass
class ClusterHealth:
"""Overall cluster health"""
cluster_name: str
quorate: bool
node_count: int
expected_votes: int
total_votes: int
nodes: List[NodeStatus]
corosync_rings: List[CorosyncStatus]
config_version: Optional[int]
warnings: List[str]
errors: List[str]
@property
def is_healthy(self) -> bool:
"""Check if cluster is in healthy state"""
return self.quorate and len(self.errors) == 0
class ClusterHealthChecker:
"""Check Proxmox cluster health via SSH"""
def __init__(self, node: str):
# Validate node is a valid hostname or IP address
if not self._validate_node(node):
raise ValueError(f"Invalid node name or IP address: {node}")
self.node = node
self.health = ClusterHealth(
cluster_name="",
quorate=False,
node_count=0,
expected_votes=0,
total_votes=0,
nodes=[],
corosync_rings=[],
config_version=None,
warnings=[],
errors=[]
)
def _validate_node(self, node: str) -> bool:
"""Validate node is a valid hostname or IP address"""
import re
# Allow valid hostnames and IPv4/IPv6 addresses
hostname_pattern = r'^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?)*$'
ipv4_pattern = r'^(\d{1,3}\.){3}\d{1,3}$'
ipv6_pattern = r'^([0-9a-fA-F]{0,4}:){2,7}[0-9a-fA-F]{0,4}$'
return bool(
re.match(hostname_pattern, node) or
re.match(ipv4_pattern, node) or
re.match(ipv6_pattern, node)
)
def run_command(self, command: str) -> str:
"""Execute command on remote node via SSH"""
try:
# Use -- to prevent SSH option injection
result = subprocess.run(
["ssh", "-o", "BatchMode=yes", f"root@{self.node}", "--", command],
capture_output=True,
text=True,
check=True,
timeout=30
)
return result.stdout
except subprocess.TimeoutExpired:
self.health.errors.append(f"Command timed out: {command}")
return ""
except subprocess.CalledProcessError as e:
self.health.errors.append(f"Command failed: {command}: {e.stderr}")
return ""
def check_cluster_status(self):
"""Check pvecm status output"""
output = self.run_command("pvecm status")
if not output:
self.health.errors.append("Failed to get cluster status")
return
# Parse cluster name
cluster_match = re.search(r'Cluster name:\s+(\S+)', output)
if cluster_match:
self.health.cluster_name = cluster_match.group(1)
# Parse quorum status
quorum_match = re.search(r'Quorate:\s+(\w+)', output)
if quorum_match:
self.health.quorate = quorum_match.group(1).lower() == 'yes'
if not self.health.quorate:
self.health.errors.append("Cluster does not have quorum!")
# Parse node count
node_match = re.search(r'Nodes:\s+(\d+)', output)
if node_match:
self.health.node_count = int(node_match.group(1))
# Parse expected votes
expected_match = re.search(r'Expected votes:\s+(\d+)', output)
if expected_match:
self.health.expected_votes = int(expected_match.group(1))
# Parse total votes
total_match = re.search(r'Total votes:\s+(\d+)', output)
if total_match:
self.health.total_votes = int(total_match.group(1))
# Check if we have majority
if self.health.total_votes < (self.health.expected_votes // 2 + 1):
self.health.errors.append(
f"Insufficient votes: {self.health.total_votes}/{self.health.expected_votes}"
)
def check_nodes(self):
"""Check node membership"""
output = self.run_command("pvecm nodes")
if not output:
self.health.warnings.append("Failed to get node list")
return
# Parse node list (skip header)
lines = output.strip().split('\n')[1:] # Skip header
for line in lines:
if not line.strip():
continue
# Example: " 1 0x00000001 foxtrot 192.168.3.5"
parts = line.split()
if len(parts) >= 3:
try:
node_id = int(parts[0])
name = parts[2] if len(parts) >= 3 else "unknown"
ip = parts[3] if len(parts) >= 4 else "unknown"
online = True # If in list, assumed online
self.health.nodes.append(NodeStatus(
name=name,
online=online,
node_id=node_id,
ip=ip
))
except (ValueError, IndexError) as e:
self.health.warnings.append(f"Failed to parse node line: {line}: {e}")
# Verify expected node count
if len(self.health.nodes) != self.health.node_count:
self.health.warnings.append(
f"Node count mismatch: expected {self.health.node_count}, found {len(self.health.nodes)}"
)
def check_corosync(self):
"""Check corosync ring status"""
output = self.run_command("corosync-cfgtool -s")
if not output:
self.health.warnings.append("Failed to get corosync status")
return
# Parse corosync status
# Example output:
# Printing ring status.
# Local node ID 1
# RING ID 0
# id = 192.168.8.5
# status = ring 0 active with no faults
current_ring = None
for line in output.split('\n'):
line = line.strip()
if line.startswith('RING ID'):
ring_match = re.search(r'RING ID (\d+)', line)
if ring_match:
current_ring = int(ring_match.group(1))
elif 'status' in line.lower() and current_ring is not None:
status_match = re.search(r'status\s*=\s*(.+)', line)
if status_match:
status = status_match.group(1)
# Check for faults
if 'no faults' not in status.lower():
self.health.errors.append(f"Corosync ring {current_ring}: {status}")
self.health.corosync_rings.append(CorosyncStatus(
ring_id=current_ring,
nodes=[], # Could parse this if needed
status=status
))
def check_config_version(self):
"""Check cluster configuration version"""
output = self.run_command("corosync-cmapctl -b totem.config_version")
if output:
try:
self.health.config_version = int(output.strip())
except ValueError:
self.health.warnings.append("Failed to parse config version")
def check_resource_manager(self):
"""Check pve-cluster service status"""
output = self.run_command("systemctl is-active pve-cluster")
if output.strip() != "active":
self.health.errors.append("pve-cluster service is not active")
# Check pmxcfs filesystem
output = self.run_command("pvecm status | grep -i 'cluster filesystem'")
if output and 'online' not in output.lower():
self.health.warnings.append("Cluster filesystem may not be online")
def run_all_checks(self) -> ClusterHealth:
"""Run all health checks"""
self.check_cluster_status()
self.check_nodes()
self.check_corosync()
self.check_config_version()
self.check_resource_manager()
return self.health
def main():
parser = argparse.ArgumentParser(
description="Check Proxmox cluster health",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__
)
parser.add_argument(
'--node',
default='foxtrot',
help='Cluster node to check (default: foxtrot)'
)
parser.add_argument(
'--json',
action='store_true',
help='Output as JSON'
)
args = parser.parse_args()
# Run health checks
checker = ClusterHealthChecker(args.node)
health = checker.run_all_checks()
if args.json:
# Output as JSON
print(json.dumps(asdict(health), indent=2))
else:
# Human-readable output
print(f"Cluster Health Check: {health.cluster_name}")
print("=" * 60)
print(f"Quorum Status: {'✓ YES' if health.quorate else '✗ NO'}")
print(f"Nodes: {health.node_count} ({health.total_votes}/{health.expected_votes} votes)")
if health.config_version:
print(f"Config Version: {health.config_version}")
print("\nNodes:")
for node in health.nodes:
status = "" if node.online else ""
print(f" {status} {node.name} (ID: {node.node_id}, IP: {node.ip})")
print("\nCorosync Rings:")
for ring in health.corosync_rings:
print(f" Ring {ring.ring_id}: {ring.status}")
if health.warnings:
print("\nWarnings:")
for warning in health.warnings:
print(f"{warning}")
if health.errors:
print("\nErrors:")
for error in health.errors:
print(f"{error}")
print("\n" + "=" * 60)
if health.is_healthy:
print("Status: ✓ HEALTHY")
sys.exit(0)
else:
print("Status: ✗ UNHEALTHY")
sys.exit(1)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,252 @@
#!/usr/bin/env -S uv run --script --quiet
# /// script
# dependencies = ["proxmoxer", "requests"]
# ///
"""
Display Proxmox cluster health and resource usage.
Usage:
./cluster_status.py
./cluster_status.py --node foxtrot
./cluster_status.py --detailed
Environment Variables:
PROXMOX_VE_ENDPOINT - Proxmox API endpoint (e.g., https://192.168.3.5:8006)
PROXMOX_VE_USERNAME - Username (e.g., root@pam)
PROXMOX_VE_PASSWORD - Password
OR
PROXMOX_VE_API_TOKEN - API token (user@realm!token-id=secret)
"""
import argparse
import os
import sys
from proxmoxer import ProxmoxAPI, ResourceException
class ClusterMonitor:
"""Monitor Proxmox cluster health and resources."""
def __init__(self, endpoint: str, auth_type: str, **auth_kwargs):
"""Initialize Proxmox connection."""
self.endpoint = endpoint.replace("https://", "").replace(":8006", "")
try:
if auth_type == "token":
user, token = auth_kwargs["token"].split("!")
token_name, token_value = token.split("=")
self.proxmox = ProxmoxAPI(
self.endpoint,
user=user,
token_name=token_name,
token_value=token_value,
verify_ssl=False
)
else:
self.proxmox = ProxmoxAPI(
self.endpoint,
user=auth_kwargs["user"],
password=auth_kwargs["password"],
verify_ssl=False
)
except Exception as e:
print(f"❌ Failed to connect to Proxmox: {e}", file=sys.stderr)
sys.exit(1)
def get_cluster_status(self):
"""Get cluster status and quorum info."""
try:
status = self.proxmox.cluster.status.get()
return status
except ResourceException as e:
print(f"❌ Failed to get cluster status: {e}", file=sys.stderr)
return None
def get_node_status(self, node_name: str):
"""Get detailed node status."""
try:
status = self.proxmox.nodes(node_name).status.get()
return status
except ResourceException as e:
print(f"❌ Failed to get node status: {e}", file=sys.stderr)
return None
def get_node_vms(self, node_name: str):
"""Get VMs on a node."""
try:
vms = self.proxmox.nodes(node_name).qemu.get()
return vms
except ResourceException as e:
print(f"❌ Failed to get VMs: {e}", file=sys.stderr)
return []
def display_cluster_overview(self):
"""Display cluster overview."""
print("🖥️ Proxmox Cluster Status")
print("=" * 70)
cluster_status = self.get_cluster_status()
if not cluster_status:
return
# Find cluster info
cluster_info = next((item for item in cluster_status if item['type'] == 'cluster'), None)
if cluster_info:
print(f"\n📊 Cluster: {cluster_info.get('name', 'N/A')}")
print(f" Quorum: {cluster_info.get('quorate', 0)} (nodes: {cluster_info.get('nodes', 0)})")
# Node statuses
nodes = [item for item in cluster_status if item['type'] == 'node']
print(f"\n🔧 Nodes ({len(nodes)}):")
print(f"{'Node':<15} {'Status':<10} {'CPU':<12} {'Memory':<20} {'VMs':<8}")
print("-" * 70)
for node_info in nodes:
node_name = node_info['name']
online = "✓ Online" if node_info.get('online', 0) == 1 else "✗ Offline"
# Get detailed status
detailed = self.get_node_status(node_name)
if not detailed:
print(f"{node_name:<15} {online:<10} {'N/A':<12} {'N/A':<20} {'N/A':<8}")
continue
# CPU usage
cpu_pct = detailed.get('cpu', 0) * 100
cpu_str = f"{cpu_pct:.1f}%"
# Memory usage
mem_used = detailed.get('memory', {}).get('used', 0) / (1024**3) # GB
mem_total = detailed.get('memory', {}).get('total', 0) / (1024**3) # GB
mem_pct = (mem_used / mem_total * 100) if mem_total > 0 else 0
mem_str = f"{mem_used:.1f}/{mem_total:.1f}GB ({mem_pct:.1f}%)"
# VM count
vms = self.get_node_vms(node_name)
vm_count = len(vms)
running_vms = len([vm for vm in vms if vm.get('status') == 'running'])
vm_str = f"{running_vms}/{vm_count}"
print(f"{node_name:<15} {online:<10} {cpu_str:<12} {mem_str:<20} {vm_str:<8}")
print("=" * 70)
def display_node_detail(self, node_name: str):
"""Display detailed node information."""
print(f"\n🔍 Node Details: {node_name}")
print("=" * 70)
status = self.get_node_status(node_name)
if not status:
return
# System info
print(f"\n💻 System:")
print(f" Uptime: {status.get('uptime', 0) / 86400:.1f} days")
print(f" Load Average: {status.get('loadavg', ['N/A', 'N/A', 'N/A'])[0]:.2f}")
print(f" CPU Cores: {status.get('cpuinfo', {}).get('cpus', 'N/A')}")
# CPU
cpu_pct = status.get('cpu', 0) * 100
print(f"\n🖥️ CPU Usage: {cpu_pct:.1f}%")
# Memory
mem = status.get('memory', {})
mem_used = mem.get('used', 0) / (1024**3)
mem_total = mem.get('total', 0) / (1024**3)
mem_free = mem.get('free', 0) / (1024**3)
mem_pct = (mem_used / mem_total * 100) if mem_total > 0 else 0
print(f"\n💾 Memory:")
print(f" Used: {mem_used:.2f} GB ({mem_pct:.1f}%)")
print(f" Free: {mem_free:.2f} GB")
print(f" Total: {mem_total:.2f} GB")
# Storage
root = status.get('rootfs', {})
root_used = root.get('used', 0) / (1024**3)
root_total = root.get('total', 0) / (1024**3)
root_avail = root.get('avail', 0) / (1024**3)
root_pct = (root_used / root_total * 100) if root_total > 0 else 0
print(f"\n💿 Root Filesystem:")
print(f" Used: {root_used:.2f} GB ({root_pct:.1f}%)")
print(f" Available: {root_avail:.2f} GB")
print(f" Total: {root_total:.2f} GB")
# VMs
vms = self.get_node_vms(node_name)
print(f"\n🖼️ Virtual Machines ({len(vms)}):")
if vms:
print(f" {'VMID':<8} {'Name':<25} {'Status':<10} {'CPU':<8} {'Memory':<15}")
print(" " + "-" * 66)
for vm in vms:
vmid = vm.get('vmid', 'N/A')
name = vm.get('name', 'N/A')[:24]
status = vm.get('status', 'unknown')
cpu_pct = vm.get('cpu', 0) * 100 if vm.get('status') == 'running' else 0
mem = vm.get('mem', 0) / (1024**2) if vm.get('status') == 'running' else 0 # MB
status_icon = "▶️" if status == "running" else "⏸️"
print(f" {vmid:<8} {name:<25} {status_icon} {status:<8} {cpu_pct:>6.1f}% {mem:>8.0f} MB")
else:
print(" No VMs found")
print("=" * 70)
def main():
parser = argparse.ArgumentParser(
description="Display Proxmox cluster health and resource usage"
)
parser.add_argument(
"--node",
help="Show detailed info for specific node"
)
parser.add_argument(
"--detailed",
action="store_true",
help="Show detailed info for all nodes"
)
args = parser.parse_args()
# Get authentication from environment
endpoint = os.getenv("PROXMOX_VE_ENDPOINT")
api_token = os.getenv("PROXMOX_VE_API_TOKEN")
username = os.getenv("PROXMOX_VE_USERNAME")
password = os.getenv("PROXMOX_VE_PASSWORD")
if not endpoint:
print("❌ PROXMOX_VE_ENDPOINT environment variable required", file=sys.stderr)
sys.exit(1)
# Determine authentication method
if api_token:
monitor = ClusterMonitor(endpoint, "token", token=api_token)
elif username and password:
monitor = ClusterMonitor(endpoint, "password", user=username, password=password)
else:
print("❌ Authentication required: set PROXMOX_VE_API_TOKEN or PROXMOX_VE_USERNAME/PASSWORD", file=sys.stderr)
sys.exit(1)
# Display status
if args.node:
monitor.display_node_detail(args.node)
elif args.detailed:
monitor.display_cluster_overview()
# Get all nodes and show details
cluster_status = monitor.get_cluster_status()
if cluster_status:
nodes = [item['name'] for item in cluster_status if item['type'] == 'node']
for node_name in nodes:
monitor.display_node_detail(node_name)
else:
monitor.display_cluster_overview()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,224 @@
#!/usr/bin/env -S uv run --script --quiet
# /// script
# dependencies = ["proxmoxer", "requests"]
# ///
"""
Validate Proxmox VM template health and configuration.
Usage:
./validate_template.py --template-id 9000 --node foxtrot
./validate_template.py --template-id 9000 --all-nodes
Environment Variables:
PROXMOX_VE_ENDPOINT - Proxmox API endpoint (e.g., https://192.168.3.5:8006)
PROXMOX_VE_USERNAME - Username (e.g., root@pam)
PROXMOX_VE_PASSWORD - Password
OR
PROXMOX_VE_API_TOKEN - API token (user@realm!token-id=secret)
"""
import argparse
import os
import sys
from proxmoxer import ProxmoxAPI, ResourceException
class TemplateValidator:
"""Validates Proxmox VM templates."""
def __init__(self, endpoint: str, auth_type: str, **auth_kwargs):
"""Initialize Proxmox connection."""
self.endpoint = endpoint.replace("https://", "").replace(":8006", "")
try:
if auth_type == "token":
user, token = auth_kwargs["token"].split("!")
token_name, token_value = token.split("=")
self.proxmox = ProxmoxAPI(
self.endpoint,
user=user,
token_name=token_name,
token_value=token_value,
verify_ssl=False
)
else:
self.proxmox = ProxmoxAPI(
self.endpoint,
user=auth_kwargs["user"],
password=auth_kwargs["password"],
verify_ssl=False
)
except Exception as e:
print(f"❌ Failed to connect to Proxmox: {e}", file=sys.stderr)
sys.exit(1)
def find_template(self, template_id: int, node: str = None):
"""Find template on cluster."""
nodes = [node] if node else [n['node'] for n in self.proxmox.nodes.get()]
for node_name in nodes:
try:
vms = self.proxmox.nodes(node_name).qemu.get()
for vm in vms:
if vm['vmid'] == template_id:
return node_name, vm
except ResourceException:
continue
return None, None
def validate_template(self, template_id: int, node: str = None):
"""Validate template configuration."""
print(f"🔍 Validating template {template_id}...")
# Find template
node_name, vm_info = self.find_template(template_id, node)
if not node_name:
print(f"❌ Template {template_id} not found", file=sys.stderr)
return False
print(f"✓ Found on node: {node_name}")
# Check if it's actually a template
if vm_info.get('template', 0) != 1:
print(f"❌ VM {template_id} is not a template", file=sys.stderr)
return False
print(f"✓ Confirmed as template")
# Get detailed config
try:
config = self.proxmox.nodes(node_name).qemu(template_id).config.get()
except ResourceException as e:
print(f"❌ Failed to get template config: {e}", file=sys.stderr)
return False
# Validation checks
checks = {
"Cloud-init drive": self._check_cloudinit(config),
"QEMU guest agent": self._check_agent(config),
"SCSI controller": self._check_scsi(config),
"Boot disk": self._check_boot_disk(config),
"Serial console": self._check_serial(config),
"EFI disk": self._check_efi(config),
}
# Print results
print("\n📋 Validation Results:")
print("-" * 50)
all_passed = True
for check_name, (passed, message) in checks.items():
status = "" if passed else ""
print(f"{status} {check_name}: {message}")
if not passed:
all_passed = False
print("-" * 50)
# Print template info
print(f"\n📊 Template Info:")
print(f" Name: {config.get('name', 'N/A')}")
print(f" Memory: {config.get('memory', 'N/A')} MB")
print(f" Cores: {config.get('cores', 'N/A')}")
print(f" Sockets: {config.get('sockets', 'N/A')}")
if all_passed:
print(f"\n✅ Template {template_id} is properly configured")
else:
print(f"\n⚠️ Template {template_id} has configuration issues")
return all_passed
def _check_cloudinit(self, config):
"""Check for cloud-init drive."""
for key in config:
if key.startswith('ide') and 'cloudinit' in str(config[key]):
return True, f"Found at {key}"
return False, "Missing cloud-init drive (should be ide2)"
def _check_agent(self, config):
"""Check for QEMU guest agent."""
agent = config.get('agent', '0')
if agent in ['1', 'enabled=1']:
return True, "Enabled"
return False, "Not enabled (recommended for IP detection)"
def _check_scsi(self, config):
"""Check SCSI controller type."""
scsihw = config.get('scsihw', '')
if 'virtio' in scsihw:
return True, f"Using {scsihw}"
return False, f"Not using virtio-scsi (found: {scsihw or 'none'})"
def _check_boot_disk(self, config):
"""Check for boot disk."""
for key in config:
if key.startswith('scsi') and key != 'scsihw':
return True, f"Found at {key}"
return False, "No SCSI disk found"
def _check_serial(self, config):
"""Check for serial console."""
if 'serial0' in config:
return True, "Configured"
return False, "Not configured (recommended for cloud images)"
def _check_efi(self, config):
"""Check for EFI disk."""
if 'efidisk0' in config:
return True, "Configured"
return False, "Not configured (needed for UEFI boot)"
def main():
parser = argparse.ArgumentParser(
description="Validate Proxmox VM template health and configuration"
)
parser.add_argument(
"--template-id",
type=int,
required=True,
help="Template VM ID (e.g., 9000)"
)
parser.add_argument(
"--node",
help="Specific Proxmox node to check (default: search all nodes)"
)
parser.add_argument(
"--all-nodes",
action="store_true",
help="Search all nodes in cluster"
)
args = parser.parse_args()
# Get authentication from environment
endpoint = os.getenv("PROXMOX_VE_ENDPOINT")
api_token = os.getenv("PROXMOX_VE_API_TOKEN")
username = os.getenv("PROXMOX_VE_USERNAME")
password = os.getenv("PROXMOX_VE_PASSWORD")
if not endpoint:
print("❌ PROXMOX_VE_ENDPOINT environment variable required", file=sys.stderr)
sys.exit(1)
# Determine authentication method
if api_token:
validator = TemplateValidator(endpoint, "token", token=api_token)
elif username and password:
validator = TemplateValidator(endpoint, "password", user=username, password=password)
else:
print("❌ Authentication required: set PROXMOX_VE_API_TOKEN or PROXMOX_VE_USERNAME/PASSWORD", file=sys.stderr)
sys.exit(1)
# Validate template
node = None if args.all_nodes else args.node
success = validator.validate_template(args.template_id, node)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()