17 KiB
17 KiB
Common Python Backend Architecture Patterns
This document provides reference implementations and patterns for common architectural decisions.
1. Repository Pattern
from abc import ABC, abstractmethod
from typing import Generic, TypeVar, Optional, List
from sqlalchemy.orm import Session
T = TypeVar('T')
class BaseRepository(ABC, Generic[T]):
"""Abstract base repository for data access"""
@abstractmethod
async def get(self, id: int) -> Optional[T]:
pass
@abstractmethod
async def list(self, skip: int = 0, limit: int = 100) -> List[T]:
pass
@abstractmethod
async def create(self, obj: T) -> T:
pass
@abstractmethod
async def update(self, id: int, obj: T) -> Optional[T]:
pass
@abstractmethod
async def delete(self, id: int) -> bool:
pass
class SQLAlchemyRepository(BaseRepository[T]):
"""SQLAlchemy implementation of repository pattern"""
def __init__(self, session: Session, model_class: type):
self.session = session
self.model_class = model_class
async def get(self, id: int) -> Optional[T]:
return self.session.query(self.model_class).filter_by(id=id).first()
async def list(self, skip: int = 0, limit: int = 100) -> List[T]:
return self.session.query(self.model_class).offset(skip).limit(limit).all()
async def create(self, obj: T) -> T:
self.session.add(obj)
self.session.commit()
self.session.refresh(obj)
return obj
async def update(self, id: int, obj: T) -> Optional[T]:
existing = await self.get(id)
if existing:
for key, value in obj.__dict__.items():
setattr(existing, key, value)
self.session.commit()
return existing
return None
async def delete(self, id: int) -> bool:
obj = await self.get(id)
if obj:
self.session.delete(obj)
self.session.commit()
return True
return False
2. Service Layer Pattern
from typing import Protocol
from .repositories import UserRepository
from .models import User
from .schemas import UserCreate, UserUpdate
class IUserService(Protocol):
"""Interface for user service"""
async def get_user(self, user_id: int) -> User:
...
async def create_user(self, user_data: UserCreate) -> User:
...
class UserService:
"""Service layer for user business logic"""
def __init__(self, user_repo: UserRepository):
self.user_repo = user_repo
async def get_user(self, user_id: int) -> User:
user = await self.user_repo.get(user_id)
if not user:
raise ValueError(f"User {user_id} not found")
return user
async def create_user(self, user_data: UserCreate) -> User:
# Business logic here
if await self._email_exists(user_data.email):
raise ValueError("Email already registered")
user = User(**user_data.dict())
return await self.user_repo.create(user)
async def _email_exists(self, email: str) -> bool:
# Check if email exists
return await self.user_repo.find_by_email(email) is not None
3. Dependency Injection with FastAPI
from fastapi import Depends, FastAPI
from sqlalchemy.orm import Session
from typing import Generator
app = FastAPI()
# Database session dependency
def get_db() -> Generator[Session, None, None]:
db = SessionLocal()
try:
yield db
finally:
db.close()
# Repository dependency
def get_user_repository(db: Session = Depends(get_db)) -> UserRepository:
return UserRepository(db)
# Service dependency
def get_user_service(
user_repo: UserRepository = Depends(get_user_repository)
) -> UserService:
return UserService(user_repo)
# Route using dependency injection
@app.get("/users/{user_id}")
async def get_user(
user_id: int,
user_service: UserService = Depends(get_user_service)
):
return await user_service.get_user(user_id)
4. Event-Driven Architecture
from typing import Callable, Dict, List, Any
from dataclasses import dataclass
from datetime import datetime
import asyncio
@dataclass
class Event:
"""Base event class"""
event_type: str
data: Dict[str, Any]
timestamp: datetime = datetime.utcnow()
class EventBus:
"""Simple in-memory event bus"""
def __init__(self):
self._subscribers: Dict[str, List[Callable]] = {}
def subscribe(self, event_type: str, handler: Callable):
"""Subscribe to an event type"""
if event_type not in self._subscribers:
self._subscribers[event_type] = []
self._subscribers[event_type].append(handler)
async def publish(self, event: Event):
"""Publish an event to all subscribers"""
handlers = self._subscribers.get(event.event_type, [])
await asyncio.gather(*[handler(event) for handler in handlers])
# Usage
event_bus = EventBus()
@dataclass
class UserCreatedEvent(Event):
event_type: str = "user.created"
async def send_welcome_email(event: Event):
print(f"Sending welcome email for user {event.data['user_id']}")
async def create_user_profile(event: Event):
print(f"Creating profile for user {event.data['user_id']}")
# Subscribe handlers
event_bus.subscribe("user.created", send_welcome_email)
event_bus.subscribe("user.created", create_user_profile)
# Publish event
await event_bus.publish(UserCreatedEvent(data={"user_id": 123}))
5. Circuit Breaker Pattern
from enum import Enum
from datetime import datetime, timedelta
from typing import Callable, Any
import asyncio
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""Circuit breaker for external service calls"""
def __init__(
self,
failure_threshold: int = 5,
timeout: int = 60,
recovery_timeout: int = 30
):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with circuit breaker protection"""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await asyncio.wait_for(
func(*args, **kwargs),
timeout=self.timeout
)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise e
def _on_success(self):
"""Handle successful call"""
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
"""Handle failed call"""
self.failure_count += 1
self.last_failure_time = datetime.utcnow()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to retry"""
if self.last_failure_time is None:
return True
return (
datetime.utcnow() - self.last_failure_time
).total_seconds() >= self.recovery_timeout
# Usage
circuit_breaker = CircuitBreaker(failure_threshold=3, timeout=5)
async def call_external_api():
# External API call
pass
result = await circuit_breaker.call(call_external_api)
6. CQRS Pattern
from abc import ABC, abstractmethod
from typing import Generic, TypeVar
from pydantic import BaseModel
# Commands
class Command(BaseModel):
"""Base command"""
pass
class CreateUserCommand(Command):
email: str
name: str
# Command Handlers
TCommand = TypeVar('TCommand', bound=Command)
class CommandHandler(ABC, Generic[TCommand]):
"""Abstract command handler"""
@abstractmethod
async def handle(self, command: TCommand) -> Any:
pass
class CreateUserCommandHandler(CommandHandler[CreateUserCommand]):
"""Handler for creating users"""
def __init__(self, user_repo: UserRepository):
self.user_repo = user_repo
async def handle(self, command: CreateUserCommand) -> User:
user = User(email=command.email, name=command.name)
return await self.user_repo.create(user)
# Queries
class Query(BaseModel):
"""Base query"""
pass
class GetUserQuery(Query):
user_id: int
# Query Handlers
TQuery = TypeVar('TQuery', bound=Query)
class QueryHandler(ABC, Generic[TQuery]):
"""Abstract query handler"""
@abstractmethod
async def handle(self, query: TQuery) -> Any:
pass
class GetUserQueryHandler(QueryHandler[GetUserQuery]):
"""Handler for getting users"""
def __init__(self, user_repo: UserRepository):
self.user_repo = user_repo
async def handle(self, query: GetUserQuery) -> User:
return await self.user_repo.get(query.user_id)
# Command Bus
class CommandBus:
"""Simple command bus"""
def __init__(self):
self._handlers = {}
def register(self, command_type: type, handler: CommandHandler):
self._handlers[command_type] = handler
async def execute(self, command: Command):
handler = self._handlers.get(type(command))
if not handler:
raise ValueError(f"No handler for {type(command)}")
return await handler.handle(command)
7. Retry Pattern with Exponential Backoff
import asyncio
from typing import Callable, TypeVar, Any
from functools import wraps
T = TypeVar('T')
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0
):
"""Decorator for retry logic with exponential backoff"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
retries = 0
while retries < max_retries:
try:
return await func(*args, **kwargs)
except Exception as e:
retries += 1
if retries >= max_retries:
raise e
delay = min(
base_delay * (exponential_base ** (retries - 1)),
max_delay
)
print(f"Retry {retries}/{max_retries} after {delay}s")
await asyncio.sleep(delay)
raise Exception("Max retries exceeded")
return wrapper
return decorator
# Usage
@retry_with_backoff(max_retries=3, base_delay=1.0)
async def fetch_data_from_api():
# API call that might fail
pass
8. Settings Management
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
"""Application settings"""
# API Settings
api_title: str = "My API"
api_version: str = "1.0.0"
# Database
database_url: str
db_pool_size: int = 5
# Redis
redis_url: str
redis_ttl: int = 3600
# Security
secret_key: str
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
# External Services
external_api_url: str
external_api_key: str
# Observability
log_level: str = "INFO"
sentry_dsn: str | None = None
class Config:
env_file = ".env"
case_sensitive = False
@lru_cache()
def get_settings() -> Settings:
"""Cached settings instance"""
return Settings()
# Usage
settings = get_settings()
9. Background Task Processing
from celery import Celery
from typing import Any
# Celery app
celery_app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_time_limit=300, # 5 minutes
task_soft_time_limit=240, # 4 minutes
)
@celery_app.task(bind=True, max_retries=3)
def process_data(self, data: dict) -> Any:
"""Background task with retry logic"""
try:
# Process data
result = heavy_computation(data)
return result
except Exception as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
# FastAPI integration
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
@app.post("/process")
async def trigger_processing(data: dict, background_tasks: BackgroundTasks):
# Option 1: FastAPI background tasks (for quick tasks)
background_tasks.add_task(quick_task, data)
# Option 2: Celery (for long-running tasks)
task = process_data.delay(data)
return {"task_id": task.id}
@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
task = celery_app.AsyncResult(task_id)
return {
"task_id": task_id,
"status": task.status,
"result": task.result if task.ready() else None
}
10. API Versioning
from fastapi import APIRouter, FastAPI
from enum import Enum
class APIVersion(str, Enum):
V1 = "v1"
V2 = "v2"
app = FastAPI()
# Version 1 router
router_v1 = APIRouter(prefix="/api/v1", tags=["v1"])
@router_v1.get("/users/{user_id}")
async def get_user_v1(user_id: int):
return {"id": user_id, "version": "v1"}
# Version 2 router
router_v2 = APIRouter(prefix="/api/v2", tags=["v2"])
@router_v2.get("/users/{user_id}")
async def get_user_v2(user_id: int):
return {
"id": user_id,
"version": "v2",
"additional_field": "new in v2"
}
app.include_router(router_v1)
app.include_router(router_v2)
# Header-based versioning (alternative)
from fastapi import Header
@app.get("/users/{user_id}")
async def get_user(
user_id: int,
api_version: str = Header(default="v1", alias="X-API-Version")
):
if api_version == "v2":
return {"id": user_id, "version": "v2"}
return {"id": user_id, "version": "v1"}
11. Middleware Patterns
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
import time
import logging
class LoggingMiddleware(BaseHTTPMiddleware):
"""Middleware for request/response logging"""
async def dispatch(self, request: Request, call_next):
start_time = time.time()
# Log request
logging.info(f"Request: {request.method} {request.url}")
response = await call_next(request)
# Log response
process_time = time.time() - start_time
logging.info(
f"Response: {response.status_code} "
f"(took {process_time:.2f}s)"
)
response.headers["X-Process-Time"] = str(process_time)
return response
class RateLimitMiddleware(BaseHTTPMiddleware):
"""Simple rate limiting middleware"""
def __init__(self, app, requests_per_minute: int = 60):
super().__init__(app)
self.requests_per_minute = requests_per_minute
self.request_counts = {}
async def dispatch(self, request: Request, call_next):
client_ip = request.client.host
current_minute = int(time.time() / 60)
key = f"{client_ip}:{current_minute}"
self.request_counts[key] = self.request_counts.get(key, 0) + 1
if self.request_counts[key] > self.requests_per_minute:
return JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded"}
)
return await call_next(request)
# Add middleware to app
app = FastAPI()
app.add_middleware(LoggingMiddleware)
app.add_middleware(RateLimitMiddleware, requests_per_minute=100)
12. Structured Logging
import structlog
from typing import Any
import logging
# Configure structlog
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
# Get logger
logger = structlog.get_logger()
# Usage
logger.info(
"user_created",
user_id=123,
email="user@example.com",
ip_address="192.168.1.1"
)
# Context binding
logger = logger.bind(request_id="abc-123")
logger.info("processing_request")
logger.info("request_completed", duration_ms=150)
These patterns provide battle-tested solutions for common architectural challenges in Python backend development.