Files
2025-11-30 08:48:52 +08:00

445 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Database performance analysis tool for MongoDB and PostgreSQL.
Analyzes slow queries, recommends indexes, and generates reports.
"""
import argparse
import json
import sys
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Dict, List, Optional
try:
from pymongo import MongoClient
MONGO_AVAILABLE = True
except ImportError:
MONGO_AVAILABLE = False
try:
import psycopg2
from psycopg2.extras import RealDictCursor
POSTGRES_AVAILABLE = True
except ImportError:
POSTGRES_AVAILABLE = False
@dataclass
class SlowQuery:
"""Represents a slow query."""
query: str
execution_time_ms: float
count: int
collection_or_table: Optional[str] = None
index_used: Optional[str] = None
@dataclass
class IndexRecommendation:
"""Index recommendation."""
collection_or_table: str
fields: List[str]
reason: str
estimated_benefit: str
@dataclass
class PerformanceReport:
"""Performance analysis report."""
database_type: str
database_name: str
timestamp: datetime
slow_queries: List[SlowQuery]
index_recommendations: List[IndexRecommendation]
database_metrics: Dict[str, any]
class PerformanceAnalyzer:
"""Analyzes database performance."""
def __init__(self, db_type: str, connection_string: str, threshold_ms: int = 100):
"""
Initialize performance analyzer.
Args:
db_type: Database type ('mongodb' or 'postgres')
connection_string: Database connection string
threshold_ms: Slow query threshold in milliseconds
"""
self.db_type = db_type.lower()
self.connection_string = connection_string
self.threshold_ms = threshold_ms
self.client = None
self.db = None
self.conn = None
def connect(self) -> bool:
"""Connect to database."""
try:
if self.db_type == "mongodb":
if not MONGO_AVAILABLE:
print("Error: pymongo not installed")
return False
self.client = MongoClient(self.connection_string)
self.db = self.client.get_default_database()
self.client.server_info()
return True
elif self.db_type == "postgres":
if not POSTGRES_AVAILABLE:
print("Error: psycopg2 not installed")
return False
self.conn = psycopg2.connect(self.connection_string)
return True
else:
print(f"Error: Unsupported database type: {self.db_type}")
return False
except Exception as e:
print(f"Connection error: {e}")
return False
def disconnect(self):
"""Disconnect from database."""
try:
if self.client:
self.client.close()
if self.conn:
self.conn.close()
except Exception as e:
print(f"Disconnect error: {e}")
def analyze(self) -> Optional[PerformanceReport]:
"""
Analyze database performance.
Returns:
PerformanceReport if successful, None otherwise
"""
try:
if self.db_type == "mongodb":
return self._analyze_mongodb()
elif self.db_type == "postgres":
return self._analyze_postgres()
else:
return None
except Exception as e:
print(f"Analysis error: {e}")
return None
def _analyze_mongodb(self) -> PerformanceReport:
"""Analyze MongoDB performance."""
slow_queries = []
index_recommendations = []
# Enable profiling if not enabled
profiling_level = self.db.command("profile", -1)
if profiling_level.get("was", 0) == 0:
self.db.command("profile", 1, slowms=self.threshold_ms)
# Get slow queries from system.profile
for doc in self.db.system.profile.find(
{"millis": {"$gte": self.threshold_ms}},
limit=50
).sort("millis", -1):
query_str = json.dumps(doc.get("command", {}), default=str)
slow_queries.append(SlowQuery(
query=query_str,
execution_time_ms=doc.get("millis", 0),
count=1,
collection_or_table=doc.get("ns", "").split(".")[-1] if "ns" in doc else None,
index_used=doc.get("planSummary")
))
# Analyze collections for index recommendations
for coll_name in self.db.list_collection_names():
if coll_name.startswith("system."):
continue
coll = self.db[coll_name]
# Check for collections scans
stats = coll.aggregate([
{"$collStats": {"storageStats": {}}}
]).next()
# Check if collection has indexes
indexes = list(coll.list_indexes())
if len(indexes) <= 1: # Only _id index
# Recommend indexes based on common patterns
# Sample documents to find frequently queried fields
sample = list(coll.find().limit(100))
if sample:
# Find fields that appear in most documents
field_freq = {}
for doc in sample:
for field in doc.keys():
if field != "_id":
field_freq[field] = field_freq.get(field, 0) + 1
# Recommend index on most common field
if field_freq:
top_field = max(field_freq.items(), key=lambda x: x[1])[0]
index_recommendations.append(IndexRecommendation(
collection_or_table=coll_name,
fields=[top_field],
reason="Frequently queried field without index",
estimated_benefit="High"
))
# Get database metrics
server_status = self.client.admin.command("serverStatus")
db_stats = self.db.command("dbStats")
metrics = {
"connections": server_status.get("connections", {}).get("current", 0),
"operations_per_sec": server_status.get("opcounters", {}).get("query", 0),
"database_size_mb": db_stats.get("dataSize", 0) / (1024 * 1024),
"index_size_mb": db_stats.get("indexSize", 0) / (1024 * 1024),
"collections": db_stats.get("collections", 0)
}
return PerformanceReport(
database_type="mongodb",
database_name=self.db.name,
timestamp=datetime.now(),
slow_queries=slow_queries[:10], # Top 10
index_recommendations=index_recommendations,
database_metrics=metrics
)
def _analyze_postgres(self) -> PerformanceReport:
"""Analyze PostgreSQL performance."""
slow_queries = []
index_recommendations = []
with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
# Check if pg_stat_statements extension is available
cur.execute("""
SELECT EXISTS (
SELECT 1 FROM pg_extension WHERE extname = 'pg_stat_statements'
) AS has_extension
""")
has_pg_stat_statements = cur.fetchone()["has_extension"]
if has_pg_stat_statements:
# Get slow queries from pg_stat_statements
cur.execute("""
SELECT
query,
mean_exec_time,
calls,
total_exec_time
FROM pg_stat_statements
WHERE mean_exec_time >= %s
ORDER BY mean_exec_time DESC
LIMIT 10
""", (self.threshold_ms,))
for row in cur.fetchall():
slow_queries.append(SlowQuery(
query=row["query"],
execution_time_ms=row["mean_exec_time"],
count=row["calls"]
))
# Find tables with sequential scans (potential index candidates)
cur.execute("""
SELECT
schemaname,
tablename,
seq_scan,
seq_tup_read,
idx_scan
FROM pg_stat_user_tables
WHERE seq_scan > 1000
AND (idx_scan IS NULL OR seq_scan > idx_scan * 2)
ORDER BY seq_tup_read DESC
LIMIT 10
""")
for row in cur.fetchall():
index_recommendations.append(IndexRecommendation(
collection_or_table=f"{row['schemaname']}.{row['tablename']}",
fields=["<analyze query patterns>"],
reason=f"High sequential scans ({row['seq_scan']}) vs index scans ({row['idx_scan'] or 0})",
estimated_benefit="High" if row["seq_tup_read"] > 100000 else "Medium"
))
# Find unused indexes
cur.execute("""
SELECT
schemaname,
tablename,
indexname,
idx_scan
FROM pg_stat_user_indexes
WHERE idx_scan = 0
AND indexname NOT LIKE '%_pkey'
ORDER BY pg_relation_size(indexrelid) DESC
""")
unused_indexes = []
for row in cur.fetchall():
unused_indexes.append(
f"{row['schemaname']}.{row['tablename']}.{row['indexname']}"
)
# Database metrics
cur.execute("""
SELECT
sum(numbackends) AS connections,
sum(xact_commit) AS commits,
sum(xact_rollback) AS rollbacks
FROM pg_stat_database
WHERE datname = current_database()
""")
stats = cur.fetchone()
cur.execute("""
SELECT pg_database_size(current_database()) AS db_size
""")
db_size = cur.fetchone()["db_size"]
cur.execute("""
SELECT
sum(heap_blks_hit) / NULLIF(sum(heap_blks_hit) + sum(heap_blks_read), 0) AS cache_hit_ratio
FROM pg_statio_user_tables
""")
cache_ratio = cur.fetchone()["cache_hit_ratio"] or 0
metrics = {
"connections": stats["connections"],
"commits": stats["commits"],
"rollbacks": stats["rollbacks"],
"database_size_mb": db_size / (1024 * 1024),
"cache_hit_ratio": float(cache_ratio),
"unused_indexes": unused_indexes
}
return PerformanceReport(
database_type="postgres",
database_name=self.conn.info.dbname,
timestamp=datetime.now(),
slow_queries=slow_queries,
index_recommendations=index_recommendations,
database_metrics=metrics
)
def print_report(self, report: PerformanceReport):
"""Print performance report."""
print("=" * 80)
print(f"Database Performance Report - {report.database_type.upper()}")
print(f"Database: {report.database_name}")
print(f"Timestamp: {report.timestamp}")
print("=" * 80)
print("\n## Database Metrics")
print("-" * 80)
for key, value in report.database_metrics.items():
if isinstance(value, float):
print(f"{key}: {value:.2f}")
else:
print(f"{key}: {value}")
print("\n## Slow Queries")
print("-" * 80)
if report.slow_queries:
for i, query in enumerate(report.slow_queries, 1):
print(f"\n{i}. Execution Time: {query.execution_time_ms:.2f}ms | Count: {query.count}")
if query.collection_or_table:
print(f" Collection/Table: {query.collection_or_table}")
if query.index_used:
print(f" Index Used: {query.index_used}")
print(f" Query: {query.query[:200]}...")
else:
print("No slow queries found")
print("\n## Index Recommendations")
print("-" * 80)
if report.index_recommendations:
for i, rec in enumerate(report.index_recommendations, 1):
print(f"\n{i}. {rec.collection_or_table}")
print(f" Fields: {', '.join(rec.fields)}")
print(f" Reason: {rec.reason}")
print(f" Estimated Benefit: {rec.estimated_benefit}")
if report.database_type == "mongodb":
index_spec = {field: 1 for field in rec.fields}
print(f" Command: db.{rec.collection_or_table}.createIndex({json.dumps(index_spec)})")
elif report.database_type == "postgres":
fields_str = ", ".join(rec.fields)
print(f" Command: CREATE INDEX idx_{rec.collection_or_table.replace('.', '_')}_{rec.fields[0]} ON {rec.collection_or_table}({fields_str});")
else:
print("No index recommendations")
print("\n" + "=" * 80)
def save_report(self, report: PerformanceReport, filename: str):
"""Save report to JSON file."""
# Convert dataclasses to dict
report_dict = {
"database_type": report.database_type,
"database_name": report.database_name,
"timestamp": report.timestamp.isoformat(),
"slow_queries": [asdict(q) for q in report.slow_queries],
"index_recommendations": [asdict(r) for r in report.index_recommendations],
"database_metrics": report.database_metrics
}
with open(filename, "w") as f:
json.dump(report_dict, f, indent=2, default=str)
print(f"\nReport saved to: {filename}")
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="Database performance analysis tool")
parser.add_argument("--db", required=True, choices=["mongodb", "postgres"],
help="Database type")
parser.add_argument("--uri", required=True, help="Database connection string")
parser.add_argument("--threshold", type=int, default=100,
help="Slow query threshold in milliseconds (default: 100)")
parser.add_argument("--output", help="Save report to JSON file")
args = parser.parse_args()
analyzer = PerformanceAnalyzer(args.db, args.uri, args.threshold)
if not analyzer.connect():
sys.exit(1)
try:
print(f"Analyzing {args.db} performance (threshold: {args.threshold}ms)...")
report = analyzer.analyze()
if report:
analyzer.print_report(report)
if args.output:
analyzer.save_report(report, args.output)
sys.exit(0)
else:
print("Analysis failed")
sys.exit(1)
finally:
analyzer.disconnect()
if __name__ == "__main__":
main()