Files
gh-rknall-claude-skills-pyt…/common-patterns.md
2025-11-30 08:52:02 +08:00

17 KiB

Common Python Backend Architecture Patterns

This document provides reference implementations and patterns for common architectural decisions.

1. Repository Pattern

from abc import ABC, abstractmethod
from typing import Generic, TypeVar, Optional, List
from sqlalchemy.orm import Session

T = TypeVar('T')

class BaseRepository(ABC, Generic[T]):
    """Abstract base repository for data access"""

    @abstractmethod
    async def get(self, id: int) -> Optional[T]:
        pass

    @abstractmethod
    async def list(self, skip: int = 0, limit: int = 100) -> List[T]:
        pass

    @abstractmethod
    async def create(self, obj: T) -> T:
        pass

    @abstractmethod
    async def update(self, id: int, obj: T) -> Optional[T]:
        pass

    @abstractmethod
    async def delete(self, id: int) -> bool:
        pass


class SQLAlchemyRepository(BaseRepository[T]):
    """SQLAlchemy implementation of repository pattern"""

    def __init__(self, session: Session, model_class: type):
        self.session = session
        self.model_class = model_class

    async def get(self, id: int) -> Optional[T]:
        return self.session.query(self.model_class).filter_by(id=id).first()

    async def list(self, skip: int = 0, limit: int = 100) -> List[T]:
        return self.session.query(self.model_class).offset(skip).limit(limit).all()

    async def create(self, obj: T) -> T:
        self.session.add(obj)
        self.session.commit()
        self.session.refresh(obj)
        return obj

    async def update(self, id: int, obj: T) -> Optional[T]:
        existing = await self.get(id)
        if existing:
            for key, value in obj.__dict__.items():
                setattr(existing, key, value)
            self.session.commit()
            return existing
        return None

    async def delete(self, id: int) -> bool:
        obj = await self.get(id)
        if obj:
            self.session.delete(obj)
            self.session.commit()
            return True
        return False

2. Service Layer Pattern

from typing import Protocol
from .repositories import UserRepository
from .models import User
from .schemas import UserCreate, UserUpdate

class IUserService(Protocol):
    """Interface for user service"""

    async def get_user(self, user_id: int) -> User:
        ...

    async def create_user(self, user_data: UserCreate) -> User:
        ...


class UserService:
    """Service layer for user business logic"""

    def __init__(self, user_repo: UserRepository):
        self.user_repo = user_repo

    async def get_user(self, user_id: int) -> User:
        user = await self.user_repo.get(user_id)
        if not user:
            raise ValueError(f"User {user_id} not found")
        return user

    async def create_user(self, user_data: UserCreate) -> User:
        # Business logic here
        if await self._email_exists(user_data.email):
            raise ValueError("Email already registered")

        user = User(**user_data.dict())
        return await self.user_repo.create(user)

    async def _email_exists(self, email: str) -> bool:
        # Check if email exists
        return await self.user_repo.find_by_email(email) is not None

3. Dependency Injection with FastAPI

from fastapi import Depends, FastAPI
from sqlalchemy.orm import Session
from typing import Generator

app = FastAPI()

# Database session dependency
def get_db() -> Generator[Session, None, None]:
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# Repository dependency
def get_user_repository(db: Session = Depends(get_db)) -> UserRepository:
    return UserRepository(db)

# Service dependency
def get_user_service(
    user_repo: UserRepository = Depends(get_user_repository)
) -> UserService:
    return UserService(user_repo)

# Route using dependency injection
@app.get("/users/{user_id}")
async def get_user(
    user_id: int,
    user_service: UserService = Depends(get_user_service)
):
    return await user_service.get_user(user_id)

4. Event-Driven Architecture

from typing import Callable, Dict, List, Any
from dataclasses import dataclass
from datetime import datetime
import asyncio

@dataclass
class Event:
    """Base event class"""
    event_type: str
    data: Dict[str, Any]
    timestamp: datetime = datetime.utcnow()

class EventBus:
    """Simple in-memory event bus"""

    def __init__(self):
        self._subscribers: Dict[str, List[Callable]] = {}

    def subscribe(self, event_type: str, handler: Callable):
        """Subscribe to an event type"""
        if event_type not in self._subscribers:
            self._subscribers[event_type] = []
        self._subscribers[event_type].append(handler)

    async def publish(self, event: Event):
        """Publish an event to all subscribers"""
        handlers = self._subscribers.get(event.event_type, [])
        await asyncio.gather(*[handler(event) for handler in handlers])

# Usage
event_bus = EventBus()

@dataclass
class UserCreatedEvent(Event):
    event_type: str = "user.created"

