Files
gh-jezweb-claude-skills-ski…/references/production-patterns.md
2025-11-30 08:24:49 +08:00

12 KiB

FastMCP Production Patterns Reference

Battle-tested patterns for production-ready FastMCP servers.

Self-Contained Server Pattern

Problem: Circular imports break cloud deployment Solution: Keep all utilities in one file

# src/utils.py - All utilities in one place
import os
from typing import Dict, Any
from datetime import datetime

class Config:
    """Configuration from environment."""
    SERVER_NAME = os.getenv("SERVER_NAME", "FastMCP Server")
    API_KEY = os.getenv("API_KEY", "")
    CACHE_TTL = int(os.getenv("CACHE_TTL", "300"))

def format_success(data: Any) -> Dict[str, Any]:
    """Format successful response."""
    return {
        "success": True,
        "data": data,
        "timestamp": datetime.now().isoformat()
    }

def format_error(error: str, code: str = "ERROR") -> Dict[str, Any]:
    """Format error response."""
    return {
        "success": False,
        "error": error,
        "code": code,
        "timestamp": datetime.now().isoformat()
    }

# Usage in tools
from .utils import format_success, format_error, Config

@mcp.tool()
async def process_data(data: dict) -> dict:
    try:
        result = await process(data)
        return format_success(result)
    except Exception as e:
        return format_error(str(e))

Why it works:

  • No circular dependencies
  • Cloud deployment safe
  • Easy to maintain
  • Single source of truth

Lazy Initialization Pattern

Problem: Creating expensive resources at import time fails in cloud Solution: Initialize resources only when needed

class ResourceManager:
    """Manages expensive resources with lazy initialization."""
    _db_pool = None
    _cache = None

    @classmethod
    async def get_db(cls):
        """Get database pool (create on first use)."""
        if cls._db_pool is None:
            cls._db_pool = await create_db_pool()
        return cls._db_pool

    @classmethod
    async def get_cache(cls):
        """Get cache (create on first use)."""
        if cls._cache is None:
            cls._cache = await create_cache()
        return cls._cache

# Usage - no initialization at module level
manager = ResourceManager()  # Lightweight

@mcp.tool()
async def database_operation():
    db = await manager.get_db()  # Initialization happens here
    return await db.query("SELECT * FROM users")

Connection Pooling Pattern

Problem: Creating new connections for each request is slow Solution: Reuse HTTP clients with connection pooling

import httpx

class APIClient:
    _instance: Optional[httpx.AsyncClient] = None

    @classmethod
    async def get_client(cls) -> httpx.AsyncClient:
        """Get or create shared HTTP client."""
        if cls._instance is None:
            cls._instance = httpx.AsyncClient(
                base_url=API_BASE_URL,
                timeout=httpx.Timeout(30.0),
                limits=httpx.Limits(
                    max_keepalive_connections=5,
                    max_connections=10
                )
            )
        return cls._instance

    @classmethod
    async def cleanup(cls):
        """Cleanup on shutdown."""
        if cls._instance:
            await cls._instance.aclose()
            cls._instance = None

@mcp.tool()
async def api_request(endpoint: str) -> dict:
    client = await APIClient.get_client()
    response = await client.get(endpoint)
    return response.json()

Retry with Exponential Backoff

Problem: Transient failures cause tool failures Solution: Automatic retry with exponential backoff

import asyncio

async def retry_with_backoff(
    func,
    max_retries: int = 3,
    initial_delay: float = 1.0,
    exponential_base: float = 2.0
):
    """Retry function with exponential backoff."""
    delay = initial_delay
    last_exception = None

    for attempt in range(max_retries):
        try:
            return await func()
        except (httpx.TimeoutException, httpx.NetworkError) as e:
            last_exception = e
            if attempt < max_retries - 1:
                await asyncio.sleep(delay)
                delay *= exponential_base

    raise last_exception

@mcp.tool()
async def resilient_api_call(endpoint: str) -> dict:
    """API call with automatic retry."""
    async def make_call():
        client = await APIClient.get_client()
        response = await client.get(endpoint)
        response.raise_for_status()
        return response.json()

    try:
        data = await retry_with_backoff(make_call)
        return {"success": True, "data": data}
    except Exception as e:
        return {"success": False, "error": str(e)}

Time-Based Caching Pattern

Problem: Repeated API calls for same data waste time/money Solution: Cache with TTL (time-to-live)

import time

class TimeBasedCache:
    def __init__(self, ttl: int = 300):
        self.ttl = ttl
        self.cache = {}
        self.timestamps = {}

    def get(self, key: str):
        if key in self.cache:
            if time.time() - self.timestamps[key] < self.ttl:
                return self.cache[key]
            else:
                del self.cache[key]
                del self.timestamps[key]
        return None

    def set(self, key: str, value):
        self.cache[key] = value
        self.timestamps[key] = time.time()

cache = TimeBasedCache(ttl=300)

@mcp.tool()
async def cached_fetch(resource_id: str) -> dict:
    """Fetch with caching."""
    cache_key = f"resource:{resource_id}"

    cached = cache.get(cache_key)
    if cached:
        return {"data": cached, "from_cache": True}

    data = await fetch_from_api(resource_id)
    cache.set(cache_key, data)

    return {"data": data, "from_cache": False}

Structured Error Responses

Problem: Inconsistent error formats make debugging hard Solution: Standardized error response format

from enum import Enum

class ErrorCode(Enum):
    VALIDATION_ERROR = "VALIDATION_ERROR"
    NOT_FOUND = "NOT_FOUND"
    API_ERROR = "API_ERROR"
    TIMEOUT = "TIMEOUT"
    UNKNOWN = "UNKNOWN"

