22 KiB
Python Runtime API Reference
Complete reference for MXCP Python endpoints, including wrapping external libraries and packages.
Database Access
from mxcp.runtime import db
# Execute query
results = db.execute(
"SELECT * FROM users WHERE id = $id",
{"id": user_id}
)
# Get first result
first = results[0] if results else None
# Iterate results
for row in results:
print(row["name"])
Important: Always access through db.execute(), never cache db.connection.
Configuration & Secrets
from mxcp.runtime import config
# Get secret (returns dict with parameters)
secret = config.get_secret("api_key")
api_key = secret["value"] if secret else None
# For complex secrets (like HTTP with headers)
http_secret = config.get_secret("api_service")
if http_secret:
token = http_secret.get("BEARER_TOKEN")
headers = http_secret.get("EXTRA_HTTP_HEADERS", {})
# Get settings
project_name = config.get_setting("project")
debug_mode = config.get_setting("debug", default=False)
# Access full configs
user_config = config.user_config
site_config = config.site_config
Lifecycle Hooks
from mxcp.runtime import on_init, on_shutdown
import httpx
client = None
@on_init
def setup():
"""Initialize resources at startup"""
global client
client = httpx.Client()
print("Client initialized")
@on_shutdown
def cleanup():
"""Clean up resources at shutdown"""
global client
if client:
client.close()
IMPORTANT: Lifecycle hooks are for Python resources ONLY
- ✅ USE FOR: HTTP clients, external API connections, ML model loading, cache clients
- ❌ DON'T USE FOR: Database management, DuckDB connections, dbt operations
The DuckDB connection is managed automatically by MXCP. These hooks are for managing Python-specific resources that need initialization at server startup and cleanup at shutdown.
Async Functions
import asyncio
import aiohttp
async def fetch_data(urls: list[str]) -> list[dict]:
"""Fetch from multiple URLs concurrently"""
async def fetch_one(url: str) -> dict:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
results = await asyncio.gather(*[fetch_one(url) for url in urls])
return results
Return Types
Match your function return to the endpoint's return type:
# Array return
def list_items() -> list:
return [{"id": 1}, {"id": 2}]
# Object return
def get_stats() -> dict:
return {"total": 100, "active": 75}
# Scalar return
def count_items() -> int:
return 42
Shared Modules
Organize code in subdirectories:
# python/utils/validators.py
def validate_email(email: str) -> bool:
import re
return bool(re.match(r'^[\w\.-]+@[\w\.-]+\.\w+$', email))
# python/main_tool.py
from utils.validators import validate_email
def process_user(email: str) -> dict:
if not validate_email(email):
return {"error": "Invalid email"}
return {"status": "ok"}
Error Handling
def safe_divide(a: float, b: float) -> dict:
if b == 0:
return {"error": "Division by zero"}
return {"result": a / b}
External API Integration Pattern
import httpx
from mxcp.runtime import config, db
async def call_external_api(param: str) -> dict:
# Get API key
api_key = config.get_secret("external_api")["value"]
# Check cache
cached = db.execute(
"SELECT data FROM cache WHERE key = $key AND ts > datetime('now', '-1 hour')",
{"key": param}
).fetchone()
if cached:
return cached["data"]
# Make API call
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.example.com/data",
params={"q": param, "key": api_key}
)
data = response.json()
# Cache result
db.execute(
"INSERT OR REPLACE INTO cache (key, data, ts) VALUES ($1, $2, CURRENT_TIMESTAMP)",
{"key": param, "data": data}
)
return data
Database Reload (Advanced)
Use reload_duckdb only when external tools need exclusive database access:
from mxcp.runtime import reload_duckdb
def rebuild_database():
"""Trigger database rebuild"""
def rebuild():
# Run with exclusive database access
import subprocess
subprocess.run(["dbt", "run"], check=True)
reload_duckdb(
payload_func=rebuild,
description="Rebuilding with dbt"
)
return {"status": "Reload scheduled"}
Note: Normally you don't need this. Use db.execute() for direct operations.
Wrapping External Libraries
Pattern 1: Simple Library Wrapper
Use case: Expose existing Python library as MCP tool
# python/library_wrapper.py
"""Wrapper for an existing library like requests, pandas, etc."""
import requests
from mxcp.runtime import get_secret
def fetch_url(url: str, method: str = "GET", headers: dict = None) -> dict:
"""Wrap requests library as MCP tool"""
try:
# Get auth if needed
secret = get_secret("api_token")
if secret and headers is None:
headers = {"Authorization": f"Bearer {secret['token']}"}
response = requests.request(method, url, headers=headers, timeout=30)
response.raise_for_status()
return {
"status_code": response.status_code,
"headers": dict(response.headers),
"body": response.json() if response.headers.get('content-type', '').startswith('application/json') else response.text
}
except requests.RequestException as e:
return {"error": str(e), "status": "failed"}
# tools/http_request.yml
mxcp: 1
tool:
name: http_request
description: "Make HTTP requests using requests library"
language: python
parameters:
- name: url
type: string
- name: method
type: string
default: "GET"
return:
type: object
source:
file: ../python/library_wrapper.py
Pattern 2: Data Science Library Wrapper (pandas, numpy)
# python/data_analysis.py
"""Wrap pandas for data analysis"""
import pandas as pd
import numpy as np
from mxcp.runtime import db
def analyze_dataframe(table_name: str) -> dict:
"""Analyze a table using pandas"""
# Read from DuckDB into pandas
df = db.execute(f"SELECT * FROM {table_name}").df()
# Pandas analysis
analysis = {
"shape": df.shape,
"columns": list(df.columns),
"dtypes": df.dtypes.astype(str).to_dict(),
"missing_values": df.isnull().sum().to_dict(),
"summary_stats": df.describe().to_dict(),
"memory_usage": df.memory_usage(deep=True).sum()
}
# Numeric column correlations
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 1:
analysis["correlations"] = df[numeric_cols].corr().to_dict()
return analysis
def pandas_query(table_name: str, operation: str) -> dict:
"""Execute pandas operations on DuckDB table"""
df = db.execute(f"SELECT * FROM {table_name}").df()
# Support common pandas operations
if operation == "describe":
result = df.describe().to_dict()
elif operation == "head":
result = df.head(10).to_dict('records')
elif operation == "value_counts":
# For first categorical column
cat_col = df.select_dtypes(include=['object']).columns[0]
result = df[cat_col].value_counts().to_dict()
else:
return {"error": f"Unknown operation: {operation}"}
return {"operation": operation, "result": result}
Pattern 3: ML Library Wrapper (scikit-learn)
# python/ml_wrapper.py
"""Wrap scikit-learn for ML tasks"""
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from mxcp.runtime import db, on_init
import pickle
import os
# Global model store
models = {}
@on_init
def load_models():
"""Load saved models on startup"""
global models
model_dir = "models"
if os.path.exists(model_dir):
for file in os.listdir(model_dir):
if file.endswith('.pkl'):
model_name = file[:-4]
with open(os.path.join(model_dir, file), 'rb') as f:
models[model_name] = pickle.load(f)
def train_classifier(
table_name: str,
target_column: str,
feature_columns: list[str],
model_name: str = "default"
) -> dict:
"""Train a classifier on DuckDB table"""
# Load data
df = db.execute(f"SELECT * FROM {table_name}").df()
X = df[feature_columns]
y = df[target_column]
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Train model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Evaluate
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
# Save model
global models
models[model_name] = model
os.makedirs("models", exist_ok=True)
with open(f"models/{model_name}.pkl", 'wb') as f:
pickle.dump(model, f)
return {
"model_name": model_name,
"train_accuracy": train_score,
"test_accuracy": test_score,
"feature_importance": dict(zip(feature_columns, model.feature_importances_))
}
def predict(model_name: str, features: dict) -> dict:
"""Make prediction with trained model"""
if model_name not in models:
return {"error": f"Model '{model_name}' not found"}
model = models[model_name]
# Convert features to DataFrame with correct order
import pandas as pd
feature_df = pd.DataFrame([features])
prediction = model.predict(feature_df)[0]
probabilities = model.predict_proba(feature_df)[0] if hasattr(model, 'predict_proba') else None
return {
"prediction": prediction,
"probabilities": probabilities.tolist() if probabilities is not None else None
}
Pattern 4: API Client Library Wrapper
# python/api_client_wrapper.py
"""Wrap an API client library (e.g., stripe, twilio, sendgrid)"""
import stripe
from mxcp.runtime import get_secret, on_init
@on_init
def initialize_stripe():
"""Configure Stripe on startup"""
secret = get_secret("stripe")
if secret:
stripe.api_key = secret["api_key"]
def create_customer(email: str, name: str) -> dict:
"""Wrap Stripe customer creation"""
try:
customer = stripe.Customer.create(
email=email,
name=name
)
return {
"customer_id": customer.id,
"email": customer.email,
"name": customer.name,
"created": customer.created
}
except stripe.error.StripeError as e:
return {"error": str(e), "type": e.__class__.__name__}
def list_charges(customer_id: str = None, limit: int = 10) -> dict:
"""Wrap Stripe charges listing"""
try:
charges = stripe.Charge.list(
customer=customer_id,
limit=limit
)
return {
"charges": [
{
"id": charge.id,
"amount": charge.amount,
"currency": charge.currency,
"status": charge.status,
"created": charge.created
}
for charge in charges.data
]
}
except stripe.error.StripeError as e:
return {"error": str(e)}
Pattern 5: Async Library Wrapper
# python/async_library_wrapper.py
"""Wrap async libraries like httpx, aiohttp"""
import httpx
import asyncio
from mxcp.runtime import get_secret
async def batch_fetch(urls: list[str]) -> list[dict]:
"""Fetch multiple URLs concurrently"""
async with httpx.AsyncClient(timeout=30.0) as client:
async def fetch_one(url: str) -> dict:
try:
response = await client.get(url)
return {
"url": url,
"status": response.status_code,
"data": response.json() if response.headers.get('content-type', '').startswith('application/json') else response.text
}
except Exception as e:
return {"url": url, "error": str(e)}
results = await asyncio.gather(*[fetch_one(url) for url in urls])
return results
async def graphql_query(endpoint: str, query: str, variables: dict = None) -> dict:
"""Wrap GraphQL library/client"""
secret = get_secret("graphql_api")
headers = {"Authorization": f"Bearer {secret['token']}"} if secret else {}
async with httpx.AsyncClient() as client:
response = await client.post(
endpoint,
json={"query": query, "variables": variables or {}},
headers=headers
)
return response.json()
Pattern 6: Complex Library with State Management
# python/stateful_library_wrapper.py
"""Wrap libraries that maintain state (e.g., database connections, cache clients)"""
from redis import Redis
from mxcp.runtime import get_secret, on_init, on_shutdown
redis_client = None
@on_init
def connect_redis():
"""Initialize Redis connection on startup"""
global redis_client
secret = get_secret("redis")
if secret:
redis_client = Redis(
host=secret["host"],
port=secret.get("port", 6379),
password=secret.get("password"),
decode_responses=True
)
@on_shutdown
def disconnect_redis():
"""Clean up Redis connection"""
global redis_client
if redis_client:
redis_client.close()
def cache_set(key: str, value: str, ttl: int = 3600) -> dict:
"""Set value in Redis cache"""
if not redis_client:
return {"error": "Redis not configured"}
try:
redis_client.setex(key, ttl, value)
return {"status": "success", "key": key, "ttl": ttl}
except Exception as e:
return {"error": str(e)}
def cache_get(key: str) -> dict:
"""Get value from Redis cache"""
if not redis_client:
return {"error": "Redis not configured"}
try:
value = redis_client.get(key)
return {"key": key, "value": value, "found": value is not None}
except Exception as e:
return {"error": str(e)}
Dependency Management
requirements.txt
Always include dependencies for wrapped libraries:
# requirements.txt
# HTTP clients
requests>=2.31.0
httpx>=0.24.0
aiohttp>=3.8.0
# Data processing
pandas>=2.0.0
numpy>=1.24.0
openpyxl>=3.1.0 # For Excel support
# ML libraries
scikit-learn>=1.3.0
# API clients
stripe>=5.4.0
twilio>=8.0.0
sendgrid>=6.10.0
# Database/Cache
redis>=4.5.0
psycopg2-binary>=2.9.0 # For PostgreSQL
# Other common libraries
pillow>=10.0.0 # Image processing
beautifulsoup4>=4.12.0 # HTML parsing
lxml>=4.9.0 # XML parsing
Installing Dependencies
# In project directory
pip install -r requirements.txt
# Or install specific library
pip install pandas requests
Error Handling for Library Wrappers
Always handle library-specific exceptions:
def safe_library_call(param: str) -> dict:
"""Template for safe library wrapping"""
try:
# Import library (can fail if not installed)
import some_library
# Use library
result = some_library.do_something(param)
return {"success": True, "result": result}
except ImportError as e:
return {
"error": "Library not installed",
"message": str(e),
"fix": "Run: pip install some_library"
}
except some_library.SpecificError as e:
return {
"error": "Library-specific error",
"message": str(e),
"type": e.__class__.__name__
}
except Exception as e:
return {
"error": "Unexpected error",
"message": str(e),
"type": e.__class__.__name__
}
Database Reload (Advanced)
Important: In most cases, you DON'T need this feature. Use db.execute() directly for database operations.
The reload_duckdb() function allows Python endpoints to trigger a safe reload of the DuckDB database. This is only needed when external processes require exclusive access to the database file.
When to Use
Use reload_duckdb() ONLY when:
- External tools need exclusive database access (e.g., running
dbtas a subprocess) - You're replacing the entire database file
- External processes cannot operate within the same Python process
When NOT to Use
- ❌ Regular database operations (use
db.execute()instead) - ❌ Running dbt (use dbt Python API directly in the same process)
- ❌ Loading data from APIs/files (use
db.execute()to insert data)
DuckDB's concurrency model allows the MXCP process to own the connection while multiple threads operate safely. Only use reload_duckdb() if you absolutely must have an external process update the database file.
API
from mxcp.runtime import reload_duckdb
def update_data_endpoint() -> dict:
"""Endpoint that triggers a data refresh"""
def rebuild_database():
"""
This function runs with all connections closed.
You have exclusive access to the DuckDB file.
"""
# Example: Run external tool
import subprocess
subprocess.run(["dbt", "run", "--target", "prod"], check=True)
# Or: Replace with pre-built database
import shutil
shutil.copy("/staging/analytics.duckdb", "/app/data/analytics.duckdb")
# Or: Load fresh data
import pandas as pd
import duckdb
df = pd.read_parquet("s3://bucket/latest-data.parquet")
conn = duckdb.connect("/app/data/analytics.duckdb")
conn.execute("CREATE OR REPLACE TABLE sales AS SELECT * FROM df")
conn.close()
# Schedule the reload (happens asynchronously)
reload_duckdb(
payload_func=rebuild_database,
description="Updating analytics data"
)
# Return immediately - reload happens in background
return {
"status": "scheduled",
"message": "Data refresh will complete in background"
}
How It Works
When you call reload_duckdb():
- Queues the reload - Function returns immediately to client
- Drains active requests - Existing requests complete normally
- Shuts down runtime - Closes Python hooks and DuckDB connections
- Runs your payload - With all connections closed and exclusive access
- Restarts runtime - Fresh configuration and connections
- Processes waiting requests - With the updated data
Real-World Example
from mxcp.runtime import reload_duckdb, db
from datetime import datetime
import requests
def scheduled_update(source: str = "api") -> dict:
"""Endpoint called by cron to update data"""
def rebuild_from_api():
# Fetch data from external API
response = requests.get("https://api.example.com/analytics/export")
data = response.json()
# Write to DuckDB (exclusive access guaranteed)
import duckdb
conn = duckdb.connect("/app/data/analytics.duckdb")
# Clear old data
conn.execute("DROP TABLE IF EXISTS daily_metrics")
# Load new data
conn.execute("""
CREATE TABLE daily_metrics AS
SELECT * FROM read_json_auto(?)
""", [data])
# Update metadata
conn.execute("""
INSERT INTO update_log (timestamp, source, record_count)
VALUES (?, ?, ?)
""", [datetime.now(), source, len(data)])
conn.close()
reload_duckdb(
payload_func=rebuild_from_api,
description=f"Scheduled update from {source}"
)
return {
"status": "scheduled",
"source": source,
"timestamp": datetime.now().isoformat()
}
Best Practices
- Avoid when possible - Prefer direct
db.execute()operations - Return immediately - Don't wait for reload in your endpoint
- Handle errors in payload - Wrap payload logic in try/except
- Keep payload fast - Long-running payloads block new requests
- Document behavior - Let users know data refresh is asynchronous
Plugin System
MXCP supports a plugin system for extending DuckDB with custom Python functions.
Accessing Plugins
from mxcp.runtime import plugins
# Get a specific plugin
my_plugin = plugins.get("my_custom_plugin")
if my_plugin:
result = my_plugin.some_method()
# List available plugins
available_plugins = plugins.list()
print(f"Available plugins: {available_plugins}")
Example Usage
def use_custom_function(data: str) -> dict:
"""Use a custom DuckDB function from a plugin"""
# Get the plugin
text_plugin = plugins.get("text_processing")
if not text_plugin:
return {"error": "text_processing plugin not available"}
# Use plugin functionality
result = text_plugin.normalize_text(data)
return {"normalized": result}
Plugin Definition
Plugins are defined in plugins/ directory:
# plugins/my_plugin.py
def custom_transform(value: str) -> str:
"""Custom transformation logic"""
return value.upper()
# Register with DuckDB if needed
def register_functions(conn):
"""Register custom functions with DuckDB"""
conn.create_function("custom_upper", custom_transform)
See official MXCP documentation for complete plugin development guide.
Best Practices for Library Wrapping
- Initialize once: Use
@on_initfor expensive setup (connections, model loading) - Clean up: Use
@on_shutdownto release resources (HTTP clients, NOT database) - Handle errors: Catch library-specific exceptions, return error dicts
- Document dependencies: List in requirements.txt with versions
- Type hints: Add for better IDE support and documentation
- Async when appropriate: Use async for I/O-bound library operations
- State management: Use global variables + lifecycle hooks for stateful clients
- Version pin: Pin library versions to avoid breaking changes
- Timeout handling: Add timeouts for network operations
- Return simple types: Convert library-specific objects to dicts/lists
General Best Practices
- Database Access: Always use
db.execute(), never cache connections - Error Handling: Return error dicts instead of raising exceptions
- Type Hints: Use for better IDE support
- Logging: Use standard Python logging
- Resource Management: Use context managers
- Async: Use for I/O-bound operations