#!/usr/bin/env python3 """ Database performance analysis tool for MongoDB and PostgreSQL. Analyzes slow queries, recommends indexes, and generates reports. """ import argparse import json import sys from dataclasses import dataclass, asdict from datetime import datetime from typing import Dict, List, Optional try: from pymongo import MongoClient MONGO_AVAILABLE = True except ImportError: MONGO_AVAILABLE = False try: import psycopg2 from psycopg2.extras import RealDictCursor POSTGRES_AVAILABLE = True except ImportError: POSTGRES_AVAILABLE = False @dataclass class SlowQuery: """Represents a slow query.""" query: str execution_time_ms: float count: int collection_or_table: Optional[str] = None index_used: Optional[str] = None @dataclass class IndexRecommendation: """Index recommendation.""" collection_or_table: str fields: List[str] reason: str estimated_benefit: str @dataclass class PerformanceReport: """Performance analysis report.""" database_type: str database_name: str timestamp: datetime slow_queries: List[SlowQuery] index_recommendations: List[IndexRecommendation] database_metrics: Dict[str, any] class PerformanceAnalyzer: """Analyzes database performance.""" def __init__(self, db_type: str, connection_string: str, threshold_ms: int = 100): """ Initialize performance analyzer. Args: db_type: Database type ('mongodb' or 'postgres') connection_string: Database connection string threshold_ms: Slow query threshold in milliseconds """ self.db_type = db_type.lower() self.connection_string = connection_string self.threshold_ms = threshold_ms self.client = None self.db = None self.conn = None def connect(self) -> bool: """Connect to database.""" try: if self.db_type == "mongodb": if not MONGO_AVAILABLE: print("Error: pymongo not installed") return False self.client = MongoClient(self.connection_string) self.db = self.client.get_default_database() self.client.server_info() return True elif self.db_type == "postgres": if not POSTGRES_AVAILABLE: print("Error: psycopg2 not installed") return False self.conn = psycopg2.connect(self.connection_string) return True else: print(f"Error: Unsupported database type: {self.db_type}") return False except Exception as e: print(f"Connection error: {e}") return False def disconnect(self): """Disconnect from database.""" try: if self.client: self.client.close() if self.conn: self.conn.close() except Exception as e: print(f"Disconnect error: {e}") def analyze(self) -> Optional[PerformanceReport]: """ Analyze database performance. Returns: PerformanceReport if successful, None otherwise """ try: if self.db_type == "mongodb": return self._analyze_mongodb() elif self.db_type == "postgres": return self._analyze_postgres() else: return None except Exception as e: print(f"Analysis error: {e}") return None def _analyze_mongodb(self) -> PerformanceReport: """Analyze MongoDB performance.""" slow_queries = [] index_recommendations = [] # Enable profiling if not enabled profiling_level = self.db.command("profile", -1) if profiling_level.get("was", 0) == 0: self.db.command("profile", 1, slowms=self.threshold_ms) # Get slow queries from system.profile for doc in self.db.system.profile.find( {"millis": {"$gte": self.threshold_ms}}, limit=50 ).sort("millis", -1): query_str = json.dumps(doc.get("command", {}), default=str) slow_queries.append(SlowQuery( query=query_str, execution_time_ms=doc.get("millis", 0), count=1, collection_or_table=doc.get("ns", "").split(".")[-1] if "ns" in doc else None, index_used=doc.get("planSummary") )) # Analyze collections for index recommendations for coll_name in self.db.list_collection_names(): if coll_name.startswith("system."): continue coll = self.db[coll_name] # Check for collections scans stats = coll.aggregate([ {"$collStats": {"storageStats": {}}} ]).next() # Check if collection has indexes indexes = list(coll.list_indexes()) if len(indexes) <= 1: # Only _id index # Recommend indexes based on common patterns # Sample documents to find frequently queried fields sample = list(coll.find().limit(100)) if sample: # Find fields that appear in most documents field_freq = {} for doc in sample: for field in doc.keys(): if field != "_id": field_freq[field] = field_freq.get(field, 0) + 1 # Recommend index on most common field if field_freq: top_field = max(field_freq.items(), key=lambda x: x[1])[0] index_recommendations.append(IndexRecommendation( collection_or_table=coll_name, fields=[top_field], reason="Frequently queried field without index", estimated_benefit="High" )) # Get database metrics server_status = self.client.admin.command("serverStatus") db_stats = self.db.command("dbStats") metrics = { "connections": server_status.get("connections", {}).get("current", 0), "operations_per_sec": server_status.get("opcounters", {}).get("query", 0), "database_size_mb": db_stats.get("dataSize", 0) / (1024 * 1024), "index_size_mb": db_stats.get("indexSize", 0) / (1024 * 1024), "collections": db_stats.get("collections", 0) } return PerformanceReport( database_type="mongodb", database_name=self.db.name, timestamp=datetime.now(), slow_queries=slow_queries[:10], # Top 10 index_recommendations=index_recommendations, database_metrics=metrics ) def _analyze_postgres(self) -> PerformanceReport: """Analyze PostgreSQL performance.""" slow_queries = [] index_recommendations = [] with self.conn.cursor(cursor_factory=RealDictCursor) as cur: # Check if pg_stat_statements extension is available cur.execute(""" SELECT EXISTS ( SELECT 1 FROM pg_extension WHERE extname = 'pg_stat_statements' ) AS has_extension """) has_pg_stat_statements = cur.fetchone()["has_extension"] if has_pg_stat_statements: # Get slow queries from pg_stat_statements cur.execute(""" SELECT query, mean_exec_time, calls, total_exec_time FROM pg_stat_statements WHERE mean_exec_time >= %s ORDER BY mean_exec_time DESC LIMIT 10 """, (self.threshold_ms,)) for row in cur.fetchall(): slow_queries.append(SlowQuery( query=row["query"], execution_time_ms=row["mean_exec_time"], count=row["calls"] )) # Find tables with sequential scans (potential index candidates) cur.execute(""" SELECT schemaname, tablename, seq_scan, seq_tup_read, idx_scan FROM pg_stat_user_tables WHERE seq_scan > 1000 AND (idx_scan IS NULL OR seq_scan > idx_scan * 2) ORDER BY seq_tup_read DESC LIMIT 10 """) for row in cur.fetchall(): index_recommendations.append(IndexRecommendation( collection_or_table=f"{row['schemaname']}.{row['tablename']}", fields=[""], reason=f"High sequential scans ({row['seq_scan']}) vs index scans ({row['idx_scan'] or 0})", estimated_benefit="High" if row["seq_tup_read"] > 100000 else "Medium" )) # Find unused indexes cur.execute(""" SELECT schemaname, tablename, indexname, idx_scan FROM pg_stat_user_indexes WHERE idx_scan = 0 AND indexname NOT LIKE '%_pkey' ORDER BY pg_relation_size(indexrelid) DESC """) unused_indexes = [] for row in cur.fetchall(): unused_indexes.append( f"{row['schemaname']}.{row['tablename']}.{row['indexname']}" ) # Database metrics cur.execute(""" SELECT sum(numbackends) AS connections, sum(xact_commit) AS commits, sum(xact_rollback) AS rollbacks FROM pg_stat_database WHERE datname = current_database() """) stats = cur.fetchone() cur.execute(""" SELECT pg_database_size(current_database()) AS db_size """) db_size = cur.fetchone()["db_size"] cur.execute(""" SELECT sum(heap_blks_hit) / NULLIF(sum(heap_blks_hit) + sum(heap_blks_read), 0) AS cache_hit_ratio FROM pg_statio_user_tables """) cache_ratio = cur.fetchone()["cache_hit_ratio"] or 0 metrics = { "connections": stats["connections"], "commits": stats["commits"], "rollbacks": stats["rollbacks"], "database_size_mb": db_size / (1024 * 1024), "cache_hit_ratio": float(cache_ratio), "unused_indexes": unused_indexes } return PerformanceReport( database_type="postgres", database_name=self.conn.info.dbname, timestamp=datetime.now(), slow_queries=slow_queries, index_recommendations=index_recommendations, database_metrics=metrics ) def print_report(self, report: PerformanceReport): """Print performance report.""" print("=" * 80) print(f"Database Performance Report - {report.database_type.upper()}") print(f"Database: {report.database_name}") print(f"Timestamp: {report.timestamp}") print("=" * 80) print("\n## Database Metrics") print("-" * 80) for key, value in report.database_metrics.items(): if isinstance(value, float): print(f"{key}: {value:.2f}") else: print(f"{key}: {value}") print("\n## Slow Queries") print("-" * 80) if report.slow_queries: for i, query in enumerate(report.slow_queries, 1): print(f"\n{i}. Execution Time: {query.execution_time_ms:.2f}ms | Count: {query.count}") if query.collection_or_table: print(f" Collection/Table: {query.collection_or_table}") if query.index_used: print(f" Index Used: {query.index_used}") print(f" Query: {query.query[:200]}...") else: print("No slow queries found") print("\n## Index Recommendations") print("-" * 80) if report.index_recommendations: for i, rec in enumerate(report.index_recommendations, 1): print(f"\n{i}. {rec.collection_or_table}") print(f" Fields: {', '.join(rec.fields)}") print(f" Reason: {rec.reason}") print(f" Estimated Benefit: {rec.estimated_benefit}") if report.database_type == "mongodb": index_spec = {field: 1 for field in rec.fields} print(f" Command: db.{rec.collection_or_table}.createIndex({json.dumps(index_spec)})") elif report.database_type == "postgres": fields_str = ", ".join(rec.fields) print(f" Command: CREATE INDEX idx_{rec.collection_or_table.replace('.', '_')}_{rec.fields[0]} ON {rec.collection_or_table}({fields_str});") else: print("No index recommendations") print("\n" + "=" * 80) def save_report(self, report: PerformanceReport, filename: str): """Save report to JSON file.""" # Convert dataclasses to dict report_dict = { "database_type": report.database_type, "database_name": report.database_name, "timestamp": report.timestamp.isoformat(), "slow_queries": [asdict(q) for q in report.slow_queries], "index_recommendations": [asdict(r) for r in report.index_recommendations], "database_metrics": report.database_metrics } with open(filename, "w") as f: json.dump(report_dict, f, indent=2, default=str) print(f"\nReport saved to: {filename}") def main(): """Main entry point.""" parser = argparse.ArgumentParser(description="Database performance analysis tool") parser.add_argument("--db", required=True, choices=["mongodb", "postgres"], help="Database type") parser.add_argument("--uri", required=True, help="Database connection string") parser.add_argument("--threshold", type=int, default=100, help="Slow query threshold in milliseconds (default: 100)") parser.add_argument("--output", help="Save report to JSON file") args = parser.parse_args() analyzer = PerformanceAnalyzer(args.db, args.uri, args.threshold) if not analyzer.connect(): sys.exit(1) try: print(f"Analyzing {args.db} performance (threshold: {args.threshold}ms)...") report = analyzer.analyze() if report: analyzer.print_report(report) if args.output: analyzer.save_report(report, args.output) sys.exit(0) else: print("Analysis failed") sys.exit(1) finally: analyzer.disconnect() if __name__ == "__main__": main()