Files
2025-11-30 08:46:06 +08:00

225 lines
6.8 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Analyze Pod resources from must-gather data.
Displays output similar to 'oc get pods -A' command.
"""
import sys
import os
import yaml
import argparse
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Any, Optional
def parse_pod(file_path: Path) -> Optional[Dict[str, Any]]:
"""Parse a single pod YAML file."""
try:
with open(file_path, 'r') as f:
doc = yaml.safe_load(f)
if doc and doc.get('kind') == 'Pod':
return doc
except Exception as e:
print(f"Warning: Failed to parse {file_path}: {e}", file=sys.stderr)
return None
def calculate_age(creation_timestamp: str) -> str:
"""Calculate age from creation timestamp."""
try:
ts = datetime.fromisoformat(creation_timestamp.replace('Z', '+00:00'))
now = datetime.now(ts.tzinfo)
delta = now - ts
days = delta.days
hours = delta.seconds // 3600
minutes = (delta.seconds % 3600) // 60
if days > 0:
return f"{days}d"
elif hours > 0:
return f"{hours}h"
elif minutes > 0:
return f"{minutes}m"
else:
return "<1m"
except Exception:
return ""
def get_pod_status(pod: Dict[str, Any]) -> Dict[str, Any]:
"""Extract pod status information."""
metadata = pod.get('metadata', {})
status = pod.get('status', {})
spec = pod.get('spec', {})
name = metadata.get('name', 'unknown')
namespace = metadata.get('namespace', 'unknown')
creation_time = metadata.get('creationTimestamp', '')
# Get container statuses
container_statuses = status.get('containerStatuses', [])
init_container_statuses = status.get('initContainerStatuses', [])
# Calculate ready containers
total_containers = len(spec.get('containers', []))
ready_containers = sum(1 for cs in container_statuses if cs.get('ready', False))
# Get overall phase
phase = status.get('phase', 'Unknown')
# Determine more specific status
pod_status = phase
reason = status.get('reason', '')
# Check for specific container states
for cs in container_statuses:
state = cs.get('state', {})
if 'waiting' in state:
waiting = state['waiting']
pod_status = waiting.get('reason', 'Waiting')
elif 'terminated' in state:
terminated = state['terminated']
if terminated.get('exitCode', 0) != 0:
pod_status = terminated.get('reason', 'Error')
# Check init containers
for ics in init_container_statuses:
state = ics.get('state', {})
if 'waiting' in state:
waiting = state['waiting']
if waiting.get('reason') in ['CrashLoopBackOff', 'ImagePullBackOff', 'ErrImagePull']:
pod_status = f"Init:{waiting.get('reason', 'Waiting')}"
# Calculate total restarts
total_restarts = sum(cs.get('restartCount', 0) for cs in container_statuses)
# Calculate age
age = calculate_age(creation_time) if creation_time else ''
return {
'namespace': namespace,
'name': name,
'ready': f"{ready_containers}/{total_containers}",
'status': pod_status,
'restarts': str(total_restarts),
'age': age,
'node': spec.get('nodeName', ''),
'is_problem': pod_status not in ['Running', 'Succeeded', 'Completed'] or total_restarts > 0
}
def print_pods_table(pods: List[Dict[str, Any]], show_namespace: bool = True):
"""Print pods in a formatted table like 'oc get pods'."""
if not pods:
print("No resources found.")
return
# Print header
if show_namespace:
print(f"{'NAMESPACE':<42} {'NAME':<50} {'READY':<7} {'STATUS':<20} {'RESTARTS':<9} AGE")
else:
print(f"{'NAME':<50} {'READY':<7} {'STATUS':<20} {'RESTARTS':<9} AGE")
# Print rows
for pod in pods:
name = pod['name'][:50]
ready = pod['ready'][:7]
status = pod['status'][:20]
restarts = pod['restarts'][:9]
age = pod['age']
if show_namespace:
namespace = pod['namespace'][:42]
print(f"{namespace:<42} {name:<50} {ready:<7} {status:<20} {restarts:<9} {age}")
else:
print(f"{name:<50} {ready:<7} {status:<20} {restarts:<9} {age}")
def analyze_pods(must_gather_path: str, namespace: Optional[str] = None, problems_only: bool = False):
"""Analyze all pods in a must-gather directory."""
base_path = Path(must_gather_path)
pods = []
# Find all pod YAML files
# Structure: namespaces/<namespace>/pods/<pod-name>/<pod-name>.yaml
if namespace:
# Specific namespace
patterns = [
f"namespaces/{namespace}/pods/*/*.yaml",
f"*/namespaces/{namespace}/pods/*/*.yaml",
]
else:
# All namespaces
patterns = [
"namespaces/*/pods/*/*.yaml",
"*/namespaces/*/pods/*/*.yaml",
]
for pattern in patterns:
for pod_file in base_path.glob(pattern):
pod = parse_pod(pod_file)
if pod:
pod_status = get_pod_status(pod)
pods.append(pod_status)
if not pods:
print("No resources found.")
return 1
# Remove duplicates
seen = set()
unique_pods = []
for p in pods:
key = f"{p['namespace']}/{p['name']}"
if key not in seen:
seen.add(key)
unique_pods.append(p)
# Sort by namespace, then name
unique_pods.sort(key=lambda x: (x['namespace'], x['name']))
# Filter if problems only
if problems_only:
unique_pods = [p for p in unique_pods if p['is_problem']]
if not unique_pods:
print("No resources found.")
return 0
# Print results
print_pods_table(unique_pods, show_namespace=(namespace is None))
return 0
def main():
parser = argparse.ArgumentParser(
description='Analyze pod resources from must-gather data',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s ./must-gather.local.123456789
%(prog)s ./must-gather.local.123456789 --namespace openshift-etcd
%(prog)s ./must-gather.local.123456789 --problems-only
"""
)
parser.add_argument('must_gather_path', help='Path to must-gather directory')
parser.add_argument('-n', '--namespace', help='Filter by namespace')
parser.add_argument('-p', '--problems-only', action='store_true',
help='Show only pods with issues')
args = parser.parse_args()
if not os.path.isdir(args.must_gather_path):
print(f"Error: Directory not found: {args.must_gather_path}", file=sys.stderr)
return 1
return analyze_pods(args.must_gather_path, args.namespace, args.problems_only)
if __name__ == '__main__':
sys.exit(main())