Files
gh-jezweb-claude-skills-ski…/references/production-patterns.md
2025-11-30 08:24:49 +08:00

489 lines
12 KiB
Markdown

# FastMCP Production Patterns Reference
Battle-tested patterns for production-ready FastMCP servers.
## Self-Contained Server Pattern
**Problem:** Circular imports break cloud deployment
**Solution:** Keep all utilities in one file
```python
# src/utils.py - All utilities in one place
import os
from typing import Dict, Any
from datetime import datetime
class Config:
"""Configuration from environment."""
SERVER_NAME = os.getenv("SERVER_NAME", "FastMCP Server")
API_KEY = os.getenv("API_KEY", "")
CACHE_TTL = int(os.getenv("CACHE_TTL", "300"))
def format_success(data: Any) -> Dict[str, Any]:
"""Format successful response."""
return {
"success": True,
"data": data,
"timestamp": datetime.now().isoformat()
}
def format_error(error: str, code: str = "ERROR") -> Dict[str, Any]:
"""Format error response."""
return {
"success": False,
"error": error,
"code": code,
"timestamp": datetime.now().isoformat()
}
# Usage in tools
from .utils import format_success, format_error, Config
@mcp.tool()
async def process_data(data: dict) -> dict:
try:
result = await process(data)
return format_success(result)
except Exception as e:
return format_error(str(e))
```
**Why it works:**
- No circular dependencies
- Cloud deployment safe
- Easy to maintain
- Single source of truth
## Lazy Initialization Pattern
**Problem:** Creating expensive resources at import time fails in cloud
**Solution:** Initialize resources only when needed
```python
class ResourceManager:
"""Manages expensive resources with lazy initialization."""
_db_pool = None
_cache = None
@classmethod
async def get_db(cls):
"""Get database pool (create on first use)."""
if cls._db_pool is None:
cls._db_pool = await create_db_pool()
return cls._db_pool
@classmethod
async def get_cache(cls):
"""Get cache (create on first use)."""
if cls._cache is None:
cls._cache = await create_cache()
return cls._cache
# Usage - no initialization at module level
manager = ResourceManager() # Lightweight
@mcp.tool()
async def database_operation():
db = await manager.get_db() # Initialization happens here
return await db.query("SELECT * FROM users")
```
## Connection Pooling Pattern
**Problem:** Creating new connections for each request is slow
**Solution:** Reuse HTTP clients with connection pooling
```python
import httpx
class APIClient:
_instance: Optional[httpx.AsyncClient] = None
@classmethod
async def get_client(cls) -> httpx.AsyncClient:
"""Get or create shared HTTP client."""
if cls._instance is None:
cls._instance = httpx.AsyncClient(
base_url=API_BASE_URL,
timeout=httpx.Timeout(30.0),
limits=httpx.Limits(
max_keepalive_connections=5,
max_connections=10
)
)
return cls._instance
@classmethod
async def cleanup(cls):
"""Cleanup on shutdown."""
if cls._instance:
await cls._instance.aclose()
cls._instance = None
@mcp.tool()
async def api_request(endpoint: str) -> dict:
client = await APIClient.get_client()
response = await client.get(endpoint)
return response.json()
```
## Retry with Exponential Backoff
**Problem:** Transient failures cause tool failures
**Solution:** Automatic retry with exponential backoff
```python
import asyncio
async def retry_with_backoff(
func,
max_retries: int = 3,
initial_delay: float = 1.0,
exponential_base: float = 2.0
):
"""Retry function with exponential backoff."""
delay = initial_delay
last_exception = None
for attempt in range(max_retries):
try:
return await func()
except (httpx.TimeoutException, httpx.NetworkError) as e:
last_exception = e
if attempt < max_retries - 1:
await asyncio.sleep(delay)
delay *= exponential_base
raise last_exception
@mcp.tool()
async def resilient_api_call(endpoint: str) -> dict:
"""API call with automatic retry."""
async def make_call():
client = await APIClient.get_client()
response = await client.get(endpoint)
response.raise_for_status()
return response.json()
try:
data = await retry_with_backoff(make_call)
return {"success": True, "data": data}
except Exception as e:
return {"success": False, "error": str(e)}
```
## Time-Based Caching Pattern
**Problem:** Repeated API calls for same data waste time/money
**Solution:** Cache with TTL (time-to-live)
```python
import time
class TimeBasedCache:
def __init__(self, ttl: int = 300):
self.ttl = ttl
self.cache = {}
self.timestamps = {}
def get(self, key: str):
if key in self.cache:
if time.time() - self.timestamps[key] < self.ttl:
return self.cache[key]
else:
del self.cache[key]
del self.timestamps[key]
return None
def set(self, key: str, value):
self.cache[key] = value
self.timestamps[key] = time.time()
cache = TimeBasedCache(ttl=300)
@mcp.tool()
async def cached_fetch(resource_id: str) -> dict:
"""Fetch with caching."""
cache_key = f"resource:{resource_id}"
cached = cache.get(cache_key)
if cached:
return {"data": cached, "from_cache": True}
data = await fetch_from_api(resource_id)
cache.set(cache_key, data)
return {"data": data, "from_cache": False}
```
## Structured Error Responses
**Problem:** Inconsistent error formats make debugging hard
**Solution:** Standardized error response format
```python
from enum import Enum
class ErrorCode(Enum):
VALIDATION_ERROR = "VALIDATION_ERROR"
NOT_FOUND = "NOT_FOUND"
API_ERROR = "API_ERROR"
TIMEOUT = "TIMEOUT"
UNKNOWN = "UNKNOWN"
def create_error(code: ErrorCode, message: str, details: dict = None):
"""Create structured error response."""
return {
"success": False,
"error": {
"code": code.value,
"message": message,
"details": details or {},
"timestamp": datetime.now().isoformat()
}
}
@mcp.tool()
async def validated_operation(data: str) -> dict:
if not data:
return create_error(
ErrorCode.VALIDATION_ERROR,
"Data is required",
{"field": "data"}
)
try:
result = await process(data)
return {"success": True, "data": result}
except Exception as e:
return create_error(ErrorCode.UNKNOWN, str(e))
```
## Environment-Based Configuration
**Problem:** Different settings for dev/staging/production
**Solution:** Environment-based configuration class
```python
import os
from enum import Enum
class Environment(Enum):
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
class Config:
ENV = Environment(os.getenv("ENVIRONMENT", "development"))
SETTINGS = {
Environment.DEVELOPMENT: {
"debug": True,
"cache_ttl": 60,
"log_level": "DEBUG"
},
Environment.STAGING: {
"debug": True,
"cache_ttl": 300,
"log_level": "INFO"
},
Environment.PRODUCTION: {
"debug": False,
"cache_ttl": 3600,
"log_level": "WARNING"
}
}
@classmethod
def get(cls, key: str):
return cls.SETTINGS[cls.ENV].get(key)
# Use configuration
cache_ttl = Config.get("cache_ttl")
debug_mode = Config.get("debug")
```
## Health Check Pattern
**Problem:** Need to monitor server health in production
**Solution:** Comprehensive health check resource
```python
@mcp.resource("health://status")
async def health_check() -> dict:
"""Comprehensive health check."""
checks = {}
# Check API connectivity
try:
client = await APIClient.get_client()
response = await client.get("/health", timeout=5)
checks["api"] = response.status_code == 200
except:
checks["api"] = False
# Check database (if applicable)
try:
db = await ResourceManager.get_db()
await db.execute("SELECT 1")
checks["database"] = True
except:
checks["database"] = False
# System resources
import psutil
checks["memory_percent"] = psutil.virtual_memory().percent
checks["cpu_percent"] = psutil.cpu_percent()
# Overall status
all_healthy = (
checks.get("api", True) and
checks.get("database", True) and
checks["memory_percent"] < 90 and
checks["cpu_percent"] < 90
)
return {
"status": "healthy" if all_healthy else "degraded",
"timestamp": datetime.now().isoformat(),
"checks": checks
}
```
## Parallel Processing Pattern
**Problem:** Sequential processing is slow for batch operations
**Solution:** Process items in parallel
```python
import asyncio
@mcp.tool()
async def batch_process(items: list[str]) -> dict:
"""Process multiple items in parallel."""
async def process_single(item: str):
try:
result = await process_item(item)
return {"item": item, "success": True, "result": result}
except Exception as e:
return {"item": item, "success": False, "error": str(e)}
# Process all items in parallel
tasks = [process_single(item) for item in items]
results = await asyncio.gather(*tasks)
successful = [r for r in results if r["success"]]
failed = [r for r in results if not r["success"]]
return {
"total": len(items),
"successful": len(successful),
"failed": len(failed),
"results": results
}
```
## State Management Pattern
**Problem:** Shared state causes race conditions
**Solution:** Thread-safe state management with locks
```python
import asyncio
class StateManager:
def __init__(self):
self._state = {}
self._locks = {}
async def get(self, key: str, default=None):
return self._state.get(key, default)
async def set(self, key: str, value):
if key not in self._locks:
self._locks[key] = asyncio.Lock()
async with self._locks[key]:
self._state[key] = value
async def update(self, key: str, updater):
"""Update with function."""
if key not in self._locks:
self._locks[key] = asyncio.Lock()
async with self._locks[key]:
current = self._state.get(key)
self._state[key] = await updater(current)
return self._state[key]
state = StateManager()
@mcp.tool()
async def increment_counter(name: str) -> dict:
new_value = await state.update(
f"counter_{name}",
lambda x: (x or 0) + 1
)
return {"counter": name, "value": new_value}
```
## Anti-Patterns to Avoid
### ❌ Factory Functions in __init__.py
```python
# DON'T DO THIS
# shared/__init__.py
def get_api_client():
from .api_client import APIClient # Circular import risk
return APIClient()
```
### ❌ Blocking Operations in Async
```python
# DON'T DO THIS
@mcp.tool()
async def bad_async():
time.sleep(5) # Blocks entire event loop!
return "done"
# DO THIS INSTEAD
@mcp.tool()
async def good_async():
await asyncio.sleep(5)
return "done"
```
### ❌ Global Mutable State
```python
# DON'T DO THIS
results = [] # Race conditions!
@mcp.tool()
async def add_result(data: str):
results.append(data)
```
## Production Deployment Checklist
- [ ] Module-level server object
- [ ] Environment variables for all config
- [ ] Connection pooling for HTTP clients
- [ ] Retry logic for transient failures
- [ ] Caching for expensive operations
- [ ] Structured error responses
- [ ] Health check endpoint
- [ ] Logging configured
- [ ] No circular imports
- [ ] No import-time async execution
- [ ] Rate limiting if needed
- [ ] Graceful shutdown handling
## Resources
- **Production Examples**: See `self-contained-server.py` template
- **Error Handling**: See `error-handling.py` template
- **API Patterns**: See `api-client-pattern.py` template