Files
2025-11-30 08:48:52 +08:00

503 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Database backup and restore tool for MongoDB and PostgreSQL.
Supports compression, scheduling, and verification.
"""
import argparse
import gzip
import json
import os
import shutil
import subprocess
import sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
@dataclass
class BackupInfo:
"""Backup metadata."""
filename: str
database_type: str
database_name: str
timestamp: datetime
size_bytes: int
compressed: bool
verified: bool = False
class BackupManager:
"""Manages database backups for MongoDB and PostgreSQL."""
def __init__(self, db_type: str, backup_dir: str = "./backups"):
"""
Initialize backup manager.
Args:
db_type: Database type ('mongodb' or 'postgres')
backup_dir: Directory to store backups
"""
self.db_type = db_type.lower()
self.backup_dir = Path(backup_dir)
self.backup_dir.mkdir(exist_ok=True)
def create_backup(
self,
uri: str,
database: Optional[str] = None,
compress: bool = True,
verify: bool = True
) -> Optional[BackupInfo]:
"""
Create database backup.
Args:
uri: Database connection string
database: Database name (optional for MongoDB)
compress: Compress backup file
verify: Verify backup after creation
Returns:
BackupInfo if successful, None otherwise
"""
timestamp = datetime.now()
date_str = timestamp.strftime("%Y%m%d_%H%M%S")
if self.db_type == "mongodb":
return self._backup_mongodb(uri, database, date_str, compress, verify)
elif self.db_type == "postgres":
return self._backup_postgres(uri, database, date_str, compress, verify)
else:
print(f"Error: Unsupported database type: {self.db_type}")
return None
def _backup_mongodb(
self,
uri: str,
database: Optional[str],
date_str: str,
compress: bool,
verify: bool
) -> Optional[BackupInfo]:
"""Create MongoDB backup using mongodump."""
db_name = database or "all"
filename = f"mongodb_{db_name}_{date_str}"
backup_path = self.backup_dir / filename
try:
cmd = ["mongodump", "--uri", uri, "--out", str(backup_path)]
if database:
cmd.extend(["--db", database])
print(f"Creating MongoDB backup: {filename}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"Error: {result.stderr}")
return None
# Compress if requested
if compress:
archive_path = backup_path.with_suffix(".tar.gz")
print(f"Compressing backup...")
shutil.make_archive(str(backup_path), "gztar", backup_path)
shutil.rmtree(backup_path)
backup_path = archive_path
filename = archive_path.name
size_bytes = self._get_size(backup_path)
backup_info = BackupInfo(
filename=filename,
database_type="mongodb",
database_name=db_name,
timestamp=datetime.now(),
size_bytes=size_bytes,
compressed=compress
)
if verify:
backup_info.verified = self._verify_backup(backup_info)
self._save_metadata(backup_info)
print(f"✓ Backup created: {filename} ({self._format_size(size_bytes)})")
return backup_info
except Exception as e:
print(f"Error creating MongoDB backup: {e}")
return None
def _backup_postgres(
self,
uri: str,
database: str,
date_str: str,
compress: bool,
verify: bool
) -> Optional[BackupInfo]:
"""Create PostgreSQL backup using pg_dump."""
if not database:
print("Error: Database name required for PostgreSQL backup")
return None
ext = ".sql.gz" if compress else ".sql"
filename = f"postgres_{database}_{date_str}{ext}"
backup_path = self.backup_dir / filename
try:
cmd = ["pg_dump", uri]
if compress:
# Use pg_dump with gzip
with open(backup_path, "wb") as f:
dump_proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
gzip_proc = subprocess.Popen(
["gzip"],
stdin=dump_proc.stdout,
stdout=f
)
dump_proc.stdout.close()
gzip_proc.communicate()
if dump_proc.returncode != 0:
print("Error: pg_dump failed")
return None
else:
with open(backup_path, "w") as f:
result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
print(f"Error: {result.stderr}")
return None
size_bytes = backup_path.stat().st_size
backup_info = BackupInfo(
filename=filename,
database_type="postgres",
database_name=database,
timestamp=datetime.now(),
size_bytes=size_bytes,
compressed=compress
)
if verify:
backup_info.verified = self._verify_backup(backup_info)
self._save_metadata(backup_info)
print(f"✓ Backup created: {filename} ({self._format_size(size_bytes)})")
return backup_info
except Exception as e:
print(f"Error creating PostgreSQL backup: {e}")
return None
def restore_backup(self, filename: str, uri: str, dry_run: bool = False) -> bool:
"""
Restore database from backup.
Args:
filename: Backup filename
uri: Database connection string
dry_run: If True, only show what would be done
Returns:
True if successful, False otherwise
"""
backup_path = self.backup_dir / filename
if not backup_path.exists():
print(f"Error: Backup not found: {filename}")
return False
# Load metadata
metadata_path = backup_path.with_suffix(".json")
if metadata_path.exists():
with open(metadata_path) as f:
metadata = json.load(f)
print(f"Restoring backup from {metadata['timestamp']}")
print(f"Database: {metadata['database_name']}")
if dry_run:
print(f"Would restore from: {backup_path}")
return True
print(f"Restoring backup: {filename}")
try:
if self.db_type == "mongodb":
return self._restore_mongodb(backup_path, uri)
elif self.db_type == "postgres":
return self._restore_postgres(backup_path, uri)
else:
print(f"Error: Unsupported database type: {self.db_type}")
return False
except Exception as e:
print(f"Error restoring backup: {e}")
return False
def _restore_mongodb(self, backup_path: Path, uri: str) -> bool:
"""Restore MongoDB backup using mongorestore."""
try:
# Extract if compressed
restore_path = backup_path
if backup_path.suffix == ".gz":
print("Extracting backup...")
extract_path = backup_path.with_suffix("")
shutil.unpack_archive(backup_path, extract_path)
restore_path = extract_path
cmd = ["mongorestore", "--uri", uri, str(restore_path)]
result = subprocess.run(cmd, capture_output=True, text=True)
# Cleanup extracted files
if restore_path != backup_path and restore_path.is_dir():
shutil.rmtree(restore_path)
if result.returncode != 0:
print(f"Error: {result.stderr}")
return False
print("✓ Restore completed")
return True
except Exception as e:
print(f"Error restoring MongoDB: {e}")
return False
def _restore_postgres(self, backup_path: Path, uri: str) -> bool:
"""Restore PostgreSQL backup using psql."""
try:
if backup_path.suffix == ".gz":
# Decompress and restore
with gzip.open(backup_path, "rb") as f:
cmd = ["psql", uri]
result = subprocess.run(
cmd,
stdin=f,
capture_output=True,
text=False
)
else:
with open(backup_path) as f:
cmd = ["psql", uri]
result = subprocess.run(
cmd,
stdin=f,
capture_output=True,
text=True
)
if result.returncode != 0:
print(f"Error: {result.stderr}")
return False
print("✓ Restore completed")
return True
except Exception as e:
print(f"Error restoring PostgreSQL: {e}")
return False
def list_backups(self) -> List[BackupInfo]:
"""
List all backups.
Returns:
List of BackupInfo objects
"""
backups = []
for metadata_file in sorted(self.backup_dir.glob("*.json")):
try:
with open(metadata_file) as f:
data = json.load(f)
backup_info = BackupInfo(
filename=data["filename"],
database_type=data["database_type"],
database_name=data["database_name"],
timestamp=datetime.fromisoformat(data["timestamp"]),
size_bytes=data["size_bytes"],
compressed=data["compressed"],
verified=data.get("verified", False)
)
backups.append(backup_info)
except Exception as e:
print(f"Error reading metadata {metadata_file}: {e}")
return backups
def cleanup_old_backups(self, retention_days: int, dry_run: bool = False) -> int:
"""
Remove backups older than retention period.
Args:
retention_days: Number of days to retain backups
dry_run: If True, only show what would be deleted
Returns:
Number of backups removed
"""
cutoff = datetime.now().timestamp() - (retention_days * 24 * 3600)
removed = 0
for backup_file in self.backup_dir.glob("*"):
if backup_file.suffix == ".json":
continue
if backup_file.stat().st_mtime < cutoff:
if dry_run:
print(f"Would remove: {backup_file.name}")
else:
print(f"Removing: {backup_file.name}")
backup_file.unlink()
# Remove metadata
metadata_file = backup_file.with_suffix(".json")
if metadata_file.exists():
metadata_file.unlink()
removed += 1
return removed
def _verify_backup(self, backup_info: BackupInfo) -> bool:
"""
Verify backup integrity.
Args:
backup_info: Backup information
Returns:
True if backup is valid, False otherwise
"""
backup_path = self.backup_dir / backup_info.filename
if not backup_path.exists():
return False
# Basic verification: file exists and has size > 0
if backup_path.stat().st_size == 0:
return False
# Could add more verification here (checksums, test restore, etc.)
return True
def _get_size(self, path: Path) -> int:
"""Get total size of file or directory."""
if path.is_file():
return path.stat().st_size
elif path.is_dir():
total = 0
for item in path.rglob("*"):
if item.is_file():
total += item.stat().st_size
return total
return 0
def _format_size(self, size_bytes: int) -> str:
"""Format size in human-readable format."""
for unit in ["B", "KB", "MB", "GB", "TB"]:
if size_bytes < 1024:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024
return f"{size_bytes:.2f} PB"
def _save_metadata(self, backup_info: BackupInfo):
"""Save backup metadata to JSON file."""
metadata_path = self.backup_dir / f"{backup_info.filename}.json"
metadata = {
"filename": backup_info.filename,
"database_type": backup_info.database_type,
"database_name": backup_info.database_name,
"timestamp": backup_info.timestamp.isoformat(),
"size_bytes": backup_info.size_bytes,
"compressed": backup_info.compressed,
"verified": backup_info.verified
}
with open(metadata_path, "w") as f:
json.dump(metadata, f, indent=2)
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="Database backup tool")
parser.add_argument("--db", required=True, choices=["mongodb", "postgres"],
help="Database type")
parser.add_argument("--backup-dir", default="./backups",
help="Backup directory")
subparsers = parser.add_subparsers(dest="command", required=True)
# Backup command
backup_parser = subparsers.add_parser("backup", help="Create backup")
backup_parser.add_argument("--uri", required=True, help="Database connection string")
backup_parser.add_argument("--database", help="Database name")
backup_parser.add_argument("--no-compress", action="store_true",
help="Disable compression")
backup_parser.add_argument("--no-verify", action="store_true",
help="Skip verification")
# Restore command
restore_parser = subparsers.add_parser("restore", help="Restore backup")
restore_parser.add_argument("filename", help="Backup filename")
restore_parser.add_argument("--uri", required=True, help="Database connection string")
restore_parser.add_argument("--dry-run", action="store_true",
help="Show what would be done")
# List command
subparsers.add_parser("list", help="List backups")
# Cleanup command
cleanup_parser = subparsers.add_parser("cleanup", help="Remove old backups")
cleanup_parser.add_argument("--retention-days", type=int, default=7,
help="Days to retain backups (default: 7)")
cleanup_parser.add_argument("--dry-run", action="store_true",
help="Show what would be removed")
args = parser.parse_args()
manager = BackupManager(args.db, args.backup_dir)
if args.command == "backup":
backup_info = manager.create_backup(
args.uri,
args.database,
compress=not args.no_compress,
verify=not args.no_verify
)
sys.exit(0 if backup_info else 1)
elif args.command == "restore":
success = manager.restore_backup(args.filename, args.uri, args.dry_run)
sys.exit(0 if success else 1)
elif args.command == "list":
backups = manager.list_backups()
print(f"Total backups: {len(backups)}\n")
for backup in backups:
verified_str = "" if backup.verified else "?"
print(f"[{verified_str}] {backup.filename}")
print(f" Database: {backup.database_name}")
print(f" Created: {backup.timestamp}")
print(f" Size: {manager._format_size(backup.size_bytes)}")
print()
elif args.command == "cleanup":
removed = manager.cleanup_old_backups(args.retention_days, args.dry_run)
print(f"Removed {removed} backup(s)")
if __name__ == "__main__":
main()