Files
gh-jezweb-claude-skills-ski…/templates/api-client-pattern.py
2025-11-30 08:24:49 +08:00

414 lines
11 KiB
Python

"""
FastMCP API Client Pattern
===========================
Manual API integration with connection pooling, caching, and retry logic.
"""
from fastmcp import FastMCP
import httpx
import os
import time
import asyncio
from typing import Optional, Any, Dict
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
mcp = FastMCP("API Client Pattern")
# ============================================================================
# Configuration
# ============================================================================
class Config:
"""API configuration from environment."""
API_BASE_URL = os.getenv("API_BASE_URL", "https://api.example.com")
API_KEY = os.getenv("API_KEY", "")
API_TIMEOUT = int(os.getenv("API_TIMEOUT", "30"))
CACHE_TTL = int(os.getenv("CACHE_TTL", "300")) # 5 minutes
MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3"))
# ============================================================================
# API Client with Connection Pooling
# ============================================================================
class APIClient:
"""Singleton API client with connection pooling."""
_instance: Optional[httpx.AsyncClient] = None
@classmethod
async def get_client(cls) -> httpx.AsyncClient:
"""Get or create the shared HTTP client."""
if cls._instance is None:
cls._instance = httpx.AsyncClient(
base_url=Config.API_BASE_URL,
headers={
"Authorization": f"Bearer {Config.API_KEY}",
"Content-Type": "application/json",
"User-Agent": "FastMCP-Client/1.0"
},
timeout=httpx.Timeout(Config.API_TIMEOUT),
limits=httpx.Limits(
max_keepalive_connections=5,
max_connections=10
)
)
return cls._instance
@classmethod
async def cleanup(cls):
"""Cleanup the HTTP client."""
if cls._instance:
await cls._instance.aclose()
cls._instance = None
# ============================================================================
# Cache Implementation
# ============================================================================
class SimpleCache:
"""Time-based cache for API responses."""
def __init__(self, ttl: int = 300):
self.ttl = ttl
self.cache: Dict[str, Any] = {}
self.timestamps: Dict[str, float] = {}
def get(self, key: str) -> Optional[Any]:
"""Get cached value if not expired."""
if key in self.cache:
if time.time() - self.timestamps[key] < self.ttl:
return self.cache[key]
else:
# Expired, remove it
del self.cache[key]
del self.timestamps[key]
return None
def set(self, key: str, value: Any):
"""Set cache value with timestamp."""
self.cache[key] = value
self.timestamps[key] = time.time()
def invalidate(self, pattern: Optional[str] = None):
"""Invalidate cache entries."""
if pattern:
keys_to_delete = [k for k in self.cache if pattern in k]
for key in keys_to_delete:
del self.cache[key]
del self.timestamps[key]
else:
self.cache.clear()
self.timestamps.clear()
# Global cache instance
cache = SimpleCache(ttl=Config.CACHE_TTL)
# ============================================================================
# Retry Logic with Exponential Backoff
# ============================================================================
async def retry_with_backoff(
func,
max_retries: int = 3,
initial_delay: float = 1.0,
exponential_base: float = 2.0
):
"""Retry function with exponential backoff."""
delay = initial_delay
last_exception = None
for attempt in range(max_retries):
try:
return await func()
except (httpx.TimeoutException, httpx.NetworkError) as e:
last_exception = e
if attempt < max_retries - 1:
print(f"Attempt {attempt + 1} failed, retrying in {delay}s...")
await asyncio.sleep(delay)
delay *= exponential_base
except httpx.HTTPStatusError as e:
# Don't retry client errors (4xx)
if 400 <= e.response.status_code < 500:
raise
last_exception = e
if attempt < max_retries - 1:
await asyncio.sleep(delay)
delay *= exponential_base
raise last_exception
# ============================================================================
# API Tools
# ============================================================================
@mcp.tool()
async def api_get(
endpoint: str,
use_cache: bool = True
) -> dict:
"""
Make a GET request to the API.
Args:
endpoint: API endpoint path (e.g., "/users/123")
use_cache: Whether to use cached response if available
Returns:
API response data or error
"""
cache_key = f"GET:{endpoint}"
# Check cache
if use_cache:
cached = cache.get(cache_key)
if cached:
return {
"success": True,
"data": cached,
"from_cache": True,
"timestamp": datetime.now().isoformat()
}
# Make request with retry
async def make_request():
client = await APIClient.get_client()
response = await client.get(endpoint)
response.raise_for_status()
return response.json()
try:
data = await retry_with_backoff(make_request, max_retries=Config.MAX_RETRIES)
# Cache successful response
if use_cache:
cache.set(cache_key, data)
return {
"success": True,
"data": data,
"from_cache": False,
"timestamp": datetime.now().isoformat()
}
except httpx.HTTPStatusError as e:
return {
"success": False,
"error": f"HTTP {e.response.status_code}",
"message": e.response.text,
"endpoint": endpoint
}
except Exception as e:
return {
"success": False,
"error": str(e),
"endpoint": endpoint
}
@mcp.tool()
async def api_post(
endpoint: str,
data: dict,
invalidate_cache: bool = True
) -> dict:
"""
Make a POST request to the API.
Args:
endpoint: API endpoint path
data: Request body data
invalidate_cache: Whether to invalidate related cache entries
Returns:
API response or error
"""
async def make_request():
client = await APIClient.get_client()
response = await client.post(endpoint, json=data)
response.raise_for_status()
return response.json()
try:
result = await retry_with_backoff(make_request, max_retries=Config.MAX_RETRIES)
# Invalidate cache for related endpoints
if invalidate_cache:
cache.invalidate(endpoint.split('/')[1] if '/' in endpoint else endpoint)
return {
"success": True,
"data": result,
"timestamp": datetime.now().isoformat()
}
except httpx.HTTPStatusError as e:
return {
"success": False,
"error": f"HTTP {e.response.status_code}",
"message": e.response.text
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
@mcp.tool()
async def api_put(
endpoint: str,
data: dict,
invalidate_cache: bool = True
) -> dict:
"""Make a PUT request to the API."""
async def make_request():
client = await APIClient.get_client()
response = await client.put(endpoint, json=data)
response.raise_for_status()
return response.json()
try:
result = await retry_with_backoff(make_request)
if invalidate_cache:
cache.invalidate(endpoint)
return {"success": True, "data": result}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool()
async def api_delete(
endpoint: str,
invalidate_cache: bool = True
) -> dict:
"""Make a DELETE request to the API."""
async def make_request():
client = await APIClient.get_client()
response = await client.delete(endpoint)
response.raise_for_status()
return response.status_code
try:
status = await retry_with_backoff(make_request)
if invalidate_cache:
cache.invalidate(endpoint)
return {
"success": True,
"status_code": status,
"deleted": True
}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool()
async def batch_api_requests(
endpoints: list[str],
use_cache: bool = True
) -> dict:
"""
Make multiple GET requests in parallel.
Args:
endpoints: List of endpoint paths
use_cache: Whether to use cache
Returns:
Batch results with successes and failures
"""
async def fetch_one(endpoint: str):
return await api_get(endpoint, use_cache=use_cache)
results = await asyncio.gather(*[fetch_one(ep) for ep in endpoints])
successful = [r for r in results if r.get("success")]
failed = [r for r in results if not r.get("success")]
return {
"total": len(endpoints),
"successful": len(successful),
"failed": len(failed),
"results": results
}
@mcp.tool()
def clear_cache(pattern: Optional[str] = None) -> dict:
"""
Clear API response cache.
Args:
pattern: Optional pattern to match cache keys (clears all if not provided)
Returns:
Cache clear status
"""
try:
cache.invalidate(pattern)
return {
"success": True,
"message": f"Cache cleared{f' for pattern: {pattern}' if pattern else ''}"
}
except Exception as e:
return {"success": False, "error": str(e)}
# ============================================================================
# Resources
# ============================================================================
@mcp.resource("info://api-status")
async def api_status() -> dict:
"""Check API connectivity and status."""
try:
client = await APIClient.get_client()
response = await client.get("/health", timeout=5)
return {
"api_reachable": True,
"status_code": response.status_code,
"healthy": response.status_code == 200,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"api_reachable": False,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
@mcp.resource("info://cache-stats")
def cache_statistics() -> dict:
"""Get cache statistics."""
return {
"total_entries": len(cache.cache),
"ttl_seconds": cache.ttl,
"entries": list(cache.cache.keys())
}
# ============================================================================
# Main
# ============================================================================
if __name__ == "__main__":
try:
mcp.run()
finally:
# Cleanup on exit
import asyncio
asyncio.run(APIClient.cleanup())