Files
2025-11-30 08:59:27 +08:00

1118 lines
30 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Database Integration
## Overview
**Database integration specialist covering SQLAlchemy, connection pooling, query optimization, migrations, transactions, and production patterns.**
**Core principle**: Databases are stateful, high-latency external systems requiring careful connection management, query optimization, and migration strategies to maintain performance and reliability at scale.
## When to Use This Skill
Use when encountering:
- **Connection pooling**: Pool exhaustion, "too many connections" errors, pool configuration
- **Query optimization**: N+1 queries, slow endpoints, eager loading strategies
- **Migrations**: Schema changes, zero-downtime deployments, data backfills
- **Transactions**: Multi-step operations, rollback strategies, isolation levels
- **ORM vs Raw SQL**: Complex queries, performance optimization, query readability
- **Testing**: Database test strategies, fixtures, test isolation
- **Monitoring**: Query performance tracking, connection pool health
**Do NOT use for**:
- Database selection (PostgreSQL vs MySQL vs MongoDB)
- Database administration (backup, replication, sharding)
- Schema design principles (see general architecture resources)
## Connection Pool Configuration
### Pool Sizing Formula
**Calculate pool size based on deployment architecture**:
```python
# Formula: pool_size × num_workers ≤ (postgres_max_connections - reserved)
# Example: 10 workers × 5 connections = 50 total ≤ (100 - 10) reserved
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
DATABASE_URL = "postgresql://user:pass@host/db"
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=5, # Connections per worker
max_overflow=10, # Additional connections during spikes
pool_pre_ping=True, # CRITICAL: Verify connection before use
pool_recycle=3600, # Recycle after 1 hour (prevent stale connections)
pool_timeout=30, # Wait max 30s for connection from pool
echo_pool=False, # Enable for debugging pool issues
connect_args={
"connect_timeout": 10,
"options": "-c statement_timeout=30000" # 30s query timeout
}
)
```
**Environment-based configuration**:
```python
import os
from pydantic import BaseSettings
class DatabaseSettings(BaseSettings):
database_url: str
pool_size: int = 5
max_overflow: int = 10
pool_pre_ping: bool = True
pool_recycle: int = 3600
class Config:
env_file = ".env"
settings = DatabaseSettings()
engine = create_engine(
settings.database_url,
pool_size=settings.pool_size,
max_overflow=settings.max_overflow,
pool_pre_ping=settings.pool_pre_ping,
pool_recycle=settings.pool_recycle
)
```
**Async configuration** (asyncpg):
```python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine(
"postgresql+asyncpg://user:pass@host/db",
pool_size=20, # Async handles more concurrent connections
max_overflow=0, # No overflow - fail fast
pool_pre_ping=False, # asyncpg handles internally
pool_recycle=3600
)
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
```
### Pool Health Monitoring
**Health check endpoint**:
```python
from fastapi import FastAPI, HTTPException
from sqlalchemy import text
app = FastAPI()
@app.get("/health/database")
async def database_health(db: Session = Depends(get_db)):
"""Check database connectivity and pool status"""
try:
# Simple query to verify connection
result = db.execute(text("SELECT 1"))
# Check pool statistics
pool = db.get_bind().pool
pool_status = {
"size": pool.size(),
"checked_in": pool.checkedin(),
"checked_out": pool.checkedout(),
"overflow": pool.overflow(),
"total_connections": pool.size() + pool.overflow()
}
return {
"status": "healthy",
"pool": pool_status
}
except Exception as e:
raise HTTPException(status_code=503, detail=f"Database unhealthy: {e}")
```
**Pool exhaustion debugging**:
```python
import logging
logger = logging.getLogger(__name__)
# Enable pool event logging
from sqlalchemy import event
@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
logger.info(f"New connection created: {id(dbapi_conn)}")
@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_conn, connection_record, connection_proxy):
logger.debug(f"Connection checked out: {id(dbapi_conn)}")
pool = connection_proxy._pool
logger.debug(
f"Pool status - size: {pool.size()}, "
f"checked_out: {pool.checkedout()}, "
f"overflow: {pool.overflow()}"
)
@event.listens_for(engine, "checkin")
def receive_checkin(dbapi_conn, connection_record):
logger.debug(f"Connection checked in: {id(dbapi_conn)}")
```
### Testing with NullPool
**Disable pooling in tests**:
```python
from sqlalchemy.pool import NullPool
# Test configuration - no connection pooling
test_engine = create_engine(
"postgresql://user:pass@localhost/test_db",
poolclass=NullPool, # No pooling - each query gets new connection
echo=True # Log all SQL queries
)
```
## Query Optimization
### N+1 Query Detection
**Automatic detection in tests**:
```python
from sqlalchemy import event
from sqlalchemy.engine import Engine
import pytest
class QueryCounter:
"""Count queries executed during test"""
def __init__(self):
self.queries = []
def __enter__(self):
event.listen(Engine, "before_cursor_execute", self._before_cursor_execute)
return self
def __exit__(self, *args):
event.remove(Engine, "before_cursor_execute", self._before_cursor_execute)
def _before_cursor_execute(self, conn, cursor, statement, *args):
self.queries.append(statement)
@property
def count(self):
return len(self.queries)
# Test usage
def test_no_n_plus_1():
with QueryCounter() as counter:
users = get_users_with_posts() # Should use eager loading
# Access posts (should not trigger additional queries)
for user in users:
_ = [post.title for post in user.posts]
# Should be 1-2 queries, not 101
assert counter.count <= 2, f"N+1 detected: {counter.count} queries"
```
### Eager Loading Strategies
**Decision matrix**:
| Pattern | Queries | Use When | Example |
|---------|---------|----------|---------|
| `joinedload()` | 1 (JOIN) | One-to-one, small one-to-many | User → Profile |
| `selectinload()` | 2 (IN clause) | One-to-many with many rows | User → Posts |
| `subqueryload()` | 2 (subquery) | Legacy alternative | Use selectinload instead |
| `raiseload()` | 0 (raises error) | Prevent lazy loading | Production safety |
**joinedload() - Single query with JOIN**:
```python
from sqlalchemy.orm import joinedload
# Single query: SELECT * FROM users LEFT OUTER JOIN posts ON ...
users = db.query(User).options(
joinedload(User.posts)
).all()
# Best for one-to-one or small one-to-many
user = db.query(User).options(
joinedload(User.profile) # One-to-one
).filter(User.id == user_id).first()
```
**selectinload() - Two queries (more efficient for many rows)**:
```python
from sqlalchemy.orm import selectinload
# Query 1: SELECT * FROM users
# Query 2: SELECT * FROM posts WHERE user_id IN (1, 2, 3, ...)
users = db.query(User).options(
selectinload(User.posts)
).all()
# Best for one-to-many with many related rows
```
**Nested eager loading**:
```python
# Load users → posts → comments (3 queries total)
users = db.query(User).options(
selectinload(User.posts).selectinload(Post.comments)
).all()
```
**Conditional eager loading**:
```python
from sqlalchemy.orm import selectinload, Load
# Only load published posts
users = db.query(User).options(
selectinload(User.posts).options(
Load(Post).filter(Post.published == True)
)
).all()
```
**Prevent lazy loading in production** (raiseload):
```python
from sqlalchemy.orm import raiseload
# Raise error if any relationship accessed without eager loading
users = db.query(User).options(
raiseload('*') # Disable all lazy loading
).all()
# This will raise an error:
# user.posts # InvalidRequestError: 'User.posts' is not available due to lazy='raise'
```
### Query Performance Measurement
**Log slow queries**:
```python
from sqlalchemy import event
from sqlalchemy.engine import Engine
import time
import logging
logger = logging.getLogger(__name__)
SLOW_QUERY_THRESHOLD = 1.0 # seconds
@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
conn.info.setdefault('query_start_time', []).append(time.time())
@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
total_time = time.time() - conn.info['query_start_time'].pop()
if total_time > SLOW_QUERY_THRESHOLD:
logger.warning(
f"Slow query ({total_time:.2f}s): {statement[:200]}",
extra={
"duration": total_time,
"statement": statement,
"parameters": parameters
}
)
```
**EXPLAIN ANALYZE for query optimization**:
```python
from sqlalchemy import text
def explain_query(db: Session, query):
"""Get query execution plan"""
compiled = query.statement.compile(
compile_kwargs={"literal_binds": True}
)
explain_result = db.execute(
text(f"EXPLAIN ANALYZE {compiled}")
).fetchall()
return "\n".join([row[0] for row in explain_result])
# Usage
query = db.query(User).join(Post).filter(Post.published == True)
plan = explain_query(db, query)
print(plan)
```
### Deferred Column Loading
**Exclude large columns from initial query**:
```python
from sqlalchemy.orm import defer, undefer
# Don't load large 'bio' column initially
users = db.query(User).options(
defer(User.bio), # Skip this column
defer(User.profile_image) # Skip binary data
).all()
# Load specific user's bio when needed
user = db.query(User).options(
undefer(User.bio) # Load this column
).filter(User.id == user_id).first()
```
**Load only specific columns**:
```python
from sqlalchemy.orm import load_only
# Only load id and name (ignore all other columns)
users = db.query(User).options(
load_only(User.id, User.name)
).all()
```
## Zero-Downtime Migrations
### Migration Decision Matrix
| Operation | Locking | Approach | Downtime |
|-----------|---------|----------|----------|
| Add nullable column | None | Single migration | No |
| Add NOT NULL column | Table lock | Multi-phase (nullable → backfill → NOT NULL) | No |
| Add index | Share lock | `CREATE INDEX CONCURRENTLY` | No |
| Add foreign key | Share lock | `NOT VALID``VALIDATE` | No |
| Drop column | None | Multi-phase (stop using → drop) | No |
| Rename column | None | Multi-phase (add new → dual write → drop old) | No |
| Alter column type | Table lock | Multi-phase or rebuild table | Maybe |
### Multi-Phase NOT NULL Migration
**Phase 1: Add nullable column**:
```python
# migrations/versions/001_add_email_verified.py
def upgrade():
# Fast: no table rewrite
op.add_column('users', sa.Column('email_verified', sa.Boolean(), nullable=True))
# Set default for new rows
op.execute("ALTER TABLE users ALTER COLUMN email_verified SET DEFAULT false")
def downgrade():
op.drop_column('users', 'email_verified')
```
**Phase 2: Backfill in batches**:
```python
# migrations/versions/002_backfill_email_verified.py
from alembic import op
import sqlalchemy as sa
def upgrade():
"""Backfill existing rows in batches"""
connection = op.get_bind()
# Process in batches to avoid long transactions
batch_size = 10000
total_updated = 0
while True:
result = connection.execute(sa.text("""
UPDATE users
SET email_verified = false
WHERE email_verified IS NULL
AND id IN (
SELECT id FROM users
WHERE email_verified IS NULL
ORDER BY id
LIMIT :batch_size
)
"""), {"batch_size": batch_size})
rows_updated = result.rowcount
total_updated += rows_updated
if rows_updated == 0:
break
print(f"Backfilled {total_updated} rows")
def downgrade():
pass # No rollback needed
```
**Phase 3: Add NOT NULL constraint**:
```python
# migrations/versions/003_make_email_verified_not_null.py
def upgrade():
# Verify no NULLs remain
connection = op.get_bind()
result = connection.execute(sa.text(
"SELECT COUNT(*) FROM users WHERE email_verified IS NULL"
))
null_count = result.scalar()
if null_count > 0:
raise Exception(f"Cannot add NOT NULL: {null_count} NULL values remain")
# Add NOT NULL constraint (fast since all values are set)
op.alter_column('users', 'email_verified', nullable=False)
def downgrade():
op.alter_column('users', 'email_verified', nullable=True)
```
### Concurrent Index Creation
**Without CONCURRENTLY (blocks writes)**:
```python
# BAD: Locks table during index creation
def upgrade():
op.create_index('idx_users_email', 'users', ['email'])
```
**With CONCURRENTLY (no locks)**:
```python
# GOOD: No blocking, safe for production
def upgrade():
# Requires raw SQL for CONCURRENTLY
op.execute("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_email
ON users (email)
""")
def downgrade():
op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_users_email")
```
**Partial index for efficiency**:
```python
def upgrade():
op.execute("""
CREATE INDEX CONCURRENTLY idx_users_active_email
ON users (email)
WHERE deleted_at IS NULL
""")
```
### Adding Foreign Keys Without Blocking
**Using NOT VALID constraint**:
```python
# migrations/versions/004_add_foreign_key.py
def upgrade():
# Phase 1: Add constraint without validating existing rows (fast)
op.execute("""
ALTER TABLE posts
ADD CONSTRAINT fk_posts_user_id
FOREIGN KEY (user_id)
REFERENCES users (id)
NOT VALID
""")
# Phase 2: Validate constraint in background (can be canceled/restarted)
op.execute("""
ALTER TABLE posts
VALIDATE CONSTRAINT fk_posts_user_id
""")
def downgrade():
op.drop_constraint('fk_posts_user_id', 'posts', type_='foreignkey')
```
### Migration Monitoring
**Track migration progress**:
```sql
-- Check backfill progress
SELECT
COUNT(*) FILTER (WHERE email_verified IS NULL) as null_count,
COUNT(*) as total_count,
ROUND(100.0 * COUNT(*) FILTER (WHERE email_verified IS NOT NULL) / COUNT(*), 2) as pct_complete
FROM users;
-- Check index creation progress (PostgreSQL 12+)
SELECT
phase,
ROUND(100.0 * blocks_done / NULLIF(blocks_total, 0), 2) as pct_complete
FROM pg_stat_progress_create_index
WHERE relid = 'users'::regclass;
```
## Transaction Management
### Basic Transaction Pattern
**Context manager with automatic rollback**:
```python
from contextlib import contextmanager
from sqlalchemy.orm import Session
@contextmanager
def transactional_session(db: Session):
"""Context manager for automatic rollback on error"""
try:
yield db
db.commit()
except Exception as e:
db.rollback()
raise
finally:
db.close()
# Usage
with transactional_session(db) as session:
user = User(name="Alice")
session.add(user)
# Automatic commit on success, rollback on exception
```
### Savepoints for Partial Rollback
**Nested transactions with savepoints**:
```python
def create_order_with_retry(db: Session, order_data: dict):
"""Use savepoints to retry failed steps without losing entire transaction"""
# Start main transaction
order = Order(**order_data)
db.add(order)
db.flush() # Get order.id
# Try payment with savepoint
sp = db.begin_nested() # Create savepoint
try:
payment = process_payment(order.total)
order.payment_id = payment.id
except PaymentError as e:
sp.rollback() # Rollback to savepoint (keep order)
# Try alternative payment method
sp = db.begin_nested()
try:
payment = process_backup_payment(order.total)
order.payment_id = payment.id
except PaymentError:
sp.rollback()
raise HTTPException(status_code=402, detail="All payment methods failed")
db.commit() # Commit entire transaction
return order
```
### Locking Strategies
**Optimistic locking with version column**:
```python
from sqlalchemy import Column, Integer, String
class Product(Base):
__tablename__ = "products"
id = Column(Integer, primary_key=True)
name = Column(String)
inventory = Column(Integer)
version = Column(Integer, nullable=False, default=1) # Version column
# Usage
def decrement_inventory(db: Session, product_id: int, quantity: int):
product = db.query(Product).filter(Product.id == product_id).first()
if product.inventory < quantity:
raise ValueError("Insufficient inventory")
# Update with version check
rows_updated = db.execute(
sa.update(Product)
.where(Product.id == product_id)
.where(Product.version == product.version) # Check version hasn't changed
.values(
inventory=Product.inventory - quantity,
version=Product.version + 1
)
).rowcount
if rows_updated == 0:
# Version mismatch - another transaction modified this row
raise HTTPException(status_code=409, detail="Product was modified by another transaction")
db.commit()
```
**Pessimistic locking with SELECT FOR UPDATE**:
```python
def decrement_inventory_with_lock(db: Session, product_id: int, quantity: int):
"""Acquire row lock to prevent concurrent modifications"""
# Lock the row (blocks other transactions)
product = db.query(Product).filter(
Product.id == product_id
).with_for_update().first() # SELECT ... FOR UPDATE
if not product:
raise HTTPException(status_code=404, detail="Product not found")
if product.inventory < quantity:
raise HTTPException(status_code=400, detail="Insufficient inventory")
product.inventory -= quantity
db.commit()
# Lock released after commit
```
**Lock timeout to prevent deadlocks**:
```python
from sqlalchemy import text
def with_lock_timeout(db: Session, timeout_ms: int = 5000):
"""Set lock timeout for this transaction"""
db.execute(text(f"SET LOCAL lock_timeout = '{timeout_ms}ms'"))
# Usage
try:
with_lock_timeout(db, 3000) # 3 second timeout
product = db.query(Product).with_for_update().filter(...).first()
except Exception as e:
if "lock timeout" in str(e).lower():
raise HTTPException(status_code=409, detail="Resource locked by another transaction")
raise
```
### Isolation Levels
**Configure isolation level**:
```python
from sqlalchemy import create_engine
# Default: READ COMMITTED
engine = create_engine(
DATABASE_URL,
isolation_level="REPEATABLE READ" # Options: READ UNCOMMITTED, READ COMMITTED, REPEATABLE READ, SERIALIZABLE
)
# Per-transaction isolation
from sqlalchemy.orm import Session
with Session(engine) as session:
session.connection(execution_options={"isolation_level": "SERIALIZABLE"})
# ... transaction logic ...
```
## Raw SQL vs ORM
### Decision Matrix
| Use ORM When | Use Raw SQL When |
|--------------|------------------|
| CRUD operations | Complex CTEs (Common Table Expressions) |
| Simple joins (<3 tables) | Window functions with PARTITION BY |
| Type safety critical | Performance-critical queries |
| Database portability needed | Database-specific optimizations (PostgreSQL arrays, JSONB) |
| Code readability with ORM is good | ORM query becomes unreadable (>10 lines) |
### Raw SQL with Type Safety
**Parameterized queries with Pydantic results**:
```python
from sqlalchemy import text
from pydantic import BaseModel
from typing import List
class CustomerReport(BaseModel):
id: int
name: str
region: str
total_spent: float
order_count: int
rank_in_region: int
@app.get("/reports/top-customers")
def get_top_customers(
db: Session = Depends(get_db),
region: str = None,
limit: int = 100
) -> List[CustomerReport]:
"""Complex report with CTEs and window functions"""
query = text("""
WITH customer_totals AS (
SELECT
u.id,
u.name,
u.region,
COUNT(o.id) as order_count,
COALESCE(SUM(o.total), 0) as total_spent
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.deleted_at IS NULL
AND (:region IS NULL OR u.region = :region)
GROUP BY u.id, u.name, u.region
),
ranked AS (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY region
ORDER BY total_spent DESC
) as rank_in_region
FROM customer_totals
)
SELECT * FROM ranked
WHERE total_spent > 0
ORDER BY total_spent DESC
LIMIT :limit
""")
result = db.execute(query, {"region": region, "limit": limit})
# Type-safe results with Pydantic
return [CustomerReport(**dict(row._mapping)) for row in result]
```
### Hybrid Approach
**Combine ORM and raw SQL**:
```python
def get_user_analytics(db: Session, user_id: int):
"""Use raw SQL for complex aggregation, ORM for simple queries"""
# Complex aggregation in raw SQL
analytics_query = text("""
SELECT
COUNT(*) as total_orders,
SUM(total) as lifetime_value,
AVG(total) as avg_order_value,
MAX(created_at) as last_order_date,
MIN(created_at) as first_order_date
FROM orders
WHERE user_id = :user_id
""")
analytics = db.execute(analytics_query, {"user_id": user_id}).first()
# Simple ORM query for user details
user = db.query(User).filter(User.id == user_id).first()
return {
"user": {
"id": user.id,
"name": user.name,
"email": user.email
},
"analytics": {
"total_orders": analytics.total_orders,
"lifetime_value": float(analytics.lifetime_value or 0),
"avg_order_value": float(analytics.avg_order_value or 0),
"first_order": analytics.first_order_date,
"last_order": analytics.last_order_date
}
}
```
### Query Optimization Checklist
**Before optimizing**:
1. **Measure with EXPLAIN ANALYZE**:
```sql
EXPLAIN ANALYZE
SELECT * FROM users JOIN orders ON users.id = orders.user_id;
```
2. **Look for**:
- Sequential scans on large tables → Add index
- High loop counts → N+1 query problem
- Hash joins on small tables → Consider nested loop
- Sort operations → Consider index on ORDER BY columns
3. **Optimize**:
- Add indexes on foreign keys, WHERE clauses, ORDER BY columns
- Use LIMIT for pagination
- Use EXISTS instead of IN for large subqueries
- Denormalize for read-heavy workloads
**Index usage verification**:
```sql
-- Check if index is being used
EXPLAIN SELECT * FROM users WHERE email = 'test@example.com';
-- Look for "Index Scan using idx_users_email"
-- Check index statistics
SELECT
schemaname,
tablename,
indexname,
idx_scan as index_scans,
idx_tup_read as tuples_read,
idx_tup_fetch as tuples_fetched
FROM pg_stat_user_indexes
WHERE tablename = 'users';
```
## Testing Strategies
### Test Database Setup
**Separate test database with fixtures**:
```python
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool
@pytest.fixture(scope="session")
def test_engine():
"""Create test database engine"""
engine = create_engine(
"postgresql://user:pass@localhost/test_db",
poolclass=NullPool, # No pooling in tests
echo=True # Log all queries
)
# Create all tables
Base.metadata.create_all(engine)
yield engine
# Drop all tables after tests
Base.metadata.drop_all(engine)
@pytest.fixture(scope="function")
def db_session(test_engine):
"""Create fresh database session for each test"""
connection = test_engine.connect()
transaction = connection.begin()
Session = sessionmaker(bind=connection)
session = Session()
yield session
# Rollback transaction (undo all changes)
session.close()
transaction.rollback()
connection.close()
```
### Factory Pattern for Test Data
**Use factories for consistent test data**:
```python
from factory import Factory, Faker, SubFactory
from factory.alchemy import SQLAlchemyModelFactory
class UserFactory(SQLAlchemyModelFactory):
class Meta:
model = User
sqlalchemy_session = db_session
name = Faker('name')
email = Faker('email')
created_at = Faker('date_time')
class PostFactory(SQLAlchemyModelFactory):
class Meta:
model = Post
sqlalchemy_session = db_session
title = Faker('sentence')
content = Faker('text')
user = SubFactory(UserFactory) # Auto-create related user
# Test usage
def test_get_user_posts(db_session):
user = UserFactory.create()
PostFactory.create_batch(5, user=user) # Create 5 posts for user
posts = db_session.query(Post).filter(Post.user_id == user.id).all()
assert len(posts) == 5
```
### Testing Transactions
**Test rollback behavior**:
```python
def test_transaction_rollback(db_session):
"""Verify rollback on error"""
user = User(name="Alice", email="alice@example.com")
db_session.add(user)
with pytest.raises(IntegrityError):
# This should fail (duplicate email)
user2 = User(name="Bob", email="alice@example.com")
db_session.add(user2)
db_session.commit()
# Verify rollback occurred
db_session.rollback()
assert db_session.query(User).count() == 0
```
### Testing Migrations
**Test migration up and down**:
```python
from alembic import command
from alembic.config import Config
def test_migration_upgrade_downgrade():
"""Test migration can be applied and reversed"""
alembic_cfg = Config("alembic.ini")
alembic_cfg.set_main_option("sqlalchemy.url", TEST_DATABASE_URL)
# Apply migration
command.upgrade(alembic_cfg, "head")
# Verify schema changes
# ... assertions ...
# Rollback migration
command.downgrade(alembic_cfg, "-1")
# Verify rollback
# ... assertions ...
```
## Monitoring and Observability
### Query Performance Tracking
**Track slow queries with middleware**:
```python
from fastapi import Request
import time
import logging
logger = logging.getLogger(__name__)
@app.middleware("http")
async def track_db_queries(request: Request, call_next):
"""Track database query performance per request"""
query_count = 0
total_query_time = 0.0
def track_query(conn, cursor, statement, parameters, context, executemany):
nonlocal query_count, total_query_time
start = time.time()
# Execute query
cursor.execute(statement, parameters)
duration = time.time() - start
query_count += 1
total_query_time += duration
if duration > 1.0: # Log slow queries
logger.warning(
f"Slow query ({duration:.2f}s): {statement[:200]}",
extra={
"duration": duration,
"path": request.url.path
}
)
# Attach listener
event.listen(Engine, "before_cursor_execute", track_query)
response = await call_next(request)
# Remove listener
event.remove(Engine, "before_cursor_execute", track_query)
# Add headers
response.headers["X-DB-Query-Count"] = str(query_count)
response.headers["X-DB-Query-Time"] = f"{total_query_time:.3f}s"
return response
```
### Connection Pool Metrics
**Expose pool metrics for monitoring**:
```python
from prometheus_client import Gauge
pool_size_gauge = Gauge('db_pool_size', 'Number of connections in pool')
pool_checked_out_gauge = Gauge('db_pool_checked_out', 'Connections currently checked out')
pool_overflow_gauge = Gauge('db_pool_overflow', 'Overflow connections')
@app.on_event("startup")
async def start_pool_metrics():
"""Collect pool metrics periodically"""
import asyncio
async def collect_metrics():
while True:
pool = engine.pool
pool_size_gauge.set(pool.size())
pool_checked_out_gauge.set(pool.checkedout())
pool_overflow_gauge.set(pool.overflow())
await asyncio.sleep(10) # Every 10 seconds
asyncio.create_task(collect_metrics())
```
## Anti-Patterns
| Anti-Pattern | Why Bad | Fix |
|--------------|---------|-----|
| **No connection pooling** | Creates new connection per request (slow) | Use `create_engine()` with pool |
| **pool_pre_ping=False** | Fails on stale connections | Always `pool_pre_ping=True` in production |
| **Lazy loading in loops** | N+1 query problem | Use `joinedload()` or `selectinload()` |
| **No query timeout** | Slow queries block workers | Set `statement_timeout` in connect_args |
| **Large transactions** | Locks held too long, blocking | Break into smaller transactions |
| **No migration rollback** | Can't undo bad migrations | Always test downgrade path |
| **String interpolation in SQL** | SQL injection vulnerability | Use parameterized queries with `text()` |
| **No index on foreign keys** | Slow joins | Add index on all foreign key columns |
| **Blocking migrations** | Downtime during deployment | Use `CONCURRENTLY`, `NOT VALID` patterns |
## Cross-References
**Related skills**:
- **FastAPI dependency injection** → `fastapi-development` (database dependencies)
- **API testing** → `api-testing` (testing database code)
- **Microservices** → `microservices-architecture` (per-service databases)
- **Security** → `ordis-security-architect` (SQL injection, connection security)
## Further Reading
- **SQLAlchemy docs**: https://docs.sqlalchemy.org/
- **Alembic migrations**: https://alembic.sqlalchemy.org/
- **PostgreSQL performance**: https://www.postgresql.org/docs/current/performance-tips.html
- **Database Reliability Engineering** by Laine Campbell