def create_error(code: ErrorCode, message: str, details: dict = None):
    """Create structured error response."""
    return {
        "success": False,
        "error": {
            "code": code.value,
            "message": message,
            "details": details or {},
            "timestamp": datetime.now().isoformat()
        }
    }

@mcp.tool()
async def validated_operation(data: str) -> dict:
    if not data:
        return create_error(
            ErrorCode.VALIDATION_ERROR,
            "Data is required",
            {"field": "data"}
        )

    try:
        result = await process(data)
        return {"success": True, "data": result}
    except Exception as e:
        return create_error(ErrorCode.UNKNOWN, str(e))

Environment-Based Configuration

Problem: Different settings for dev/staging/production Solution: Environment-based configuration class

import os
from enum import Enum

class Environment(Enum):
    DEVELOPMENT = "development"
    STAGING = "staging"
    PRODUCTION = "production"

class Config:
    ENV = Environment(os.getenv("ENVIRONMENT", "development"))

    SETTINGS = {
        Environment.DEVELOPMENT: {
            "debug": True,
            "cache_ttl": 60,
            "log_level": "DEBUG"
        },
        Environment.STAGING: {
            "debug": True,
            "cache_ttl": 300,
            "log_level": "INFO"
        },
        Environment.PRODUCTION: {
            "debug": False,
            "cache_ttl": 3600,
            "log_level": "WARNING"
        }
    }

    @classmethod
    def get(cls, key: str):
        return cls.SETTINGS[cls.ENV].get(key)

# Use configuration
cache_ttl = Config.get("cache_ttl")
debug_mode = Config.get("debug")

Health Check Pattern

Problem: Need to monitor server health in production Solution: Comprehensive health check resource

@mcp.resource("health://status")
async def health_check() -> dict:
    """Comprehensive health check."""
    checks = {}

    # Check API connectivity
    try:
        client = await APIClient.get_client()
        response = await client.get("/health", timeout=5)
        checks["api"] = response.status_code == 200
    except:
        checks["api"] = False

    # Check database (if applicable)
    try:
        db = await ResourceManager.get_db()
        await db.execute("SELECT 1")
        checks["database"] = True
    except:
        checks["database"] = False

    # System resources
    import psutil
    checks["memory_percent"] = psutil.virtual_memory().percent
    checks["cpu_percent"] = psutil.cpu_percent()

    # Overall status
    all_healthy = (
        checks.get("api", True) and
        checks.get("database", True) and
        checks["memory_percent"] < 90 and
        checks["cpu_percent"] < 90
    )

    return {
        "status": "healthy" if all_healthy else "degraded",
        "timestamp": datetime.now().isoformat(),
        "checks": checks
    }

Parallel Processing Pattern

Problem: Sequential processing is slow for batch operations Solution: Process items in parallel

import asyncio

@mcp.tool()
async def batch_process(items: list[str]) -> dict:
    """Process multiple items in parallel."""
    async def process_single(item: str):
        try:
            result = await process_item(item)
            return {"item": item, "success": True, "result": result}
        except Exception as e:
            return {"item": item, "success": False, "error": str(e)}

    # Process all items in parallel
    tasks = [process_single(item) for item in items]
    results = await asyncio.gather(*tasks)

    successful = [r for r in results if r["success"]]
    failed = [r for r in results if not r["success"]]

    return {
        "total": len(items),
        "successful": len(successful),
        "failed": len(failed),
        "results": results
    }

State Management Pattern

Problem: Shared state causes race conditions Solution: Thread-safe state management with locks

import asyncio

class StateManager:
    def __init__(self):
        self._state = {}
        self._locks = {}

    async def get(self, key: str, default=None):
        return self._state.get(key, default)

    async def set(self, key: str, value):
        if key not in self._locks:
            self._locks[key] = asyncio.Lock()

        async with self._locks[key]:
            self._state[key] = value

    async def update(self, key: str, updater):
        """Update with function."""
        if key not in self._locks:
            self._locks[key] = asyncio.Lock()

        async with self._locks[key]:
            current = self._state.get(key)
            self._state[key] = await updater(current)
            return self._state[key]

state = StateManager()

@mcp.tool()
async def increment_counter(name: str) -> dict:
    new_value = await state.update(
        f"counter_{name}",
        lambda x: (x or 0) + 1
    )
    return {"counter": name, "value": new_value}

Anti-Patterns to Avoid

Factory Functions in init.py

# DON'T DO THIS
# shared/__init__.py
def get_api_client():
    from .api_client import APIClient  # Circular import risk
    return APIClient()

Blocking Operations in Async

# DON'T DO THIS
@mcp.tool()
async def bad_async():
    time.sleep(5)  # Blocks entire event loop!
    return "done"

# DO THIS INSTEAD
@mcp.tool()
async def good_async():
    await asyncio.sleep(5)
    return "done"

Global Mutable State

# DON'T DO THIS
results = []  # Race conditions!

@mcp.tool()
async def add_result(data: str):
    results.append(data)

Production Deployment Checklist

  • Module-level server object
  • Environment variables for all config
  • Connection pooling for HTTP clients
  • Retry logic for transient failures
  • Caching for expensive operations
  • Structured error responses
  • Health check endpoint
  • Logging configured
  • No circular imports
  • No import-time async execution
  • Rate limiting if needed
  • Graceful shutdown handling

Resources

  • Production Examples: See self-contained-server.py template
  • Error Handling: See error-handling.py template
  • API Patterns: See api-client-pattern.py template