Initial commit

This commit is contained in:
Zhongwei Li
2025-11-30 08:24:49 +08:00
commit 99a553f8ab
25 changed files with 6408 additions and 0 deletions

View File

@@ -0,0 +1,12 @@
{
"name": "fastmcp",
"description": "Build MCP servers in Python with FastMCP framework to expose tools, resources, and prompts to LLMs. Supports storage backends (memory/disk/Redis), middleware, OAuth Proxy, OpenAPI integration, and FastMCP Cloud deployment. Use when: creating MCP servers, defining tools or resources, implementing OAuth authentication, configuring storage backends for tokens/cache, adding middleware for logging/rate limiting, deploying to FastMCP Cloud, or troubleshooting module-level server, storage, lifespan, mi",
"version": "1.0.0",
"author": {
"name": "Jeremy Dawes",
"email": "jeremy@jezweb.net"
},
"skills": [
"./"
]
}

3
README.md Normal file
View File

@@ -0,0 +1,3 @@
# fastmcp
Build MCP servers in Python with FastMCP framework to expose tools, resources, and prompts to LLMs. Supports storage backends (memory/disk/Redis), middleware, OAuth Proxy, OpenAPI integration, and FastMCP Cloud deployment. Use when: creating MCP servers, defining tools or resources, implementing OAuth authentication, configuring storage backends for tokens/cache, adding middleware for logging/rate limiting, deploying to FastMCP Cloud, or troubleshooting module-level server, storage, lifespan, mi

574
SKILL.md Normal file
View File