async def send_welcome_email(event: Event):
    print(f"Sending welcome email for user {event.data['user_id']}")

async def create_user_profile(event: Event):
    print(f"Creating profile for user {event.data['user_id']}")

# Subscribe handlers
event_bus.subscribe("user.created", send_welcome_email)
event_bus.subscribe("user.created", create_user_profile)

# Publish event
await event_bus.publish(UserCreatedEvent(data={"user_id": 123}))

5. Circuit Breaker Pattern

from enum import Enum
from datetime import datetime, timedelta
from typing import Callable, Any
import asyncio

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    """Circuit breaker for external service calls"""

    def __init__(
        self,
        failure_threshold: int = 5,
        timeout: int = 60,
        recovery_timeout: int = 30
    ):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

    async def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with circuit breaker protection"""

        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = await asyncio.wait_for(
                func(*args, **kwargs),
                timeout=self.timeout
            )
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise e

    def _on_success(self):
        """Handle successful call"""
        self.failure_count = 0
        self.state = CircuitState.CLOSED

    def _on_failure(self):
        """Handle failed call"""
        self.failure_count += 1
        self.last_failure_time = datetime.utcnow()

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

    def _should_attempt_reset(self) -> bool:
        """Check if enough time has passed to retry"""
        if self.last_failure_time is None:
            return True

        return (
            datetime.utcnow() - self.last_failure_time
        ).total_seconds() >= self.recovery_timeout

# Usage
circuit_breaker = CircuitBreaker(failure_threshold=3, timeout=5)

async def call_external_api():
    # External API call
    pass

result = await circuit_breaker.call(call_external_api)

6. CQRS Pattern

from abc import ABC, abstractmethod
from typing import Generic, TypeVar
from pydantic import BaseModel

# Commands
class Command(BaseModel):
    """Base command"""
    pass

class CreateUserCommand(Command):
    email: str
    name: str

# Command Handlers
TCommand = TypeVar('TCommand', bound=Command)

class CommandHandler(ABC, Generic[TCommand]):
    """Abstract command handler"""

    @abstractmethod
    async def handle(self, command: TCommand) -> Any:
        pass

class CreateUserCommandHandler(CommandHandler[CreateUserCommand]):
    """Handler for creating users"""

    def __init__(self, user_repo: UserRepository):
        self.user_repo = user_repo

    async def handle(self, command: CreateUserCommand) -> User:
        user = User(email=command.email, name=command.name)
        return await self.user_repo.create(user)

# Queries
class Query(BaseModel):
    """Base query"""
    pass

class GetUserQuery(Query):
    user_id: int

# Query Handlers
TQuery = TypeVar('TQuery', bound=Query)

class QueryHandler(ABC, Generic[TQuery]):
    """Abstract query handler"""

    @abstractmethod
    async def handle(self, query: TQuery) -> Any:
        pass

class GetUserQueryHandler(QueryHandler[GetUserQuery]):
    """Handler for getting users"""

    def __init__(self, user_repo: UserRepository):
        self.user_repo = user_repo

    async def handle(self, query: GetUserQuery) -> User:
        return await self.user_repo.get(query.user_id)

# Command Bus
class CommandBus:
    """Simple command bus"""

    def __init__(self):
        self._handlers = {}

    def register(self, command_type: type, handler: CommandHandler):
        self._handlers[command_type] = handler

    async def execute(self, command: Command):
        handler = self._handlers.get(type(command))
        if not handler:
            raise ValueError(f"No handler for {type(command)}")
        return await handler.handle(command)

7. Retry Pattern with Exponential Backoff

import asyncio
from typing import Callable, TypeVar, Any
from functools import wraps

T = TypeVar('T')

def retry_with_backoff(
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0
):
    """Decorator for retry logic with exponential backoff"""

    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            retries = 0

            while retries < max_retries:
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    retries += 1

                    if retries >= max_retries:
                        raise e

                    delay = min(
                        base_delay * (exponential_base ** (retries - 1)),
                        max_delay
                    )

                    print(f"Retry {retries}/{max_retries} after {delay}s")
                    await asyncio.sleep(delay)

            raise Exception("Max retries exceeded")

        return wrapper
    return decorator

# Usage
@retry_with_backoff(max_retries=3, base_delay=1.0)
async def fetch_data_from_api():
    # API call that might fail
    pass

8. Settings Management

from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    """Application settings"""

    # API Settings
    api_title: str = "My API"
    api_version: str = "1.0.0"

    # Database
    database_url: str
    db_pool_size: int = 5

    # Redis
    redis_url: str
    redis_ttl: int = 3600

    # Security
    secret_key: str
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30

    # External Services
    external_api_url: str
    external_api_key: str

    # Observability
    log_level: str = "INFO"
    sentry_dsn: str | None = None

    class Config:
        env_file = ".env"
        case_sensitive = False

@lru_cache()
def get_settings() -> Settings:
    """Cached settings instance"""
    return Settings()

# Usage
settings = get_settings()

9. Background Task Processing

from celery import Celery
from typing import Any

# Celery app
celery_app = Celery(
    "tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0"
)

celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
    task_track_started=True,
    task_time_limit=300,  # 5 minutes
    task_soft_time_limit=240,  # 4 minutes
)

@celery_app.task(bind=True, max_retries=3)
def process_data(self, data: dict) -> Any:
    """Background task with retry logic"""
    try:
        # Process data
        result = heavy_computation(data)
        return result
    except Exception as exc:
        # Retry with exponential backoff
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)

# FastAPI integration
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

@app.post("/process")
async def trigger_processing(data: dict, background_tasks: BackgroundTasks):
    # Option 1: FastAPI background tasks (for quick tasks)
    background_tasks.add_task(quick_task, data)

    # Option 2: Celery (for long-running tasks)
    task = process_data.delay(data)

    return {"task_id": task.id}

@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
    task = celery_app.AsyncResult(task_id)
    return {
        "task_id": task_id,
        "status": task.status,
        "result": task.result if task.ready() else None
    }

10. API Versioning

from fastapi import APIRouter, FastAPI
from enum import Enum

class APIVersion(str, Enum):
    V1 = "v1"
    V2 = "v2"

app = FastAPI()

# Version 1 router
router_v1 = APIRouter(prefix="/api/v1", tags=["v1"])

@router_v1.get("/users/{user_id}")
async def get_user_v1(user_id: int):
    return {"id": user_id, "version": "v1"}

# Version 2 router
router_v2 = APIRouter(prefix="/api/v2", tags=["v2"])

@router_v2.get("/users/{user_id}")
async def get_user_v2(user_id: int):
    return {
        "id": user_id,
        "version": "v2",
        "additional_field": "new in v2"
    }

app.include_router(router_v1)
app.include_router(router_v2)

# Header-based versioning (alternative)
from fastapi import Header

@app.get("/users/{user_id}")
async def get_user(
    user_id: int,
    api_version: str = Header(default="v1", alias="X-API-Version")
):
    if api_version == "v2":
        return {"id": user_id, "version": "v2"}
    return {"id": user_id, "version": "v1"}

11. Middleware Patterns

from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
import time
import logging

class LoggingMiddleware(BaseHTTPMiddleware):
    """Middleware for request/response logging"""

    async def dispatch(self, request: Request, call_next):
        start_time = time.time()

        # Log request
        logging.info(f"Request: {request.method} {request.url}")

        response = await call_next(request)

        # Log response
        process_time = time.time() - start_time
        logging.info(
            f"Response: {response.status_code} "
            f"(took {process_time:.2f}s)"
        )

        response.headers["X-Process-Time"] = str(process_time)
        return response

class RateLimitMiddleware(BaseHTTPMiddleware):
    """Simple rate limiting middleware"""

    def __init__(self, app, requests_per_minute: int = 60):
        super().__init__(app)
        self.requests_per_minute = requests_per_minute
        self.request_counts = {}

    async def dispatch(self, request: Request, call_next):
        client_ip = request.client.host
        current_minute = int(time.time() / 60)

        key = f"{client_ip}:{current_minute}"
        self.request_counts[key] = self.request_counts.get(key, 0) + 1

        if self.request_counts[key] > self.requests_per_minute:
            return JSONResponse(
                status_code=429,
                content={"error": "Rate limit exceeded"}
            )

        return await call_next(request)

# Add middleware to app
app = FastAPI()
app.add_middleware(LoggingMiddleware)
app.add_middleware(RateLimitMiddleware, requests_per_minute=100)

12. Structured Logging

import structlog
from typing import Any
import logging

# Configure structlog
structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.UnicodeDecoder(),
        structlog.processors.JSONRenderer()
    ],
    context_class=dict,
    logger_factory=structlog.stdlib.LoggerFactory(),
    cache_logger_on_first_use=True,
)

# Get logger
logger = structlog.get_logger()

# Usage
logger.info(
    "user_created",
    user_id=123,
    email="user@example.com",
    ip_address="192.168.1.1"
)

# Context binding
logger = logger.bind(request_id="abc-123")
logger.info("processing_request")
logger.info("request_completed", duration_ms=150)

These patterns provide battle-tested solutions for common architectural challenges in Python backend development.