#!/usr/bin/env python3 """ Audit Prometheus alert rules against best practices. Checks for: alert naming, severity labels, runbook links, expression quality. """ import argparse import sys import os import re from typing import Dict, List, Any from pathlib import Path try: import yaml except ImportError: print("āš ļø Warning: 'PyYAML' library not found. Install with: pip install pyyaml") sys.exit(1) class AlertQualityChecker: def __init__(self): self.issues = [] self.warnings = [] self.recommendations = [] def check_alert_name(self, alert_name: str) -> List[str]: """Check alert naming conventions.""" issues = [] # Should be PascalCase or camelCase if not re.match(r'^[A-Z][a-zA-Z0-9]*$', alert_name): issues.append(f"Alert name '{alert_name}' should use PascalCase (e.g., HighCPUUsage)") # Should be descriptive if len(alert_name) < 5: issues.append(f"Alert name '{alert_name}' is too short, use descriptive names") # Avoid generic names generic_names = ['Alert', 'Test', 'Warning', 'Error'] if alert_name in generic_names: issues.append(f"Alert name '{alert_name}' is too generic") return issues def check_labels(self, alert: Dict[str, Any]) -> List[str]: """Check required and recommended labels.""" issues = [] labels = alert.get('labels', {}) # Required labels if 'severity' not in labels: issues.append("Missing required 'severity' label (critical/warning/info)") elif labels['severity'] not in ['critical', 'warning', 'info']: issues.append(f"Severity '{labels['severity']}' should be one of: critical, warning, info") # Recommended labels if 'team' not in labels: self.recommendations.append("Consider adding 'team' label for routing") if 'component' not in labels and 'service' not in labels: self.recommendations.append("Consider adding 'component' or 'service' label") return issues def check_annotations(self, alert: Dict[str, Any]) -> List[str]: """Check annotations quality.""" issues = [] annotations = alert.get('annotations', {}) # Required annotations if 'summary' not in annotations: issues.append("Missing 'summary' annotation") elif len(annotations['summary']) < 10: issues.append("Summary annotation is too short, provide clear description") if 'description' not in annotations: issues.append("Missing 'description' annotation") # Runbook if 'runbook_url' not in annotations and 'runbook' not in annotations: self.recommendations.append("Consider adding 'runbook_url' for incident response") # Check for templating if 'summary' in annotations: if '{{ $value }}' not in annotations['summary'] and '{{' not in annotations['summary']: self.recommendations.append("Consider using template variables in summary (e.g., {{ $value }})") return issues def check_expression(self, expr: str, alert_name: str) -> List[str]: """Check PromQL expression quality.""" issues = [] # Should have a threshold if '>' not in expr and '<' not in expr and '==' not in expr and '!=' not in expr: issues.append("Expression should include a comparison operator") # Should use rate() for counters if '_total' in expr and 'rate(' not in expr and 'increase(' not in expr: self.recommendations.append("Consider using rate() or increase() for counter metrics (*_total)") # Avoid instant queries without aggregation if not any(agg in expr for agg in ['sum(', 'avg(', 'min(', 'max(', 'count(']): if expr.count('{') > 1: # Multiple metrics without aggregation self.recommendations.append("Consider aggregating metrics with sum(), avg(), etc.") # Check for proper time windows if '[' not in expr and 'rate(' in expr: issues.append("rate() requires a time window (e.g., rate(metric[5m]))") return issues def check_for_duration(self, rule: Dict[str, Any]) -> List[str]: """Check for 'for' clause to prevent flapping.""" issues = [] severity = rule.get('labels', {}).get('severity', 'unknown') if 'for' not in rule: if severity == 'critical': issues.append("Critical alerts should have 'for' clause to prevent flapping") else: self.warnings.append("Consider adding 'for' clause to prevent alert flapping") else: # Parse duration duration = rule['for'] if severity == 'critical' and any(x in duration for x in ['0s', '30s', '1m']): self.warnings.append(f"'for' duration ({duration}) might be too short for critical alerts") return issues def check_alert_rule(self, rule: Dict[str, Any]) -> Dict[str, Any]: """Check a single alert rule.""" alert_name = rule.get('alert', 'Unknown') issues = [] # Check alert name issues.extend(self.check_alert_name(alert_name)) # Check expression if 'expr' not in rule: issues.append("Missing 'expr' field") else: issues.extend(self.check_expression(rule['expr'], alert_name)) # Check labels issues.extend(self.check_labels(rule)) # Check annotations issues.extend(self.check_annotations(rule)) # Check for duration issues.extend(self.check_for_duration(rule)) return { "alert": alert_name, "issues": issues, "severity": rule.get('labels', {}).get('severity', 'unknown') } def analyze_file(self, filepath: str) -> Dict[str, Any]: """Analyze a Prometheus rules file.""" try: with open(filepath, 'r') as f: data = yaml.safe_load(f) if not data: return {"error": "Empty or invalid YAML file"} results = [] groups = data.get('groups', []) for group in groups: group_name = group.get('name', 'Unknown') rules = group.get('rules', []) for rule in rules: # Only check alerting rules, not recording rules if 'alert' in rule: result = self.check_alert_rule(rule) result['group'] = group_name results.append(result) return { "file": filepath, "groups": len(groups), "alerts_checked": len(results), "results": results } except Exception as e: return {"error": f"Failed to parse file: {e}"} def print_results(analysis: Dict[str, Any], checker: AlertQualityChecker): """Pretty print analysis results.""" print("\n" + "="*60) print("🚨 ALERT QUALITY CHECK RESULTS") print("="*60) if "error" in analysis: print(f"\nāŒ Error: {analysis['error']}") return print(f"\nšŸ“ File: {analysis['file']}") print(f"šŸ“Š Groups: {analysis['groups']}") print(f"šŸ”” Alerts Checked: {analysis['alerts_checked']}") # Count issues by severity critical_count = 0 warning_count = 0 for result in analysis['results']: if result['issues']: critical_count += 1 print(f"\n{'='*60}") print(f"šŸ“ˆ Summary:") print(f" āŒ Alerts with Issues: {critical_count}") print(f" āš ļø Warnings: {len(checker.warnings)}") print(f" šŸ’” Recommendations: {len(checker.recommendations)}") # Print detailed results if critical_count > 0: print(f"\n{'='*60}") print("āŒ ALERTS WITH ISSUES:") print(f"{'='*60}") for result in analysis['results']: if result['issues']: print(f"\nšŸ”” Alert: {result['alert']} (Group: {result['group']})") print(f" Severity: {result['severity']}") print(" Issues:") for issue in result['issues']: print(f" • {issue}") # Print warnings if checker.warnings: print(f"\n{'='*60}") print("āš ļø WARNINGS:") print(f"{'='*60}") for warning in set(checker.warnings): # Remove duplicates print(f"• {warning}") # Print recommendations if checker.recommendations: print(f"\n{'='*60}") print("šŸ’” RECOMMENDATIONS:") print(f"{'='*60}") for rec in list(set(checker.recommendations))[:10]: # Top 10 unique recommendations print(f"• {rec}") # Overall score total_alerts = analysis['alerts_checked'] if total_alerts > 0: quality_score = ((total_alerts - critical_count) / total_alerts) * 100 print(f"\n{'='*60}") print(f"šŸ“Š Quality Score: {quality_score:.1f}% ({total_alerts - critical_count}/{total_alerts} alerts passing)") print(f"{'='*60}\n") def main(): parser = argparse.ArgumentParser( description="Audit Prometheus alert rules for quality and best practices", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Check a single file python3 alert_quality_checker.py alerts.yml # Check all YAML files in a directory python3 alert_quality_checker.py /path/to/prometheus/rules/ Best Practices Checked: āœ“ Alert naming conventions (PascalCase, descriptive) āœ“ Required labels (severity) āœ“ Required annotations (summary, description) āœ“ Runbook URL presence āœ“ PromQL expression quality āœ“ 'for' clause to prevent flapping āœ“ Template variable usage """ ) parser.add_argument('path', help='Path to alert rules file or directory') parser.add_argument('--verbose', action='store_true', help='Show all recommendations') args = parser.parse_args() checker = AlertQualityChecker() # Check if path is file or directory path = Path(args.path) if path.is_file(): files = [str(path)] elif path.is_dir(): files = [str(f) for f in path.rglob('*.yml')] + [str(f) for f in path.rglob('*.yaml')] else: print(f"āŒ Path not found: {args.path}") sys.exit(1) if not files: print(f"āŒ No YAML files found in: {args.path}") sys.exit(1) print(f"šŸ” Checking {len(files)} file(s)...") for filepath in files: analysis = checker.analyze_file(filepath) print_results(analysis, checker) if __name__ == "__main__": main()