@@ -0,0 +1,574 @@
---
name: fastmcp
description: |
Build MCP servers in Python with FastMCP framework to expose tools, resources, and prompts to LLMs. Supports
storage backends (memory/disk/Redis), middleware, OAuth Proxy, OpenAPI integration, and FastMCP Cloud deployment.
Use when: creating MCP servers, defining tools or resources, implementing OAuth authentication, configuring
storage backends for tokens/cache, adding middleware for logging/rate limiting, deploying to FastMCP Cloud,
or troubleshooting module-level server, storage, lifespan, middleware order, circular imports, or OAuth errors.
Keywords: FastMCP, MCP server Python, Model Context Protocol Python, fastmcp framework, mcp tools, mcp resources, mcp prompts, fastmcp storage, fastmcp memory storage, fastmcp disk storage, fastmcp redis, fastmcp dynamodb, fastmcp lifespan, fastmcp middleware, fastmcp oauth proxy, server composition mcp, fastmcp import, fastmcp mount, fastmcp cloud, fastmcp deployment, mcp authentication, fastmcp icons, openapi mcp, claude mcp server, fastmcp testing, storage misconfiguration, lifespan issues, middleware order, circular imports, module-level server, async await mcp
license: MIT
metadata:
version: "2.0.0"
package_version: "fastmcp>=2.13.1"
python_version: ">=3.10"
token_savings: "90-95%"
errors_prevented: 25
production_tested: true
last_updated: "2025-11-25"
---
# FastMCP - Build MCP Servers in Python
FastMCP is a Python framework for building Model Context Protocol (MCP) servers that expose tools, resources, and prompts to Large Language Models like Claude. This skill provides production-tested patterns, error prevention, and deployment strategies for building robust MCP servers.
## Quick Start
### Installation
```bash
pip install fastmcp
# or
uv pip install fastmcp
```
### Minimal Server
```python
from fastmcp import FastMCP
# MUST be at module level for FastMCP Cloud
mcp = FastMCP("My Server")
@mcp.tool()
async def hello(name: str) -> str:
"""Say hello to someone."""
return f"Hello, {name}!"
if __name__ == "__main__":
mcp.run()
```
**Run it:**
```bash
# Local development
python server.py
# With FastMCP CLI
fastmcp dev server.py
# HTTP mode
python server.py --transport http --port 8000
```
## What's New in v2.13.1 (November 2025)
**Meta Parameter Support:**
- ToolResult can return metadata alongside results (enables OpenAI Apps SDK integration)
- Client-sent meta parameters now supported
**Authentication Improvements:**
- `DebugTokenVerifier` for custom token validation during development
- OCI (Oracle Cloud Infrastructure) authentication provider added
- Enhanced OAuth error handling and messaging
- Improved CSP policies for OAuth consent screens
**Utilities & Developer Experience:**
- `Image.to_data_uri()` method added for easier icon embedding
- Manual Client initialization control (defer connection until needed)
- 20+ bug fixes: URL encoding in Cursor deeplinks, OAuth metadata endpoint handling, Windows test timeouts, token cache expiration
**Security Update:**
- **CVE-2025-61920**: authlib updated to 1.6.5
- Safer Windows API validation for Cursor deeplink URLs
**Known Compatibility:**
- MCP SDK 1.21.1 excluded due to integration test failures (use 1.21.0 or 1.22.0+)
---
## Core Concepts
### Tools
Functions LLMs can call. Best practices: Clear names, comprehensive docstrings (LLMs read these!), strong type hints (Pydantic validates), structured returns, error handling.
```python
@mcp.tool()
async def async_tool(url: str) -> dict: # Use async for I/O
async with httpx.AsyncClient() as client:
return (await client.get(url)).json()
```
### Resources
Expose data to LLMs. URI schemes: `data://`, `file://`, `resource://`, `info://`, `api://`, or custom.
```python
@mcp.resource("user://{user_id}/profile") # Template with parameters
async def get_user(user_id: str) -> dict: # CRITICAL: param names must match
return await fetch_user_from_db(user_id)
```
### Prompts
Pre-configured prompts with parameters.
```python
@mcp.prompt("analyze")
def analyze_prompt(topic: str) -> str:
return f"Analyze {topic} considering: state, challenges, opportunities, recommendations."
```
## Context Features
Inject `Context` parameter (with type hint!) for advanced features:
**Elicitation (User Input):**
```python
from fastmcp import Context
@mcp.tool()
async def confirm_action(action: str, context: Context) -> dict:
confirmed = await context.request_elicitation(prompt=f"Confirm {action}?", response_type=str)
return {"status": "completed" if confirmed.lower() == "yes" else "cancelled"}
```
**Progress Tracking:**
```python
@mcp.tool()
async def batch_import(file_path: str, context: Context) -> dict:
data = await read_file(file_path)
for i, item in enumerate(data):
await context.report_progress(i + 1, len(data), f"Importing {i + 1}/{len(data)}")
await import_item(item)
return {"imported": len(data)}
```
**Sampling (LLM calls from tools):**
```python
@mcp.tool()
async def enhance_text(text: str, context: Context) -> str:
response = await context.request_sampling(
messages=[{"role": "user", "content": f"Enhance: {text}"}],
temperature=0.7
)
return response["content"]
```
## Storage Backends
Built on `py-key-value-aio` for OAuth tokens, response caching, persistent state.
**Available Backends:**
- **Memory** (default): Ephemeral, fast, dev-only
- **Disk**: Persistent, encrypted with `FernetEncryptionWrapper`, platform-aware (Mac/Windows default)
- **Redis**: Distributed, production, multi-instance
- **Others**: DynamoDB, MongoDB, Elasticsearch, Memcached, RocksDB, Valkey
**Basic Usage:**
```python
from key_value.stores import DiskStore, RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
# Disk (persistent, single instance)
mcp = FastMCP("Server", storage=DiskStore(path="/app/data/storage"))
# Redis (distributed, production)
mcp = FastMCP("Server", storage=RedisStore(
host=os.getenv("REDIS_HOST"), password=os.getenv("REDIS_PASSWORD")
))
# Encrypted storage (recommended)
mcp = FastMCP("Server", storage=FernetEncryptionWrapper(
key_value=DiskStore(path="/app/data"),
fernet=Fernet(os.getenv("STORAGE_ENCRYPTION_KEY"))
))
```
**Platform Defaults:** Mac/Windows use Disk, Linux uses Memory. Override with `storage` parameter.
## Server Lifespans
**⚠️ Breaking Change in v2.13.0**: Lifespan behavior changed from per-session to per-server-instance.
Initialize/cleanup resources once per server (NOT per session) - critical for DB connections, API clients.
```python
from contextlib import asynccontextmanager
from dataclasses import dataclass
@dataclass
class AppContext:
db: Database
api_client: httpx.AsyncClient
@asynccontextmanager
async def app_lifespan(server: FastMCP):
"""Runs ONCE per server instance."""
db = await Database.connect(os.getenv("DATABASE_URL"))
api_client = httpx.AsyncClient(base_url=os.getenv("API_BASE_URL"), timeout=30.0)
try:
yield AppContext(db=db, api_client=api_client)
finally:
await db.disconnect()
await api_client.aclose()
mcp = FastMCP("Server", lifespan=app_lifespan)
# Access in tools
@mcp.tool()
async def query_db(sql: str, context: Context) -> list:
app_ctx = context.fastmcp_context.lifespan_context
return await app_ctx.db.query(sql)
```
**ASGI Integration (FastAPI/Starlette):**
```python
mcp = FastMCP("Server", lifespan=mcp_lifespan)
app = FastAPI(lifespan=mcp.lifespan) # ✅ MUST pass lifespan!
```
**State Management:**
```python
context.fastmcp_context.set_state(key, value) # Store
context.fastmcp_context.get_state(key, default=None) # Retrieve
```
## Middleware System
**8 Built-in Types:** TimingMiddleware, ResponseCachingMiddleware, LoggingMiddleware, RateLimitingMiddleware, ErrorHandlingMiddleware, ToolInjectionMiddleware, PromptToolMiddleware, ResourceToolMiddleware
**Execution Order (order matters!):**
```
Request Flow:
→ ErrorHandlingMiddleware (catches errors)
→ TimingMiddleware (starts timer)
→ LoggingMiddleware (logs request)
→ RateLimitingMiddleware (checks rate limit)
→ ResponseCachingMiddleware (checks cache)
→ Tool/Resource Handler
```
**Basic Usage:**
```python
from fastmcp.middleware import ErrorHandlingMiddleware, TimingMiddleware, LoggingMiddleware
mcp.add_middleware(ErrorHandlingMiddleware()) # First: catch errors
mcp.add_middleware(TimingMiddleware()) # Second: time requests
mcp.add_middleware(LoggingMiddleware(level="INFO"))
mcp.add_middleware(RateLimitingMiddleware(max_requests=100, window_seconds=60))
mcp.add_middleware(ResponseCachingMiddleware(ttl_seconds=300, storage=RedisStore()))
```
**Custom Middleware:**
```python
from fastmcp.middleware import BaseMiddleware
class AccessControlMiddleware(BaseMiddleware):
async def on_call_tool(self, tool_name, arguments, context):
user = context.fastmcp_context.get_state("user_id")
if user not in self.allowed_users:
raise PermissionError(f"User not authorized")
return await self.next(tool_name, arguments, context)
```
**Hook Hierarchy:** `on_message` (all) → `on_request`/`on_notification``on_call_tool`/`on_read_resource`/`on_get_prompt``on_list_*` (list operations)
## Server Composition
**Two Strategies:**
1. **`import_server()`** - Static snapshot: One-time copy at import, changes don't propagate, fast (no runtime delegation). Use for: Finalized component bundles.
2. **`mount()`** - Dynamic link: Live runtime link, changes immediately visible, runtime delegation (slower). Use for: Modular runtime composition.
**Basic Usage:**
```python
# Import (static)
main_server.import_server(api_server) # One-time copy
# Mount (dynamic)
main_server.mount(api_server, prefix="api") # Tools: api.fetch_data
main_server.mount(db_server, prefix="db") # Resources: resource://db/path
```
**Tag Filtering:**
```python
@api_server.tool(tags=["public"])
def public_api(): pass
main_server.import_server(api_server, include_tags=["public"]) # Only public
main_server.mount(api_server, prefix="api", exclude_tags=["admin"]) # No admin
```
**Resource Prefix Formats:**
- **Path** (default since v2.4.0): `resource://prefix/path`
- **Protocol** (legacy): `prefix+resource://path`
```python
main_server.mount(subserver, prefix="api", resource_prefix_format="path")
```
## OAuth & Authentication
**4 Authentication Patterns:**
1. **Token Validation** (`JWTVerifier`): Validate external tokens
2. **External Identity Providers** (`RemoteAuthProvider`): OAuth 2.0/OIDC with DCR
3. **OAuth Proxy** (`OAuthProxy`): Bridge to providers without DCR (GitHub, Google, Azure, AWS, Discord, Facebook)
4. **Full OAuth** (`OAuthProvider`): Complete authorization server
**Pattern 1: Token Validation**
```python
from fastmcp.auth import JWTVerifier
auth = JWTVerifier(issuer="https://auth.example.com", audience="my-server",
public_key=os.getenv("JWT_PUBLIC_KEY"))
mcp = FastMCP("Server", auth=auth)
```
**Pattern 3: OAuth Proxy (Production)**
```python
from fastmcp.auth import OAuthProxy
from key_value.stores import RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
auth = OAuthProxy(
jwt_signing_key=os.environ["JWT_SIGNING_KEY"],
client_storage=FernetEncryptionWrapper(
key_value=RedisStore(host=os.getenv("REDIS_HOST"), password=os.getenv("REDIS_PASSWORD")),
fernet=Fernet(os.environ["STORAGE_ENCRYPTION_KEY"])
),
upstream_authorization_endpoint="https://github.com/login/oauth/authorize",
upstream_token_endpoint="https://github.com/login/oauth/access_token",
upstream_client_id=os.getenv("GITHUB_CLIENT_ID"),
upstream_client_secret=os.getenv("GITHUB_CLIENT_SECRET"),
enable_consent_screen=True # CRITICAL: Prevents confused deputy attacks
)
mcp = FastMCP("GitHub Auth", auth=auth)
```
**OAuth Proxy Features:** Token factory pattern (issues own JWTs), consent screens (prevents bypass), PKCE support, RFC 7662 token introspection
**Supported Providers:** GitHub, Google, Azure, AWS Cognito, Discord, Facebook, WorkOS, AuthKit, Descope, Scalekit, OCI (v2.13.1)
## Icons, API Integration, Cloud Deployment
**Icons:** Add to servers, tools, resources, prompts. Use `Icon(url, size)`, data URIs via `Icon.from_file()` or `Image.to_data_uri()` (v2.13.1).
**API Integration (3 Patterns):**
1. **Manual**: `httpx.AsyncClient` with base_url/headers/timeout
2. **OpenAPI Auto-Gen**: `FastMCP.from_openapi(spec, client, route_maps)` - GET→Resources/Templates, POST/PUT/DELETE→Tools
3. **FastAPI Conversion**: `FastMCP.from_fastapi(app, httpx_client_kwargs)`
**Cloud Deployment Critical Requirements:**
1.**Module-level server** named `mcp`, `server`, or `app`
2. **PyPI dependencies only** in requirements.txt
3. **Public GitHub repo** (or accessible)
4. **Environment variables** for config
```python
# ✅ CORRECT: Module-level export
mcp = FastMCP("server") # At module level!
# ❌ WRONG: Function-wrapped
def create_server():
return FastMCP("server") # Too late for cloud!
```
**Deployment:** https://fastmcp.cloud → Sign in → Create Project → Select repo → Deploy
**Client Config (Claude Desktop):**
```json
{"mcpServers": {"my-server": {"url": "https://project.fastmcp.app/mcp", "transport": "http"}}}
```
## 25 Common Errors (With Solutions)
### Error 1: Missing Server Object
**Error:** `RuntimeError: No server object found at module level`
**Cause:** Server not exported at module level (FastMCP Cloud requirement)
**Solution:** `mcp = FastMCP("server")` at module level, not inside functions
### Error 2: Async/Await Confusion
**Error:** `RuntimeError: no running event loop`, `TypeError: object coroutine can't be used in 'await'`
**Cause:** Mixing sync/async incorrectly
**Solution:** Use `async def` for tools with `await`, sync `def` for non-async code
### Error 3: Context Not Injected
**Error:** `TypeError: missing 1 required positional argument: 'context'`
**Cause:** Missing `Context` type annotation
**Solution:** `async def tool(context: Context)` - type hint required!
### Error 4: Resource URI Syntax
**Error:** `ValueError: Invalid resource URI: missing scheme`
**Cause:** Resource URI missing scheme prefix
**Solution:** Use `@mcp.resource("data://config")` not `@mcp.resource("config")`
### Error 5: Resource Template Parameter Mismatch
**Error:** `TypeError: get_user() missing 1 required positional argument`
**Cause:** Function parameter names don't match URI template
**Solution:** `@mcp.resource("user://{user_id}/profile")``def get_user(user_id: str)` - names must match exactly
---
### Error 6: Pydantic Validation Error
**Error:** `ValidationError: value is not a valid integer`
**Cause:** Type hints don't match provided data
**Solution:** Use Pydantic models: `class Params(BaseModel): query: str = Field(min_length=1)`
### Error 7: Transport/Protocol Mismatch
**Error:** `ConnectionError: Server using different transport`
**Cause:** Client and server using incompatible transports
**Solution:** Match transports - stdio: `mcp.run()` + `{"command": "python", "args": ["server.py"]}`, HTTP: `mcp.run(transport="http", port=8000)` + `{"url": "http://localhost:8000/mcp", "transport": "http"}`
### Error 8: Import Errors (Editable Package)
**Error:** `ModuleNotFoundError: No module named 'my_package'`
**Cause:** Package not properly installed
**Solution:** `pip install -e .` or use absolute imports or `export PYTHONPATH="/path/to/project"`
### Error 9: Deprecation Warnings
**Error:** `DeprecationWarning: 'mcp.settings' is deprecated`
**Cause:** Using old FastMCP v1 API
**Solution:** Use `os.getenv("API_KEY")` instead of `mcp.settings.get("API_KEY")`
### Error 10: Port Already in Use
**Error:** `OSError: [Errno 48] Address already in use`
**Cause:** Port 8000 already occupied
**Solution:** Use different port `--port 8001` or kill process `lsof -ti:8000 | xargs kill -9`
### Error 11: Schema Generation Failures
**Error:** `TypeError: Object of type 'ndarray' is not JSON serializable`
**Cause:** Unsupported type hints (NumPy arrays, custom classes)
**Solution:** Return JSON-compatible types: `list[float]` or convert: `{"values": np_array.tolist()}`
### Error 12: JSON Serialization
**Error:** `TypeError: Object of type 'datetime' is not JSON serializable`
**Cause:** Returning non-JSON-serializable objects
**Solution:** Convert: `datetime.now().isoformat()`, bytes: `.decode('utf-8')`
### Error 13: Circular Import Errors
**Error:** `ImportError: cannot import name 'X' from partially initialized module`
**Cause:** Circular dependency (common in cloud deployment)
**Solution:** Use direct imports in `__init__.py`: `from .api_client import APIClient` or lazy imports in functions
### Error 14: Python Version Compatibility
**Error:** `DeprecationWarning: datetime.utcnow() is deprecated`
**Cause:** Using deprecated Python 3.12+ methods
**Solution:** Use `datetime.now(timezone.utc)` instead of `datetime.utcnow()`
### Error 15: Import-Time Execution
**Error:** `RuntimeError: Event loop is closed`
**Cause:** Creating async resources at module import time
**Solution:** Use lazy initialization - create connection class with async `connect()` method, call when needed in tools
---
### Error 16: Storage Backend Not Configured
**Error:** `RuntimeError: OAuth tokens lost on restart`, `ValueError: Cache not persisting`
**Cause:** Using default memory storage in production without persistence
**Solution:** Use encrypted DiskStore (single instance) or RedisStore (multi-instance) with `FernetEncryptionWrapper`
### Error 17: Lifespan Not Passed to ASGI App
**Error:** `RuntimeError: Database connection never initialized`, `Warning: MCP lifespan hooks not running`
**Cause:** FastMCP with FastAPI/Starlette without passing lifespan (v2.13.0 requirement)
**Solution:** `app = FastAPI(lifespan=mcp.lifespan)` - MUST pass lifespan!
### Error 18: Middleware Execution Order Error
**Error:** `RuntimeError: Rate limit not checked before caching`
**Cause:** Incorrect middleware ordering (order matters!)
**Solution:** ErrorHandling → Timing → Logging → RateLimiting → ResponseCaching (this order)
### Error 19: Circular Middleware Dependencies
**Error:** `RecursionError: maximum recursion depth exceeded`
**Cause:** Middleware not calling `self.next()` or calling incorrectly
**Solution:** Always call `result = await self.next(tool_name, arguments, context)` in middleware hooks
### Error 20: Import vs Mount Confusion
**Error:** `RuntimeError: Subserver changes not reflected`, `ValueError: Unexpected tool namespacing`
**Cause:** Using `import_server()` when `mount()` was needed (or vice versa)
**Solution:** `import_server()` for static bundles (one-time copy), `mount()` for dynamic composition (live link)
### Error 21: Resource Prefix Format Mismatch
**Error:** `ValueError: Resource not found: resource://api/users`
**Cause:** Using wrong resource prefix format
**Solution:** Path format (default v2.4.0+): `resource://prefix/path`, Protocol (legacy): `prefix+resource://path` - set with `resource_prefix_format="path"`
### Error 22: OAuth Proxy Without Consent Screen
**Error:** `SecurityWarning: Authorization bypass possible`
**Cause:** OAuth Proxy without consent screen (security vulnerability)
**Solution:** Always set `enable_consent_screen=True` - prevents confused deputy attacks (CRITICAL)
### Error 23: Missing JWT Signing Key in Production
**Error:** `ValueError: JWT signing key required for OAuth Proxy`
**Cause:** OAuth Proxy missing `jwt_signing_key`
**Solution:** Generate: `secrets.token_urlsafe(32)`, store in `FASTMCP_JWT_SIGNING_KEY` env var, pass to `OAuthProxy(jwt_signing_key=...)`
### Error 24: Icon Data URI Format Error
**Error:** `ValueError: Invalid data URI format`
**Cause:** Incorrectly formatted data URI for icons
**Solution:** Use `Icon.from_file("/path/icon.png", size="medium")` or `Image.to_data_uri()` (v2.13.1) - don't manually format
### Error 25: Lifespan Behavior Change (v2.13.0)
**Error:** `Warning: Lifespan runs per-server, not per-session`
**Cause:** Expecting v2.12 behavior (per-session) in v2.13.0+ (per-server)
**Solution:** v2.13.0+ lifespans run ONCE per server, not per session - use middleware for per-session logic
---
## Production Patterns, Testing, CLI
**4 Production Patterns:**
1. **Utils Module**: Single `utils.py` with Config class, format_success/error helpers
2. **Connection Pooling**: Singleton `httpx.AsyncClient` with `get_client()` class method
3. **Retry with Backoff**: `retry_with_backoff(func, max_retries=3, initial_delay=1.0, exponential_base=2.0)`
4. **Time-Based Caching**: `TimeBasedCache(ttl=300)` with `.get()` and `.set()` methods
**Testing:**
- Unit: `pytest` + `create_test_client(test_server)` + `await client.call_tool()`
- Integration: `Client("server.py")` + `list_tools()` + `call_tool()` + `list_resources()`
**CLI Commands:**
```bash
fastmcp dev server.py # Run with inspector
fastmcp install server.py # Install to Claude Desktop
FASTMCP_LOG_LEVEL=DEBUG fastmcp dev # Debug logging
```
**Best Practices:** Factory pattern with module-level export, environment config with validation, comprehensive docstrings (LLMs read these!), health check resources
**Project Structure:**
- Simple: `server.py`, `requirements.txt`, `.env`, `README.md`
- Production: `src/` (server.py, utils.py, tools/, resources/, prompts/), `tests/`, `pyproject.toml`
---
## References & Summary
**Official:** https://github.com/jlowin/fastmcp, https://fastmcp.cloud, https://modelcontextprotocol.io, Context7: `/jlowin/fastmcp`
**Related Skills:** openai-api, claude-api, cloudflare-worker-base
**Package Versions:** fastmcp>=2.13.1, Python>=3.10, httpx, pydantic, py-key-value-aio, cryptography
**15 Key Takeaways:**
1. Module-level server export (FastMCP Cloud)
2. Persistent storage (Disk/Redis) for OAuth/caching
3. Server lifespans for resource management
4. Middleware order: errors → timing → logging → rate limiting → caching
5. Composition: `import_server()` (static) vs `mount()` (dynamic)
6. OAuth security: consent screens + encrypted storage + JWT signing
7. Async/await properly (don't block event loop)
8. Structured error handling
9. Avoid circular imports
10. Test locally (`fastmcp dev`)
11. Environment variables (never hardcode secrets)
12. Comprehensive docstrings (LLMs read!)
13. Production patterns (utils, pooling, retry, caching)
14. OpenAPI auto-generation
15. Health checks + monitoring
**Production Readiness:** Encrypted storage, 4 auth patterns, 8 middleware types, modular composition, OAuth security (consent screens, PKCE, RFC 7662), response caching, connection pooling, timing middleware
**Prevents 25 errors. 90-95% token savings.**

129
plugin.lock.json Normal file
View File

@@ -0,0 +1,129 @@
{
"$schema": "internal://schemas/plugin.lock.v1.json",
"pluginId": "gh:jezweb/claude-skills:skills/fastmcp",
"normalized": {
"repo": null,
"ref": "refs/tags/v20251128.0",
"commit": "97aca21de1fa52f62b8a5e0ce7433d489a0ebd99",
"treeHash": "9efddbdaf38c9c2cd990fbfdbfcbfd5acf1ac3c8f580a51e87afaddcca43a033",
"generatedAt": "2025-11-28T10:19:05.064195Z",
"toolVersion": "publish_plugins.py@0.2.0"
},
"origin": {
"remote": "git@github.com:zhongweili/42plugin-data.git",
"branch": "master",
"commit": "aa1497ed0949fd50e99e70d6324a29c5b34f9390",
"repoRoot": "/Users/zhongweili/projects/openmind/42plugin-data"
},
"manifest": {
"name": "fastmcp",
"description": "Build MCP servers in Python with FastMCP framework to expose tools, resources, and prompts to LLMs. Supports storage backends (memory/disk/Redis), middleware, OAuth Proxy, OpenAPI integration, and FastMCP Cloud deployment. Use when: creating MCP servers, defining tools or resources, implementing OAuth authentication, configuring storage backends for tokens/cache, adding middleware for logging/rate limiting, deploying to FastMCP Cloud, or troubleshooting module-level server, storage, lifespan, mi",
"version": "1.0.0"
},
"content": {
"files": [
{
"path": "README.md",
"sha256": "47b2dd2996611a0f8f0f62d23011e5a827ff98ba67c9e6b8505417f77b1db58b"
},
{
"path": "SKILL.md",
"sha256": "8c7db20de376f872edc104dbca16950c6ff473b9e039ace247b2fab316cd839d"
},
{
"path": "references/cloud-deployment.md",
"sha256": "b82de5d1fd772cbb57d71397baec3caee6a6bac0bd3c7678cab065eefe53877d"
},
{
"path": "references/context-features.md",
"sha256": "5d1bde1e2f45647ad54e06e3e356d0fddb5b9d449efa6b31fe813a02280f778c"
},
{
"path": "references/cli-commands.md",
"sha256": "92e8cda170b9666b065ee8e29c3df1138ff24e8f719093f226d59f1f420844aa"
},
{
"path": "references/integration-patterns.md",
"sha256": "4cc6b8a207a86b82777855c0e147cd9ef7d3e1110afcae1cf3f55185b65dd75c"
},
{
"path": "references/production-patterns.md",
"sha256": "f01f284f95dc0701918caa656d5d099b0085d9c889764f6d72fddd15018306b4"
},
{
"path": "references/common-errors.md",
"sha256": "90e5314b3125eb305e11510f9a6f2cfccc99d19d386b7004a7eda215e646bb04"
},
{
"path": "scripts/test-server.sh",
"sha256": "20f71294a1fb4a4da7f0f79dd6301bd0c264dab002d121aaad8d8860fe2aa86d"
},
{
"path": "scripts/check-versions.sh",
"sha256": "f2ebc6a7f9650ee1420e98ce72c43629da1b6f69f3f9f0e28940e66bc896a38f"
},
{
"path": "scripts/deploy-cloud.sh",
"sha256": "e8a2c40353343f301762b7c095a809223a0163cdd42d82f447c543a2ef258c94"
},
{
"path": ".claude-plugin/plugin.json",
"sha256": "c89a65c42c2fa1a363effb24e8d548514cca893f74838c361626f60c6b105d22"
},
{
"path": "templates/client-example.py",
"sha256": "c41a43876ee5c998f99e2392e36a0c47b40ed22ffcea1adf9b12468a21b97987"
},
{
"path": "templates/resources-examples.py",
"sha256": "db8a6bade0f71c65d4dc9c64f811854c8c2356ffcb9773cc08e3810854361775"
},
{
"path": "templates/prompts-examples.py",
"sha256": "c2f351a7e419e0c13bd407253e88940c601103a6b54253dbe14ec3e89fa7cb31"
},
{
"path": "templates/requirements.txt",
"sha256": "258fb87b46abe5fcdf062010618a4d513c8038f8d3a079ef08aa6fe777ff143d"
},
{
"path": "templates/pyproject.toml",
"sha256": "e78ad123f984a1364036170c2207b665e14c61750cc7abe5fd2ab853cab69a7a"
},
{
"path": "templates/error-handling.py",
"sha256": "728cb26c2c9d00bacc4e3f55eb21c59555eb58ca60c3ae05c435809a05fc1a18"
},
{
"path": "templates/openapi-integration.py",
"sha256": "ea9957f4369da85a751319cff998d15fa597a04397844eff418d57f738e1904f"
},
{
"path": "templates/tools-examples.py",
"sha256": "ddc370c3c370e30eac9b47536493dd9fbee456494c2d5c64bef605c750620c15"
},
{
"path": "templates/self-contained-server.py",
"sha256": "0a5f294560f21e72a0b9f547d00e89c298613b2c1eb2f48140e5b54c66247eb2"
},
{
"path": "templates/basic-server.py",
"sha256": "00c8fa8522e7bf656528a65edbdbfe8c75dd18af452f89ef9106d0e509f21ca9"
},
{
"path": "templates/.env.example",
"sha256": "02acb65832c53e688bd2d9d1b6f561c6b7a44241ae40753560fe6b5aedf12888"
},
{
"path": "templates/api-client-pattern.py",
"sha256": "5a0693d3506123380af137fe00e370c14cfe67c3aa6304faa712165967989010"
}
],
"dirSha256": "9efddbdaf38c9c2cd990fbfdbfcbfd5acf1ac3c8f580a51e87afaddcca43a033"
},
"security": {
"scannedAt": null,
"scannerVersion": null,
"flags": []
}
}

417
references/cli-commands.md Normal file
View File

@@ -0,0 +1,417 @@
# FastMCP CLI Commands Reference
Complete reference for FastMCP command-line interface.
## Installation
```bash
# Install FastMCP
pip install fastmcp
# or with uv
uv pip install fastmcp
# Check version
fastmcp --version
```
## Development Commands
### `fastmcp dev`
Run server with inspector interface (recommended for development).
```bash
# Basic usage
fastmcp dev server.py
# With options
fastmcp dev server.py --port 8000
# Enable debug logging
FASTMCP_LOG_LEVEL=DEBUG fastmcp dev server.py
```
**Features:**
- Interactive inspector UI
- Hot reload on file changes
- Detailed logging
- Tool/resource inspection
### `fastmcp run`
Run server normally (production-like).
```bash
# stdio transport (default)
fastmcp run server.py
# HTTP transport
fastmcp run server.py --transport http --port 8000
# SSE transport
fastmcp run server.py --transport sse
```
**Options:**
- `--transport`: `stdio`, `http`, or `sse`
- `--port`: Port number (for HTTP/SSE)
- `--host`: Host address (default: 127.0.0.1)
### `fastmcp inspect`
Inspect server without running it.
```bash
# Inspect tools and resources
fastmcp inspect server.py
# Output as JSON
fastmcp inspect server.py --json
# Show detailed information
fastmcp inspect server.py --verbose
```
**Output includes:**
- List of tools
- List of resources
- List of prompts
- Configuration details
## Installation Commands
### `fastmcp install`
Install server to Claude Desktop.
```bash
# Basic installation
fastmcp install server.py
# With custom name
fastmcp install server.py --name "My Server"
# Specify config location
fastmcp install server.py --config-path ~/.config/Claude/claude_desktop_config.json
```
**What it does:**
- Adds server to Claude Desktop configuration
- Sets up proper command and arguments
- Validates server before installing
### Claude Desktop Configuration
Manual configuration (if not using `fastmcp install`):
```json
{
"mcpServers": {
"my-server": {
"command": "python",
"args": ["/absolute/path/to/server.py"],
"env": {
"API_KEY": "your-key"
}
}
}
}
```
**Config locations:**
- **macOS**: `~/Library/Application Support/Claude/claude_desktop_config.json`
- **Linux**: `~/.config/Claude/claude_desktop_config.json`
- **Windows**: `%APPDATA%\Claude\claude_desktop_config.json`
## Python Direct Execution
### Run with Python
```bash
# stdio (default)
python server.py
# HTTP transport
python server.py --transport http --port 8000
# With arguments
python server.py --transport http --port 8000 --host 0.0.0.0
```
### Custom Argument Parsing
```python
# server.py
if __name__ == "__main__":
import sys
# Parse custom arguments
if "--test" in sys.argv:
run_tests()
elif "--migrate" in sys.argv:
run_migrations()
else:
mcp.run()
```
## Environment Variables
### FastMCP-Specific Variables
```bash
# Logging
export FASTMCP_LOG_LEVEL=DEBUG # DEBUG, INFO, WARNING, ERROR
export FASTMCP_LOG_FILE=/path/to/log.txt
# Environment
export FASTMCP_ENV=production # development, staging, production
# Custom variables (your server)
export API_KEY=your-key
export DATABASE_URL=postgres://...
```
### Using with Commands
```bash
# Inline environment variables
API_KEY=test fastmcp dev server.py
# From .env file
set -a && source .env && set +a && fastmcp dev server.py
```
## Testing Commands
### Run Tests with Client
```python
# test.py
import asyncio
from fastmcp import Client
async def test():
async with Client("server.py") as client:
tools = await client.list_tools()
print(f"Tools: {[t.name for t in tools]}")
asyncio.run(test())
```
```bash
# Run tests
python test.py
```
### Integration Testing
```bash
# Start server in background
fastmcp run server.py --transport http --port 8000 &
SERVER_PID=$!
# Run tests
pytest tests/
# Kill server
kill $SERVER_PID
```
## Debugging Commands
### Enable Debug Logging
```bash
# Full debug output
FASTMCP_LOG_LEVEL=DEBUG fastmcp dev server.py
# Python logging
PYTHONVERBOSE=1 fastmcp dev server.py
# Trace imports
PYTHONPATH=. python -v server.py
```
### Check Python Environment
```bash
# Check Python version
python --version
# Check installed packages
pip list | grep fastmcp
# Check import paths
python -c "import sys; print('\n'.join(sys.path))"
```
### Validate Server
```bash
# Check syntax
python -m py_compile server.py
# Check imports
python -c "import server; print('OK')"
# Inspect structure
fastmcp inspect server.py --verbose
```
## Deployment Commands
### Prepare for Deployment
```bash
# Freeze dependencies
pip freeze > requirements.txt
# Clean specific to FastMCP
echo "fastmcp>=2.12.0" > requirements.txt
echo "httpx>=0.27.0" >> requirements.txt
# Test with clean environment
python -m venv test_env
source test_env/bin/activate
pip install -r requirements.txt
python server.py
```
### Git Commands for Deployment
```bash
# Prepare for cloud deployment
git add server.py requirements.txt
git commit -m "Prepare for deployment"
# Create GitHub repo
gh repo create my-mcp-server --public
# Push
git push -u origin main
```
## Performance Commands
### Profiling
```bash
# Profile with cProfile
python -m cProfile -o profile.stats server.py
# Analyze profile
python -m pstats profile.stats
```
### Memory Profiling
```bash
# Install memory_profiler
pip install memory_profiler
# Run with memory profiling
python -m memory_profiler server.py
```
## Batch Operations
### Multiple Servers
```bash
# Start multiple servers
fastmcp run server1.py --port 8000 &
fastmcp run server2.py --port 8001 &
fastmcp run server3.py --port 8002 &
# Kill all
killall -9 python
```
### Process Management
```bash
# Use screen/tmux for persistent sessions
screen -S fastmcp
fastmcp dev server.py
# Detach: Ctrl+A, D
# Reattach
screen -r fastmcp
```
## Common Command Patterns
### Local Development
```bash
# Quick iteration cycle
fastmcp dev server.py # Edit, save, auto-reload
```
### Testing with HTTP Client
```bash
# Start HTTP server
fastmcp run server.py --transport http --port 8000
# Test with curl
curl -X POST http://localhost:8000/mcp \
-H "Content-Type: application/json" \
-d '{"method": "tools/list"}'
```
### Production-like Testing
```bash
# Set production environment
export ENVIRONMENT=production
export FASTMCP_LOG_LEVEL=WARNING
# Run
fastmcp run server.py
```
## Troubleshooting Commands
### Server Won't Start
```bash
# Check for syntax errors
python -m py_compile server.py
# Check for missing dependencies
pip check
# Verify FastMCP installation
python -c "import fastmcp; print(fastmcp.__version__)"
```
### Port Already in Use
```bash
# Find process using port
lsof -i :8000
# Kill process
lsof -ti:8000 | xargs kill -9
# Use different port
fastmcp run server.py --port 8001
```
### Permission Issues
```bash
# Make server executable
chmod +x server.py
# Fix Python path
export PYTHONPATH="${PYTHONPATH}:$(pwd)"
```
## Resources
- **FastMCP CLI Docs**: https://github.com/jlowin/fastmcp#cli
- **MCP Protocol**: https://modelcontextprotocol.io
- **Context7**: `/jlowin/fastmcp`

View File

@@ -0,0 +1,309 @@
# FastMCP Cloud Deployment Guide
Complete guide for deploying FastMCP servers to FastMCP Cloud.
## Critical Requirements
**❗️ MUST HAVE** for FastMCP Cloud:
1. **Module-level server object** named `mcp`, `server`, or `app`
2. **PyPI dependencies only** in `requirements.txt`
3. **Public GitHub repository** (or accessible to FastMCP Cloud)
4. **Environment variables** for configuration (no hardcoded secrets)
## Cloud-Ready Server Pattern
```python
# server.py
from fastmcp import FastMCP
import os
# ✅ CORRECT: Module-level export
mcp = FastMCP("production-server")
# ✅ Use environment variables
API_KEY = os.getenv("API_KEY")
DATABASE_URL = os.getenv("DATABASE_URL")
@mcp.tool()
async def production_tool(data: str) -> dict:
if not API_KEY:
return {"error": "API_KEY not configured"}
return {"status": "success", "data": data}
if __name__ == "__main__":
mcp.run()
```
## Common Anti-Patterns
### ❌ WRONG: Function-Wrapped Server
```python
def create_server():
mcp = FastMCP("server")
return mcp
if __name__ == "__main__":
server = create_server() # Too late for cloud!
server.run()
```
### ✅ CORRECT: Factory with Module Export
```python
def create_server() -> FastMCP:
mcp = FastMCP("server")
# Complex setup logic here
return mcp
# Export at module level
mcp = create_server()
if __name__ == "__main__":
mcp.run()
```
## Deployment Steps
### 1. Prepare Repository
```bash
# Initialize git
git init
# Add files
git add .
# Commit
git commit -m "Initial MCP server"
# Create GitHub repo
gh repo create my-mcp-server --public
# Push
git push -u origin main
```
### 2. Deploy to FastMCP Cloud
1. Visit https://fastmcp.cloud
2. Sign in with GitHub
3. Click "Create Project"
4. Select your repository
5. Configure:
- **Server Name**: Your project name
- **Entrypoint**: `server.py`
- **Environment Variables**: Add all needed variables
### 3. Configure Environment Variables
In FastMCP Cloud dashboard, add:
- `API_KEY`
- `DATABASE_URL`
- `CACHE_TTL`
- Any custom variables
### 4. Access Your Server
- **URL**: `https://your-project.fastmcp.app/mcp`
- **Auto-deploy**: Pushes to main branch auto-deploy
- **PR Previews**: Pull requests get preview deployments
## Project Structure Requirements
### Minimal Structure
```
my-mcp-server/
├── server.py # Main entry point (required)
├── requirements.txt # PyPI dependencies (required)
├── .env # Local dev only (git-ignored)
├── .gitignore # Must ignore .env
└── README.md # Documentation (recommended)
```
### Production Structure
```
my-mcp-server/
├── src/
│ ├── server.py # Main entry point
│ ├── utils.py # Self-contained utilities
│ └── tools/ # Tool modules
│ ├── __init__.py
│ └── api_tools.py
├── requirements.txt
├── .env.example # Template for .env
├── .gitignore
└── README.md
```
## Requirements.txt Rules
### ✅ ALLOWED: PyPI Packages
```txt
fastmcp>=2.12.0
httpx>=0.27.0
python-dotenv>=1.0.0
pydantic>=2.0.0
```
### ❌ NOT ALLOWED: Non-PyPI Dependencies
```txt
# Don't use these in cloud:
git+https://github.com/user/repo.git
-e ./local-package
./wheels/package.whl
```
## Environment Variables Best Practices
### ✅ GOOD: Environment-based Configuration
```python
import os
class Config:
API_KEY = os.getenv("API_KEY", "")
BASE_URL = os.getenv("BASE_URL", "https://api.example.com")
DEBUG = os.getenv("DEBUG", "false").lower() == "true"
@classmethod
def validate(cls):
if not cls.API_KEY:
raise ValueError("API_KEY is required")
```
### ❌ BAD: Hardcoded Values
```python
# Never do this in cloud:
API_KEY = "sk-1234567890" # Exposed in repository!
DATABASE_URL = "postgresql://user:pass@host/db" # Insecure!
```
## Avoiding Circular Imports
**Critical for cloud deployment!**
### ❌ WRONG: Factory Function in `__init__.py`
```python
# shared/__init__.py
def get_api_client():
from .api_client import APIClient # Circular import risk
return APIClient()
# shared/monitoring.py
from . import get_api_client # Creates circle!
```
### ✅ CORRECT: Direct Imports
```python
# shared/__init__.py
from .api_client import APIClient
from .cache import CacheManager
# shared/monitoring.py
from .api_client import APIClient
client = APIClient() # Create directly
```
## Testing Before Deployment
### Local Testing
```bash
# Test with stdio (default)
fastmcp dev server.py
# Test with HTTP
python server.py --transport http --port 8000
```
### Pre-Deployment Checklist
- [ ] Server object exported at module level
- [ ] Only PyPI dependencies in requirements.txt
- [ ] No hardcoded secrets (all in environment variables)
- [ ] `.env` file in `.gitignore`
- [ ] No circular imports
- [ ] No import-time async execution
- [ ] Works with `fastmcp dev server.py`
- [ ] Git repository committed and pushed
- [ ] All required environment variables documented
## Monitoring Deployment
### Check Deployment Logs
FastMCP Cloud provides:
- Build logs
- Runtime logs
- Error logs
### Health Check Endpoint
Add a health check resource:
```python
@mcp.resource("health://status")
async def health_check() -> dict:
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"version": "1.0.0"
}
```
### Common Deployment Errors
1. **"No server object found"**
- Fix: Export server at module level
2. **"Module not found"**
- Fix: Use only PyPI packages
3. **"Import error: circular dependency"**
- Fix: Avoid factory functions in `__init__.py`
4. **"Environment variable not set"**
- Fix: Add variables in FastMCP Cloud dashboard
## Continuous Deployment
FastMCP Cloud automatically deploys when you push to main:
```bash
# Make changes
git add .
git commit -m "Add new feature"
git push
# Deployment happens automatically!
# Check status at fastmcp.cloud
```
## Rollback Strategy
If deployment fails:
```bash
# Revert to previous commit
git revert HEAD
git push
# Or reset to specific commit
git reset --hard <commit-hash>
git push --force # Use with caution!
```
## Resources
- **FastMCP Cloud**: https://fastmcp.cloud
- **FastMCP GitHub**: https://github.com/jlowin/fastmcp
- **Deployment Docs**: Check FastMCP Cloud documentation

118
references/common-errors.md Normal file
View File

@@ -0,0 +1,118 @@
# FastMCP Common Errors Reference
Quick reference for the 15 most common FastMCP errors and their solutions.
## Error 1: Missing Server Object
**Error:** `RuntimeError: No server object found at module level`
**Fix:** Export server at module level: `mcp = FastMCP("name")`
**Why:** FastMCP Cloud requires module-level server object
**Source:** FastMCP Cloud documentation
## Error 2: Async/Await Confusion
**Error:** `RuntimeError: no running event loop`
**Fix:** Use `async def` for async operations, don't mix sync/async
**Example:** Use `await client.get()` not `client.get()`
**Source:** GitHub issues #156, #203
## Error 3: Context Not Injected
**Error:** `TypeError: missing required argument 'context'`
**Fix:** Add type hint: `async def tool(context: Context):`
**Why:** Type hint is required for context injection
**Source:** FastMCP v2 migration guide
## Error 4: Resource URI Syntax
**Error:** `ValueError: Invalid resource URI`
**Fix:** Include scheme: `@mcp.resource("data://config")`
**Valid schemes:** `data://`, `file://`, `info://`, `api://`
**Source:** MCP Protocol specification
## Error 5: Resource Template Parameter Mismatch
**Error:** `TypeError: missing positional argument`
**Fix:** Match parameter names: `user://{user_id}``def get_user(user_id: str)`
**Why:** Parameter names must exactly match URI template
**Source:** FastMCP patterns documentation
## Error 6: Pydantic Validation Error
**Error:** `ValidationError: value is not valid`
**Fix:** Ensure type hints match data types
**Best practice:** Use Pydantic models for complex validation
**Source:** Pydantic documentation
## Error 7: Transport/Protocol Mismatch
**Error:** `ConnectionError: different transport`
**Fix:** Match client/server transport (stdio or http)
**Server:** `mcp.run(transport="http")`
**Client:** `{"transport": "http", "url": "..."}`
**Source:** MCP transport specification
## Error 8: Import Errors (Editable Package)
**Error:** `ModuleNotFoundError: No module named 'my_package'`
**Fix:** Install in editable mode: `pip install -e .`
**Alternative:** Use absolute imports or add to PYTHONPATH
**Source:** Python packaging documentation
## Error 9: Deprecation Warnings
**Error:** `DeprecationWarning: 'mcp.settings' deprecated`
**Fix:** Use `os.getenv()` instead of `mcp.settings.get()`
**Why:** FastMCP v2 removed settings API
**Source:** FastMCP v2 migration guide
## Error 10: Port Already in Use
**Error:** `OSError: [Errno 48] Address already in use`
**Fix:** Use different port: `--port 8001`
**Alternative:** Kill process: `lsof -ti:8000 | xargs kill -9`
**Source:** Common networking issue
## Error 11: Schema Generation Failures
**Error:** `TypeError: not JSON serializable`
**Fix:** Use JSON-compatible types (no NumPy arrays, custom classes)
**Example:** Convert: `data.tolist()` or `data.to_dict()`
**Source:** JSON serialization requirements
## Error 12: JSON Serialization
**Error:** `TypeError: Object of type 'datetime' not JSON serializable`
**Fix:** Convert to string: `datetime.now().isoformat()`
**Apply to:** datetime, bytes, custom objects
**Source:** JSON specification
## Error 13: Circular Import Errors
**Error:** `ImportError: cannot import name 'X' from partially initialized module`
**Fix:** Avoid factory functions in `__init__.py`, use direct imports
**Example:** Import `APIClient` directly, don't use `get_api_client()` factory
**Why:** Cloud deployment initializes modules differently
**Source:** Production cloud deployment errors
## Error 14: Python Version Compatibility
**Error:** `DeprecationWarning: datetime.utcnow() deprecated`
**Fix:** Use `datetime.now(timezone.utc)` (Python 3.12+)
**Why:** Python 3.12+ deprecated some datetime methods
**Source:** Python 3.12 release notes
## Error 15: Import-Time Execution
**Error:** `RuntimeError: Event loop is closed`
**Fix:** Don't create async resources at module level
**Example:** Use lazy initialization: create resources when needed, not at import
**Why:** Event loop not available during module import
**Source:** Async event loop management
---
## Quick Debugging Checklist
When encountering errors:
1. ✅ Check server is exported at module level
2. ✅ Verify async/await usage is correct
3. ✅ Ensure Context type hints are present
4. ✅ Validate resource URIs have scheme prefixes
5. ✅ Match resource template parameters exactly
6. ✅ Use JSON-serializable types only
7. ✅ Avoid circular imports (especially in `__init__.py`)
8. ✅ Don't execute async code at module level
9. ✅ Test locally with `fastmcp dev server.py` before deploying
## Getting Help
- **FastMCP GitHub**: https://github.com/jlowin/fastmcp/issues
- **Context7 Docs**: `/jlowin/fastmcp`
- **This Skill**: See SKILL.md for detailed solutions

View File

@@ -0,0 +1,475 @@
# FastMCP Context Features Reference
Complete reference for FastMCP's advanced context features: elicitation, progress tracking, and sampling.
## Context Injection
To use context features, inject Context into your tool:
```python
from fastmcp import Context
@mcp.tool()
async def tool_with_context(param: str, context: Context) -> dict:
"""Tool that uses context features."""
# Access context features here
pass
```
**Important:** Context parameter MUST have type hint `Context` for injection to work.
## Feature 1: Elicitation (User Input)
Request user input during tool execution.
### Basic Usage
```python
from fastmcp import Context
@mcp.tool()
async def confirm_action(action: str, context: Context) -> dict:
"""Request user confirmation."""
# Request user input
user_response = await context.request_elicitation(
prompt=f"Confirm {action}? (yes/no)",
response_type=str
)
if user_response.lower() == "yes":
result = await perform_action(action)
return {"status": "completed", "action": action}
else:
return {"status": "cancelled", "action": action}
```
### Type-Based Elicitation
```python
@mcp.tool()
async def collect_user_info(context: Context) -> dict:
"""Collect information from user."""
# String input
name = await context.request_elicitation(
prompt="What is your name?",
response_type=str
)
# Boolean input
confirmed = await context.request_elicitation(
prompt="Do you want to continue?",
response_type=bool
)
# Numeric input
count = await context.request_elicitation(
prompt="How many items?",
response_type=int
)
return {
"name": name,
"confirmed": confirmed,
"count": count
}
```
### Custom Type Elicitation
```python
from dataclasses import dataclass
@dataclass
class UserChoice:
option: str
reason: str
@mcp.tool()
async def get_user_choice(options: list[str], context: Context) -> dict:
"""Get user choice with reasoning."""
choice = await context.request_elicitation(
prompt=f"Choose from: {', '.join(options)}",
response_type=UserChoice
)
return {
"selected": choice.option,
"reason": choice.reason
}
```
### Client Handler for Elicitation
Client must provide handler:
```python
from fastmcp import Client
async def elicitation_handler(message: str, response_type: type, context: dict):
"""Handle elicitation requests."""
if response_type == str:
return input(f"{message}: ")
elif response_type == bool:
response = input(f"{message} (y/n): ")
return response.lower() == 'y'
elif response_type == int:
return int(input(f"{message}: "))
else:
return input(f"{message}: ")
async with Client(
"server.py",
elicitation_handler=elicitation_handler
) as client:
result = await client.call_tool("collect_user_info", {})
```
## Feature 2: Progress Tracking
Report progress for long-running operations.
### Basic Progress
```python
@mcp.tool()
async def long_operation(count: int, context: Context) -> dict:
"""Operation with progress tracking."""
for i in range(count):
# Report progress
await context.report_progress(
progress=i + 1,
total=count,
message=f"Processing item {i + 1}/{count}"
)
# Do work
await asyncio.sleep(0.1)
return {"status": "completed", "processed": count}
```
### Multi-Phase Progress
```python
@mcp.tool()
async def multi_phase_operation(data: list, context: Context) -> dict:
"""Operation with multiple phases."""
# Phase 1: Loading
await context.report_progress(0, 3, "Phase 1: Loading data")
loaded = await load_data(data)
# Phase 2: Processing
await context.report_progress(1, 3, "Phase 2: Processing")
for i, item in enumerate(loaded):
await context.report_progress(
progress=i,
total=len(loaded),
message=f"Processing {i + 1}/{len(loaded)}"
)
await process_item(item)
# Phase 3: Saving
await context.report_progress(2, 3, "Phase 3: Saving results")
await save_results()
await context.report_progress(3, 3, "Complete!")
return {"status": "completed", "items": len(loaded)}
```
### Indeterminate Progress
For operations where total is unknown:
```python
@mcp.tool()
async def indeterminate_operation(context: Context) -> dict:
"""Operation with unknown duration."""
stages = [
"Initializing",
"Loading data",
"Processing",
"Finalizing"
]
for stage in stages:
# No total - shows as spinner/indeterminate
await context.report_progress(
progress=stages.index(stage),
total=None,
message=stage
)
await perform_stage(stage)
return {"status": "completed"}
```
### Client Handler for Progress
```python
async def progress_handler(progress: float, total: float | None, message: str | None):
"""Handle progress updates."""
if total:
pct = (progress / total) * 100
# Use \r for same-line update
print(f"\r[{pct:.1f}%] {message}", end="", flush=True)
else:
# Indeterminate progress
print(f"\n[PROGRESS] {message}")
async with Client(
"server.py",
progress_handler=progress_handler
) as client:
result = await client.call_tool("long_operation", {"count": 100})
```
## Feature 3: Sampling (LLM Integration)
Request LLM completions from within tools.
### Basic Sampling
```python
@mcp.tool()
async def enhance_text(text: str, context: Context) -> str:
"""Enhance text using LLM."""
response = await context.request_sampling(
messages=[{
"role": "system",
"content": "You are a professional copywriter."
}, {
"role": "user",
"content": f"Enhance this text: {text}"
}],
temperature=0.7,
max_tokens=500
)
return response["content"]
```
### Structured Output with Sampling
```python
@mcp.tool()
async def classify_text(text: str, context: Context) -> dict:
"""Classify text using LLM."""
prompt = f"""
Classify this text: {text}
Return JSON with:
- category: one of [news, blog, academic, social]
- sentiment: one of [positive, negative, neutral]
- topics: list of main topics
Return as JSON object.
"""
response = await context.request_sampling(
messages=[{"role": "user", "content": prompt}],
temperature=0.3, # Lower for consistency
response_format="json"
)
import json
return json.loads(response["content"])
```
### Multi-Turn Sampling
```python
@mcp.tool()
async def interactive_analysis(topic: str, context: Context) -> dict:
"""Multi-turn analysis with LLM."""
# First turn: Generate questions
questions_response = await context.request_sampling(
messages=[{
"role": "user",
"content": f"Generate 3 key questions about: {topic}"
}],
max_tokens=200
)
# Second turn: Answer questions
analysis_response = await context.request_sampling(
messages=[{
"role": "user",
"content": f"Answer these questions about {topic}:\n{questions_response['content']}"
}],
max_tokens=500
)
return {
"topic": topic,
"questions": questions_response["content"],
"analysis": analysis_response["content"]
}
```
### Client Handler for Sampling
Client provides LLM access:
```python
async def sampling_handler(messages, params, context):
"""Handle LLM sampling requests."""
# Call your LLM API
from openai import AsyncOpenAI
client = AsyncOpenAI()
response = await client.chat.completions.create(
model=params.get("model", "gpt-4"),
messages=messages,
temperature=params.get("temperature", 0.7),
max_tokens=params.get("max_tokens", 1000)
)
return {
"content": response.choices[0].message.content,
"model": response.model,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
}
}
async with Client(
"server.py",
sampling_handler=sampling_handler
) as client:
result = await client.call_tool("enhance_text", {"text": "Hello world"})
```
## Combined Example
All context features together:
```python
@mcp.tool()
async def comprehensive_task(data: list, context: Context) -> dict:
"""Task using all context features."""
# 1. Elicitation: Confirm operation
confirmed = await context.request_elicitation(
prompt="Start processing?",
response_type=bool
)
if not confirmed:
return {"status": "cancelled"}
# 2. Progress: Track processing
results = []
for i, item in enumerate(data):
await context.report_progress(
progress=i + 1,
total=len(data),
message=f"Processing {i + 1}/{len(data)}"
)
# 3. Sampling: Use LLM for processing
enhanced = await context.request_sampling(
messages=[{
"role": "user",
"content": f"Analyze this item: {item}"
}],
temperature=0.5
)
results.append({
"item": item,
"analysis": enhanced["content"]
})
return {
"status": "completed",
"total": len(data),
"results": results
}
```
## Best Practices
### Elicitation
- **Clear prompts**: Be specific about what you're asking
- **Type validation**: Use appropriate response_type
- **Handle cancellation**: Allow users to cancel operations
- **Provide context**: Explain why input is needed
### Progress Tracking
- **Regular updates**: Report every 5-10% or every item
- **Meaningful messages**: Describe what's happening
- **Phase indicators**: Show which phase of operation
- **Final confirmation**: Report 100% completion
### Sampling
- **System prompts**: Set clear instructions
- **Temperature control**: Lower for factual, higher for creative
- **Token limits**: Set reasonable max_tokens
- **Error handling**: Handle API failures gracefully
- **Cost awareness**: Sampling uses LLM API (costs money)
## Error Handling
### Context Not Available
```python
@mcp.tool()
async def safe_context_usage(context: Context) -> dict:
"""Safely use context features."""
# Check if feature is available
if hasattr(context, 'report_progress'):
await context.report_progress(0, 100, "Starting")
if hasattr(context, 'request_elicitation'):
response = await context.request_elicitation(
prompt="Continue?",
response_type=bool
)
else:
# Fallback behavior
response = True
return {"status": "completed"}
```
### Timeout Handling
```python
import asyncio
@mcp.tool()
async def elicitation_with_timeout(context: Context) -> dict:
"""Elicitation with timeout."""
try:
response = await asyncio.wait_for(
context.request_elicitation(
prompt="Your input (30 seconds):",
response_type=str
),
timeout=30.0
)
return {"status": "completed", "input": response}
except asyncio.TimeoutError:
return {"status": "timeout", "message": "No input received"}
```
## Context Feature Availability
| Feature | Claude Desktop | Claude Code CLI | FastMCP Cloud | Custom Client |
|---------|---------------|----------------|---------------|---------------|
| Elicitation | ✅ | ✅ | ⚠️ Depends | ✅ With handler |
| Progress | ✅ | ✅ | ✅ | ✅ With handler |
| Sampling | ✅ | ✅ | ⚠️ Depends | ✅ With handler |
⚠️ = Feature availability depends on client implementation
## Resources
- **Context API**: See SKILL.md for full Context API reference
- **Client Handlers**: See `client-example.py` template
- **MCP Protocol**: https://modelcontextprotocol.io

View File

@@ -0,0 +1,456 @@
# FastMCP Integration Patterns Reference
Quick reference for API integration patterns with FastMCP.
## Pattern 1: Manual API Integration
Best for simple APIs or when you need fine control.
```python
import httpx
from fastmcp import FastMCP
mcp = FastMCP("API Integration")
# Reusable client
client = httpx.AsyncClient(
base_url="https://api.example.com",
headers={"Authorization": f"Bearer {API_KEY}"},
timeout=30.0
)
@mcp.tool()
async def fetch_data(endpoint: str) -> dict:
"""Fetch from API."""
response = await client.get(endpoint)
response.raise_for_status()
return response.json()
```
**Pros:**
- Full control over requests
- Easy to customize
- Simple to understand
**Cons:**
- Manual tool creation for each endpoint
- More boilerplate code
## Pattern 2: OpenAPI/Swagger Auto-Generation
Best for well-documented APIs with OpenAPI specs.
```python
from fastmcp import FastMCP
from fastmcp.server.openapi import RouteMap, MCPType
import httpx
# Load spec
spec = httpx.get("https://api.example.com/openapi.json").json()
# Create client
client = httpx.AsyncClient(
base_url="https://api.example.com",
headers={"Authorization": f"Bearer {API_KEY}"}
)
# Auto-generate server
mcp = FastMCP.from_openapi(
openapi_spec=spec,
client=client,
name="API Server",
route_maps=[
# GET + params → Resource Templates
RouteMap(
methods=["GET"],
pattern=r".*\{.*\}.*",
mcp_type=MCPType.RESOURCE_TEMPLATE
),
# GET no params → Resources
RouteMap(
methods=["GET"],
mcp_type=MCPType.RESOURCE
),
# POST/PUT/DELETE → Tools
RouteMap(
methods=["POST", "PUT", "PATCH", "DELETE"],
mcp_type=MCPType.TOOL
),
]
)
```
**Pros:**
- Instant integration (minutes not hours)
- Auto-updates with spec changes
- No manual endpoint mapping
**Cons:**
- Requires OpenAPI/Swagger spec
- Less control over individual endpoints
- May include unwanted endpoints
## Pattern 3: FastAPI Conversion
Best for converting existing FastAPI applications.
```python
from fastapi import FastAPI
from fastmcp import FastMCP
# Existing FastAPI app
app = FastAPI()
@app.get("/users/{user_id}")
def get_user(user_id: int):
return {"id": user_id, "name": "User"}
# Convert to MCP
mcp = FastMCP.from_fastapi(
app=app,
httpx_client_kwargs={
"headers": {"Authorization": "Bearer token"}
}
)
```
**Pros:**
- Reuse existing FastAPI code
- Minimal changes needed
- Familiar FastAPI patterns
**Cons:**
- FastAPI must be running separately
- Extra HTTP hop (slower)
## Route Mapping Strategies
### Strategy 1: By HTTP Method
```python
route_maps = [
RouteMap(methods=["GET"], mcp_type=MCPType.RESOURCE),
RouteMap(methods=["POST"], mcp_type=MCPType.TOOL),
RouteMap(methods=["PUT", "PATCH"], mcp_type=MCPType.TOOL),
RouteMap(methods=["DELETE"], mcp_type=MCPType.TOOL),
]
```
### Strategy 2: By Path Pattern
```python
route_maps = [
# Admin endpoints → Exclude
RouteMap(
pattern=r"/admin/.*",
mcp_type=MCPType.EXCLUDE
),
# Internal → Exclude
RouteMap(
pattern=r"/internal/.*",
mcp_type=MCPType.EXCLUDE
),
# Health → Exclude
RouteMap(
pattern=r"/(health|healthz)",
mcp_type=MCPType.EXCLUDE
),
# Everything else
RouteMap(mcp_type=MCPType.TOOL),
]
```
### Strategy 3: By Parameters
```python
route_maps = [
# Has path parameters → Resource Template
RouteMap(
pattern=r".*\{[^}]+\}.*",
mcp_type=MCPType.RESOURCE_TEMPLATE
),
# No parameters → Static Resource or Tool
RouteMap(
methods=["GET"],
mcp_type=MCPType.RESOURCE
),
RouteMap(
methods=["POST", "PUT", "DELETE"],
mcp_type=MCPType.TOOL
),
]
```
## Authentication Patterns
### API Key Authentication
```python
client = httpx.AsyncClient(
base_url="https://api.example.com",
headers={"X-API-Key": os.getenv("API_KEY")}
)
```
### Bearer Token
```python
client = httpx.AsyncClient(
base_url="https://api.example.com",
headers={"Authorization": f"Bearer {os.getenv('API_TOKEN')}"}
)
```
### OAuth2 with Token Refresh
```python
class OAuth2Client:
def __init__(self):
self.access_token = None
self.expires_at = None
async def get_token(self) -> str:
if not self.expires_at or datetime.now() > self.expires_at:
await self.refresh_token()
return self.access_token
async def refresh_token(self):
async with httpx.AsyncClient() as client:
response = await client.post(
"https://auth.example.com/token",
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
}
)
data = response.json()
self.access_token = data["access_token"]
self.expires_at = datetime.now() + timedelta(
seconds=data["expires_in"] - 60
)
oauth = OAuth2Client()
@mcp.tool()
async def authenticated_request(endpoint: str) -> dict:
token = await oauth.get_token()
async with httpx.AsyncClient() as client:
response = await client.get(
endpoint,
headers={"Authorization": f"Bearer {token}"}
)
return response.json()
```
## Error Handling Patterns
### Basic Error Handling
```python
@mcp.tool()
async def safe_api_call(endpoint: str) -> dict:
try:
response = await client.get(endpoint)
response.raise_for_status()
return {"success": True, "data": response.json()}
except httpx.HTTPStatusError as e:
return {
"success": False,
"error": f"HTTP {e.response.status_code}",
"message": e.response.text
}
except httpx.TimeoutException:
return {"success": False, "error": "Request timeout"}
except Exception as e:
return {"success": False, "error": str(e)}
```
### Retry with Exponential Backoff
```python
async def retry_with_backoff(func, max_retries=3):
delay = 1.0
for attempt in range(max_retries):
try:
return await func()
except (httpx.TimeoutException, httpx.NetworkError) as e:
if attempt < max_retries - 1:
await asyncio.sleep(delay)
delay *= 2
else:
raise
```
## Caching Patterns
### Simple Time-Based Cache
```python
import time
class SimpleCache:
def __init__(self, ttl=300):
self.cache = {}
self.timestamps = {}
self.ttl = ttl
def get(self, key: str):
if key in self.cache:
if time.time() - self.timestamps[key] < self.ttl:
return self.cache[key]
return None
def set(self, key: str, value):
self.cache[key] = value
self.timestamps[key] = time.time()
cache = SimpleCache()
@mcp.tool()
async def cached_fetch(endpoint: str) -> dict:
# Check cache
cached = cache.get(endpoint)
if cached:
return {"data": cached, "from_cache": True}
# Fetch from API
data = await fetch_from_api(endpoint)
cache.set(endpoint, data)
return {"data": data, "from_cache": False}
```
## Rate Limiting Patterns
### Simple Rate Limiter
```python
from collections import deque
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self, max_requests: int, time_window: int):
self.max_requests = max_requests
self.time_window = timedelta(seconds=time_window)
self.requests = deque()
async def acquire(self):
now = datetime.now()
# Remove old requests
while self.requests and now - self.requests[0] > self.time_window:
self.requests.popleft()
# Check limit
if len(self.requests) >= self.max_requests:
sleep_time = (self.requests[0] + self.time_window - now).total_seconds()
await asyncio.sleep(sleep_time)
return await self.acquire()
self.requests.append(now)
limiter = RateLimiter(100, 60) # 100 requests per minute
@mcp.tool()
async def rate_limited_call(endpoint: str) -> dict:
await limiter.acquire()
return await api_call(endpoint)
```
## Connection Pooling
### Singleton Client Pattern
```python
class APIClient:
_instance = None
@classmethod
async def get_client(cls):
if cls._instance is None:
cls._instance = httpx.AsyncClient(
base_url=API_BASE_URL,
timeout=30.0,
limits=httpx.Limits(
max_keepalive_connections=5,
max_connections=10
)
)
return cls._instance
@classmethod
async def cleanup(cls):
if cls._instance:
await cls._instance.aclose()
cls._instance = None
# Use in tools
@mcp.tool()
async def api_request(endpoint: str) -> dict:
client = await APIClient.get_client()
response = await client.get(endpoint)
return response.json()
```
## Batch Request Patterns
### Parallel Batch Requests
```python
@mcp.tool()
async def batch_fetch(endpoints: list[str]) -> dict:
"""Fetch multiple endpoints in parallel."""
async def fetch_one(endpoint: str):
try:
response = await client.get(endpoint)
return {"endpoint": endpoint, "success": True, "data": response.json()}
except Exception as e:
return {"endpoint": endpoint, "success": False, "error": str(e)}
results = await asyncio.gather(*[fetch_one(ep) for ep in endpoints])
return {
"total": len(endpoints),
"successful": len([r for r in results if r["success"]]),
"results": results
}
```
## Webhook Patterns
### Webhook Receiver
```python
from fastapi import FastAPI, Request
app = FastAPI()
@app.post("/webhook")
async def handle_webhook(request: Request):
data = await request.json()
# Process webhook
return {"status": "received"}
# Add to MCP server
mcp = FastMCP.from_fastapi(app)
```
## When to Use Each Pattern
| Pattern | Use When | Avoid When |
|---------|----------|------------|
| Manual Integration | Simple API, custom logic needed | API has 50+ endpoints |
| OpenAPI Auto-gen | Well-documented API, many endpoints | No OpenAPI spec available |
| FastAPI Conversion | Existing FastAPI app | Starting from scratch |
| Custom Route Maps | Need precise control | Simple use case |
| Connection Pooling | High-frequency requests | Single request needed |
| Caching | Expensive API calls, data rarely changes | Real-time data required |
| Rate Limiting | API has rate limits | No limits or internal API |
## Resources
- **FastMCP OpenAPI**: FastMCP.from_openapi documentation
- **FastAPI Integration**: FastMCP.from_fastapi documentation
- **HTTPX Docs**: https://www.python-httpx.org
- **OpenAPI Spec**: https://spec.openapis.org

View File

@@ -0,0 +1,488 @@
# FastMCP Production Patterns Reference
Battle-tested patterns for production-ready FastMCP servers.
## Self-Contained Server Pattern
**Problem:** Circular imports break cloud deployment
**Solution:** Keep all utilities in one file
```python
# src/utils.py - All utilities in one place
import os
from typing import Dict, Any
from datetime import datetime
class Config:
"""Configuration from environment."""
SERVER_NAME = os.getenv("SERVER_NAME", "FastMCP Server")
API_KEY = os.getenv("API_KEY", "")
CACHE_TTL = int(os.getenv("CACHE_TTL", "300"))
def format_success(data: Any) -> Dict[str, Any]:
"""Format successful response."""
return {
"success": True,
"data": data,
"timestamp": datetime.now().isoformat()
}
def format_error(error: str, code: str = "ERROR") -> Dict[str, Any]:
"""Format error response."""
return {
"success": False,
"error": error,
"code": code,
"timestamp": datetime.now().isoformat()
}
# Usage in tools
from .utils import format_success, format_error, Config
@mcp.tool()
async def process_data(data: dict) -> dict:
try:
result = await process(data)
return format_success(result)
except Exception as e:
return format_error(str(e))
```
**Why it works:**
- No circular dependencies
- Cloud deployment safe
- Easy to maintain
- Single source of truth
## Lazy Initialization Pattern
**Problem:** Creating expensive resources at import time fails in cloud
**Solution:** Initialize resources only when needed
```python
class ResourceManager:
"""Manages expensive resources with lazy initialization."""
_db_pool = None
_cache = None
@classmethod
async def get_db(cls):
"""Get database pool (create on first use)."""
if cls._db_pool is None:
cls._db_pool = await create_db_pool()
return cls._db_pool
@classmethod
async def get_cache(cls):
"""Get cache (create on first use)."""
if cls._cache is None:
cls._cache = await create_cache()
return cls._cache
# Usage - no initialization at module level
manager = ResourceManager() # Lightweight
@mcp.tool()
async def database_operation():
db = await manager.get_db() # Initialization happens here
return await db.query("SELECT * FROM users")
```
## Connection Pooling Pattern
**Problem:** Creating new connections for each request is slow
**Solution:** Reuse HTTP clients with connection pooling
```python
import httpx
class APIClient:
_instance: Optional[httpx.AsyncClient] = None
@classmethod
async def get_client(cls) -> httpx.AsyncClient:
"""Get or create shared HTTP client."""
if cls._instance is None:
cls._instance = httpx.AsyncClient(
base_url=API_BASE_URL,
timeout=httpx.Timeout(30.0),
limits=httpx.Limits(
max_keepalive_connections=5,
max_connections=10
)
)
return cls._instance
@classmethod
async def cleanup(cls):
"""Cleanup on shutdown."""
if cls._instance:
await cls._instance.aclose()
cls._instance = None
@mcp.tool()
async def api_request(endpoint: str) -> dict:
client = await APIClient.get_client()
response = await client.get(endpoint)
return response.json()
```
## Retry with Exponential Backoff
**Problem:** Transient failures cause tool failures
**Solution:** Automatic retry with exponential backoff
```python
import asyncio
async def retry_with_backoff(
func,
max_retries: int = 3,
initial_delay: float = 1.0,
exponential_base: float = 2.0
):
"""Retry function with exponential backoff."""
delay = initial_delay
last_exception = None
for attempt in range(max_retries):
try:
return await func()
except (httpx.TimeoutException, httpx.NetworkError) as e:
last_exception = e
if attempt < max_retries - 1:
await asyncio.sleep(delay)
delay *= exponential_base
raise last_exception
@mcp.tool()
async def resilient_api_call(endpoint: str) -> dict:
"""API call with automatic retry."""
async def make_call():
client = await APIClient.get_client()
response = await client.get(endpoint)
response.raise_for_status()
return response.json()
try:
data = await retry_with_backoff(make_call)
return {"success": True, "data": data}
except Exception as e:
return {"success": False, "error": str(e)}
```
## Time-Based Caching Pattern
**Problem:** Repeated API calls for same data waste time/money
**Solution:** Cache with TTL (time-to-live)
```python
import time
class TimeBasedCache:
def __init__(self, ttl: int = 300):
self.ttl = ttl
self.cache = {}
self.timestamps = {}
def get(self, key: str):
if key in self.cache:
if time.time() - self.timestamps[key] < self.ttl:
return self.cache[key]
else:
del self.cache[key]
del self.timestamps[key]
return None
def set(self, key: str, value):
self.cache[key] = value
self.timestamps[key] = time.time()
cache = TimeBasedCache(ttl=300)
@mcp.tool()
async def cached_fetch(resource_id: str) -> dict:
"""Fetch with caching."""
cache_key = f"resource:{resource_id}"
cached = cache.get(cache_key)
if cached:
return {"data": cached, "from_cache": True}
data = await fetch_from_api(resource_id)
cache.set(cache_key, data)
return {"data": data, "from_cache": False}
```
## Structured Error Responses
**Problem:** Inconsistent error formats make debugging hard
**Solution:** Standardized error response format
```python
from enum import Enum
class ErrorCode(Enum):
VALIDATION_ERROR = "VALIDATION_ERROR"
NOT_FOUND = "NOT_FOUND"
API_ERROR = "API_ERROR"
TIMEOUT = "TIMEOUT"
UNKNOWN = "UNKNOWN"
def create_error(code: ErrorCode, message: str, details: dict = None):
"""Create structured error response."""
return {
"success": False,
"error": {
"code": code.value,
"message": message,
"details": details or {},
"timestamp": datetime.now().isoformat()
}
}
@mcp.tool()
async def validated_operation(data: str) -> dict:
if not data:
return create_error(
ErrorCode.VALIDATION_ERROR,
"Data is required",
{"field": "data"}
)
try:
result = await process(data)
return {"success": True, "data": result}
except Exception as e:
return create_error(ErrorCode.UNKNOWN, str(e))
```
## Environment-Based Configuration
**Problem:** Different settings for dev/staging/production
**Solution:** Environment-based configuration class
```python
import os
from enum import Enum
class Environment(Enum):
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
class Config:
ENV = Environment(os.getenv("ENVIRONMENT", "development"))
SETTINGS = {
Environment.DEVELOPMENT: {
"debug": True,
"cache_ttl": 60,
"log_level": "DEBUG"
},
Environment.STAGING: {
"debug": True,
"cache_ttl": 300,
"log_level": "INFO"
},
Environment.PRODUCTION: {
"debug": False,
"cache_ttl": 3600,
"log_level": "WARNING"
}
}
@classmethod
def get(cls, key: str):
return cls.SETTINGS[cls.ENV].get(key)
# Use configuration
cache_ttl = Config.get("cache_ttl")
debug_mode = Config.get("debug")
```
## Health Check Pattern
**Problem:** Need to monitor server health in production
**Solution:** Comprehensive health check resource
```python
@mcp.resource("health://status")
async def health_check() -> dict:
"""Comprehensive health check."""
checks = {}
# Check API connectivity
try:
client = await APIClient.get_client()
response = await client.get("/health", timeout=5)
checks["api"] = response.status_code == 200
except:
checks["api"] = False
# Check database (if applicable)
try:
db = await ResourceManager.get_db()
await db.execute("SELECT 1")
checks["database"] = True
except:
checks["database"] = False
# System resources
import psutil
checks["memory_percent"] = psutil.virtual_memory().percent
checks["cpu_percent"] = psutil.cpu_percent()
# Overall status
all_healthy = (
checks.get("api", True) and
checks.get("database", True) and
checks["memory_percent"] < 90 and
checks["cpu_percent"] < 90
)
return {
"status": "healthy" if all_healthy else "degraded",
"timestamp": datetime.now().isoformat(),
"checks": checks
}
```
## Parallel Processing Pattern
**Problem:** Sequential processing is slow for batch operations
**Solution:** Process items in parallel
```python
import asyncio
@mcp.tool()
async def batch_process(items: list[str]) -> dict:
"""Process multiple items in parallel."""
async def process_single(item: str):
try:
result = await process_item(item)
return {"item": item, "success": True, "result": result}
except Exception as e:
return {"item": item, "success": False, "error": str(e)}
# Process all items in parallel
tasks = [process_single(item) for item in items]
results = await asyncio.gather(*tasks)
successful = [r for r in results if r["success"]]
failed = [r for r in results if not r["success"]]
return {
"total": len(items),
"successful": len(successful),
"failed": len(failed),
"results": results
}
```
## State Management Pattern
**Problem:** Shared state causes race conditions
**Solution:** Thread-safe state management with locks
```python
import asyncio
class StateManager:
def __init__(self):
self._state = {}
self._locks = {}
async def get(self, key: str, default=None):
return self._state.get(key, default)
async def set(self, key: str, value):
if key not in self._locks:
self._locks[key] = asyncio.Lock()
async with self._locks[key]:
self._state[key] = value
async def update(self, key: str, updater):
"""Update with function."""
if key not in self._locks:
self._locks[key] = asyncio.Lock()
async with self._locks[key]:
current = self._state.get(key)
self._state[key] = await updater(current)
return self._state[key]
state = StateManager()
@mcp.tool()
async def increment_counter(name: str) -> dict:
new_value = await state.update(
f"counter_{name}",
lambda x: (x or 0) + 1
)
return {"counter": name, "value": new_value}
```
## Anti-Patterns to Avoid
### ❌ Factory Functions in __init__.py
```python
# DON'T DO THIS
# shared/__init__.py
def get_api_client():
from .api_client import APIClient # Circular import risk
return APIClient()
```
### ❌ Blocking Operations in Async
```python
# DON'T DO THIS
@mcp.tool()
async def bad_async():
time.sleep(5) # Blocks entire event loop!
return "done"
# DO THIS INSTEAD
@mcp.tool()
async def good_async():
await asyncio.sleep(5)
return "done"
```
### ❌ Global Mutable State
```python
# DON'T DO THIS
results = [] # Race conditions!
@mcp.tool()
async def add_result(data: str):
results.append(data)
```
## Production Deployment Checklist
- [ ] Module-level server object
- [ ] Environment variables for all config
- [ ] Connection pooling for HTTP clients
- [ ] Retry logic for transient failures
- [ ] Caching for expensive operations
- [ ] Structured error responses
- [ ] Health check endpoint
- [ ] Logging configured
- [ ] No circular imports
- [ ] No import-time async execution
- [ ] Rate limiting if needed
- [ ] Graceful shutdown handling
## Resources
- **Production Examples**: See `self-contained-server.py` template
- **Error Handling**: See `error-handling.py` template
- **API Patterns**: See `api-client-pattern.py` template

102
scripts/check-versions.sh Executable file
View File

@@ -0,0 +1,102 @@
#!/bin/bash
# FastMCP Version Checker
# Verifies that FastMCP and dependencies are up to date
set -e
echo "======================================"
echo "FastMCP Version Checker"
echo "======================================"
echo ""
# Colors
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
RED='\033[0;31m'
NC='\033[0m' # No Color
# Check if Python is installed
if ! command -v python3 &> /dev/null; then
echo -e "${RED}${NC} Python 3 is not installed"
exit 1
fi
echo -e "${GREEN}${NC} Python $(python3 --version)"
echo ""
# Check Python version
PYTHON_VERSION=$(python3 -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")')
REQUIRED_VERSION="3.10"
if [ "$(printf '%s\n' "$REQUIRED_VERSION" "$PYTHON_VERSION" | sort -V | head -n1)" != "$REQUIRED_VERSION" ]; then
echo -e "${RED}${NC} Python $PYTHON_VERSION is too old. FastMCP requires Python $REQUIRED_VERSION or later"
exit 1
fi
echo -e "${GREEN}${NC} Python version $PYTHON_VERSION meets requirements"
echo ""
# Check if pip is installed
if ! command -v pip3 &> /dev/null; then
echo -e "${RED}${NC} pip3 is not installed"
exit 1
fi
echo "Checking package versions..."
echo ""
# Function to check package version
check_package() {
local package=$1
local min_version=$2
if pip3 show "$package" &> /dev/null; then
local installed_version=$(pip3 show "$package" | grep "Version:" | awk '{print $2}')
echo -e "${GREEN}${NC} $package: $installed_version (required: >=$min_version)"
# Note: This is a simple check. For production, use more robust version comparison
if [ "$installed_version" != "$min_version" ]; then
if [ "$(printf '%s\n' "$min_version" "$installed_version" | sort -V | head -n1)" != "$min_version" ]; then
echo -e " ${YELLOW}${NC} Installed version is older than minimum required"
fi
fi
else
echo -e "${RED}${NC} $package: Not installed (required: >=$min_version)"
fi
}
# Check core packages
check_package "fastmcp" "2.12.0"
check_package "httpx" "0.27.0"
check_package "python-dotenv" "1.0.0"
check_package "pydantic" "2.0.0"
echo ""
echo "Checking optional packages..."
echo ""
# Check optional packages
if pip3 show "psutil" &> /dev/null; then
check_package "psutil" "5.9.0"
else
echo -e "${YELLOW}${NC} psutil: Not installed (optional, for health checks)"
fi
if pip3 show "pytest" &> /dev/null; then
check_package "pytest" "8.0.0"
else
echo -e "${YELLOW}${NC} pytest: Not installed (optional, for testing)"
fi
echo ""
echo "======================================"
echo "Version check complete!"
echo "======================================"
echo ""
# Suggestions
echo "Suggestions:"
echo " - To update FastMCP: pip install --upgrade fastmcp"
echo " - To update all dependencies: pip install --upgrade -r requirements.txt"
echo " - To see outdated packages: pip list --outdated"
echo ""

231
scripts/deploy-cloud.sh Executable file
View File

@@ -0,0 +1,231 @@
#!/bin/bash
# FastMCP Cloud Deployment Checker
# Validates server is ready for FastMCP Cloud deployment
set -e
# Colors
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
RED='\033[0;31m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
echo "======================================"
echo "FastMCP Cloud Deployment Checker"
echo "======================================"
echo ""
# Check arguments
if [ $# -eq 0 ]; then
echo "Usage: $0 <server.py>"
echo ""
echo "Example:"
echo " $0 server.py"
exit 1
fi
SERVER_PATH=$1
ERRORS=0
WARNINGS=0
# Function to check requirement
check_required() {
local description=$1
local command=$2
if eval "$command" &> /dev/null; then
echo -e "${GREEN}${NC} $description"
return 0
else
echo -e "${RED}${NC} $description"
ERRORS=$((ERRORS + 1))
return 1
fi
}
# Function to check warning
check_warning() {
local description=$1
local command=$2
if eval "$command" &> /dev/null; then
echo -e "${GREEN}${NC} $description"
return 0
else
echo -e "${YELLOW}${NC} $description"
WARNINGS=$((WARNINGS + 1))
return 1
fi
}
# 1. Check server file exists
echo "Checking server file..."
check_required "Server file exists: $SERVER_PATH" "test -f '$SERVER_PATH'"
echo ""
# 2. Check Python syntax
echo "Checking Python syntax..."
check_required "Python syntax is valid" "python3 -m py_compile '$SERVER_PATH'"
echo ""
# 3. Check for module-level server object
echo "Checking module-level server object..."
if grep -q "^mcp = FastMCP\|^server = FastMCP\|^app = FastMCP" "$SERVER_PATH"; then
echo -e "${GREEN}${NC} Found module-level server object (mcp/server/app)"
else
echo -e "${RED}${NC} No module-level server object found"
echo " Expected: mcp = FastMCP(...) at module level"
ERRORS=$((ERRORS + 1))
fi
echo ""
# 4. Check requirements.txt
echo "Checking requirements.txt..."
if [ -f "requirements.txt" ]; then
echo -e "${GREEN}${NC} requirements.txt exists"
# Check for non-PyPI dependencies
if grep -q "^git+\|^-e \|\.whl$\|\.tar.gz$" requirements.txt; then
echo -e "${RED}${NC} requirements.txt contains non-PyPI dependencies"
echo " FastMCP Cloud requires PyPI packages only"
ERRORS=$((ERRORS + 1))
else
echo -e "${GREEN}${NC} All dependencies are PyPI packages"
fi
# Check for fastmcp
if grep -q "^fastmcp" requirements.txt; then
echo -e "${GREEN}${NC} FastMCP is in requirements.txt"
else
echo -e "${YELLOW}${NC} FastMCP not found in requirements.txt"
WARNINGS=$((WARNINGS + 1))
fi
else
echo -e "${RED}${NC} requirements.txt not found"
ERRORS=$((ERRORS + 1))
fi
echo ""
# 5. Check for hardcoded secrets
echo "Checking for hardcoded secrets..."
if grep -i "api_key\s*=\s*[\"']" "$SERVER_PATH" | grep -v "os.getenv\|os.environ" > /dev/null; then
echo -e "${RED}${NC} Found hardcoded API keys (possible security issue)"
ERRORS=$((ERRORS + 1))
else
echo -e "${GREEN}${NC} No hardcoded API keys found"
fi
if grep -i "password\s*=\s*[\"']\|secret\s*=\s*[\"']" "$SERVER_PATH" | grep -v "os.getenv\|os.environ" > /dev/null; then
echo -e "${YELLOW}${NC} Found possible hardcoded passwords/secrets"
WARNINGS=$((WARNINGS + 1))
fi
echo ""
# 6. Check .gitignore
echo "Checking .gitignore..."
if [ -f ".gitignore" ]; then
echo -e "${GREEN}${NC} .gitignore exists"
if grep -q "\.env$" .gitignore; then
echo -e "${GREEN}${NC} .env is in .gitignore"
else
echo -e "${YELLOW}${NC} .env not in .gitignore"
WARNINGS=$((WARNINGS + 1))
fi
else
echo -e "${YELLOW}${NC} .gitignore not found"
WARNINGS=$((WARNINGS + 1))
fi
echo ""
# 7. Check for circular imports
echo "Checking for potential circular imports..."
if grep -r "from __init__ import\|from . import.*get_" . --include="*.py" 2>/dev/null | grep -v ".git" > /dev/null; then
echo -e "${YELLOW}${NC} Possible circular import pattern detected (factory functions)"
WARNINGS=$((WARNINGS + 1))
else
echo -e "${GREEN}${NC} No obvious circular import patterns"
fi
echo ""
# 8. Check git repository
echo "Checking git repository..."
if [ -d ".git" ]; then
echo -e "${GREEN}${NC} Git repository initialized"
# Check if there are uncommitted changes
if [ -z "$(git status --porcelain)" ]; then
echo -e "${GREEN}${NC} No uncommitted changes"
else
echo -e "${YELLOW}${NC} There are uncommitted changes"
WARNINGS=$((WARNINGS + 1))
fi
# Check if remote is set
if git remote -v | grep -q "origin"; then
echo -e "${GREEN}${NC} Git remote (origin) configured"
REMOTE_URL=$(git remote get-url origin)
echo " Remote: $REMOTE_URL"
else
echo -e "${YELLOW}${NC} No git remote configured"
echo " Run: gh repo create <name> --public"
WARNINGS=$((WARNINGS + 1))
fi
else
echo -e "${YELLOW}${NC} Not a git repository"
echo " Run: git init"
WARNINGS=$((WARNINGS + 1))
fi
echo ""
# 9. Test server can run
echo "Testing server execution..."
if timeout 5 python3 "$SERVER_PATH" --help &> /dev/null || timeout 5 fastmcp inspect "$SERVER_PATH" &> /dev/null; then
echo -e "${GREEN}${NC} Server can be loaded"
else
echo -e "${YELLOW}${NC} Could not verify server loads correctly"
WARNINGS=$((WARNINGS + 1))
fi
echo ""
# Summary
echo "======================================"
echo "Deployment Check Summary"
echo "======================================"
echo ""
if [ $ERRORS -eq 0 ] && [ $WARNINGS -eq 0 ]; then
echo -e "${GREEN}✓ Ready for deployment!${NC}"
echo ""
echo "Next steps:"
echo " 1. Commit changes: git add . && git commit -m 'Ready for deployment'"
echo " 2. Push to GitHub: git push -u origin main"
echo " 3. Visit https://fastmcp.cloud"
echo " 4. Connect your repository"
echo " 5. Add environment variables"
echo " 6. Deploy!"
exit 0
elif [ $ERRORS -eq 0 ]; then
echo -e "${YELLOW}⚠ Ready with warnings (${WARNINGS} warnings)${NC}"
echo ""
echo "Review warnings above before deploying."
echo ""
echo "To deploy anyway:"
echo " 1. git add . && git commit -m 'Ready for deployment'"
echo " 2. git push -u origin main"
echo " 3. Visit https://fastmcp.cloud"
exit 0
else
echo -e "${RED}✗ Not ready for deployment (${ERRORS} errors, ${WARNINGS} warnings)${NC}"
echo ""
echo "Fix the errors above before deploying."
echo ""
echo "Common fixes:"
echo " - Export server at module level: mcp = FastMCP('name')"
echo " - Use only PyPI packages in requirements.txt"
echo " - Use os.getenv() for secrets, not hardcoded values"
echo " - Initialize git: git init"
echo " - Create .gitignore with .env"
exit 1
fi

187
scripts/test-server.sh Executable file
View File

@@ -0,0 +1,187 @@
#!/bin/bash
# FastMCP Server Tester
# Tests a FastMCP server using the FastMCP Client
set -e
# Colors
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
RED='\033[0;31m'
NC='\033[0m' # No Color
echo "======================================"
echo "FastMCP Server Tester"
echo "======================================"
echo ""
# Check arguments
if [ $# -eq 0 ]; then
echo "Usage: $0 <server.py> [--http] [--port 8000]"
echo ""
echo "Examples:"
echo " $0 server.py # Test stdio server"
echo " $0 server.py --http --port 8000 # Test HTTP server"
exit 1
fi
SERVER_PATH=$1
TRANSPORT="stdio"
PORT="8000"
# Parse arguments
shift
while [[ $# -gt 0 ]]; do
case $1 in
--http)
TRANSPORT="http"
shift
;;
--port)
PORT="$2"
shift 2
;;
*)
echo "Unknown option: $1"
exit 1
;;
esac
done
# Check if server file exists
if [ ! -f "$SERVER_PATH" ]; then
echo -e "${RED}${NC} Server file not found: $SERVER_PATH"
exit 1
fi
echo -e "${GREEN}${NC} Found server: $SERVER_PATH"
echo -e "${GREEN}${NC} Transport: $TRANSPORT"
if [ "$TRANSPORT" = "http" ]; then
echo -e "${GREEN}${NC} Port: $PORT"
fi
echo ""
# Create test script
TEST_SCRIPT=$(mktemp)
cat > "$TEST_SCRIPT" << 'EOF'
import asyncio
import sys
from fastmcp import Client
async def test_server(server_path, transport, port):
"""Test MCP server."""
print("Starting server test...\n")
try:
if transport == "http":
server_url = f"http://localhost:{port}/mcp"
print(f"Connecting to HTTP server at {server_url}...")
client_context = Client(server_url)
else:
print(f"Connecting to stdio server: {server_path}...")
client_context = Client(server_path)
async with client_context as client:
print("✓ Connected to server\n")
# Test: List tools
print("Testing: List tools")
tools = await client.list_tools()
print(f"✓ Found {len(tools)} tools")
for tool in tools:
print(f" - {tool.name}: {tool.description[:60]}...")
print()
# Test: List resources
print("Testing: List resources")
resources = await client.list_resources()
print(f"✓ Found {len(resources)} resources")
for resource in resources:
print(f" - {resource.uri}: {resource.description[:60] if resource.description else 'No description'}...")
print()
# Test: List prompts
print("Testing: List prompts")
prompts = await client.list_prompts()
print(f"✓ Found {len(prompts)} prompts")
for prompt in prompts:
print(f" - {prompt.name}: {prompt.description[:60] if prompt.description else 'No description'}...")
print()
# Test: Call first tool (if any)
if tools:
print(f"Testing: Call tool '{tools[0].name}'")
try:
# Try calling with empty args (may fail if required params)
result = await client.call_tool(tools[0].name, {})
print(f"✓ Tool executed successfully")
print(f" Result: {str(result.data)[:100]}...")
except Exception as e:
print(f"⚠ Tool call failed (may require parameters): {e}")
print()
# Test: Read first resource (if any)
if resources:
print(f"Testing: Read resource '{resources[0].uri}'")
try:
data = await client.read_resource(resources[0].uri)
print(f"✓ Resource read successfully")
print(f" Data: {str(data)[:100]}...")
except Exception as e:
print(f"✗ Failed to read resource: {e}")
print()
print("=" * 50)
print("✓ Server test completed successfully!")
print("=" * 50)
return 0
except Exception as e:
print(f"\n✗ Server test failed: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
server_path = sys.argv[1]
transport = sys.argv[2] if len(sys.argv) > 2 else "stdio"
port = sys.argv[3] if len(sys.argv) > 3 else "8000"
exit_code = asyncio.run(test_server(server_path, transport, port))
sys.exit(exit_code)
EOF
# Run test
echo "Running tests..."
echo ""
if [ "$TRANSPORT" = "http" ]; then
# For HTTP, start server in background
echo "Starting HTTP server in background..."
python3 "$SERVER_PATH" --transport http --port "$PORT" &
SERVER_PID=$!
# Wait for server to start
sleep 2
# Run test
python3 "$TEST_SCRIPT" "$SERVER_PATH" "$TRANSPORT" "$PORT"
TEST_EXIT=$?
# Kill server
kill $SERVER_PID 2>/dev/null || true
# Cleanup
rm "$TEST_SCRIPT"
exit $TEST_EXIT
else
# For stdio, run test directly
python3 "$TEST_SCRIPT" "$SERVER_PATH" "$TRANSPORT"
TEST_EXIT=$?
# Cleanup
rm "$TEST_SCRIPT"
exit $TEST_EXIT
fi

49
templates/.env.example Normal file
View File

@@ -0,0 +1,49 @@
# FastMCP Server Configuration
# Copy this file to .env and fill in your values
# Server Configuration
SERVER_NAME="My FastMCP Server"
ENVIRONMENT="development" # development, staging, production
# API Configuration (if integrating with external API)
API_BASE_URL="https://api.example.com"
API_KEY="your-api-key-here"
API_SECRET="your-api-secret-here"
API_TIMEOUT="30"
# Database Configuration (if using database)
DATABASE_URL="postgresql://user:password@localhost:5432/dbname"
# Cache Configuration
CACHE_TTL="300" # Cache time-to-live in seconds (5 minutes)
ENABLE_CACHE="true"
# Retry Configuration
MAX_RETRIES="3"
# OpenAPI Configuration (if using OpenAPI integration)
OPENAPI_SPEC_URL="https://api.example.com/openapi.json"
# Logging
LOG_LEVEL="INFO" # DEBUG, INFO, WARNING, ERROR
# Features (optional)
ENABLE_PROGRESS_TRACKING="true"
ENABLE_ELICITATION="true"
ENABLE_SAMPLING="true"
# Rate Limiting (optional)
RATE_LIMIT_REQUESTS="100" # Max requests per time window
RATE_LIMIT_WINDOW="60" # Time window in seconds
# Security (optional)
ALLOWED_ORIGINS="*"
ENABLE_CORS="true"
# FastMCP Cloud (for deployment)
# These will be set automatically by FastMCP Cloud
# FASTMCP_ENV="production"
# FASTMCP_REGION="us-west"
# Custom Configuration
# Add any custom environment variables your server needs

View File

@@ -0,0 +1,413 @@
"""
FastMCP API Client Pattern
===========================
Manual API integration with connection pooling, caching, and retry logic.
"""
from fastmcp import FastMCP
import httpx
import os
import time
import asyncio
from typing import Optional, Any, Dict
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
mcp = FastMCP("API Client Pattern")
# ============================================================================
# Configuration
# ============================================================================
class Config:
"""API configuration from environment."""
API_BASE_URL = os.getenv("API_BASE_URL", "https://api.example.com")
API_KEY = os.getenv("API_KEY", "")
API_TIMEOUT = int(os.getenv("API_TIMEOUT", "30"))
CACHE_TTL = int(os.getenv("CACHE_TTL", "300")) # 5 minutes
MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3"))
# ============================================================================
# API Client with Connection Pooling
# ============================================================================
class APIClient:
"""Singleton API client with connection pooling."""
_instance: Optional[httpx.AsyncClient] = None
@classmethod
async def get_client(cls) -> httpx.AsyncClient:
"""Get or create the shared HTTP client."""
if cls._instance is None:
cls._instance = httpx.AsyncClient(
base_url=Config.API_BASE_URL,
headers={
"Authorization": f"Bearer {Config.API_KEY}",
"Content-Type": "application/json",
"User-Agent": "FastMCP-Client/1.0"
},
timeout=httpx.Timeout(Config.API_TIMEOUT),
limits=httpx.Limits(
max_keepalive_connections=5,
max_connections=10
)
)
return cls._instance
@classmethod
async def cleanup(cls):
"""Cleanup the HTTP client."""
if cls._instance:
await cls._instance.aclose()
cls._instance = None
# ============================================================================
# Cache Implementation
# ============================================================================
class SimpleCache:
"""Time-based cache for API responses."""
def __init__(self, ttl: int = 300):
self.ttl = ttl
self.cache: Dict[str, Any] = {}
self.timestamps: Dict[str, float] = {}
def get(self, key: str) -> Optional[Any]:
"""Get cached value if not expired."""
if key in self.cache:
if time.time() - self.timestamps[key] < self.ttl:
return self.cache[key]
else:
# Expired, remove it
del self.cache[key]
del self.timestamps[key]
return None
def set(self, key: str, value: Any):
"""Set cache value with timestamp."""
self.cache[key] = value
self.timestamps[key] = time.time()
def invalidate(self, pattern: Optional[str] = None):
"""Invalidate cache entries."""
if pattern:
keys_to_delete = [k for k in self.cache if pattern in k]
for key in keys_to_delete:
del self.cache[key]
del self.timestamps[key]
else:
self.cache.clear()
self.timestamps.clear()
# Global cache instance
cache = SimpleCache(ttl=Config.CACHE_TTL)
# ============================================================================
# Retry Logic with Exponential Backoff
# ============================================================================
async def retry_with_backoff(
func,
max_retries: int = 3,
initial_delay: float = 1.0,
exponential_base: float = 2.0
):
"""Retry function with exponential backoff."""
delay = initial_delay
last_exception = None
for attempt in range(max_retries):
try:
return await func()
except (httpx.TimeoutException, httpx.NetworkError) as e:
last_exception = e
if attempt < max_retries - 1:
print(f"Attempt {attempt + 1} failed, retrying in {delay}s...")
await asyncio.sleep(delay)
delay *= exponential_base
except httpx.HTTPStatusError as e:
# Don't retry client errors (4xx)
if 400 <= e.response.status_code < 500:
raise
last_exception = e
if attempt < max_retries - 1:
await asyncio.sleep(delay)
delay *= exponential_base
raise last_exception
# ============================================================================
# API Tools
# ============================================================================
@mcp.tool()
async def api_get(
endpoint: str,
use_cache: bool = True
) -> dict:
"""
Make a GET request to the API.
Args:
endpoint: API endpoint path (e.g., "/users/123")
use_cache: Whether to use cached response if available
Returns:
API response data or error
"""
cache_key = f"GET:{endpoint}"
# Check cache
if use_cache:
cached = cache.get(cache_key)
if cached:
return {
"success": True,
"data": cached,
"from_cache": True,
"timestamp": datetime.now().isoformat()
}
# Make request with retry
async def make_request():
client = await APIClient.get_client()
response = await client.get(endpoint)
response.raise_for_status()
return response.json()
try:
data = await retry_with_backoff(make_request, max_retries=Config.MAX_RETRIES)
# Cache successful response
if use_cache:
cache.set(cache_key, data)
return {
"success": True,
"data": data,
"from_cache": False,
"timestamp": datetime.now().isoformat()
}
except httpx.HTTPStatusError as e:
return {
"success": False,
"error": f"HTTP {e.response.status_code}",
"message": e.response.text,
"endpoint": endpoint
}
except Exception as e:
return {
"success": False,
"error": str(e),
"endpoint": endpoint
}
@mcp.tool()
async def api_post(
endpoint: str,
data: dict,
invalidate_cache: bool = True
) -> dict:
"""
Make a POST request to the API.
Args:
endpoint: API endpoint path
data: Request body data
invalidate_cache: Whether to invalidate related cache entries
Returns:
API response or error
"""
async def make_request():
client = await APIClient.get_client()
response = await client.post(endpoint, json=data)
response.raise_for_status()
return response.json()
try:
result = await retry_with_backoff(make_request, max_retries=Config.MAX_RETRIES)
# Invalidate cache for related endpoints
if invalidate_cache:
cache.invalidate(endpoint.split('/')[1] if '/' in endpoint else endpoint)
return {
"success": True,
"data": result,
"timestamp": datetime.now().isoformat()
}
except httpx.HTTPStatusError as e:
return {
"success": False,
"error": f"HTTP {e.response.status_code}",
"message": e.response.text
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
@mcp.tool()
async def api_put(
endpoint: str,
data: dict,
invalidate_cache: bool = True
) -> dict:
"""Make a PUT request to the API."""
async def make_request():
client = await APIClient.get_client()
response = await client.put(endpoint, json=data)
response.raise_for_status()
return response.json()
try:
result = await retry_with_backoff(make_request)
if invalidate_cache:
cache.invalidate(endpoint)
return {"success": True, "data": result}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool()
async def api_delete(
endpoint: str,
invalidate_cache: bool = True
) -> dict:
"""Make a DELETE request to the API."""
async def make_request():
client = await APIClient.get_client()
response = await client.delete(endpoint)
response.raise_for_status()
return response.status_code
try:
status = await retry_with_backoff(make_request)
if invalidate_cache:
cache.invalidate(endpoint)
return {
"success": True,
"status_code": status,
"deleted": True
}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool()
async def batch_api_requests(
endpoints: list[str],
use_cache: bool = True
) -> dict:
"""
Make multiple GET requests in parallel.
Args:
endpoints: List of endpoint paths
use_cache: Whether to use cache
Returns:
Batch results with successes and failures
"""
async def fetch_one(endpoint: str):
return await api_get(endpoint, use_cache=use_cache)
results = await asyncio.gather(*[fetch_one(ep) for ep in endpoints])
successful = [r for r in results if r.get("success")]
failed = [r for r in results if not r.get("success")]
return {
"total": len(endpoints),
"successful": len(successful),
"failed": len(failed),
"results": results
}
@mcp.tool()
def clear_cache(pattern: Optional[str] = None) -> dict:
"""
Clear API response cache.
Args:
pattern: Optional pattern to match cache keys (clears all if not provided)
Returns:
Cache clear status
"""
try:
cache.invalidate(pattern)
return {
"success": True,
"message": f"Cache cleared{f' for pattern: {pattern}' if pattern else ''}"
}
except Exception as e:
return {"success": False, "error": str(e)}
# ============================================================================
# Resources
# ============================================================================
@mcp.resource("info://api-status")
async def api_status() -> dict:
"""Check API connectivity and status."""
try:
client = await APIClient.get_client()
response = await client.get("/health", timeout=5)
return {
"api_reachable": True,
"status_code": response.status_code,
"healthy": response.status_code == 200,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"api_reachable": False,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
@mcp.resource("info://cache-stats")
def cache_statistics() -> dict:
"""Get cache statistics."""
return {
"total_entries": len(cache.cache),
"ttl_seconds": cache.ttl,
"entries": list(cache.cache.keys())
}
# ============================================================================
# Main
# ============================================================================
if __name__ == "__main__":
try:
mcp.run()
finally:
# Cleanup on exit
import asyncio
asyncio.run(APIClient.cleanup())

116
templates/basic-server.py Normal file
View File

@@ -0,0 +1,116 @@
"""
Basic FastMCP Server Template
==============================
A minimal working FastMCP server with essential patterns.
"""
from fastmcp import FastMCP
import os
# Load environment variables (optional)
from dotenv import load_dotenv
load_dotenv()
# ============================================================================
# CRITICAL: Server must be at module level for FastMCP Cloud
# ============================================================================
mcp = FastMCP(
name="My Basic Server",
instructions="""
This is a basic MCP server demonstrating core patterns.
Available tools:
- greet: Say hello to someone
- calculate: Perform basic math operations
Available resources:
- info://status: Server status information
"""
)
# ============================================================================
# Tools
# ============================================================================
@mcp.tool()
def greet(name: str, greeting: str = "Hello") -> str:
"""
Greet someone by name.
Args:
name: The name of the person to greet
greeting: The greeting to use (default: "Hello")
Returns:
A greeting message
"""
return f"{greeting}, {name}!"
@mcp.tool()
async def calculate(operation: str, a: float, b: float) -> dict:
"""
Perform a mathematical operation.
Args:
operation: The operation to perform (add, subtract, multiply, divide)
a: First number
b: Second number
Returns:
Dictionary with the result or error message
"""
operations = {
"add": lambda x, y: x + y,
"subtract": lambda x, y: x - y,
"multiply": lambda x, y: x * y,
"divide": lambda x, y: x / y if y != 0 else None
}
if operation not in operations:
return {
"error": f"Unknown operation: {operation}",
"valid_operations": list(operations.keys())
}
result = operations[operation](a, b)
if result is None:
return {"error": "Division by zero"}
return {
"operation": operation,
"a": a,
"b": b,
"result": result
}
# ============================================================================
# Resources
# ============================================================================
@mcp.resource("info://status")
def server_status() -> dict:
"""Get current server status."""
from datetime import datetime
return {
"server": "My Basic Server",
"status": "operational",
"timestamp": datetime.now().isoformat(),
"version": "1.0.0"
}
# ============================================================================
# Main Execution
# ============================================================================
if __name__ == "__main__":
# Run with stdio transport (default)
mcp.run()
# Alternative: HTTP transport for testing
# mcp.run(transport="http", port=8000)

309
templates/client-example.py Normal file
View File

@@ -0,0 +1,309 @@
"""
FastMCP Client Example
======================
Testing MCP servers with the FastMCP Client.
"""
import asyncio
from fastmcp import Client
from typing import Optional
async def test_basic_server():
"""Test a basic MCP server."""
print("\n=== Testing Basic Server ===\n")
async with Client("basic-server.py") as client:
# List available tools
tools = await client.list_tools()
print(f"Available tools: {len(tools)}")
for tool in tools:
print(f" - {tool.name}: {tool.description}")
# Call a tool
print("\nCalling 'greet' tool...")
result = await client.call_tool("greet", {"name": "World"})
print(f"Result: {result.data}")
# Call another tool
print("\nCalling 'calculate' tool...")
result = await client.call_tool("calculate", {
"operation": "add",
"a": 10,
"b": 5
})
print(f"Result: {result.data}")
# List resources
print("\nAvailable resources:")
resources = await client.list_resources()
for resource in resources:
print(f" - {resource.uri}: {resource.description}")
# Read a resource
print("\nReading 'info://status' resource...")
status = await client.read_resource("info://status")
print(f"Status: {status}")
async def test_tools_examples():
"""Test the tools examples server."""
print("\n=== Testing Tools Examples ===\n")
async with Client("tools-examples.py") as client:
# Test sync tool
print("Testing sync tool...")
result = await client.call_tool("simple_sync_tool", {"text": "hello"})
print(f"Sync result: {result.data}")
# Test async tool
print("\nTesting async tool...")
result = await client.call_tool("simple_async_tool", {"text": "WORLD"})
print(f"Async result: {result.data}")
# Test validated search
print("\nTesting validated search...")
result = await client.call_tool("validated_search", {
"params": {
"query": "python",
"limit": 5
}
})
print(f"Search result: {result.data}")
# Test batch processing
print("\nTesting batch process...")
result = await client.call_tool("batch_process", {
"items": ["item1", "item2", "item3"]
})
print(f"Batch result: {result.data}")
async def test_resources_examples():
"""Test the resources examples server."""
print("\n=== Testing Resources Examples ===\n")
async with Client("resources-examples.py") as client:
# List all resources
resources = await client.list_resources()
print(f"Total resources: {len(resources)}")
# Test static resource
print("\nReading static resource...")
config = await client.read_resource("data://config")
print(f"Config: {config}")
# Test dynamic resource
print("\nReading dynamic resource...")
status = await client.read_resource("info://status")
print(f"Status: {status}")
# Test resource template
print("\nReading resource template...")
profile = await client.read_resource("user://123/profile")
print(f"User profile: {profile}")
async def test_with_error_handling():
"""Test server with comprehensive error handling."""
print("\n=== Testing with Error Handling ===\n")
async with Client("error-handling.py") as client:
# Test successful operation
print("Testing successful operation...")
try:
result = await client.call_tool("divide_numbers", {
"a": 10,
"b": 2
})
print(f"Success: {result.data}")
except Exception as e:
print(f"Error: {e}")
# Test error case
print("\nTesting error case (division by zero)...")
try:
result = await client.call_tool("divide_numbers", {
"a": 10,
"b": 0
})
print(f"Result: {result.data}")
except Exception as e:
print(f"Error: {e}")
# Test validation error
print("\nTesting validation error...")
try:
result = await client.call_tool("validated_operation", {
"data": ""
})
print(f"Result: {result.data}")
except Exception as e:
print(f"Error: {e}")
async def test_http_server():
"""Test server running on HTTP transport."""
print("\n=== Testing HTTP Server ===\n")
# Note: Server must be running on http://localhost:8000
# Start with: python server.py --transport http --port 8000
try:
async with Client("http://localhost:8000/mcp") as client:
print("Connected to HTTP server")
tools = await client.list_tools()
print(f"Available tools: {len(tools)}")
if tools:
result = await client.call_tool(tools[0].name, {})
print(f"Tool result: {result.data}")
except Exception as e:
print(f"Could not connect to HTTP server: {e}")
print("Make sure server is running with: python server.py --transport http --port 8000")
async def test_with_handlers():
"""Test server with client handlers (elicitation, progress, sampling)."""
print("\n=== Testing with Handlers ===\n")
# Define handlers
async def elicitation_handler(message: str, response_type: type, context: dict):
"""Handle elicitation requests."""
print(f"\n[ELICIT] {message}")
return input("Your response: ")
async def progress_handler(progress: float, total: Optional[float], message: Optional[str]):
"""Handle progress updates."""
if total:
pct = (progress / total) * 100
print(f"\r[PROGRESS] {pct:.1f}% - {message}", end="", flush=True)
else:
print(f"\n[PROGRESS] {message}")
async def sampling_handler(messages, params, context):
"""Handle sampling requests (LLM completions)."""
print(f"\n[SAMPLE] LLM request with {len(messages)} messages")
# In production, call actual LLM
return {
"content": "Mock LLM response",
"model": params.get("model", "mock"),
"usage": {"tokens": 100}
}
# Create client with handlers
async with Client(
"server.py",
elicitation_handler=elicitation_handler,
progress_handler=progress_handler,
sampling_handler=sampling_handler
) as client:
print("Client created with handlers")
# Test tools that use handlers
# Note: Requires server to have tools using context.request_elicitation, etc.
tools = await client.list_tools()
print(f"Available tools: {len(tools)}")
async def comprehensive_test():
"""Run comprehensive test suite."""
print("=" * 60)
print("FastMCP Client Test Suite")
print("=" * 60)
tests = [
("Basic Server", test_basic_server),
("Tools Examples", test_tools_examples),
("Resources Examples", test_resources_examples),
("Error Handling", test_with_error_handling),
# ("HTTP Server", test_http_server), # Uncomment if HTTP server is running
# ("With Handlers", test_with_handlers), # Uncomment if server supports handlers
]
for test_name, test_func in tests:
try:
await test_func()
except Exception as e:
print(f"\n{test_name} failed: {e}")
else:
print(f"\n{test_name} passed")
print("\n" + "-" * 60)
print("\nTest suite completed!")
async def interactive_client():
"""Interactive client for manual testing."""
print("\n=== Interactive FastMCP Client ===\n")
server_path = input("Enter server path or URL: ").strip()
if not server_path:
server_path = "basic-server.py"
print(f"Using default: {server_path}")
async with Client(server_path) as client:
print(f"\n✅ Connected to: {server_path}\n")
while True:
print("\nOptions:")
print("1. List tools")
print("2. List resources")
print("3. Call tool")
print("4. Read resource")
print("5. Exit")
choice = input("\nChoice: ").strip()
if choice == "1":
tools = await client.list_tools()
print(f"\n📋 Available tools ({len(tools)}):")
for i, tool in enumerate(tools, 1):
print(f" {i}. {tool.name}")
print(f" {tool.description}")
elif choice == "2":
resources = await client.list_resources()
print(f"\n📋 Available resources ({len(resources)}):")
for i, resource in enumerate(resources, 1):
print(f" {i}. {resource.uri}")
print(f" {resource.description}")
elif choice == "3":
tool_name = input("Tool name: ").strip()
print("Arguments (as JSON): ")
import json
try:
args = json.loads(input().strip())
result = await client.call_tool(tool_name, args)
print(f"\n✅ Result: {result.data}")
except Exception as e:
print(f"\n❌ Error: {e}")
elif choice == "4":
uri = input("Resource URI: ").strip()
try:
data = await client.read_resource(uri)
print(f"\n✅ Data: {data}")
except Exception as e:
print(f"\n❌ Error: {e}")
elif choice == "5":
print("\nGoodbye!")
break
else:
print("Invalid choice")
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "--interactive":
asyncio.run(interactive_client())
else:
asyncio.run(comprehensive_test())

422
templates/error-handling.py Normal file
View File

@@ -0,0 +1,422 @@
"""
FastMCP Error Handling Template
================================
Comprehensive error handling patterns with structured responses and retry logic.
"""
from fastmcp import FastMCP
import asyncio
import httpx
from enum import Enum
from typing import Dict, Any, Optional
from datetime import datetime
mcp = FastMCP("Error Handling Examples")
# ============================================================================
# Error Code Enum
# ============================================================================
class ErrorCode(Enum):
"""Standard error codes."""
VALIDATION_ERROR = "VALIDATION_ERROR"
NOT_FOUND = "NOT_FOUND"
UNAUTHORIZED = "UNAUTHORIZED"
RATE_LIMITED = "RATE_LIMITED"
API_ERROR = "API_ERROR"
TIMEOUT = "TIMEOUT"
NETWORK_ERROR = "NETWORK_ERROR"
UNKNOWN_ERROR = "UNKNOWN_ERROR"
# ============================================================================
# Response Formatters
# ============================================================================
def create_success(data: Any, message: str = "Success") -> Dict[str, Any]:
"""Create structured success response."""
return {
"success": True,
"message": message,
"data": data,
"timestamp": datetime.now().isoformat()
}
def create_error(
code: ErrorCode,
message: str,
details: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Create structured error response."""
return {
"success": False,
"error": {
"code": code.value,
"message": message,
"details": details or {},
"timestamp": datetime.now().isoformat()
}
}
# ============================================================================
# Retry Logic
# ============================================================================
async def retry_with_backoff(
func,
max_retries: int = 3,
initial_delay: float = 1.0,
exponential_base: float = 2.0,
catch_exceptions: tuple = (Exception,)
):
"""
Retry function with exponential backoff.
Args:
func: Async function to retry
max_retries: Maximum number of retry attempts
initial_delay: Initial delay in seconds
exponential_base: Base for exponential backoff
catch_exceptions: Tuple of exceptions to catch and retry
Returns:
Function result if successful
Raises:
Last exception if all retries fail
"""
delay = initial_delay
last_exception = None
for attempt in range(max_retries):
try:
return await func()
except catch_exceptions as e:
last_exception = e
if attempt < max_retries - 1:
await asyncio.sleep(delay)
delay *= exponential_base
raise last_exception
# ============================================================================
# Tools with Error Handling
# ============================================================================
@mcp.tool()
def divide_numbers(a: float, b: float) -> dict:
"""
Divide two numbers with error handling.
Args:
a: Numerator
b: Denominator
Returns:
Result or error
"""
try:
if b == 0:
return create_error(
ErrorCode.VALIDATION_ERROR,
"Division by zero is not allowed",
{"a": a, "b": b}
)
result = a / b
return create_success(
{"result": result, "a": a, "b": b},
"Division successful"
)
except Exception as e:
return create_error(
ErrorCode.UNKNOWN_ERROR,
f"Unexpected error: {str(e)}"
)
@mcp.tool()
async def validated_operation(data: str, min_length: int = 1) -> dict:
"""
Operation with input validation.
Args:
data: Input data to validate
min_length: Minimum required length
Returns:
Processed result or validation error
"""
# Validate input
if not data:
return create_error(
ErrorCode.VALIDATION_ERROR,
"Data is required",
{"field": "data", "constraint": "not_empty"}
)
if len(data) < min_length:
return create_error(
ErrorCode.VALIDATION_ERROR,
f"Data must be at least {min_length} characters",
{"field": "data", "min_length": min_length, "actual_length": len(data)}
)
# Process data
try:
processed = data.upper()
return create_success(
{"original": data, "processed": processed},
"Data processed successfully"
)
except Exception as e:
return create_error(
ErrorCode.UNKNOWN_ERROR,
str(e)
)
@mcp.tool()
async def resilient_api_call(url: str) -> dict:
"""
API call with retry logic and comprehensive error handling.
Args:
url: URL to fetch
Returns:
API response or detailed error
"""
async def make_request():
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(url)
response.raise_for_status()
return response.json()
try:
data = await retry_with_backoff(
make_request,
max_retries=3,
catch_exceptions=(httpx.TimeoutException, httpx.NetworkError)
)
return create_success(
data,
"API call successful"
)
except httpx.TimeoutException:
return create_error(
ErrorCode.TIMEOUT,
"Request timed out",
{"url": url, "timeout_seconds": 10}
)
except httpx.NetworkError as e:
return create_error(
ErrorCode.NETWORK_ERROR,
"Network error occurred",
{"url": url, "error": str(e)}
)
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return create_error(
ErrorCode.NOT_FOUND,
"Resource not found",
{"url": url, "status_code": 404}
)
elif e.response.status_code == 401:
return create_error(
ErrorCode.UNAUTHORIZED,
"Unauthorized access",
{"url": url, "status_code": 401}
)
elif e.response.status_code == 429:
return create_error(
ErrorCode.RATE_LIMITED,
"Rate limit exceeded",
{"url": url, "status_code": 429}
)
else:
return create_error(
ErrorCode.API_ERROR,
f"HTTP {e.response.status_code}",
{"url": url, "status_code": e.response.status_code}
)
except Exception as e:
return create_error(
ErrorCode.UNKNOWN_ERROR,
f"Unexpected error: {str(e)}",
{"url": url}
)
@mcp.tool()
async def batch_with_error_recovery(items: list[str]) -> dict:
"""
Batch process items with individual error recovery.
Args:
items: List of items to process
Returns:
Results with successes and failures tracked separately
"""
results = []
errors = []
for i, item in enumerate(items):
try:
# Simulate processing
if not item:
raise ValueError("Empty item")
await asyncio.sleep(0.1)
results.append({
"index": i,
"item": item,
"processed": item.upper(),
"success": True
})
except ValueError as e:
errors.append({
"index": i,
"item": item,
"error": str(e),
"code": ErrorCode.VALIDATION_ERROR.value
})
except Exception as e:
errors.append({
"index": i,
"item": item,
"error": str(e),
"code": ErrorCode.UNKNOWN_ERROR.value
})
return {
"success": len(errors) == 0,
"total": len(items),
"successful": len(results),
"failed": len(errors),
"results": results,
"errors": errors,
"timestamp": datetime.now().isoformat()
}
@mcp.tool()
async def safe_database_operation(query: str) -> dict:
"""
Simulated database operation with error handling.
Args:
query: SQL query (simulated)
Returns:
Query result or error
"""
try:
# Validate query
if "DROP" in query.upper():
return create_error(
ErrorCode.UNAUTHORIZED,
"DROP operations not allowed",
{"query": query}
)
if not query.strip():
return create_error(
ErrorCode.VALIDATION_ERROR,
"Query cannot be empty"
)
# Simulate query execution
await asyncio.sleep(0.1)
# Simulate success
mock_data = [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"}
]
return create_success(
{"rows": mock_data, "count": len(mock_data)},
"Query executed successfully"
)
except Exception as e:
return create_error(
ErrorCode.API_ERROR,
f"Database error: {str(e)}",
{"query": query}
)
# ============================================================================
# Resources with Error Handling
# ============================================================================
@mcp.resource("health://detailed")
async def detailed_health_check() -> dict:
"""Comprehensive health check with error tracking."""
checks = {}
errors = []
# Check API connectivity
try:
async with httpx.AsyncClient(timeout=5) as client:
response = await client.get("https://api.example.com/health")
checks["api"] = {
"status": "healthy",
"status_code": response.status_code
}
except Exception as e:
checks["api"] = {
"status": "unhealthy",
"error": str(e)
}
errors.append(f"API check failed: {e}")
# Check system resources
try:
import psutil
checks["system"] = {
"status": "healthy",
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent
}
except Exception as e:
checks["system"] = {
"status": "error",
"error": str(e)
}
# Overall status
all_healthy = all(
check.get("status") == "healthy"
for check in checks.values()
)
return {
"status": "healthy" if all_healthy else "degraded",
"checks": checks,
"errors": errors if errors else None,
"timestamp": datetime.now().isoformat()
}
# ============================================================================
# Main
# ============================================================================
if __name__ == "__main__":
mcp.run()

View File

@@ -0,0 +1,259 @@
"""
FastMCP OpenAPI Integration Template
=====================================
Auto-generate MCP server from OpenAPI/Swagger specification.
"""
from fastmcp import FastMCP
from fastmcp.server.openapi import RouteMap, MCPType
import httpx
import os
from dotenv import load_dotenv
load_dotenv()
# ============================================================================
# Configuration
# ============================================================================
API_BASE_URL = os.getenv("API_BASE_URL", "https://api.example.com")
API_KEY = os.getenv("API_KEY", "")
OPENAPI_SPEC_URL = os.getenv("OPENAPI_SPEC_URL", f"{API_BASE_URL}/openapi.json")
# ============================================================================
# Load OpenAPI Specification
# ============================================================================
def load_openapi_spec():
"""Load OpenAPI specification from URL or file."""
try:
# Try loading from URL
response = httpx.get(OPENAPI_SPEC_URL, timeout=10)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Failed to load OpenAPI spec from URL: {e}")
# Fallback: try loading from local file
try:
import json
with open("openapi.json") as f:
return json.load(f)
except FileNotFoundError:
print("Error: OpenAPI spec not found. Please provide OPENAPI_SPEC_URL or openapi.json file")
return None
spec = load_openapi_spec()
# ============================================================================
# Create Authenticated HTTP Client
# ============================================================================
client = httpx.AsyncClient(
base_url=API_BASE_URL,
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
},
timeout=httpx.Timeout(30.0),
limits=httpx.Limits(
max_keepalive_connections=5,
max_connections=10
)
)
# ============================================================================
# Define Route Mapping Strategy
# ============================================================================
route_maps = [
# GET endpoints with path parameters → Resource Templates
# Example: /users/{user_id} → resource template
RouteMap(
methods=["GET"],
pattern=r".*\{.*\}.*", # Has path parameters
mcp_type=MCPType.RESOURCE_TEMPLATE
),
# GET endpoints without parameters → Static Resources
# Example: /users → static resource
RouteMap(
methods=["GET"],
pattern=r"^(?!.*\{.*\}).*$", # No path parameters
mcp_type=MCPType.RESOURCE
),
# POST/PUT/PATCH → Tools (create/update operations)
RouteMap(
methods=["POST", "PUT", "PATCH"],
mcp_type=MCPType.TOOL
),
# DELETE → Tools (delete operations)
RouteMap(
methods=["DELETE"],
mcp_type=MCPType.TOOL
),
# Exclude internal endpoints
RouteMap(
pattern=r"/internal/.*",
mcp_type=MCPType.EXCLUDE
),
# Exclude health checks
RouteMap(
pattern=r"/(health|healthz|readiness|liveness)",
mcp_type=MCPType.EXCLUDE
)
]
# ============================================================================
# Generate MCP Server from OpenAPI
# ============================================================================
if spec:
mcp = FastMCP.from_openapi(
openapi_spec=spec,
client=client,
name="API Integration Server",
route_maps=route_maps
)
print(f"✅ Generated MCP server from OpenAPI spec")
print(f" Base URL: {API_BASE_URL}")
else:
# Fallback: create empty server if spec not available
mcp = FastMCP("API Integration Server")
print("⚠️ Running without OpenAPI spec - please configure OPENAPI_SPEC_URL")
# ============================================================================
# Add Custom Tools (on top of auto-generated ones)
# ============================================================================
@mcp.tool()
async def process_api_response(data: dict, operation: str = "format") -> dict:
"""
Process API response data.
Custom tool to transform or analyze data from API endpoints.
"""
if operation == "format":
# Format the data nicely
return {
"formatted": True,
"data": data,
"count": len(data) if isinstance(data, (list, dict)) else 1
}
elif operation == "summarize":
# Summarize the data
if isinstance(data, list):
return {
"type": "list",
"count": len(data),
"sample": data[:3] if len(data) > 3 else data
}
elif isinstance(data, dict):
return {
"type": "dict",
"keys": list(data.keys()),
"size": len(data)
}
else:
return {
"type": type(data).__name__,
"value": str(data)
}
return {"error": f"Unknown operation: {operation}"}
@mcp.tool()
async def batch_api_request(endpoints: list[str]) -> dict:
"""
Make multiple API requests in parallel.
Useful for gathering data from multiple endpoints efficiently.
"""
import asyncio
async def fetch_endpoint(endpoint: str):
try:
response = await client.get(endpoint)
response.raise_for_status()
return {
"endpoint": endpoint,
"success": True,
"data": response.json()
}
except Exception as e:
return {
"endpoint": endpoint,
"success": False,
"error": str(e)
}
# Execute all requests in parallel
results = await asyncio.gather(*[fetch_endpoint(ep) for ep in endpoints])
successful = [r for r in results if r["success"]]
failed = [r for r in results if not r["success"]]
return {
"total": len(endpoints),
"successful": len(successful),
"failed": len(failed),
"results": results
}
# ============================================================================
# Add Custom Resources
# ============================================================================
@mcp.resource("info://api-config")
def api_configuration() -> dict:
"""Get API configuration details."""
return {
"base_url": API_BASE_URL,
"spec_url": OPENAPI_SPEC_URL,
"authenticated": bool(API_KEY),
"spec_loaded": spec is not None
}
@mcp.resource("info://available-endpoints")
def list_available_endpoints() -> dict:
"""List all available API endpoints."""
if not spec:
return {"error": "OpenAPI spec not loaded"}
endpoints = []
for path, path_item in spec.get("paths", {}).items():
for method in path_item.keys():
if method.upper() in ["GET", "POST", "PUT", "PATCH", "DELETE"]:
operation = path_item[method]
endpoints.append({
"path": path,
"method": method.upper(),
"summary": operation.get("summary", ""),
"description": operation.get("description", "")
})
return {
"total": len(endpoints),
"endpoints": endpoints
}
# ============================================================================
# Main Execution
# ============================================================================
if __name__ == "__main__":
mcp.run()

View File

@@ -0,0 +1,348 @@
"""
FastMCP Prompts Examples
=========================
Examples of pre-configured prompts for LLMs.
"""
from fastmcp import FastMCP
from datetime import datetime
mcp = FastMCP("Prompts Examples")
# ============================================================================
# Basic Prompts
# ============================================================================
@mcp.prompt("help")
def help_prompt() -> str:
"""Generate help text for the server."""
return """
Welcome to the FastMCP Prompts Examples Server!
This server demonstrates various prompt patterns for LLM interactions.
Available Tools:
- search: Search for items in the database
- analyze: Analyze data and generate insights
- summarize: Create summaries of text content
Available Resources:
- info://status: Current server status
- data://config: Server configuration
- data://users: List of all users
How to Use:
1. Use the search tool to find items
2. Use the analyze tool to generate insights from data
3. Use the summarize tool to create concise summaries
For specific tasks, use the pre-configured prompts:
- /analyze: Analyze a topic in depth
- /report: Generate a comprehensive report
- /review: Review and provide feedback
"""
@mcp.prompt("analyze")
def analyze_prompt(topic: str) -> str:
"""Generate a prompt for analyzing a topic."""
return f"""
Please analyze the following topic: {topic}
Consider the following aspects:
1. Current State: What is the current situation?
2. Challenges: What are the main challenges or issues?
3. Opportunities: What opportunities exist for improvement?
4. Data Points: What data supports your analysis?
5. Recommendations: What specific actions do you recommend?
Use the available tools to:
- Search for relevant data using the search tool
- Gather statistics and metrics
- Review related information
Provide a structured analysis with:
- Executive Summary
- Detailed Findings
- Data-Driven Insights
- Actionable Recommendations
"""
# ============================================================================
# Prompts with Parameters
# ============================================================================
@mcp.prompt("report")
def report_prompt(
subject: str,
timeframe: str = "last month",
detail_level: str = "summary"
) -> str:
"""Generate a report prompt with parameters."""
return f"""
Generate a comprehensive report on: {subject}
Timeframe: {timeframe}
Detail Level: {detail_level}
Report Structure:
1. Executive Summary
- Key findings
- Critical metrics
- Main recommendations
2. Data Analysis
- Quantitative metrics
- Trend analysis
- Comparative analysis
3. Insights
- Patterns discovered
- Anomalies identified
- Correlations found
4. Recommendations
- Short-term actions
- Long-term strategies
- Resource requirements
Please use the available tools to gather:
- Statistical data
- User information
- System metrics
- Historical trends
Format: {detail_level.upper()}
- "summary": High-level overview with key points
- "detailed": In-depth analysis with supporting data
- "comprehensive": Full analysis with all available data points
"""
@mcp.prompt("review")
def review_prompt(
item_type: str,
item_id: str,
focus_areas: str = "all"
) -> str:
"""Generate a review prompt."""
return f"""
Review the {item_type} (ID: {item_id})
Focus Areas: {focus_areas}
Review Criteria:
1. Quality Assessment
- Overall quality rating
- Strengths identified
- Areas for improvement
2. Completeness
- Required elements present
- Missing components
- Suggestions for additions
3. Consistency
- Internal consistency
- Alignment with standards
- Conformance to guidelines
4. Performance
- Efficiency metrics
- Resource utilization
- Optimization opportunities
5. Recommendations
- Priority improvements
- Nice-to-have enhancements
- Long-term considerations
Please gather relevant data using available tools and provide a structured review.
"""
# ============================================================================
# Task-Specific Prompts
# ============================================================================
@mcp.prompt("summarize")
def summarize_prompt(content_type: str = "text") -> str:
"""Generate a summarization prompt."""
return f"""
Create a comprehensive summary of the {content_type}.
Summary Guidelines:
1. Key Points
- Extract the most important information
- Identify main themes or topics
- Highlight critical details
2. Structure
- Opening: Context and overview
- Body: Main points organized logically
- Closing: Conclusions and implications
3. Audience Consideration
- Write for clarity and understanding
- Define technical terms if needed
- Provide context where necessary
4. Length
- Brief: 2-3 sentences
- Standard: 1 paragraph
- Detailed: 2-3 paragraphs
Output Format:
- Start with a one-sentence overview
- Follow with detailed points
- End with key takeaways
Use available tools to gather additional context if needed.
"""
@mcp.prompt("compare")
def compare_prompt(item1: str, item2: str, criteria: str = "general") -> str:
"""Generate a comparison prompt."""
return f"""
Compare and contrast: {item1} vs {item2}
Comparison Criteria: {criteria}
Analysis Framework:
1. Similarities
- Common features
- Shared characteristics
- Aligned goals or purposes
2. Differences
- Unique features
- Distinct characteristics
- Divergent approaches
3. Strengths and Weaknesses
- {item1} strengths
- {item1} weaknesses
- {item2} strengths
- {item2} weaknesses
4. Use Cases
- When to choose {item1}
- When to choose {item2}
- Situational recommendations
5. Conclusion
- Overall assessment
- Best fit scenarios
- Decision factors
Please gather data using available tools and provide a balanced comparison.
"""
# ============================================================================
# Workflow Prompts
# ============================================================================
@mcp.prompt("troubleshoot")
def troubleshoot_prompt(problem_description: str) -> str:
"""Generate a troubleshooting prompt."""
return f"""
Troubleshoot the following issue:
Problem: {problem_description}
Troubleshooting Process:
1. Problem Definition
- Describe the issue clearly
- Identify symptoms
- Note when it started
2. Information Gathering
- Use available tools to gather:
* System status
* Error logs
* Configuration details
* Recent changes
3. Analysis
- Identify potential causes
- Determine root cause
- Assess impact
4. Solution Development
- Propose solutions (short-term and long-term)
- Evaluate each solution
- Recommend best approach
5. Implementation Plan
- Step-by-step resolution
- Required resources
- Expected timeline
- Verification steps
6. Prevention
- Preventive measures
- Monitoring recommendations
- Documentation needs
Please be systematic and thorough in your analysis.
"""
@mcp.prompt("plan")
def plan_prompt(objective: str, constraints: str = "none") -> str:
"""Generate a planning prompt."""
return f"""
Create a detailed plan for: {objective}
Constraints: {constraints}
Planning Framework:
1. Objective Analysis
- Clear definition of success
- Key success criteria
- Expected outcomes
2. Current State Assessment
- Available resources
- Existing capabilities
- Known limitations
3. Strategy Development
- Approach options
- Recommended strategy
- Rationale
4. Action Plan
- Phase 1: Foundation
- Phase 2: Implementation
- Phase 3: Optimization
5. Resource Requirements
- Personnel
- Technology
- Budget
- Time
6. Risk Management
- Identified risks
- Mitigation strategies
- Contingency plans
7. Success Metrics
- KPIs to track
- Measurement methods
- Review milestones
Use available tools to gather supporting data and insights.
"""
# ============================================================================
# Main
# ============================================================================
if __name__ == "__main__":
mcp.run()

92
templates/pyproject.toml Normal file
View File

@@ -0,0 +1,92 @@
[project]
name = "my-fastmcp-server"
version = "1.0.0"
description = "My FastMCP Server"
readme = "README.md"
requires-python = ">=3.10"
license = {text = "MIT"}
authors = [
{name = "Your Name", email = "your.email@example.com"}
]
dependencies = [
"fastmcp>=2.12.0",
"httpx>=0.27.0",
"python-dotenv>=1.0.0",
"pydantic>=2.0.0",
"psutil>=5.9.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.23.0",
"ruff>=0.3.0",
"mypy>=1.8.0",
]
[project.urls]
Homepage = "https://github.com/yourusername/my-fastmcp-server"
Repository = "https://github.com/yourusername/my-fastmcp-server"
Issues = "https://github.com/yourusername/my-fastmcp-server/issues"
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[tool.setuptools]
packages = ["src"]
[tool.setuptools.package-data]
src = ["py.typed"]
# Ruff Configuration
[tool.ruff]
line-length = 100
target-version = "py310"
[tool.ruff.lint]
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
"I", # isort
"B", # flake8-bugbear
"C4", # flake8-comprehensions
]
ignore = [
"E501", # line too long (handled by formatter)
"B008", # do not perform function calls in argument defaults
]
# Mypy Configuration
[tool.mypy]
python_version = "3.10"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = false
disallow_any_generics = false
check_untyped_defs = true
# Pytest Configuration
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
asyncio_mode = "auto"
addopts = "-v --tb=short"
# Coverage Configuration
[tool.coverage.run]
source = ["src"]
omit = ["*/tests/*", "*/test_*"]
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"def __repr__",
"raise AssertionError",
"raise NotImplementedError",
"if __name__ == .__main__.:",
"if TYPE_CHECKING:",
]

View File

@@ -0,0 +1,37 @@
# FastMCP Core
fastmcp>=2.13.0
# Storage Backends (for production persistence)
py-key-value-aio>=0.1.0 # Memory, Disk, Redis, DynamoDB storage
# Encryption (for secure token storage)
cryptography>=42.0.0
# HTTP Client (for API integrations)
httpx>=0.27.0
# Environment Variables
python-dotenv>=1.0.0
# Validation (optional, for complex validation)
pydantic>=2.0.0
# System Monitoring (optional, for health checks)
psutil>=5.9.0
# Async Support
asyncio-extras>=1.3.2
# Optional Storage Backend Dependencies
# Uncomment as needed for specific backends:
# redis>=5.0.0 # For RedisStore
# boto3>=1.34.0 # For DynamoDB
# pymongo>=4.6.0 # For MongoDB
# elasticsearch>=8.12.0 # For Elasticsearch
# Development Dependencies (optional)
# Uncomment for development
# pytest>=8.0.0
# pytest-asyncio>=0.23.0
# ruff>=0.3.0
# mypy>=1.8.0

View File

@@ -0,0 +1,216 @@
"""
FastMCP Resources Examples
===========================
Examples of static resources, dynamic resources, and resource templates.
"""
from fastmcp import FastMCP
from datetime import datetime
from typing import Dict, List, Any
import os
mcp = FastMCP("Resources Examples")
# ============================================================================
# Static Resources
# ============================================================================
@mcp.resource("data://config")
def get_config() -> dict:
"""Static configuration resource."""
return {
"version": "1.0.0",
"environment": os.getenv("ENVIRONMENT", "development"),
"features": ["search", "analytics", "notifications"],
"limits": {
"max_requests": 1000,
"max_results": 100
}
}
@mcp.resource("info://server")
def server_info() -> dict:
"""Server metadata."""
return {
"name": "Resources Examples Server",
"description": "Demonstrates various resource patterns",
"version": "1.0.0",
"capabilities": [
"static_resources",
"dynamic_resources",
"resource_templates"
]
}
# ============================================================================
# Dynamic Resources
# ============================================================================
@mcp.resource("info://status")
async def server_status() -> dict:
"""Dynamic status resource (updated on each read)."""
import psutil
return {
"status": "operational",
"timestamp": datetime.now().isoformat(),
"uptime_seconds": 0, # Would track actual uptime
"system": {
"cpu_percent": psutil.cpu_percent(interval=1),
"memory_percent": psutil.virtual_memory().percent,
"disk_percent": psutil.disk_usage('/').percent
}
}
@mcp.resource("data://statistics")
async def get_statistics() -> dict:
"""Dynamic statistics resource."""
return {
"timestamp": datetime.now().isoformat(),
"total_requests": 1234, # Would track actual requests
"active_connections": 5,
"cache_hit_rate": 0.87,
"average_response_time_ms": 45.3
}
# ============================================================================
# Resource Templates (with parameters)
# ============================================================================
@mcp.resource("user://{user_id}/profile")
async def get_user_profile(user_id: str) -> dict:
"""Get user profile by ID."""
# In production, fetch from database
return {
"id": user_id,
"name": f"User {user_id}",
"email": f"user{user_id}@example.com",
"created_at": "2024-01-01T00:00:00Z",
"role": "user"
}
@mcp.resource("user://{user_id}/posts")
async def get_user_posts(user_id: str) -> List[dict]:
"""Get posts for a specific user."""
# In production, fetch from database
return [
{
"id": 1,
"user_id": user_id,
"title": "First Post",
"content": "Hello, world!",
"created_at": "2024-01-01T00:00:00Z"
},
{
"id": 2,
"user_id": user_id,
"title": "Second Post",
"content": "Another post",
"created_at": "2024-01-02T00:00:00Z"
}
]
@mcp.resource("org://{org_id}/team/{team_id}/members")
async def get_team_members(org_id: str, team_id: str) -> List[dict]:
"""Get team members with org and team context."""
# In production, fetch from database with filters
return [
{
"id": 1,
"name": "Alice",
"role": "engineer",
"org_id": org_id,
"team_id": team_id
},
{
"id": 2,
"name": "Bob",
"role": "designer",
"org_id": org_id,
"team_id": team_id
}
]
@mcp.resource("api://{version}/config")
async def get_versioned_config(version: str) -> dict:
"""Get configuration for specific API version."""
configs = {
"v1": {
"api_version": "v1",
"endpoints": ["/users", "/posts"],
"deprecated": True
},
"v2": {
"api_version": "v2",
"endpoints": ["/users", "/posts", "/comments", "/likes"],
"deprecated": False
}
}
return configs.get(version, {"error": f"Unknown version: {version}"})
# ============================================================================
# File-based Resources
# ============================================================================
@mcp.resource("file://docs/{filename}")
async def get_documentation(filename: str) -> dict:
"""Get documentation file content."""
# In production, read actual files
docs = {
"getting-started.md": {
"filename": "getting-started.md",
"content": "# Getting Started\n\nWelcome to the docs!",
"last_modified": "2024-01-01T00:00:00Z"
},
"api-reference.md": {
"filename": "api-reference.md",
"content": "# API Reference\n\nAPI documentation here.",
"last_modified": "2024-01-02T00:00:00Z"
}
}
return docs.get(filename, {"error": f"File not found: {filename}"})
# ============================================================================
# List-style Resources
# ============================================================================
@mcp.resource("data://users")
async def list_users() -> List[dict]:
"""List all users."""
# In production, fetch from database
return [
{"id": "1", "name": "Alice", "email": "alice@example.com"},
{"id": "2", "name": "Bob", "email": "bob@example.com"},
{"id": "3", "name": "Charlie", "email": "charlie@example.com"}
]
@mcp.resource("data://categories")
def list_categories() -> List[str]:
"""List available categories."""
return [
"Technology",
"Science",
"Business",
"Entertainment",
"Sports"
]
# ============================================================================
# Main
# ============================================================================
if __name__ == "__main__":
mcp.run()

View File

@@ -0,0 +1,425 @@
"""
Self-Contained FastMCP Server
==============================
Production pattern with all utilities in one file.
Avoids circular import issues common in cloud deployment.
"""
from fastmcp import FastMCP, Context
import os
import time
import asyncio
import httpx
from typing import Dict, Any, Optional, List
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
# ============================================================================
# Configuration (Self-contained)
# ============================================================================
class Config:
"""Application configuration from environment variables."""
SERVER_NAME = os.getenv("SERVER_NAME", "Self-Contained Server")
SERVER_VERSION = "1.0.0"
API_BASE_URL = os.getenv("API_BASE_URL", "")
API_KEY = os.getenv("API_KEY", "")
CACHE_TTL = int(os.getenv("CACHE_TTL", "300"))
MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3"))
# ============================================================================
# Utilities (All in one place)
# ============================================================================
def format_success(data: Any, message: str = "Success") -> Dict[str, Any]:
"""Format successful response."""
return {
"success": True,
"message": message,
"data": data,
"timestamp": datetime.now().isoformat()
}
def format_error(error: str, code: str = "ERROR") -> Dict[str, Any]:
"""Format error response."""
return {
"success": False,
"error": error,
"code": code,
"timestamp": datetime.now().isoformat()
}
# ============================================================================
# API Client (Lazy Initialization)
# ============================================================================
class APIClient:
"""Singleton HTTP client with lazy initialization."""
_instance: Optional[httpx.AsyncClient] = None
@classmethod
async def get_client(cls) -> Optional[httpx.AsyncClient]:
"""Get or create HTTP client (only when needed)."""
if not Config.API_BASE_URL or not Config.API_KEY:
return None
if cls._instance is None:
cls._instance = httpx.AsyncClient(
base_url=Config.API_BASE_URL,
headers={"Authorization": f"Bearer {Config.API_KEY}"},
timeout=httpx.Timeout(30.0)
)
return cls._instance
@classmethod
async def cleanup(cls):
"""Cleanup HTTP client."""
if cls._instance:
await cls._instance.aclose()
cls._instance = None
# ============================================================================
# Cache (Simple Implementation)
# ============================================================================
class SimpleCache:
"""Time-based cache."""
_cache: Dict[str, Any] = {}
_timestamps: Dict[str, float] = {}
@classmethod
def get(cls, key: str) -> Optional[Any]:
"""Get cached value if not expired."""
if key in cls._cache:
if time.time() - cls._timestamps[key] < Config.CACHE_TTL:
return cls._cache[key]
else:
del cls._cache[key]
del cls._timestamps[key]
return None
@classmethod
def set(cls, key: str, value: Any):
"""Set cache value."""
cls._cache[key] = value
cls._timestamps[key] = time.time()
@classmethod
def clear(cls):
"""Clear all cache."""
cls._cache.clear()
cls._timestamps.clear()
# ============================================================================
# Retry Logic
# ============================================================================
async def retry_with_backoff(func, max_retries: int = 3):
"""Retry function with exponential backoff."""
delay = 1.0
last_exception = None
for attempt in range(max_retries):
try:
return await func()
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
await asyncio.sleep(delay)
delay *= 2
raise last_exception
# ============================================================================
# Server Creation (Module Level - Required for Cloud!)
# ============================================================================
mcp = FastMCP(
name=Config.SERVER_NAME,
instructions=f"""
{Config.SERVER_NAME} v{Config.SERVER_VERSION}
A self-contained MCP server with production patterns.
Available tools:
- process_data: Process and transform data
- fetch_from_api: Fetch data from external API (if configured)
- calculate: Perform calculations
Available resources:
- info://status: Server status
- info://config: Configuration details
"""
)
# ============================================================================
# Tools
# ============================================================================
@mcp.tool()
async def process_data(
data: List[Dict[str, Any]],
operation: str = "summarize"
) -> dict:
"""
Process a list of data items.
Args:
data: List of data items
operation: Operation to perform (summarize, filter, transform)
Returns:
Processed result
"""
try:
if operation == "summarize":
return format_success({
"count": len(data),
"first": data[0] if data else None,
"last": data[-1] if data else None
})
elif operation == "filter":
filtered = [d for d in data if d.get("active", False)]
return format_success({
"original_count": len(data),
"filtered_count": len(filtered),
"data": filtered
})
elif operation == "transform":
transformed = [
{**d, "processed_at": datetime.now().isoformat()}
for d in data
]
return format_success({"data": transformed})
else:
return format_error(f"Unknown operation: {operation}", "INVALID_OPERATION")
except Exception as e:
return format_error(str(e), "PROCESSING_ERROR")
@mcp.tool()
async def fetch_from_api(
endpoint: str,
use_cache: bool = True
) -> dict:
"""
Fetch data from external API.
Args:
endpoint: API endpoint path
use_cache: Whether to use cached response
Returns:
API response or error
"""
if not Config.API_BASE_URL:
return format_error("API not configured", "API_NOT_CONFIGURED")
cache_key = f"api:{endpoint}"
# Check cache
if use_cache:
cached = SimpleCache.get(cache_key)
if cached:
return format_success(cached, "Retrieved from cache")
# Fetch from API
try:
client = await APIClient.get_client()
if not client:
return format_error("API client not available", "CLIENT_ERROR")
async def make_request():
response = await client.get(endpoint)
response.raise_for_status()
return response.json()
data = await retry_with_backoff(make_request, max_retries=Config.MAX_RETRIES)
# Cache the result
if use_cache:
SimpleCache.set(cache_key, data)
return format_success(data, "Fetched from API")
except httpx.HTTPStatusError as e:
return format_error(
f"HTTP {e.response.status_code}",
"HTTP_ERROR"
)
except Exception as e:
return format_error(str(e), "API_ERROR")
@mcp.tool()
def calculate(operation: str, a: float, b: float) -> dict:
"""
Perform mathematical operations.
Args:
operation: add, subtract, multiply, divide
a: First number
b: Second number
Returns:
Calculation result
"""
operations = {
"add": lambda x, y: x + y,
"subtract": lambda x, y: x - y,
"multiply": lambda x, y: x * y,
"divide": lambda x, y: x / y if y != 0 else None
}
if operation not in operations:
return format_error(
f"Unknown operation: {operation}",
"INVALID_OPERATION"
)
result = operations[operation](a, b)
if result is None:
return format_error("Division by zero", "DIVISION_BY_ZERO")
return format_success({
"operation": operation,
"a": a,
"b": b,
"result": result
})
@mcp.tool()
async def batch_process_with_progress(
items: List[str],
context: Context
) -> dict:
"""
Process items with progress tracking.
Args:
items: List of items to process
context: FastMCP context for progress reporting
Returns:
Processing results
"""
results = []
total = len(items)
for i, item in enumerate(items):
# Report progress
await context.report_progress(
progress=i + 1,
total=total,
message=f"Processing {i + 1}/{total}: {item}"
)
# Simulate processing
await asyncio.sleep(0.1)
results.append({
"item": item,
"processed": item.upper(),
"index": i
})
return format_success({
"total": total,
"results": results
}, "Batch processing complete")
@mcp.tool()
def clear_cache() -> dict:
"""Clear all cached data."""
try:
SimpleCache.clear()
return format_success({"cleared": True}, "Cache cleared")
except Exception as e:
return format_error(str(e), "CACHE_ERROR")
# ============================================================================
# Resources
# ============================================================================
@mcp.resource("info://status")
async def server_status() -> dict:
"""Get current server status."""
return {
"server": Config.SERVER_NAME,
"version": Config.SERVER_VERSION,
"status": "operational",
"timestamp": datetime.now().isoformat(),
"api_configured": bool(Config.API_BASE_URL and Config.API_KEY),
"cache_entries": len(SimpleCache._cache)
}
@mcp.resource("info://config")
def server_config() -> dict:
"""Get server configuration (non-sensitive)."""
return {
"server_name": Config.SERVER_NAME,
"version": Config.SERVER_VERSION,
"cache_ttl": Config.CACHE_TTL,
"max_retries": Config.MAX_RETRIES,
"api_configured": bool(Config.API_BASE_URL)
}
@mcp.resource("health://check")
async def health_check() -> dict:
"""Comprehensive health check."""
checks = {}
# Check API
if Config.API_BASE_URL:
try:
client = await APIClient.get_client()
if client:
response = await client.get("/health", timeout=5)
checks["api"] = response.status_code == 200
except:
checks["api"] = False
else:
checks["api"] = None # Not configured
# Check cache
checks["cache"] = True # Always available
return {
"status": "healthy" if all(v for v in checks.values() if v is not None) else "degraded",
"checks": checks,
"timestamp": datetime.now().isoformat()
}
# ============================================================================
# Main Execution
# ============================================================================
if __name__ == "__main__":
try:
print(f"Starting {Config.SERVER_NAME}...")
mcp.run()
except KeyboardInterrupt:
print("\nShutting down...")
finally:
# Cleanup
import asyncio
asyncio.run(APIClient.cleanup())

221
templates/tools-examples.py Normal file
View File

@@ -0,0 +1,221 @@
"""
FastMCP Tools Examples
======================
Comprehensive examples of tool patterns: sync, async, validation, error handling.
"""
from fastmcp import FastMCP, Context
from pydantic import BaseModel, Field, validator
from typing import Optional, List, Dict, Any
from datetime import datetime
import asyncio
mcp = FastMCP("Tools Examples")
# ============================================================================
# Basic Tools
# ============================================================================
@mcp.tool()
def simple_sync_tool(text: str) -> str:
"""Simple synchronous tool."""
return text.upper()
@mcp.tool()
async def simple_async_tool(text: str) -> str:
"""Simple asynchronous tool."""
await asyncio.sleep(0.1) # Simulate async operation
return text.lower()
# ============================================================================
# Tools with Validation
# ============================================================================
class SearchParams(BaseModel):
"""Validated search parameters."""
query: str = Field(min_length=1, max_length=100, description="Search query")
limit: int = Field(default=10, ge=1, le=100, description="Maximum results")
offset: int = Field(default=0, ge=0, description="Offset for pagination")
@validator("query")
def clean_query(cls, v):
return v.strip()
@mcp.tool()
async def validated_search(params: SearchParams) -> dict:
"""
Search with validated parameters.
Pydantic automatically validates all parameters.
"""
return {
"query": params.query,
"limit": params.limit,
"offset": params.offset,
"results": [
{"id": 1, "title": f"Result for: {params.query}"},
{"id": 2, "title": f"Another result for: {params.query}"}
]
}
# ============================================================================
# Tools with Optional Parameters
# ============================================================================
@mcp.tool()
def process_text(
text: str,
uppercase: bool = False,
prefix: Optional[str] = None,
suffix: Optional[str] = None
) -> str:
"""Process text with optional transformations."""
result = text
if uppercase:
result = result.upper()
if prefix:
result = f"{prefix}{result}"
if suffix:
result = f"{result}{suffix}"
return result
# ============================================================================
# Tools with Complex Return Types
# ============================================================================
@mcp.tool()
async def batch_process(items: List[str]) -> Dict[str, Any]:
"""Process multiple items and return detailed results."""
results = []
errors = []
for i, item in enumerate(items):
try:
# Simulate processing
await asyncio.sleep(0.1)
results.append({
"index": i,
"item": item,
"processed": item.upper(),
"success": True
})
except Exception as e:
errors.append({
"index": i,
"item": item,
"error": str(e),
"success": False
})
return {
"total": len(items),
"successful": len(results),
"failed": len(errors),
"results": results,
"errors": errors,
"timestamp": datetime.now().isoformat()
}
# ============================================================================
# Tools with Error Handling
# ============================================================================
@mcp.tool()
async def safe_operation(data: dict) -> dict:
"""Operation with comprehensive error handling."""
try:
# Validate input
if not data:
return {
"success": False,
"error": "Data is required",
"code": "VALIDATION_ERROR"
}
# Simulate operation
await asyncio.sleep(0.1)
processed_data = {k: v.upper() if isinstance(v, str) else v for k, v in data.items()}
return {
"success": True,
"data": processed_data,
"timestamp": datetime.now().isoformat()
}
except KeyError as e:
return {
"success": False,
"error": f"Missing key: {e}",
"code": "KEY_ERROR"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"code": "UNKNOWN_ERROR"
}
# ============================================================================
# Tools with Context (for Elicitation, Progress, Sampling)
# ============================================================================
@mcp.tool()
async def tool_with_progress(count: int, context: Context) -> dict:
"""Tool that reports progress."""
results = []
for i in range(count):
# Report progress if available
if context and hasattr(context, 'report_progress'):
await context.report_progress(
progress=i + 1,
total=count,
message=f"Processing item {i + 1}/{count}"
)
# Simulate work
await asyncio.sleep(0.1)
results.append(f"Item {i + 1}")
return {
"count": count,
"results": results,
"status": "completed"
}
@mcp.tool()
async def tool_with_elicitation(context: Context) -> dict:
"""Tool that requests user input."""
if context and hasattr(context, 'request_elicitation'):
# Request user input
user_name = await context.request_elicitation(
prompt="What is your name?",
response_type=str
)
return {
"greeting": f"Hello, {user_name}!",
"timestamp": datetime.now().isoformat()
}
else:
return {"error": "Elicitation not available"}
# ============================================================================
# Main
# ============================================================================
if __name__ == "__main__":
mcp.run()