489 lines
12 KiB
Markdown
489 lines
12 KiB
Markdown
# FastMCP Production Patterns Reference
|
|
|
|
Battle-tested patterns for production-ready FastMCP servers.
|
|
|
|
## Self-Contained Server Pattern
|
|
|
|
**Problem:** Circular imports break cloud deployment
|
|
**Solution:** Keep all utilities in one file
|
|
|
|
```python
|
|
# src/utils.py - All utilities in one place
|
|
import os
|
|
from typing import Dict, Any
|
|
from datetime import datetime
|
|
|
|
class Config:
|
|
"""Configuration from environment."""
|
|
SERVER_NAME = os.getenv("SERVER_NAME", "FastMCP Server")
|
|
API_KEY = os.getenv("API_KEY", "")
|
|
CACHE_TTL = int(os.getenv("CACHE_TTL", "300"))
|
|
|
|
def format_success(data: Any) -> Dict[str, Any]:
|
|
"""Format successful response."""
|
|
return {
|
|
"success": True,
|
|
"data": data,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
def format_error(error: str, code: str = "ERROR") -> Dict[str, Any]:
|
|
"""Format error response."""
|
|
return {
|
|
"success": False,
|
|
"error": error,
|
|
"code": code,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
# Usage in tools
|
|
from .utils import format_success, format_error, Config
|
|
|
|
@mcp.tool()
|
|
async def process_data(data: dict) -> dict:
|
|
try:
|
|
result = await process(data)
|
|
return format_success(result)
|
|
except Exception as e:
|
|
return format_error(str(e))
|
|
```
|
|
|
|
**Why it works:**
|
|
- No circular dependencies
|
|
- Cloud deployment safe
|
|
- Easy to maintain
|
|
- Single source of truth
|
|
|
|
## Lazy Initialization Pattern
|
|
|
|
**Problem:** Creating expensive resources at import time fails in cloud
|
|
**Solution:** Initialize resources only when needed
|
|
|
|
```python
|
|
class ResourceManager:
|
|
"""Manages expensive resources with lazy initialization."""
|
|
_db_pool = None
|
|
_cache = None
|
|
|
|
@classmethod
|
|
async def get_db(cls):
|
|
"""Get database pool (create on first use)."""
|
|
if cls._db_pool is None:
|
|
cls._db_pool = await create_db_pool()
|
|
return cls._db_pool
|
|
|
|
@classmethod
|
|
async def get_cache(cls):
|
|
"""Get cache (create on first use)."""
|
|
if cls._cache is None:
|
|
cls._cache = await create_cache()
|
|
return cls._cache
|
|
|
|
# Usage - no initialization at module level
|
|
manager = ResourceManager() # Lightweight
|
|
|
|
@mcp.tool()
|
|
async def database_operation():
|
|
db = await manager.get_db() # Initialization happens here
|
|
return await db.query("SELECT * FROM users")
|
|
```
|
|
|
|
## Connection Pooling Pattern
|
|
|
|
**Problem:** Creating new connections for each request is slow
|
|
**Solution:** Reuse HTTP clients with connection pooling
|
|
|
|
```python
|
|
import httpx
|
|
|
|
class APIClient:
|
|
_instance: Optional[httpx.AsyncClient] = None
|
|
|
|
@classmethod
|
|
async def get_client(cls) -> httpx.AsyncClient:
|
|
"""Get or create shared HTTP client."""
|
|
if cls._instance is None:
|
|
cls._instance = httpx.AsyncClient(
|
|
base_url=API_BASE_URL,
|
|
timeout=httpx.Timeout(30.0),
|
|
limits=httpx.Limits(
|
|
max_keepalive_connections=5,
|
|
max_connections=10
|
|
)
|
|
)
|
|
return cls._instance
|
|
|
|
@classmethod
|
|
async def cleanup(cls):
|
|
"""Cleanup on shutdown."""
|
|
if cls._instance:
|
|
await cls._instance.aclose()
|
|
cls._instance = None
|
|
|
|
@mcp.tool()
|
|
async def api_request(endpoint: str) -> dict:
|
|
client = await APIClient.get_client()
|
|
response = await client.get(endpoint)
|
|
return response.json()
|
|
```
|
|
|
|
## Retry with Exponential Backoff
|
|
|
|
**Problem:** Transient failures cause tool failures
|
|
**Solution:** Automatic retry with exponential backoff
|
|
|
|
```python
|
|
import asyncio
|
|
|
|
async def retry_with_backoff(
|
|
func,
|
|
max_retries: int = 3,
|
|
initial_delay: float = 1.0,
|
|
exponential_base: float = 2.0
|
|
):
|
|
"""Retry function with exponential backoff."""
|
|
delay = initial_delay
|
|
last_exception = None
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return await func()
|
|
except (httpx.TimeoutException, httpx.NetworkError) as e:
|
|
last_exception = e
|
|
if attempt < max_retries - 1:
|
|
await asyncio.sleep(delay)
|
|
delay *= exponential_base
|
|
|
|
raise last_exception
|
|
|
|
@mcp.tool()
|
|
async def resilient_api_call(endpoint: str) -> dict:
|
|
"""API call with automatic retry."""
|
|
async def make_call():
|
|
client = await APIClient.get_client()
|
|
response = await client.get(endpoint)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
try:
|
|
data = await retry_with_backoff(make_call)
|
|
return {"success": True, "data": data}
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)}
|
|
```
|
|
|
|
## Time-Based Caching Pattern
|
|
|
|
**Problem:** Repeated API calls for same data waste time/money
|
|
**Solution:** Cache with TTL (time-to-live)
|
|
|
|
```python
|
|
import time
|
|
|
|
class TimeBasedCache:
|
|
def __init__(self, ttl: int = 300):
|
|
self.ttl = ttl
|
|
self.cache = {}
|
|
self.timestamps = {}
|
|
|
|
def get(self, key: str):
|
|
if key in self.cache:
|
|
if time.time() - self.timestamps[key] < self.ttl:
|
|
return self.cache[key]
|
|
else:
|
|
del self.cache[key]
|
|
del self.timestamps[key]
|
|
return None
|
|
|
|
def set(self, key: str, value):
|
|
self.cache[key] = value
|
|
self.timestamps[key] = time.time()
|
|
|
|
cache = TimeBasedCache(ttl=300)
|
|
|
|
@mcp.tool()
|
|
async def cached_fetch(resource_id: str) -> dict:
|
|
"""Fetch with caching."""
|
|
cache_key = f"resource:{resource_id}"
|
|
|
|
cached = cache.get(cache_key)
|
|
if cached:
|
|
return {"data": cached, "from_cache": True}
|
|
|
|
data = await fetch_from_api(resource_id)
|
|
cache.set(cache_key, data)
|
|
|
|
return {"data": data, "from_cache": False}
|
|
```
|
|
|
|
## Structured Error Responses
|
|
|
|
**Problem:** Inconsistent error formats make debugging hard
|
|
**Solution:** Standardized error response format
|
|
|
|
```python
|
|
from enum import Enum
|
|
|
|
class ErrorCode(Enum):
|
|
VALIDATION_ERROR = "VALIDATION_ERROR"
|
|
NOT_FOUND = "NOT_FOUND"
|
|
API_ERROR = "API_ERROR"
|
|
TIMEOUT = "TIMEOUT"
|
|
UNKNOWN = "UNKNOWN"
|
|
|
|
def create_error(code: ErrorCode, message: str, details: dict = None):
|
|
"""Create structured error response."""
|
|
return {
|
|
"success": False,
|
|
"error": {
|
|
"code": code.value,
|
|
"message": message,
|
|
"details": details or {},
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
}
|
|
|
|
@mcp.tool()
|
|
async def validated_operation(data: str) -> dict:
|
|
if not data:
|
|
return create_error(
|
|
ErrorCode.VALIDATION_ERROR,
|
|
"Data is required",
|
|
{"field": "data"}
|
|
)
|
|
|
|
try:
|
|
result = await process(data)
|
|
return {"success": True, "data": result}
|
|
except Exception as e:
|
|
return create_error(ErrorCode.UNKNOWN, str(e))
|
|
```
|
|
|
|
## Environment-Based Configuration
|
|
|
|
**Problem:** Different settings for dev/staging/production
|
|
**Solution:** Environment-based configuration class
|
|
|
|
```python
|
|
import os
|
|
from enum import Enum
|
|
|
|
class Environment(Enum):
|
|
DEVELOPMENT = "development"
|
|
STAGING = "staging"
|
|
PRODUCTION = "production"
|
|
|
|
class Config:
|
|
ENV = Environment(os.getenv("ENVIRONMENT", "development"))
|
|
|
|
SETTINGS = {
|
|
Environment.DEVELOPMENT: {
|
|
"debug": True,
|
|
"cache_ttl": 60,
|
|
"log_level": "DEBUG"
|
|
},
|
|
Environment.STAGING: {
|
|
"debug": True,
|
|
"cache_ttl": 300,
|
|
"log_level": "INFO"
|
|
},
|
|
Environment.PRODUCTION: {
|
|
"debug": False,
|
|
"cache_ttl": 3600,
|
|
"log_level": "WARNING"
|
|
}
|
|
}
|
|
|
|
@classmethod
|
|
def get(cls, key: str):
|
|
return cls.SETTINGS[cls.ENV].get(key)
|
|
|
|
# Use configuration
|
|
cache_ttl = Config.get("cache_ttl")
|
|
debug_mode = Config.get("debug")
|
|
```
|
|
|
|
## Health Check Pattern
|
|
|
|
**Problem:** Need to monitor server health in production
|
|
**Solution:** Comprehensive health check resource
|
|
|
|
```python
|
|
@mcp.resource("health://status")
|
|
async def health_check() -> dict:
|
|
"""Comprehensive health check."""
|
|
checks = {}
|
|
|
|
# Check API connectivity
|
|
try:
|
|
client = await APIClient.get_client()
|
|
response = await client.get("/health", timeout=5)
|
|
checks["api"] = response.status_code == 200
|
|
except:
|
|
checks["api"] = False
|
|
|
|
# Check database (if applicable)
|
|
try:
|
|
db = await ResourceManager.get_db()
|
|
await db.execute("SELECT 1")
|
|
checks["database"] = True
|
|
except:
|
|
checks["database"] = False
|
|
|
|
# System resources
|
|
import psutil
|
|
checks["memory_percent"] = psutil.virtual_memory().percent
|
|
checks["cpu_percent"] = psutil.cpu_percent()
|
|
|
|
# Overall status
|
|
all_healthy = (
|
|
checks.get("api", True) and
|
|
checks.get("database", True) and
|
|
checks["memory_percent"] < 90 and
|
|
checks["cpu_percent"] < 90
|
|
)
|
|
|
|
return {
|
|
"status": "healthy" if all_healthy else "degraded",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"checks": checks
|
|
}
|
|
```
|
|
|
|
## Parallel Processing Pattern
|
|
|
|
**Problem:** Sequential processing is slow for batch operations
|
|
**Solution:** Process items in parallel
|
|
|
|
```python
|
|
import asyncio
|
|
|
|
@mcp.tool()
|
|
async def batch_process(items: list[str]) -> dict:
|
|
"""Process multiple items in parallel."""
|
|
async def process_single(item: str):
|
|
try:
|
|
result = await process_item(item)
|
|
return {"item": item, "success": True, "result": result}
|
|
except Exception as e:
|
|
return {"item": item, "success": False, "error": str(e)}
|
|
|
|
# Process all items in parallel
|
|
tasks = [process_single(item) for item in items]
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
successful = [r for r in results if r["success"]]
|
|
failed = [r for r in results if not r["success"]]
|
|
|
|
return {
|
|
"total": len(items),
|
|
"successful": len(successful),
|
|
"failed": len(failed),
|
|
"results": results
|
|
}
|
|
```
|
|
|
|
## State Management Pattern
|
|
|
|
**Problem:** Shared state causes race conditions
|
|
**Solution:** Thread-safe state management with locks
|
|
|
|
```python
|
|
import asyncio
|
|
|
|
class StateManager:
|
|
def __init__(self):
|
|
self._state = {}
|
|
self._locks = {}
|
|
|
|
async def get(self, key: str, default=None):
|
|
return self._state.get(key, default)
|
|
|
|
async def set(self, key: str, value):
|
|
if key not in self._locks:
|
|
self._locks[key] = asyncio.Lock()
|
|
|
|
async with self._locks[key]:
|
|
self._state[key] = value
|
|
|
|
async def update(self, key: str, updater):
|
|
"""Update with function."""
|
|
if key not in self._locks:
|
|
self._locks[key] = asyncio.Lock()
|
|
|
|
async with self._locks[key]:
|
|
current = self._state.get(key)
|
|
self._state[key] = await updater(current)
|
|
return self._state[key]
|
|
|
|
state = StateManager()
|
|
|
|
@mcp.tool()
|
|
async def increment_counter(name: str) -> dict:
|
|
new_value = await state.update(
|
|
f"counter_{name}",
|
|
lambda x: (x or 0) + 1
|
|
)
|
|
return {"counter": name, "value": new_value}
|
|
```
|
|
|
|
## Anti-Patterns to Avoid
|
|
|
|
### ❌ Factory Functions in __init__.py
|
|
|
|
```python
|
|
# DON'T DO THIS
|
|
# shared/__init__.py
|
|
def get_api_client():
|
|
from .api_client import APIClient # Circular import risk
|
|
return APIClient()
|
|
```
|
|
|
|
### ❌ Blocking Operations in Async
|
|
|
|
```python
|
|
# DON'T DO THIS
|
|
@mcp.tool()
|
|
async def bad_async():
|
|
time.sleep(5) # Blocks entire event loop!
|
|
return "done"
|
|
|
|
# DO THIS INSTEAD
|
|
@mcp.tool()
|
|
async def good_async():
|
|
await asyncio.sleep(5)
|
|
return "done"
|
|
```
|
|
|
|
### ❌ Global Mutable State
|
|
|
|
```python
|
|
# DON'T DO THIS
|
|
results = [] # Race conditions!
|
|
|
|
@mcp.tool()
|
|
async def add_result(data: str):
|
|
results.append(data)
|
|
```
|
|
|
|
## Production Deployment Checklist
|
|
|
|
- [ ] Module-level server object
|
|
- [ ] Environment variables for all config
|
|
- [ ] Connection pooling for HTTP clients
|
|
- [ ] Retry logic for transient failures
|
|
- [ ] Caching for expensive operations
|
|
- [ ] Structured error responses
|
|
- [ ] Health check endpoint
|
|
- [ ] Logging configured
|
|
- [ ] No circular imports
|
|
- [ ] No import-time async execution
|
|
- [ ] Rate limiting if needed
|
|
- [ ] Graceful shutdown handling
|
|
|
|
## Resources
|
|
|
|
- **Production Examples**: See `self-contained-server.py` template
|
|
- **Error Handling**: See `error-handling.py` template
|
|
- **API Patterns**: See `api-client-pattern.py` template
|